# **Data Processing of Code data** 

<a href="https://colab.research.google.com/github/IBM/data-prep-kit/blob/tree/dev/examples/notebooks/code/sample-notebook.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

## Notebook shows a pipeline for processing code data. 

This sample notebook shows how to process hugging face dataset `codeparrot/github-code` with data prep toolkit.

The following transformations are applied in order.

1. HF2Parquet
2. Exact Dedup
3. Doc Id
4. Fuzzy Dedup 
5. Prog Lang Select
6. Filtering
7. Repo Level Grouping
8. Tokenization

This notebook requires atleast 8 cpus. 
To run on google colab you need to change the runtime and choose TPUs. This way colab notebook gets a better machine with more number of cpus.

## Setup

Install data-prep-toolkit and transforms


In [1]:

!pip install data-prep-toolkit-transforms-ray==0.2.1.dev1
!pip install datasets

Collecting data-prep-toolkit-transforms-ray==0.2.1.dev1
  Downloading data_prep_toolkit_transforms_ray-0.2.1.dev1-py3-none-any.whl.metadata (1.7 kB)
Collecting data-prep-toolkit-transforms==0.2.1.dev1 (from data-prep-toolkit-transforms-ray==0.2.1.dev1)
  Using cached data_prep_toolkit_transforms-0.2.1.dev1-py3-none-any.whl.metadata (1.9 kB)
Collecting data-prep-toolkit-ray==0.2.1.dev0 (from data-prep-toolkit-transforms-ray==0.2.1.dev1)
  Using cached data_prep_toolkit_ray-0.2.1.dev0-py3-none-any.whl.metadata (2.0 kB)
Collecting parameterized (from data-prep-toolkit-transforms-ray==0.2.1.dev1)
  Using cached parameterized-0.9.0-py2.py3-none-any.whl.metadata (18 kB)
Collecting tqdm==4.66.3 (from data-prep-toolkit-transforms-ray==0.2.1.dev1)
  Using cached tqdm-4.66.3-py3-none-any.whl.metadata (57 kB)
Collecting mmh3==4.1.0 (from data-prep-toolkit-transforms-ray==0.2.1.dev1)
  Using cached mmh3-4.1.0-cp311-cp311-macosx_11_0_arm64.whl.metadata (13 kB)
