# Demonstrate Data-Prep-kit transforms as LangChain tools

### This notebook is based on Data Prep Kit Demo
link: https://github.com/IBM/data-prep-kit/blob/dev/examples/notebooks/intro/dpk_intro_1_ray.ipynb

![](https://raw.githubusercontent.com/IBM/data-prep-kit/dev/examples/notebooks/intro/images/data-prep-kit-3-workflow.png)

### Install dependencies. This can take some time

In [1]:
%pip install -qq -r requirements.txt

Note: you may need to restart the kernel to use updated packages.


In [2]:
%pip install -qq -r dpk-requirements.txt

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
ibm-cos-sdk-core 2.13.6 requires requests<2.32.3,>=2.32.0, but you have requests 2.32.3 which is incompatible.[0m[31m
Note: you may need to restart the kernel to use updated packages.


## Define the input task

In [3]:
# Set to True to execute the transforms on the local Ray cluster; otherwise, the Python implementation is used.
run_with_local_ray=True

In [4]:
ray_text=""
if run_with_local_ray:
    ray_text="on a local ray cluster "

task=f"Execute pdf2parquet, doc_chunk, doc_id, ededup, text_encoder transforms {ray_text} one after the other where the input to a transform is the output of the previous transform run."

### Set input/output paths

In [5]:
import shutil
import os
cwd = os.getcwd()

output_base_path = f"{cwd}/output"

input_folder = f"{cwd}/input/test-input/"
output_folder =  f"{output_base_path}/final_1/"

shutil.rmtree(output_base_path, ignore_errors=True)
print (f"✅ Cleared {output_base_path} directory")

✅ Cleared /Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output directory


### Set transforms parameters

In [6]:
import json

def prepare_params(params: dict):
    params_json=json.dumps(params)
    # trim clurly braces
    return params_json[1:-1]

In [7]:
from data_processing.utils import GB, ParamsUtils

pdf2parquet_params_dict={"data_files_to_use": "['.pdf']", "input_folder":input_folder,  "pdf2parquet_contents_type": "application/json"}
doc_chunk_params_dict={}
doc_id_params_dict={"doc_id_hash_column": "chunk_hash", "doc_id_int_column": "chunk_id"}
ededup_params_dict={"ededup_doc_column": "contents", "ededup_doc_id_column": "chunk_hash"}
text_encoder_params_dict={"text_encoder_model_name": "sentence-transformers/all-MiniLM-L6-v2", "output_folder":output_folder}

In [8]:
if run_with_local_ray:
    worker_options_str=ParamsUtils.convert_to_ast({"num_cpus" : 0.8, "memory": 2 * GB})
    ededup_params_dict=ededup_params_dict|{"ededup_hash_cpu": 0.5, 
                    "ededup_num_hashes": 2,
                    "runtime_worker_options": worker_options_str,
                    "runtime_num_workers": 2}
    
pdf2parquet_params=prepare_params(pdf2parquet_params_dict)
doc_chunk_params=prepare_params(doc_chunk_params_dict)
doc_id_params=prepare_params(doc_id_params_dict)
ededup_params=prepare_params(ededup_params_dict)
text_encoder_params=prepare_params(text_encoder_params_dict)

params=f"for pdf2parquet params use {pdf2parquet_params}. for doc_id use params {doc_id_params}. for ededup use params {ededup_params}. for text_encoder use params {text_encoder_params}"
input=f"{task} {params}"

## Print input task

In [9]:
from IPython.display import HTML

print_task=f"<p><span style='color:blue; font-weight:bold; font-size:14.0pt;'>TASK: {task}</span></p>"
print_pdf2parquet=f"<p><span style='white-space: pre-wrap;color:green; ;font-size:10pt;'>PDF2PARQUET Params: {pdf2parquet_params}</span></p> "
print_doc_chunks=f"<p><span style='white-space: pre-wrap;color:green; ;font-size:10pt;'>DOC CHUNKS Params: {doc_chunk_params}</span></p> "
print_doc_id_params=f"<p><span style='white-space: pre-wrap;color:green; ;font-size:10pt;'>DOC_ID Params: {doc_id_params}</span> </p>"
print_ededup_params=f"<p><span style='color:green; ;font-size:10pt;'>EDEDUP Params: {ededup_params}</span></p> "
print_text_encoder_params=f"<p><span style='color:green; ;font-size:10pt;'>TEXT_ENCODER Params: {text_encoder_params}</span></p> "

HTML(f"{print_task}{print_pdf2parquet}{print_doc_chunks}{print_doc_id_params}{print_ededup_params}{print_text_encoder_params}</span>")

## Define model and tools¶

In [10]:
%load_ext autoreload
%autoreload 2

from dotenv import dotenv_values
config = dotenv_values(".env")

In [11]:
import os
from llm_utils.logging import prep_loggers

os.environ["LLM_LOG_PATH"] = "./logs/llm_log.txt"
os.environ["TOOL_CALLING_LOG_PATH"] = "./logs/tool_log.txt"
prep_loggers("llm=INFO,tool_calling=INFO")

0

### Define the model

In [12]:
from llm_utils.models import getChatLLM

model_id1 = "ibm/granite-3-8b-instruct"
model_id2 = "meta-llama/llama-3-3-70b-instruct"
model_id3 = "mistralai/mixtral-8x7b-instruct-v01"
llm = getChatLLM("watsonx", model_id2, config);
#model_ollama = "llama3.1:70b"
#llm = getChatLLM("ollama", model_ollama);

### List DPK transforms

In [13]:
from llm_utils.dpk.langchain_tools.agent_toolkit.toolkit import DataPrepKitToolkit

toolkit = DataPrepKitToolkit()  
tools = toolkit.get_tools()
print("-- DPK tools: --")
tools

-- DPK tools: --


[FdedupTransform(),
 EdedupTransform(),
 FilterTransform(),
 ResizeTransform(),
 TokenizationTransform(),
 DocIDTransform(),
 Pdf2parquetTransform(),
 CodeQualityTransform(),
 ProgLangSelectTransform(),
 DocChunkTransform(),
 DocQualityTransform(),
 Code2ParquetTransform(),
 LangIdentificationTransform(),
 TextEncoderTransform(),
 PIIRedactorTransform()]

In [14]:
from langchain.tools import Tool
from typing import Union, List

def find_tool_by_name(tools: List[Tool], tool_name: str) -> Tool:
    for tool in tools:
        if tool.name == tool_name:
            return tool
    raise ValueError(f"Tool with name {tool_name} not found")

In [15]:
from langchain.tools.render import render_text_description

tools_str=render_text_description(tools)
print("-- Print tools descriptions --")
print(tools_str)
tool_names=", ".join([t.name for t in tools]),
print(tool_names)

-- Print tools descriptions --
fdedup - Apply Fdedup transform on files in input folder
ededup - Apply Ededup transform on files in input folder
filter - Apply filter transform on files in input folder
resize - Apply resize transform on files in input folder
tokenization - Apply Tokenization transform on files in input folder
doc_id - Apply doc_id transform on files in input folder
pdf2parquet - Apply pdf2parquet transform on files in input folder
code_quality - Apply code_quality transform on files in input folder
proglang_select - Apply proglang_select transform on files in input folder
doc_chunk - Apply DocChunk transform on files in input folder
doc_quality - Apply DocQuality transform on files in input folder
code2parquet - Apply code2parquet transform on files in input folder
lang_id - Apply LangIdentification transform on files in input folder
text_encoder - Apply text_encoder transform on files in input folder
pii_redactor - Apply PIIRedactor transform on files in input folder


## Define the Prompt

In [16]:
from langchain.prompts import PromptTemplate
from langchain.tools.render import render_text_description
from langchain_core.prompts import ChatPromptTemplate


prompt_template = ChatPromptTemplate.from_template( """Answer the following questions as best you can. You have access to the following tools:

    {tools}
    
    Use the following format:
    
    Question: the input question you must answer
    Thought: you should always think about what to do
    Action: the action to take, should be one of [{tool_names}]
    Action Input: the input to the action
    Observation: the result of the action
    ... (this Thought/Action/Action Input/Observation can repeat N times)
    Thought: I now know the final answer
    Final Answer: the final answer to the original input question

    Final Answer or Action should appear in the answer but not both.
    Follow the exact Action Input format provided in the examples when crafting your response.

    Here's an example.

    For example, If the required task was to execute ededup , doc_id transforms one after the other. 
    The output directory of a transform is the input for the next transform in the transform order. 
    for ededup params use: 'input_folder':'/home/user/input/ededup'
    for doc_id params use : 'output_folder':'/home/user/output/final'. 
    The output should be the following:
      
    Thought: I need to execute the ededup and doc_id one after the other.
    
    Action: ededup
    Action Input: "input_folder":"/home/user/input/ededup", "output_folder":"/home/user/output/ededup"
    Observation: The output of the ededup transform is stored in "/home/user/output/ededup".

    Action: doc_id
    Action Input: "input_folder":"/home/user/output/ededup", "output_folder":"/home/user/output/final"
    Observation: The output of the doc_id transform is stored in "/home/eres/output/final".

    Here's another example: 

    If the required task was to execute ededup , doc_id transforms on a local ray cluster one after the other. 
    The output directory of a transform is the input for the next transform in the transform order. 
    for ededup params use: 'input_folder':'/home/user/input/ededup'
    for doc_id params use : 'output_folder':'/home/user/output/final'
    The output should be the following:
      
    Thought: I need to execute the ededup and doc_id one after the other.
    
    Action: ededup
    Action Input: "runtime_type": "ray", "run_locally": "True", "input_folder":"/home/user/input/ededup", "output_folder":"/home/user/output/ededup"
    Observation: The output of the ededup transform is stored in "/home/user/output/ededup".

    Action: doc_id
    Action Input: "runtime_type": "ray", "run_locally": "True", "input_folder":"/home/user/output/ededup", "output_folder":"/home/user/output/final"
    Observation: The output of the doc_id transform is stored in "/home/user/output/final".

    
    Begin!
    
    Question: {input}
    """)



## Invoke the agent to create the plan

In [17]:
from langchain.prompts import PromptTemplate
print(input)

agent = prompt_template | llm 

agent_step = ""
agent_step = agent.invoke(
            {
                "input": input,
                "tool_names": tool_names,
                "tools": tools_str,
            }
        )
   
print(agent_step.content)

Execute pdf2parquet, doc_chunk, doc_id, ededup, text_encoder transforms on a local ray cluster  one after the other where the input to a transform is the output of the previous transform run. for pdf2parquet params use "data_files_to_use": "['.pdf']", "input_folder": "/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/input/test-input/", "pdf2parquet_contents_type": "application/json". for doc_id use params "doc_id_hash_column": "chunk_hash", "doc_id_int_column": "chunk_id". for ededup use params "ededup_doc_column": "contents", "ededup_doc_id_column": "chunk_hash", "ededup_hash_cpu": 0.5, "ededup_num_hashes": 2, "runtime_worker_options": "{'num_cpus': 0.8, 'memory': 2147483648}", "runtime_num_workers": 2. for text_encoder use params "text_encoder_model_name": "sentence-transformers/all-MiniLM-L6-v2", "output_folder": "/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/final_1/"
1. Thought: I

## Parse the plan

In [18]:
import re

regex = (r"Action\s*\d*\s*:[\s]*(.*?)[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)")
matches = re.findall(regex, agent_step.content)

print("LLM result contain the following transforms:\n")
for match in matches:
    print(f"TRANSFORM NAME {match[0]}")
    print(f"TRANSFORM PARAMS {match[1]}")
    print("--------------------------------------")

LLM result contain the following transforms:

TRANSFORM NAME pdf2parquet
TRANSFORM PARAMS "runtime_type": "ray", "run_locally": "True", "data_files_to_use": "['.pdf']", "input_folder": "/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/input/test-input/", "pdf2parquet_contents_type": "application/json", "output_folder": "/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/pdf2parquet"
--------------------------------------
TRANSFORM NAME doc_chunk
TRANSFORM PARAMS "runtime_type": "ray", "run_locally": "True", "input_folder": "/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/pdf2parquet", "output_folder": "/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/doc_chunk"
--------------------------------------
TRANSFORM NAME doc_id
TRANSFORM PARAMS "runtime_type": "ray", "run_locally": "True", "i

In [19]:
import json
from typing import Any

def load_from_json(js: str) -> dict[str, Any]:
        try:
            return json.loads(js)
        except Exception as e:
            print(f"Failed to load parameters {js} with error {e}")

## Execute the transfoms by calling their tool definition

In [20]:
def run_tool(match) -> str:
    def contains_parquet_files(dir_path):
      return any(file.endswith(".parquet") for file in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, file)))

    tool_name = match[0]
    tool_to_use = find_tool_by_name(tools, tool_name)
    tool_name = match[0]
    tool_input="{"+match[1]+"}"
    tool_input_dict = load_from_json(tool_input)
    print("=======================================================")
    print (f"🏃🏼 RUNNING {tool_name} with params: {tool_input_dict}")
    print("=======================================================")
    tool_result  = tool_to_use.run(tool_input_dict)
    if not contains_parquet_files(tool_input_dict["output_folder"]):
        out_dir=tool_input_dict["output_folder"]
        raise Exception (f"The {out_dir} directory is unexpectedly empty, indicating the job failed.")
    print (f"✅ {tool_result}")
    
    return tool_result

In [21]:
import time
error=False
for match in matches:
    try:
        tool_result = run_tool(match)
        time.sleep(10)
    except Exception as e:
            error=True
            print(f"❌ Error: " + str(e))
            break

if not error:
    print("=================================================")
    print (f"✅ Transforms execution completed successfully")
    print("=================================================")

🏃🏼 RUNNING pdf2parquet with params: {'runtime_type': 'ray', 'run_locally': 'True', 'data_files_to_use': "['.pdf']", 'input_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/input/test-input/', 'pdf2parquet_contents_type': 'application/json', 'output_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/pdf2parquet'}


11:54:44 INFO - pdf2parquet parameters are : {'batch_size': -1, 'artifacts_path': None, 'contents_type': <pdf2parquet_contents_types.JSON: 'application/json'>, 'do_table_structure': True, 'do_ocr': True, 'ocr_engine': <pdf2parquet_ocr_engine.EASYOCR: 'easyocr'>, 'bitmap_area_threshold': 0.05, 'pdf_backend': <pdf2parquet_pdf_backend.DLPARSE_V2: 'dlparse_v2'>, 'double_precision': 8}
INFO:dpk_pdf2parquet.transform:pdf2parquet parameters are : {'batch_size': -1, 'artifacts_path': None, 'contents_type': <pdf2parquet_contents_types.JSON: 'application/json'>, 'do_table_structure': True, 'do_ocr': True, 'ocr_engine': <pdf2parquet_ocr_engine.EASYOCR: 'easyocr'>, 'bitmap_area_threshold': 0.05, 'pdf_backend': <pdf2parquet_pdf_backend.DLPARSE_V2: 'dlparse_v2'>, 'double_precision': 8}
11:54:44 INFO - pipeline id pipeline_id
INFO:data_processing.runtime.execution_configuration:pipeline id pipeline_id
11:54:44 INFO - code location None
INFO:data_processing.runtime.execution_configuration:code locatio

✅ pdf2parquet transform successfully applied with input_folder /Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/input/test-input/ output_folder /Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/pdf2parquet.
🏃🏼 RUNNING doc_chunk with params: {'runtime_type': 'ray', 'run_locally': 'True', 'input_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/pdf2parquet', 'output_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/doc_chunk'}


11:56:07 INFO - doc_chunk parameters are : {'chunking_type': <chunking_types.DL_JSON: 'dl_json'>, 'content_column_name': 'contents', 'doc_id_column_name': 'document_id', 'output_chunk_column_name': 'contents', 'output_source_doc_id_column_name': 'source_document_id', 'output_jsonpath_column_name': 'doc_jsonpath', 'output_pageno_column_name': 'page_number', 'output_bbox_column_name': 'bbox', 'chunk_size_tokens': 128, 'chunk_overlap_tokens': 30, 'dl_min_chunk_len': None}
INFO:dpk_doc_chunk.transformcfg:doc_chunk parameters are : {'chunking_type': <chunking_types.DL_JSON: 'dl_json'>, 'content_column_name': 'contents', 'doc_id_column_name': 'document_id', 'output_chunk_column_name': 'contents', 'output_source_doc_id_column_name': 'source_document_id', 'output_jsonpath_column_name': 'doc_jsonpath', 'output_pageno_column_name': 'page_number', 'output_bbox_column_name': 'bbox', 'chunk_size_tokens': 128, 'chunk_overlap_tokens': 30, 'dl_min_chunk_len': None}
11:56:07 INFO - pipeline id pipeline

launching transform with params: {'run_locally': 'True', 'data_local_config': "{'input_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/pdf2parquet', 'output_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/doc_chunk'}"}


2025-01-16 11:56:08,921	INFO worker.py:1777 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[36m(orchestrate pid=74013)[0m 11:56:11 INFO - orchestrator started at 2025-01-16 11:56:11
[36m(orchestrate pid=74013)[0m 11:56:11 INFO - Number of files is 1, source profile {'max_file_size': 0.00682830810546875, 'min_file_size': 0.00682830810546875, 'total_file_size': 0.00682830810546875}
[36m(orchestrate pid=74013)[0m 11:56:11 INFO - Cluster resources: {'cpus': 10, 'gpus': 0, 'memory': 36.21767272986472, 'object_store': 2.0}
[36m(orchestrate pid=74013)[0m 11:56:11 INFO - Number of workers - 1 with {'num_cpus': 0.8, 'max_restarts': -1} each
[36m(orchestrate pid=74013)[0m 11:56:13 INFO - Completed 0 files (0.0%)  in 0.0 min. Waiting for completion
[36m(orchestrate pid=74013)[0m 11:56:14 INFO - Completed processing 1 files in 0.021 min
[36m(orchestrate pid=74013)[0m 11:56:14 INFO - done flushing in 0.001 sec
11:56:24 INFO - Completed

✅ doc_chunk transform successfully applied with input_folder /Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/pdf2parquet output_folder /Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/doc_chunk.


11:56:36 INFO - Doc id parameters are : {'doc_column': 'contents', 'hash_column': 'chunk_hash', 'int_column': 'chunk_id', 'start_id': 0}
INFO:dpk_doc_id.transform:Doc id parameters are : {'doc_column': 'contents', 'hash_column': 'chunk_hash', 'int_column': 'chunk_id', 'start_id': 0}
11:56:36 INFO - pipeline id pipeline_id
INFO:data_processing.runtime.execution_configuration:pipeline id pipeline_id
11:56:36 INFO - code location None
INFO:data_processing.runtime.execution_configuration:code location None
11:56:36 INFO - number of workers 1 worker options {'num_cpus': 0.8, 'max_restarts': -1}
INFO:data_processing_ray.runtime.ray.execution_configuration:number of workers 1 worker options {'num_cpus': 0.8, 'max_restarts': -1}
11:56:36 INFO - actor creation delay 0
INFO:data_processing_ray.runtime.ray.execution_configuration:actor creation delay 0
11:56:36 INFO - job details {'job category': 'preprocessing', 'job name': 'doc_id', 'job type': 'ray', 'job id': 'job_id'}
INFO:data_processing_ra

🏃🏼 RUNNING doc_id with params: {'runtime_type': 'ray', 'run_locally': 'True', 'input_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/doc_chunk', 'doc_id_hash_column': 'chunk_hash', 'doc_id_int_column': 'chunk_id', 'output_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/doc_id'}
launching transform with params: {'run_locally': 'True', 'data_local_config': "{'input_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/doc_chunk', 'output_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/doc_id'}", 'doc_id_hash_column': 'chunk_hash', 'doc_id_int_column': 'chunk_id'}


2025-01-16 11:56:37,163	INFO worker.py:1777 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[36m(orchestrate pid=74137)[0m 11:56:38 INFO - orchestrator started at 2025-01-16 11:56:38
[36m(orchestrate pid=74137)[0m 11:56:38 INFO - Number of files is 1, source profile {'max_file_size': 0.008722305297851562, 'min_file_size': 0.008722305297851562, 'total_file_size': 0.008722305297851562}
[36m(orchestrate pid=74137)[0m 11:56:38 INFO - Cluster resources: {'cpus': 10, 'gpus': 0, 'memory': 35.934898376464844, 'object_store': 2.0}
[36m(orchestrate pid=74137)[0m 11:56:38 INFO - Number of workers - 1 with {'num_cpus': 0.8, 'max_restarts': -1} each
[36m(orchestrate pid=74137)[0m 11:56:39 INFO - Completed 0 files (0.0%)  in 0.0 min. Waiting for completion
[36m(orchestrate pid=74137)[0m 11:56:39 INFO - Completed processing 1 files in 0.003 min
[36m(orchestrate pid=74137)[0m 11:56:39 INFO - done flushing in 0.001 sec
11:56:49 INFO - Compl

✅ doc_id transform successfully applied with input_folder /Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/doc_chunk output_folder /Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/doc_id.


11:57:00 INFO - exact dedup params are {'doc_column': 'contents', 'doc_id_column': 'chunk_hash', 'use_snapshot': False, 'snapshot_directory': None, 'hash_cpu': 0.5, 'num_hashes': 2}
INFO:dpk_ededup.transform_base:exact dedup params are {'doc_column': 'contents', 'doc_id_column': 'chunk_hash', 'use_snapshot': False, 'snapshot_directory': None, 'hash_cpu': 0.5, 'num_hashes': 2}
11:57:00 INFO - pipeline id pipeline_id
INFO:data_processing.runtime.execution_configuration:pipeline id pipeline_id
11:57:00 INFO - code location None
INFO:data_processing.runtime.execution_configuration:code location None
11:57:00 INFO - number of workers 2 worker options {'num_cpus': 0.8, 'memory': 2147483648, 'max_restarts': -1}
INFO:data_processing_ray.runtime.ray.execution_configuration:number of workers 2 worker options {'num_cpus': 0.8, 'memory': 2147483648, 'max_restarts': -1}
11:57:00 INFO - actor creation delay 0
INFO:data_processing_ray.runtime.ray.execution_configuration:actor creation delay 0
11:57:0

🏃🏼 RUNNING ededup with params: {'runtime_type': 'ray', 'run_locally': 'True', 'input_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/doc_id', 'ededup_doc_column': 'contents', 'ededup_doc_id_column': 'chunk_hash', 'ededup_hash_cpu': 0.5, 'ededup_num_hashes': 2, 'runtime_worker_options': "{'num_cpus': 0.8, 'memory': 2147483648}", 'runtime_num_workers': 2, 'output_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/ededup'}
dict_keys(['runtime_type', 'run_locally', 'runtime_num_workers', 'runtime_worker_options', 'ededup_doc_column', 'ededup_doc_id_column', 'ededup_num_hashes', 'ededup_hash_cpu'])
launching transform with params: {'run_locally': 'True', 'runtime_num_workers': 2, 'runtime_worker_options': "{'num_cpus': 0.8, 'memory': 2147483648}", 'data_local_config': "{'input_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agen

2025-01-16 11:57:01,857	INFO worker.py:1777 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[36m(orchestrate pid=74254)[0m 11:57:02 INFO - orchestrator started at 2025-01-16 11:57:02
[36m(orchestrate pid=74254)[0m 11:57:02 INFO - Number of files is 1, source profile {'max_file_size': 0.010000228881835938, 'min_file_size': 0.010000228881835938, 'total_file_size': 0.010000228881835938}
[36m(orchestrate pid=74254)[0m 11:57:02 INFO - Cluster resources: {'cpus': 10, 'gpus': 0, 'memory': 35.87061462458223, 'object_store': 2.0}
[36m(orchestrate pid=74254)[0m 11:57:02 INFO - Number of workers - 2 with {'num_cpus': 0.8, 'memory': 2147483648, 'max_restarts': -1} each
[36m(orchestrate pid=74254)[0m 11:57:03 INFO - Completed 0 files (0.0%)  in 0.0 min. Waiting for completion
[36m(orchestrate pid=74254)[0m 11:57:04 INFO - Completed processing 1 files in 0.003 min
[36m(orchestrate pid=74254)[0m 11:57:04 INFO - done flushing in 0.001 sec


✅ Ededup transform successfully applied with input_folder /Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/doc_id output_folder /Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/ededup.
🏃🏼 RUNNING text_encoder with params: {'runtime_type': 'ray', 'run_locally': 'True', 'input_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/ededup', 'text_encoder_model_name': 'sentence-transformers/all-MiniLM-L6-v2', 'output_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/final_1/'}


11:57:26 INFO - text_encoder parameters are : {'content_column_name': 'contents', 'output_embeddings_column_name': 'embeddings', 'model_name': 'sentence-transformers/all-MiniLM-L6-v2'}
INFO:dpk_text_encoder.transformcfg:text_encoder parameters are : {'content_column_name': 'contents', 'output_embeddings_column_name': 'embeddings', 'model_name': 'sentence-transformers/all-MiniLM-L6-v2'}
11:57:26 INFO - pipeline id pipeline_id
INFO:data_processing.runtime.execution_configuration:pipeline id pipeline_id
11:57:26 INFO - code location None
INFO:data_processing.runtime.execution_configuration:code location None
11:57:26 INFO - number of workers 1 worker options {'num_cpus': 0.8, 'max_restarts': -1}
INFO:data_processing_ray.runtime.ray.execution_configuration:number of workers 1 worker options {'num_cpus': 0.8, 'max_restarts': -1}
11:57:26 INFO - actor creation delay 0
INFO:data_processing_ray.runtime.ray.execution_configuration:actor creation delay 0
11:57:26 INFO - job details {'job categor

launching transform with params: {'run_locally': 'True', 'data_local_config': "{'input_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/ededup', 'output_folder': '/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/final_1/'}", 'text_encoder_model_name': 'sentence-transformers/all-MiniLM-L6-v2'}


2025-01-16 11:57:27,722	INFO worker.py:1777 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[36m(orchestrate pid=74345)[0m 11:57:31 INFO - orchestrator started at 2025-01-16 11:57:31
[36m(orchestrate pid=74345)[0m 11:57:31 INFO - Number of files is 1, source profile {'max_file_size': 0.010349273681640625, 'min_file_size': 0.010349273681640625, 'total_file_size': 0.010349273681640625}
[36m(orchestrate pid=74345)[0m 11:57:31 INFO - Cluster resources: {'cpus': 10, 'gpus': 0, 'memory': 35.83247833326459, 'object_store': 2.0}
[36m(orchestrate pid=74345)[0m 11:57:31 INFO - Number of workers - 1 with {'num_cpus': 0.8, 'max_restarts': -1} each
[36m(orchestrate pid=74345)[0m 11:57:36 INFO - Completed 0 files (0.0%)  in 0.0 min. Waiting for completion
[36m(orchestrate pid=74345)[0m 11:57:37 INFO - Completed processing 1 files in 0.01 min
[36m(orchestrate pid=74345)[0m 11:57:37 INFO - done flushing in 0.001 sec
11:57:47 INFO - Complet

✅ text_encoder transform successfully applied with input_folder /Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/ededup output_folder /Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/final_1/.
✅ Transforms execution completed successfully


## Inspect Generated Output File

You will see a column called embeddings added at the end. This the text content converted into vectors or embeddings. 
We used the model sentence-transformers/all-MiniLM-L6-v2

In [22]:
os.environ["TOKENIZERS_PARALLELISM"] = "false"
!wget -O 'my_utils.py'  'https://raw.githubusercontent.com/IBM/data-prep-kit/dev/examples/notebooks/intro/my_utils.py'

--2025-01-16 11:57:59--  https://raw.githubusercontent.com/IBM/data-prep-kit/dev/examples/notebooks/intro/my_utils.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 2606:50c0:8003::154, 2606:50c0:8002::154, 2606:50c0:8001::154, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|2606:50c0:8003::154|:443... connected.
...הבושת תלבקל ןיתממ ,החלשנ HTTP תיינפ 200 OK
Length: 1856 (1.8K) [text/plain]
Saving to: ‘my_utils.py’


2025-01-16 11:57:59 (2.00 MB/s) - ‘my_utils.py’ saved [1856/1856]



In [23]:
from my_utils import read_parquet_files_as_df

# print the last transform output
last_transform=matches[-1]
tool_input="{"+match[1]+"}"
tool_input_dict = load_from_json(tool_input)
dir=tool_input_dict["output_folder"]
print(dir)
output_df = read_parquet_files_as_df(dir)

print ("Output data dimensions (rows x columns)= ", output_df.shape)

output_df.head(2)

/Users/alexey/goWork/src/github.com/data-preprocessing/data-prep-lab/examples/agentic-workflow/output/final_1/
Output data dimensions (rows x columns)=  (6, 21)


Unnamed: 0,filename,num_pages,num_tables,num_doc_elements,document_hash,ext,hash,size,date_acquired,pdf_convert_time,...,source_document_id,contents,doc_jsonpath,page_number,bbox,document_id,chunk_hash,chunk_id,removed,embeddings
0,earth.pdf,1,0,11,17915699055171962696,pdf,fc2c603509e668f99cfeede83e0137fe7f711515765dc7...,4994,2025-01-16T11:55:38.984640,0.960712,...,fd2f14b5-8513-42ab-ab4b-e2f348af250f,Our solar system is a vast and fascinating exp...,#/texts/2,1,"[132.66474915, 588.23297119, 479.40899658, 626...",4d2e06dbe1d7a07f3722947cb6e7f9e2487d4a536c73dc...,4d2e06dbe1d7a07f3722947cb6e7f9e2487d4a536c73dc...,0,[],"[0.002112108, -0.010703988, 0.011550161, 0.020..."
1,earth.pdf,1,0,11,17915699055171962696,pdf,fc2c603509e668f99cfeede83e0137fe7f711515765dc7...,4994,2025-01-16T11:55:38.984640,0.960712,...,fd2f14b5-8513-42ab-ab4b-e2f348af250f,For more details about our Solar system see Ch...,#/texts/3,1,"[133.18649292, 570.30102539, 375.57901001, 584...",6b1799aadb8131a788033660ec73788c9f4165eda5e339...,6b1799aadb8131a788033660ec73788c9f4165eda5e339...,1,[],"[-0.060079582, 0.014746683, 0.025538277, 0.048..."
