<div style="background-color: #04D7FD; padding: 20px; text-align: left;">
    <h1 style="color: #000000; font-size: 36px; margin: 0;">Data Processing for RAG with Data Prep Kit</h1>
    
</div>


## Before Running the notebook

Please complete [setting up python dev environment](./setup-python-dev-env.md)

## Overview

This notebook will process PDF documents as part of RAG pipeline

![](media/rag-overview-2.png)

This notebook will perform steps 1, 2 and 3 in RAG pipeline.

Here are the processing steps:

- **pdf2parquet** : Extract text from PDF and convert them into parquet files
- **Chunk documents**: Split the PDFs into 'meaningful sections' (paragraphs, sentences ..etc)
- **Exact Dedup**: Chunks with exact same content are filtered out
- **Doc_ID generation**: Each chunk is assigned a uniq id, based on content and hash
- **Fuzzy Dedup**: Eliminate chunks that are 'very similar' content
- **Doc quality**: Scores the documents based on criteria like number of words, if it contains bad words ..etc
- **Text encoder**: Convert chunks into vectors using embedding models

## Step-1: Configuration

In [1]:
## Configuration

class MyConfig:
    pass 

MY_CONFIG = MyConfig ()

In [2]:
import os 

## Input Data - configure this to the folder we want to process
MY_CONFIG.INPUT_DATA_DIR = "input_data/walmart-reports-1"
MY_CONFIG.OUTPUT_FOLDER = "output"
MY_CONFIG.OUTPUT_FOLDER_FINAL = os.path.join(MY_CONFIG.OUTPUT_FOLDER , "output_final")

## Embedding model
MY_CONFIG.EMBEDDING_MODEL = 'sentence-transformers/all-MiniLM-L6-v2'

## RAY CONFIGURATION
num_cpus_available =  os.cpu_count()
# print (num_cpus_available)
# MY_CONFIG.RAY_NUM_CPUS = num_cpus_available // 2  ## use half the available cores for processing
MY_CONFIG.RAY_NUM_CPUS =  1
# print (MY_CONFIG.RAY_NUM_CPUS)
MY_CONFIG.RAY_MEMORY_GB = 2  # GB
# MY_CONFIG.RAY_RUNTIME_WORKERS = num_cpus_available // 3
MY_CONFIG.RAY_RUNTIME_WORKERS = 2

### Set input/output path variables for the pipeline

In [3]:
import os, sys
import shutil

if not os.path.exists(MY_CONFIG.INPUT_DATA_DIR ):
    raise Exception (f"❌ Input folder MY_CONFIG.INPUT_DATA_DIR = '{MY_CONFIG.INPUT_DATA_DIR}' not found")


## clear output folder
shutil.rmtree(MY_CONFIG.OUTPUT_FOLDER, ignore_errors=True)
shutil.os.makedirs(MY_CONFIG.OUTPUT_FOLDER, exist_ok=True)

print ("✅ Cleared output directory")

✅ Cleared output directory


### Import Common python modules

In [4]:

import os
import sys

# Main repo root
from utils import rootdir

from data_processing_ray.runtime.ray import RayTransformLauncher
from data_processing.runtime.pure_python import PythonTransformLauncher
from data_processing.utils import ParamsUtils

STAGE = 0

In [5]:
def inspect_parquet (parquet_dir, sample_size = 5, display_columns = None):
    import pandas as pd
    import glob
    
    parquet_files = glob.glob(f'{parquet_dir}/*.parquet')
    
    # Create an empty list to store the DataFrames
    dfs = []

    # Loop through each Parquet file and read it into a DataFrame
    for file in parquet_files:
        df = pd.read_parquet(file)
        # print (f"Read file: '{file}'.  number of rows = {df.shape[0]}")
        dfs.append(df)
    
    # Concatenate all DataFrames into a single DataFrame
    data_df = pd.concat(dfs, ignore_index=True)

    if display_columns is not None:
        data_df = data_df[display_columns]
    #print(data_df.head(sample_size))
    return data_df.head(sample_size)

<a id="pdf2parquet"></a>

## Step-2: pdf2parquet -  Convert data from PDF to Parquet