Collecting scipy==1.12.0 (from data-pr


### Common Ray params for all transforms

NOTE: The parameters can be left unchanged for normal use. In case you want fine grained control on parallelization, you can tweak these params.

These are the common config paramters used by all transforms. 

It is possible to parallelize the workloads on different cpus by using parameters: 

`runtime_num_worker`: number of parallel workers to be used.

`num_cpus`: number of cpus to be used per worker.

The option `run_locally: True` is used to start a ray cluster for running these transforms. It helps processing files in parallel.


In [2]:
from data_processing_ray.runtime.ray import RayTransformLauncher
from data_processing.utils import ParamsUtils

worker_options = {"num_cpus": 0.8}
#code_location = {"github": "github", "commit_hash": "12345", "path": "path"}
common_config_params = {
        # where to run
        "run_locally": True,
        # orchestrator
        "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
        "runtime_num_workers": 2,
    }




We will do all the processing in `sample_data` folder. Lets create these folders. 

In [3]:

# create if not created
!mkdir -p sample_data
!mkdir -p sample_data/hf_2_parquet

## Lets Start the Exploration of data processing pipeline 

## 1. **Huggingface datasets to Parquet**

This is the first component of this pipeline. It ingests a dataset `codeparrot/github-code` from huggingface and converts it into
parquet files for consumption by the next steps in this data processing pipeline.

For this demo we are trying to process a few records. The following fields can be updated in case you want to use more data.

_total_files_ = 10 <br/>
_rows_per_file_ = 10

The output of this stage of the pipeline would be written to `sample_data/hf_2_parquet`.

In [4]:


import os
import pyarrow as pa
import pyarrow.parquet as pq

from datasets import load_dataset

import uuid
from data_processing.utils import TransformUtils
from collections import defaultdict

DATASET_NAME='codeparrot/github-code'

ds = load_dataset(DATASET_NAME, 
                  streaming=True, 
                  split="train",
                  trust_remote_code=True)

def row_mapper(row):
    return {
            'ext': TransformUtils.get_file_extension(row['path'])[1],
            'document_id': str(uuid.uuid4())
            }

parquet_data_output = "sample_data/hf_2_parquet"

def hf_dataset_to_parquet(ds, skip, nrows, file_name, mapper=None, renamed_columns=[]):
    dst_ = ds.skip(skip).take(nrows)
    data_dict = defaultdict(list)

    dst = dst_.map(mapper)

    for data in dst:
        for k, v in data.items():
            data_dict[k].append(v)

    for old, new in renamed_columns:
        data_dict[new] = data_dict[old]
        del data_dict[old]

    table = pa.Table.from_pydict(data_dict)
    pq.write_table(table, file_name)


## Create some parquet files from HF data

total_files = 10
rows_per_file = 10
for num in range(total_files):
    file_name = os.path.join(
        f"{parquet_data_output}",
        f"data_{num}.parquet"
    )
    print (f"Writing {file_name}")
    hf_dataset_to_parquet(ds, 
                          1 * rows_per_file,
                          rows_per_file,
                          file_name=file_name,
                          mapper=row_mapper,
                          renamed_columns=[("code", "contents"),
                                           ("path", "title")])

Writing sample_data/hf_2_parquet/data_0.parquet
Writing sample_data/hf_2_parquet/data_1.parquet
Writing sample_data/hf_2_parquet/data_2.parquet
Writing sample_data/hf_2_parquet/data_3.parquet
Writing sample_data/hf_2_parquet/data_4.parquet
Writing sample_data/hf_2_parquet/data_5.parquet
Writing sample_data/hf_2_parquet/data_6.parquet
Writing sample_data/hf_2_parquet/data_7.parquet
Writing sample_data/hf_2_parquet/data_8.parquet
Writing sample_data/hf_2_parquet/data_9.parquet


## 2. **Exact dedup** 

This step will try to find exact duplicates in the content and remove them. 

The transform specific params for ededup are:
 
 _ededup_hash_cpu_ -  Number of cpus per worker <br/>
 _ededup_num_hashes_ - Number of workers used to store hashes <br/>
 _ededup_doc_column_ - Name of column which has to be checked for deduplication <br/>


In [5]:
import os
import sys
from ededup_transform_ray import EdedupRayTransformConfiguration

input_folder = parquet_data_output # Output of previous stage is used as input.
output_folder = "sample_data/ededup_out"

# Prepare the commandline params
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

ededup_params = {
    # ededup parameters
    "ededup_hash_cpu": 0.5,
    "ededup_num_hashes": 2,
    "ededup_doc_column": "contents",
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}

params = common_config_params | ededup_params
sys.argv = ParamsUtils.dict_to_req(d=params)
ededup_launcher = RayTransformLauncher(EdedupRayTransformConfiguration())
ededup_launcher.launch()

12:24:03 INFO - Running locally
12:24:03 INFO - exact dedup params are {'doc_column': 'contents', 'hash_cpu': 0.5, 'num_hashes': 2}
12:24:03 INFO - data factory data_ is using local data access: input_folder - sample_data/hf_2_parquet output_folder - sample_data/ededup_out
12:24:03 INFO - data factory data_ max_files -1, n_sample -1
12:24:03 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
12:24:03 INFO - pipeline id pipeline_id
12:24:03 INFO - code location None
12:24:03 INFO - number of workers 2 worker options {'num_cpus': 0.8, 'max_restarts': -1}
12:24:03 INFO - actor creation delay 0
12:24:03 INFO - job details {'job category': 'preprocessing', 'job name': 'ededup', 'job type': 'ray', 'job id': 'job_id'}
2024-08-14 12:24:08,731	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[36m(orchestrate pid=36795)

0

## 3. **DOC ID Generation**

This step is required for fuzzy deduplication to run. 

The transform specific params are:

_doc_column_ - specifies name of the column containing the document (required for ID generation) <br/>
_hash_column_ - specifies name of the column created to hold the string document id, if None, id is not generated <br/>
_int_id_column_ - specifies name of the column created to hold the integer document id, if None, id is not generated <br/>

At least one of hash_column or int_id_column must be specified.

In [6]:
input_folder = "sample_data/ededup_out"
output_folder = "sample_data/docid_out"


from doc_id_transform_ray import DocIDRayTransformConfiguration
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

doc_id_params = {
    # doc id configuration
    "doc_id_doc_column": "contents",
    "doc_id_hash_column": "hash_column",
    "doc_id_int_column": "int_id_column",
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}

params = doc_id_params | common_config_params
sys.argv = ParamsUtils.dict_to_req(d=params)
launcher = RayTransformLauncher(DocIDRayTransformConfiguration())
launcher.launch()

12:24:23 INFO - Running locally
12:24:23 INFO - Doc id parameters are : {'doc_column': 'contents', 'hash_column': 'hash_column', 'int_column': 'int_id_column'}
12:24:23 INFO - data factory data_ is using local data access: input_folder - sample_data/ededup_out output_folder - sample_data/docid_out
12:24:23 INFO - data factory data_ max_files -1, n_sample -1
12:24:23 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
12:24:23 INFO - pipeline id pipeline_id
12:24:23 INFO - code location None
12:24:23 INFO - number of workers 2 worker options {'num_cpus': 0.8, 'max_restarts': -1}
12:24:23 INFO - actor creation delay 0
12:24:23 INFO - job details {'job category': 'preprocessing', 'job name': 'doc_id', 'job type': 'ray', 'job id': 'job_id'}
2024-08-14 12:24:25,295	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[3

0

## 4. **Fuzzy Dedup**

The fdedup transforms removes documents that are very similar to each other within a set of parquet files.

Some important transform specific params are:

**columns used**

_fdedup_doc_column_ - Column to be used for deduplication <br/>
_fdedup_id_column_ - specifies name of the column created to hold the integer document id <br/>
_fdedup_cluster_column_ - specifies name of the column which holds the string document id <br/>

In [7]:
input_folder = "sample_data/docid_out"
output_folder = "sample_data/fdedup_out"


import os
import sys

from data_processing.utils import ParamsUtils
from fdedup_transform_ray import FdedupRayTransformConfiguration

# create parameters

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus": 0.8}
code_location = {"github": "github", "commit_hash": "12345", "path": "path"}
fdedup_params = {
    # columns used
    "fdedup_doc_column": "contents",
    "fdedup_id_column": "int_id_column",
    "fdedup_cluster_column": "hash_column",
    # infrastructure
    "fdedup_bucket_cpu": 0.5,
    "fdedup_doc_cpu": 0.5,
    "fdedup_mhash_cpu": 0.5,
    "fdedup_num_doc_actors": 2,
    "fdedup_num_bucket_actors": 1,
    "fdedup_num_minhash_actors": 1,
    "fdedup_num_preprocessors": 2,
    # fuzzy parameters
    "fdedup_num_permutations": 64,
    "fdedup_threshold": 0.8,
    "fdedup_shingles_size": 5,
    "fdedup_delimiters": " ",
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}

params = common_config_params| fdedup_params

# Pass commandline params
sys.argv = ParamsUtils.dict_to_req(d=params)

# launch
fdedup_launcher = RayTransformLauncher(FdedupRayTransformConfiguration())
fdedup_launcher.launch()

12:24:43 INFO - Running locally
12:24:43 INFO - fuzzy dedup params are {'doc_column': 'contents', 'id_column': 'int_id_column', 'cluster_column': 'hash_column', 'bucket_cpu': 0.5, 'mhash_cpu': 0.5, 'doc_cpu': 0.5, 'num_doc_actors': 2, 'num_minhash_actors': 1, 'num_bucket_actors': 1, 'num_preprocessors': 2, 'num_permutations': 64, 'threshold': 0.8, 'shingles_size': 5, 'delimiters': ' ', 'snapshot_delay': 1, 'use_bucket_snapshot': False, 'use_doc_snapshot': False, 'random_delay_limit': 10, 'worker_options': {'num_cpus': 0.8}}
12:24:43 INFO - data factory data_ is using local data access: input_folder - sample_data/docid_out output_folder - sample_data/fdedup_out
12:24:43 INFO - data factory data_ max_files -1, n_sample -1
12:24:43 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
12:24:43 INFO - pipeline id pipeline_id
12:24:43 INFO - code location None
12:24:43 INFO - number of

0

## 5. **Programming Language Select**

This transform can select the documents for selected languages which can be specified using selected_languages_file.
As an example we are using this [file](https://github.com/IBM/data-prep-kit/blob/dev/transforms/code/proglang_select/python/test-data/languages/allowed-code-languages.txt).

It is an annotator which adds a new column which can be used to select allowed languages.

The important parameters used by this transform are:

_lang_allowed_langs_file_key_ - A file with a list of allowed languages. <br/>
_lang_lang_column_key_ - The name of column which has programming language. <br/>
_lang_output_column_key_ - The name of annotation column. <br/>

In [8]:
input_folder = "sample_data/fdedup_out"
output_folder = "sample_data/ps_out"

# download allowed-code-languages.txt
!wget https://raw.githubusercontent.com/IBM/data-prep-kit/dev/transforms/code/proglang_select/python/test-data/languages/allowed-code-languages.txt
selected_languages_file = "./allowed-code-languages.txt"

from proglang_select_transform_ray import ProgLangSelectRayConfiguration
from proglang_select_transform import (
    lang_allowed_langs_file_key,
    lang_lang_column_key,
    lang_output_column_key,
)

# create parameters
language_column_name = "language"
annotated_column_name = "lang_selected"

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus": 0.8}
langselect_config = {
    lang_allowed_langs_file_key: selected_languages_file,
    lang_lang_column_key: language_column_name,
    lang_output_column_key: annotated_column_name,
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}

params = common_config_params| langselect_config

sys.argv = ParamsUtils.dict_to_req(d=params)

# create launcher
launcher = RayTransformLauncher(ProgLangSelectRayConfiguration())
launcher.launch()

zsh:1: command not found: wget


12:25:19 INFO - Running locally
12:25:19 INFO - data factory proglang_select_ is using local configuration without input/output path
12:25:19 INFO - data factory proglang_select_ max_files -1, n_sample -1
12:25:19 INFO - data factory proglang_select_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
12:25:19 INFO - data factory data_ is using local data access: input_folder - sample_data/fdedup_out output_folder - sample_data/ps_out
12:25:19 INFO - data factory data_ max_files -1, n_sample -1
12:25:19 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
12:25:19 INFO - pipeline id pipeline_id
12:25:19 INFO - code location None
12:25:19 INFO - number of workers 2 worker options {'num_cpus': 0.8, 'max_restarts': -1}
12:25:19 INFO - actor creation delay 0
12:25:19 INFO - job details {'job category':

1

## 6. **Filter**

This transform can be used to filter the documents based on conditions we require. Upto this point in the notebook we have used an
annotating transform *programming language select*. We can now use this filter to remove the documents annotated by the annotating transform.

We can specify filter criteria and also remove columns we added during the course of this pipeline execution.

```python

filter_criteria = [
    "lang_selected = 1",
]
```


In [9]:
input_folder = "sample_data/ps_out"
output_folder = "sample_data/filter_out"


from filter_transform import (
    filter_columns_to_drop_cli_param,
    filter_criteria_cli_param,
    filter_logical_operator_cli_param,
)
from filter_transform_ray import FilterRayTransformConfiguration

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

# This is just an example criteria to filter
filter_criteria = [
    "lang_selected = 1",
]
filter_logical_operator = "AND"
filter_columns_to_drop = ["lang_selected", "hash_column"]

filter_params = {
    filter_criteria_cli_param: filter_criteria,
    filter_columns_to_drop_cli_param: filter_columns_to_drop,
    filter_logical_operator_cli_param: filter_logical_operator,
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}


sys.argv = ParamsUtils.dict_to_req(common_config_params| filter_params)
launcher = RayTransformLauncher(FilterRayTransformConfiguration())
launcher.launch()

12:25:34 INFO - Running locally
12:25:34 INFO - data factory data_ is using local data access: input_folder - sample_data/ps_out output_folder - sample_data/filter_out
12:25:34 INFO - data factory data_ max_files -1, n_sample -1
12:25:34 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
12:25:34 INFO - pipeline id pipeline_id
12:25:34 INFO - code location None
12:25:34 INFO - number of workers 2 worker options {'num_cpus': 0.8, 'max_restarts': -1}
12:25:34 INFO - actor creation delay 0
12:25:34 INFO - job details {'job category': 'preprocessing', 'job name': 'filter', 'job type': 'ray', 'job id': 'job_id'}
2024-08-14 12:25:36,680	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[36m(orchestrate pid=37332)[0m 12:25:37 INFO - orchestrator started at 2024-08-14 12:25:37
[36m(orchestrate pid=37332)[0m 12:25:37

0

## 7. **Repo Level Ordering**

Repo Level semantic ordering grouped by language.

This transform ouputs one parquet per repo. Additionally it can sort the contents of parquet at repo level using semantic algorithm or by file name. It also has a switch to organise output by programming languages, where dominant language per repo is chosen.
It can use local filesystem storage for small data on local node and ray store for scalable  store.

This transform has following parameters:

 _repo_lvl_sorting_enabled_ - If True, the repo level output is sorted using _repo_lvl_sorting_algo_ <br/>
 _repo_lvl_sorting_algo_ - Sorting algorithm to be used for repo level sorting. It can be (SORT_BY_PATH, SORT_SEMANTIC_NORMALISED, SORT_BY_PATH) <br/>
 _repo_lvl_store_type_ - Store to build groupby information. Simplest is "local" when used locally, "ray" when used on cluster <br/>
 _repo_lvl_store_backend_dir_ -  Directory to use for local store. Needed only when repo_lvl_store_type=local <br/>
 _repo_lvl_output_by_langs_ - If True, it organises output into folders of programming language. <br/>
 _repo_lvl_combine_rows_ - If True, it combines the contents of repo into a single row. <br/>



In [10]:
input_folder = "sample_data/filter_out"
output_folder = "sample_data/rlo_out"

import tempfile
from repo_level_order_transform import RepoLevelOrderRayTransformConfiguration
with tempfile.TemporaryDirectory() as tmpdirname:

    # create parameters
    local_conf = {
        "input_folder": input_folder,
        "output_folder": output_folder,
     }

    worker_options = {"num_cpus": 0.8}
    code_location = {"github": "github", "commit_hash": "12345", "path": "path"}

    repo_level_params = {
        "repo_lvl_sorting_algo": "SORT_SEMANTIC_NORMALISED",
        "repo_lvl_store_type": "local",
        "repo_lvl_store_backend_dir": tmpdirname,
        "repo_lvl_output_by_langs": True,
        "repo_lvl_combine_rows": True,
        "repo_lvl_sorting_enabled": True,
        "data_local_config": ParamsUtils.convert_to_ast(local_conf)
    }

    
    sys.argv = ParamsUtils.dict_to_req(d= common_config_params| repo_level_params)
    launcher = RayTransformLauncher(RepoLevelOrderRayTransformConfiguration())
    launcher.launch()

12:25:48 INFO - Running locally
12:25:48 INFO - data factory data_ is using local data access: input_folder - sample_data/filter_out output_folder - sample_data/rlo_out
12:25:48 INFO - data factory data_ max_files -1, n_sample -1
12:25:48 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
12:25:48 INFO - pipeline id pipeline_id
12:25:48 INFO - code location None
12:25:48 INFO - number of workers 2 worker options {'num_cpus': 0.8, 'max_restarts': -1}
12:25:48 INFO - actor creation delay 0
12:25:48 INFO - job details {'job category': 'preprocessing', 'job name': 'repo_lvl', 'job type': 'ray', 'job id': 'job_id'}


Creating Store Params


2024-08-14 12:25:50,695	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[36m(orchestrate pid=37352)[0m 12:25:51 INFO - orchestrator started at 2024-08-14 12:25:51
[36m(orchestrate pid=37352)[0m 12:25:51 ERROR - No input files to process - exiting
12:26:01 INFO - Completed execution in 0.20973085165023803 min, execution result 0


## 8. **Tokenization**



In [11]:
input_folder = "sample_data/rlo_out"
output_folder = "sample_data/tokenize_out"

from tokenization_transform_ray import TokenizationRayConfiguration

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

tf_params= {
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}
sys.argv = ParamsUtils.dict_to_req(d=common_config_params| tf_params)
# create launcher
launcher = RayTransformLauncher(TokenizationRayConfiguration())
# Launch the ray actor(s) to process the input
launcher.launch()

12:26:03 INFO - Running locally
12:26:03 INFO - data factory data_ is using local data access: input_folder - sample_data/rlo_out output_folder - sample_data/tokenize_out
12:26:03 INFO - data factory data_ max_files -1, n_sample -1
12:26:03 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
12:26:03 INFO - pipeline id pipeline_id
12:26:03 INFO - code location None
12:26:03 INFO - number of workers 2 worker options {'num_cpus': 0.8, 'max_restarts': -1}
12:26:03 INFO - actor creation delay 0
12:26:03 INFO - job details {'job category': 'preprocessing', 'job name': 'Tokenization', 'job type': 'ray', 'job id': 'job_id'}
2024-08-14 12:26:05,186	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[36m(orchestrate pid=37377)[0m 12:26:06 INFO - orchestrator started at 2024-08-14 12:26:06
[36m(orchestrate pid=37377)[0m

0