This step is reading the input folder containing all PDF files and ingest them in a parquet table using the [Docling package](https://github.com/DS4SD/docling).
The documents are converted into a JSON format which allows to easily chunk it in the later steps.



### Set Input/output Folder

In [6]:
STAGE  += 1

input_folder = MY_CONFIG.INPUT_DATA_DIR
output_folder =  os.path.join(MY_CONFIG.OUTPUT_FOLDER, f"{STAGE:02}_parquet_out")

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

🏃🏼 STAGE-1: Processing input='input_data/walmart-reports-1' --> output='output/01_parquet_out'


### Execute 

In [7]:
%%time 

import ast
import os
import sys

from pdf2parquet_transform import (
    pdf2parquet_contents_type_cli_param,
    pdf2parquet_contents_types,
)
from pdf2parquet_transform_python import Pdf2ParquetPythonTransformConfiguration
from pdf2parquet_transform_ray import Pdf2ParquetRayTransformConfiguration

from data_processing.utils import GB, ParamsUtils


# create parameters
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus" : MY_CONFIG.RAY_NUM_CPUS, "memory": MY_CONFIG.RAY_MEMORY_GB * GB}
code_location = {"github": "github", "commit_hash": "12345", "path": "path"}
ingest_config = {
    pdf2parquet_contents_type_cli_param: pdf2parquet_contents_types.JSON,
}

params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    "data_files_to_use": ast.literal_eval("['.pdf']"),
    # orchestrator
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": MY_CONFIG.RAY_RUNTIME_WORKERS,
    "runtime_pipeline_id": "pipeline_id",
    "runtime_job_id": "job_id",
    "runtime_code_location": ParamsUtils.convert_to_ast(code_location),
}


sys.argv = ParamsUtils.dict_to_req(d=(params | ingest_config))
# create launcher
launcher = RayTransformLauncher(Pdf2ParquetRayTransformConfiguration())
# launcher = PythonTransformLauncher(Pdf2ParquetPythonTransformConfiguration())
# launch
return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Ray job failed")


00:37:22 INFO - Running locally
00:37:22 INFO - pdf2parquet parameters are : {'artifacts_path': None, 'contents_type': <pdf2parquet_contents_types.JSON: 'application/json'>, 'do_table_structure': True, 'do_ocr': False}
00:37:22 INFO - data factory data_ is using local data access: input_folder - input_data/walmart-reports-1 output_folder - output/01_parquet_out
00:37:22 INFO - data factory data_ max_files -1, n_sample -1
00:37:22 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.pdf'], files to checkpoint ['.parquet']
00:37:22 INFO - pipeline id pipeline_id
00:37:22 INFO - code location {'github': 'github', 'commit_hash': '12345', 'path': 'path'}
00:37:22 INFO - number of workers 2 worker options {'num_cpus': 1, 'memory': 2147483648, 'max_restarts': -1}
00:37:22 INFO - actor creation delay 0
00:37:22 INFO - job details {'job category': 'preprocessing', 'job name': 'pdf2parquet', 'job type': 'ray', 'job id': 'job_id'}
20

✅ Stage:1 completed successfully
CPU times: user 4.36 s, sys: 1.11 s, total: 5.47 s
Wall time: 11min 21s


### Inspect Generated output

In [8]:
inspect_parquet (output_folder, sample_size=5, display_columns=None)

Unnamed: 0,filename,contents,num_pages,num_tables,num_doc_elements,document_id,ext,hash,size,date_acquired,pdf_convert_time,source_filename
0,Walmart-10K-Reports-Optimized_2023.pdf,"{""_name"":"""",""type"":""pdf-document"",""description...",100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf
1,Walmart_2024_copy.pdf,"{""_name"":"""",""type"":""pdf-document"",""description...",100,82,1163,0bac9657-4f93-43e4-968a-eda641f1fb44,pdf,0be5657667eb7229f1389625f61b6d6dfb608c617ed5ef...,1112050,2024-08-29T00:48:29.671805,313.25177,Walmart_2024_copy.pdf
2,Walmart_2024.pdf,"{""_name"":"""",""type"":""pdf-document"",""description...",100,82,1163,4fcd34e3-8a85-4216-a175-7120feb53ac8,pdf,dd3b262828146a536bdc0f04e7c9dfbd7406d043714989...,1112045,2024-08-29T00:43:16.363577,327.949112,Walmart_2024.pdf


<a id="chunking"></a>

##  Step-3: Doc chunks

Split the documents in chunks, according to their layout segmentation.

### Set Input/output Folder

In [9]:
STAGE  += 1

input_folder = output_folder # previous output folder is the input folder for the current stage
output_folder =  os.path.join(MY_CONFIG.OUTPUT_FOLDER, f"{STAGE:02}_chunk_out")

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

🏃🏼 STAGE-2: Processing input='output/01_parquet_out' --> output='output/02_chunk_out'


### Execute 

In [10]:
%%time 

# Import doc_json_chunk transform configuration
from doc_chunk_transform_ray import DocChunkRayTransformConfiguration


# Prepare the commandline params
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus" : MY_CONFIG.RAY_NUM_CPUS}
params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # orchestrator
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": MY_CONFIG.RAY_RUNTIME_WORKERS,
    # doc_chunk arguments
    # ...
}

# Pass the commandline params
sys.argv = ParamsUtils.dict_to_req(d=params)

# create launcher
launcher = RayTransformLauncher(DocChunkRayTransformConfiguration())
# launch
return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Ray job failed")

[nltk_data] Downloading package punkt_tab to
[nltk_data]     /home/sujee/apps/anaconda3/envs/data-prep-
[nltk_data]     kit-3-py311/lib/python3.11/site-
[nltk_data]     packages/llama_index/core/_static/nltk_cache...
[nltk_data]   Package punkt_tab is already up-to-date!
00:48:44 INFO - Running locally
00:48:44 INFO - doc_chunk parameters are : {'chunking_type': <chunking_types.DL_JSON: 'dl_json'>, 'content_column_name': 'contents', 'output_chunk_column_name': 'contents', 'output_jsonpath_column_name': 'doc_jsonpath', 'output_pageno_column_name': 'page_number', 'output_bbox_column_name': 'bbox'}
00:48:44 INFO - data factory data_ is using local data access: input_folder - output/01_parquet_out output_folder - output/02_chunk_out
00:48:44 INFO - data factory data_ max_files -1, n_sample -1
00:48:44 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
00:48:44 INFO - pipeline id pi

✅ Stage:2 completed successfully
CPU times: user 1.35 s, sys: 974 ms, total: 2.33 s
Wall time: 24.2 s


### Inspect Generated output

In [11]:
inspect_parquet (output_folder, sample_size=5, display_columns=None)

Unnamed: 0,filename,num_pages,num_tables,num_doc_elements,document_id,ext,hash,size,date_acquired,pdf_convert_time,source_filename,contents,doc_jsonpath,page_number,bbox
0,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,"A A message from our r CEO\n""We started th...",$.main-text[6],2,"[29.09, 166.58, 492.15, 266.08]"
1,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,A A message from our r CEO\nAnnual report pu...,$.main-text[13],3,"[75.32, 624.9, 420.77, 633.75]"
2,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,A A message from our r CEO\nFor the fiscal y...,$.main-text[14],3,"[214.16, 607.24, 390.6, 617.44]"
3,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,UNITED STATES SECURITIES AND EXCHANGE COMMISSI...,$.main-text[19],3,"[219.78, 683.32, 385.03, 683.81]"
4,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,FORM 10-K\n(State or other jurisdiction of inc...,$.main-text[24],3,"[136.51, 400.49, 229.6, 415.93]"


## Step-4: Exact Dedup

Remove documents having identical code to remove bias in the training data. On the content of each document, a SHA256 hash is computed,
followed by de-duplication of record having identical hashes.

### Set Input/output Folder

In [12]:
STAGE  += 1

input_folder = output_folder # previous output folder is the input folder for the current stage
output_folder =  os.path.join(MY_CONFIG.OUTPUT_FOLDER, f"{STAGE:02}_ededupe_out")

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

🏃🏼 STAGE-3: Processing input='output/02_chunk_out' --> output='output/03_ededupe_out'


### Execute 

In [13]:
%%time

# Import ededup transform configuration
from ededup_transform_ray import EdedupRayTransformConfiguration


# Prepare the commandline params
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus" : MY_CONFIG.RAY_NUM_CPUS}
params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # orchestrator
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": MY_CONFIG.RAY_RUNTIME_WORKERS,
    # ededup parameters
    "ededup_hash_cpu": 0.5,
    "ededup_num_hashes": 2,
    "ededup_doc_column": "contents",
}

# Pass the commandline params
sys.argv = ParamsUtils.dict_to_req(d=params)

# create launcher
launcher = RayTransformLauncher(EdedupRayTransformConfiguration())
# launch
return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Ray job failed")

00:49:05 INFO - Running locally
00:49:05 INFO - exact dedup params are {'doc_column': 'contents', 'hash_cpu': 0.5, 'num_hashes': 2}
00:49:05 INFO - data factory data_ is using local data access: input_folder - output/02_chunk_out output_folder - output/03_ededupe_out
00:49:05 INFO - data factory data_ max_files -1, n_sample -1
00:49:05 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
00:49:05 INFO - pipeline id pipeline_id
00:49:05 INFO - code location None
00:49:05 INFO - number of workers 2 worker options {'num_cpus': 1, 'max_restarts': -1}
00:49:05 INFO - actor creation delay 0
00:49:05 INFO - job details {'job category': 'preprocessing', 'job name': 'ededup', 'job type': 'ray', 'job id': 'job_id'}
2024-08-29 00:49:07,173	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[36m(orchestrate pid=2547949)[0m 0

✅ Stage:3 completed successfully
CPU times: user 91.1 ms, sys: 138 ms, total: 229 ms
Wall time: 15 s


### Inspect Generated output

In [14]:
inspect_parquet (output_folder, sample_size=5, display_columns=None)

Unnamed: 0,filename,num_pages,num_tables,num_doc_elements,document_id,ext,hash,size,date_acquired,pdf_convert_time,source_filename,contents,doc_jsonpath,page_number,bbox
0,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,"A A message from our r CEO\n""We started th...",$.main-text[6],2,"[29.09, 166.58, 492.15, 266.08]"
1,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,A A message from our r CEO\nAnnual report pu...,$.main-text[13],3,"[75.32, 624.9, 420.77, 633.75]"
2,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,A A message from our r CEO\nFor the fiscal y...,$.main-text[14],3,"[214.16, 607.24, 390.6, 617.44]"
3,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,UNITED STATES SECURITIES AND EXCHANGE COMMISSI...,$.main-text[19],3,"[219.78, 683.32, 385.03, 683.81]"
4,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,FORM 10-K\n(State or other jurisdiction of inc...,$.main-text[24],3,"[136.51, 400.49, 229.6, 415.93]"


## Step-5:  DOC ID generation

This transform annotates documents with document "ids". It supports the following transformations of the original data:

 - Adding document hash: this enables the addition of a document hash-based id to the data. The hash is calculated with `hashlib.sha256(doc.encode("utf-8")).hexdigest()`. To enable this annotation, set hash_column to the name of the column, where you want to store it.
 - Adding integer document id: this allows the addition of an integer document id to the data that is unique across all rows in all tables provided to the transform() method. To enable this annotation, set int_id_column to the name of the column, where you want to store it. **This is a pre-requisite for fuzzy dedup** in the pipeline.

In [15]:
# Input for this stage is the output of exact dedeup component
# output of this component makes it possible for fdedup component to run on data.

STAGE  += 1

input_folder = output_folder # previous output folder is the input folder for the current stage
output_folder =  os.path.join(MY_CONFIG.OUTPUT_FOLDER, f"{STAGE:02}_doc_id_out")
print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

🏃🏼 STAGE-4: Processing input='output/03_ededupe_out' --> output='output/04_doc_id_out'


In [16]:
%%time 

from doc_id_transform_ray import DocIDRayTransformConfiguration
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus" : MY_CONFIG.RAY_NUM_CPUS}
params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # orchestrator
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": MY_CONFIG.RAY_RUNTIME_WORKERS,
    # doc id configuration
    "doc_id_doc_column": "contents",
    "doc_id_hash_column": "hash_column",
    "doc_id_int_column": "int_id_column",
}
sys.argv = ParamsUtils.dict_to_req(d=params)

# launch

launcher = RayTransformLauncher(DocIDRayTransformConfiguration())

return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Ray job failed")

00:49:20 INFO - Running locally
00:49:20 INFO - Doc id parameters are : {'doc_column': 'contents', 'hash_column': 'hash_column', 'int_column': 'int_id_column'}
00:49:20 INFO - data factory data_ is using local data access: input_folder - output/03_ededupe_out output_folder - output/04_doc_id_out
00:49:20 INFO - data factory data_ max_files -1, n_sample -1
00:49:20 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
00:49:20 INFO - pipeline id pipeline_id
00:49:20 INFO - code location None
00:49:20 INFO - number of workers 2 worker options {'num_cpus': 1, 'max_restarts': -1}
00:49:20 INFO - actor creation delay 0
00:49:20 INFO - job details {'job category': 'preprocessing', 'job name': 'doc_id', 'job type': 'ray', 'job id': 'job_id'}
2024-08-29 00:49:22,288	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[36m(o

✅ Stage:4 completed successfully
CPU times: user 102 ms, sys: 156 ms, total: 257 ms
Wall time: 15.1 s


### Inspect Generated output

In [17]:
inspect_parquet (output_folder, sample_size=5, display_columns=None)

Unnamed: 0,filename,num_pages,num_tables,num_doc_elements,document_id,ext,hash,size,date_acquired,pdf_convert_time,source_filename,contents,doc_jsonpath,page_number,bbox,hash_column,int_id_column
0,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,"A A message from our r CEO\n""We started th...",$.main-text[6],2,"[29.09, 166.58, 492.15, 266.08]",c4536c46811af1b47aa06f4fbe30d9723a5419d530f8d9...,651
1,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,A A message from our r CEO\nAnnual report pu...,$.main-text[13],3,"[75.32, 624.9, 420.77, 633.75]",1662c583f1bb69f0c82c128b907867e13f1b99584c67b8...,652
2,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,A A message from our r CEO\nFor the fiscal y...,$.main-text[14],3,"[214.16, 607.24, 390.6, 617.44]",2f26fa255117cd004e3fc8e4348d39fd265e570edfdbc7...,653
3,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,UNITED STATES SECURITIES AND EXCHANGE COMMISSI...,$.main-text[19],3,"[219.78, 683.32, 385.03, 683.81]",d11004fae67b2815805f94e98db589acb2f7fa134cd05a...,654
4,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,FORM 10-K\n(State or other jurisdiction of inc...,$.main-text[24],3,"[136.51, 400.49, 229.6, 415.93]",1e999df4a8d56727948c320e191a9c6a416ac89b1547a4...,655


## Step-6: Fuzzy Dedup

Post exact deduplication, fuzzy deduplication is applied with
the goal of removing code files that may have slight variations and thereby unbiasing
the data further. Small variations are quite commonly seen in code data in the form
of variations in the values of variables, addittion of logging statements etc. Find near-
duplicate.

### Set Input/output Folder

In [18]:
## Input to this component is the output of doc_id generator component. 

STAGE  += 1

input_folder = output_folder # previous output folder is the input folder for the current stage
output_folder =  os.path.join(MY_CONFIG.OUTPUT_FOLDER, f"{STAGE:02}_fdedupe_out")
print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

🏃🏼 STAGE-5: Processing input='output/04_doc_id_out' --> output='output/05_fdedupe_out'


### Execute 

In [19]:
%%time 

import os
import sys

from data_processing.utils import ParamsUtils
from fdedup_transform_ray import FdedupRayTransformConfiguration

# create parameters

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus" : MY_CONFIG.RAY_NUM_CPUS}
code_location = {"github": "github", "commit_hash": "12345", "path": "path"}
params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # Orchestration parameters
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": MY_CONFIG.RAY_RUNTIME_WORKERS,
    # columns used
    "fdedup_doc_column": "contents",
    "fdedup_id_column": "int_id_column",
    "fdedup_cluster_column": "hash_column",
    # infrastructure
    "fdedup_bucket_cpu": 0.5,
    "fdedup_doc_cpu": 0.5,
    "fdedup_mhash_cpu": 0.5,
    "fdedup_num_doc_actors": 2,
    "fdedup_num_bucket_actors": 1,
    "fdedup_num_minhash_actors": 1,
    "fdedup_num_preprocessors": 2,
    # fuzzy parameters
    "fdedup_num_permutations": 64,
    "fdedup_threshold": 0.8,
    "fdedup_shingles_size": 5,
    "fdedup_delimiters": " "
}

# Pass commandline params
sys.argv = ParamsUtils.dict_to_req(d=params)

# launch

launcher = RayTransformLauncher(FdedupRayTransformConfiguration())

return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Ray job failed")

00:49:35 INFO - Running locally
00:49:35 INFO - fuzzy dedup params are {'doc_column': 'contents', 'id_column': 'int_id_column', 'cluster_column': 'hash_column', 'bucket_cpu': 0.5, 'mhash_cpu': 0.5, 'doc_cpu': 0.5, 'num_doc_actors': 2, 'num_minhash_actors': 1, 'num_bucket_actors': 1, 'num_preprocessors': 2, 'num_permutations': 64, 'threshold': 0.8, 'shingles_size': 5, 'delimiters': ' ', 'snapshot_delay': 1, 'use_bucket_snapshot': False, 'use_doc_snapshot': False, 'random_delay_limit': 10, 'worker_options': {'num_cpus': 1}}
00:49:35 INFO - data factory data_ is using local data access: input_folder - output/04_doc_id_out output_folder - output/05_fdedupe_out
00:49:35 INFO - data factory data_ max_files -1, n_sample -1
00:49:35 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
00:49:35 INFO - pipeline id pipeline_id
00:49:35 INFO - code location None
00:49:35 INFO - number of wor

✅ Stage:5 completed successfully
CPU times: user 156 ms, sys: 185 ms, total: 340 ms
Wall time: 39.1 s


### Inspect Generated output

In [20]:
inspect_parquet (output_folder, sample_size=5, display_columns=None)

Unnamed: 0,filename,num_pages,num_tables,num_doc_elements,document_id,ext,hash,size,date_acquired,pdf_convert_time,source_filename,contents,doc_jsonpath,page_number,bbox,int_id_column,hash_column
0,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,"A A message from our r CEO\n""We started th...",$.main-text[6],2,"[29.09, 166.58, 492.15, 266.08]",651,-1
1,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,A A message from our r CEO\nAnnual report pu...,$.main-text[13],3,"[75.32, 624.9, 420.77, 633.75]",652,-1
2,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,A A message from our r CEO\nFor the fiscal y...,$.main-text[14],3,"[214.16, 607.24, 390.6, 617.44]",653,-1
3,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,UNITED STATES SECURITIES AND EXCHANGE COMMISSI...,$.main-text[19],3,"[219.78, 683.32, 385.03, 683.81]",654,-1
4,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,Walmart-10K-Reports-Optimized_2023.pdf,FORM 10-K\n(State or other jurisdiction of inc...,$.main-text[24],3,"[136.51, 400.49, 229.6, 415.93]",655,-1


## Step-7: Language identification

This transform identifies the language of the document components.

### Set Input/output Folder

In [21]:
# STAGE  += 1

# input_folder = output_folder # previous output folder is the input folder for the current stage
# output_folder =  os.path.join(MY_CONFIG.OUTPUT_FOLDER, f"{STAGE:02}_langid_out")
# print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

### Execute 

In [22]:
## disabling this for now

# import os
# import sys

# from data_processing.utils import ParamsUtils
# from lang_id_transform import (
#     content_column_name_cli_param,
#     model_credential_cli_param,
#     model_kind_cli_param,
#     model_url_cli_param,
# )
# from lang_models import KIND_FASTTEXT
# from lang_id_transform_ray import LangIdentificationRayTransformConfiguration


# local_conf = {
#     "input_folder": input_folder,
#     "output_folder": output_folder,
# }
# worker_options = {"num_cpus" : MY_CONFIG.RAY_NUM_CPUS}
# langid_config = {
#     model_credential_cli_param: None, #"PUT YOUR OWN HUGGINGFACE CREDENTIAL",
#     model_kind_cli_param: KIND_FASTTEXT,
#     model_url_cli_param: "facebook/fasttext-language-identification",
#     # content_column_name_cli_param: "text",
# }
# params = {
#     # where to run
#     "run_locally": True,
#     # Data access. Only required parameters are specified
#     "data_local_config": ParamsUtils.convert_to_ast(local_conf),
#     # orchestrator
#     "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
#     "runtime_num_workers": 1,
#     # language selection specific parameters
#     **langid_config,
# }

# sys.argv = ParamsUtils.dict_to_req(d=params)

# # create launcher
# launcher = RayTransformLauncher(LangIdentificationRayTransformConfiguration())
# return_code = launcher.launch()

# if return_code != 0:
#     raise Exception ("❌ Ray job failed")


## Step-8: Document Quality

### Set Input/output Folder

In [23]:
STAGE  += 1

input_folder = output_folder # previous output folder is the input folder for the current stage
output_folder =  os.path.join(MY_CONFIG.OUTPUT_FOLDER, f"{STAGE:02}_doc_quality_out")
print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

🏃🏼 STAGE-6: Processing input='output/05_fdedupe_out' --> output='output/06_doc_quality_out'


### Execute 

In [24]:
%%time

import os
import sys
from pathlib import Path

from doc_quality_transform import (
    text_lang_cli_param,
    doc_content_column_cli_param,
    bad_word_filepath_cli_param,
)
from doc_quality_transform_ray import DocQualityRayTransformConfiguration
from data_processing.utils import ParamsUtils

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

doc_quality_basedir = os.path.join(rootdir, "transforms", "language", "doc_quality", "python")
worker_options = {"num_cpus" : MY_CONFIG.RAY_NUM_CPUS}
params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # orchestrator
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": MY_CONFIG.RAY_RUNTIME_WORKERS,
    "runtime_pipeline_id": "pipeline_id",
    "runtime_job_id": "job_id",
    "runtime_creation_delay": 0,
    # doc quality configuration
    text_lang_cli_param: "en",
    doc_content_column_cli_param: "contents",
    bad_word_filepath_cli_param: os.path.join(doc_quality_basedir, "ldnoobw", "en"),
}


Path(output_folder).mkdir(parents=True, exist_ok=True)

sys.argv = ParamsUtils.dict_to_req(d=params)

# create launcher
launcher = RayTransformLauncher(DocQualityRayTransformConfiguration())
# launch
return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Ray job failed")

00:50:14 INFO - Running locally
00:50:14 INFO - doc_quality parameters are : {'text_lang': 'en', 'doc_content_column': 'contents', 'bad_word_filepath': '/home/sujee/my-stuff/projects/ai-alliance/data-prep-kit-sujee/transforms/language/doc_quality/python/ldnoobw/en', 's3_cred': None, 'docq_data_factory': <data_processing.data_access.data_access_factory.DataAccessFactory object at 0x798f5cde7f10>}
00:50:14 INFO - data factory docq_ is using local configuration without input/output path
00:50:14 INFO - data factory docq_ max_files -1, n_sample -1
00:50:14 INFO - data factory docq_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
00:50:14 INFO - data factory data_ is using local data access: input_folder - output/05_fdedupe_out output_folder - output/06_doc_quality_out
00:50:14 INFO - data factory data_ max_files -1, n_sample -1
00:50:14 INFO - data factory data_ Not using data sets, checkpointing False, 

✅ Stage:6 completed successfully
CPU times: user 92.8 ms, sys: 138 ms, total: 230 ms
Wall time: 15.8 s


### Inspect Generated output

In [25]:
inspect_parquet (output_folder, sample_size=5, display_columns=None)

Unnamed: 0,filename,num_pages,num_tables,num_doc_elements,document_id,ext,hash,size,date_acquired,pdf_convert_time,...,docq_mean_word_len,docq_symbol_to_word_ratio,docq_sentence_count,docq_lorem_ipsum_ratio,docq_curly_bracket_ratio,docq_contain_bad_word,docq_bullet_point_ratio,docq_ellipsis_line_ratio,docq_alphabet_word_ratio,docq_contain_common_en_words
0,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,...,4.131579,0.0,3,0.0,0.0,False,0.0,0.0,1.0,True
1,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,...,4.090909,0.0,1,0.0,0.0,False,0.0,0.0,0.909091,True
2,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,...,3.625,0.0,1,0.0,0.0,False,0.0,0.0,0.875,False
3,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,...,12.285714,0.0,1,0.0,0.0,False,0.0,0.0,0.857143,False
4,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,...,6.3,0.0,1,0.0,0.0,False,0.0,0.0,1.0,True


## Step-9:   Text encoding

Encode text for the vector storage.

In [26]:
STAGE  += 1

input_folder = output_folder # previous output folder is the input folder for the current stage
output_folder =  os.path.join(MY_CONFIG.OUTPUT_FOLDER, f"{STAGE:02}_encoder_out")
print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

🏃🏼 STAGE-7: Processing input='output/06_doc_quality_out' --> output='output/07_encoder_out'


In [27]:
%%time 

from text_encoder_transform_ray import TextEncoderRayTransformConfiguration

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus" : MY_CONFIG.RAY_NUM_CPUS}
params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # orchestrator
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": MY_CONFIG.RAY_RUNTIME_WORKERS,
    # text_encoder
    "text_encoder_model_name": MY_CONFIG.EMBEDDING_MODEL,
}

sys.argv = ParamsUtils.dict_to_req(d=params)
# create launcher
launcher = RayTransformLauncher(TextEncoderRayTransformConfiguration())
# Launch the ray actor(s) to process the input

return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Ray job failed")

00:50:31 INFO - Running locally
00:50:31 INFO - text_encoder parameters are : {'content_column_name': 'contents', 'output_embeddings_column_name': 'embeddings', 'model_name': 'sentence-transformers/all-MiniLM-L6-v2'}
00:50:31 INFO - data factory data_ is using local data access: input_folder - output/06_doc_quality_out output_folder - output/07_encoder_out
00:50:31 INFO - data factory data_ max_files -1, n_sample -1
00:50:31 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
00:50:31 INFO - pipeline id pipeline_id
00:50:31 INFO - code location None
00:50:31 INFO - number of workers 2 worker options {'num_cpus': 1, 'max_restarts': -1}
00:50:31 INFO - actor creation delay 0
00:50:31 INFO - job details {'job category': 'preprocessing', 'job name': 'text_encoder', 'job type': 'ray', 'job id': 'job_id'}
2024-08-29 00:50:33,112	INFO worker.py:1744 -- Started a local Ray instance. Vie

✅ Stage:7 completed successfully
CPU times: user 693 ms, sys: 282 ms, total: 975 ms
Wall time: 1min 13s


### Inspect Generated output

In [28]:
inspect_parquet (output_folder, sample_size=5, display_columns=None)

Unnamed: 0,filename,num_pages,num_tables,num_doc_elements,document_id,ext,hash,size,date_acquired,pdf_convert_time,...,docq_symbol_to_word_ratio,docq_sentence_count,docq_lorem_ipsum_ratio,docq_curly_bracket_ratio,docq_contain_bad_word,docq_bullet_point_ratio,docq_ellipsis_line_ratio,docq_alphabet_word_ratio,docq_contain_common_en_words,embeddings
0,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,...,0.0,3,0.0,0.0,False,0.0,0.0,1.0,True,"[-0.006206639, 0.010256912, 0.023658218, -0.02..."
1,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,...,0.0,1,0.0,0.0,False,0.0,0.0,0.909091,True,"[-0.0497427, 0.046492133, -0.02381167, 0.02798..."
2,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,...,0.0,1,0.0,0.0,False,0.0,0.0,0.875,False,"[-0.03265641, -0.040947884, 0.017305722, 0.022..."
3,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,...,0.0,1,0.0,0.0,False,0.0,0.0,0.857143,False,"[-0.024216235, -0.028661048, -0.054423526, 0.0..."
4,Walmart-10K-Reports-Optimized_2023.pdf,100,81,1163,d626ab9b-0f53-446c-b55d-150fbbd93066,pdf,ea5544f26fe0831ec9befbf7aaf68b1b256df6c3ae18b8...,1159974,2024-08-29T00:43:21.059856,332.679391,...,0.0,1,0.0,0.0,False,0.0,0.0,1.0,True,"[-0.022648647, -0.028276687, -0.020278638, -0...."


## Step-10: Copy output to final output dir

In [29]:
import shutil

shutil.rmtree(MY_CONFIG.OUTPUT_FOLDER_FINAL, ignore_errors=True)
shutil.copytree(src=output_folder, dst=MY_CONFIG.OUTPUT_FOLDER_FINAL)

print (f"✅ Copied output from '{output_folder}' --> '{MY_CONFIG.OUTPUT_FOLDER_FINAL}'")

✅ Copied output from 'output/07_encoder_out' --> 'output/output_final'
