# Demonstrate Data-Prep-kit transforms as LangChain or llama-index tools

### This notebook is based on Data Prep Kit Demo
link: https://github.com/IBM/data-prep-kit/blob/v0.2.3/examples/notebooks/intro/dpk_intro_1_ray.ipynb

![](https://raw.githubusercontent.com/IBM/data-prep-kit/v0.2.3/examples/notebooks/intro/images/data-prep-kit-3-workflow.png)

### Install dependencies. This can take some time

In [None]:
%pip install -qq -r requirements.txt

In [None]:
%pip install -qq -r dpk-requirements.txt

In [None]:
! cd llm_utils/dpk/llama_index_tools/llama_index_tools_dpk && pip install -qq -e .

In [None]:
%pip install -qq llama-index

## Use langchain or llama-index

In [None]:
# Set to True to define DPK transforms as langchain tools; otherwise they will be defined as llama-index tools
define_dpk_as_langchain_tools=False

## Define the input task

In [None]:
# Set to True to execute the transforms on the local Ray cluster; otherwise, the Python implementation is used.
run_with_local_ray=False

In [None]:
ray_text=""
if run_with_local_ray:
    ray_text="on a local ray cluster "

task=f"Execute pdf2parquet, doc_chunk, doc_id, ededup, text_encoder transforms {ray_text} one after the other where the input to a transform is the output of the previous transform run."

### Set input/output paths

In [None]:
import shutil
import os
cwd = os.getcwd()

output_base_path = f"{cwd}/output"

input_folder = f"{cwd}/test-data/input/"
output_folder =  f"{output_base_path}/final_1/"

shutil.rmtree(output_base_path, ignore_errors=True)
print (f"✅ Cleared {output_base_path} directory")

### Set transforms parameters

In [None]:
import json

def prepare_params(params: dict):
    params_json=json.dumps(params)
    # trim clurly braces
    return params_json[1:-1]

In [None]:
from data_processing.utils import GB, ParamsUtils

pdf2parquet_params_dict={"data_files_to_use": "['.pdf']", "input_folder":input_folder,  "pdf2parquet_contents_type": "application/json"}
doc_chunk_params_dict={}
doc_id_params_dict={"doc_id_hash_column": "chunk_hash", "doc_id_int_column": "chunk_id"}
ededup_params_dict={"ededup_doc_column": "contents", "ededup_doc_id_column": "chunk_hash"}
text_encoder_params_dict={"text_encoder_model_name": "sentence-transformers/all-MiniLM-L6-v2", "output_folder":output_folder}

In [None]:
if run_with_local_ray:
    worker_options_str=ParamsUtils.convert_to_ast({"num_cpus" : 0.8, "memory": 2 * GB})
    ededup_params_dict=ededup_params_dict|{"ededup_hash_cpu": 0.5, 
                    "ededup_num_hashes": 2,
                    "runtime_worker_options": worker_options_str,
                    "runtime_num_workers": 2}
    
pdf2parquet_params=prepare_params(pdf2parquet_params_dict)
doc_chunk_params=prepare_params(doc_chunk_params_dict)
doc_id_params=prepare_params(doc_id_params_dict)
ededup_params=prepare_params(ededup_params_dict)
text_encoder_params=prepare_params(text_encoder_params_dict)

params=f"for pdf2parquet params use {pdf2parquet_params}. for doc_id use params {doc_id_params}. for ededup use params {ededup_params}. for text_encoder use params {text_encoder_params}"
input=f"{task} {params}"

## Print input task

In [None]:
from IPython.display import HTML

print_task=f"<p><span style='color:blue; font-weight:bold; font-size:14.0pt;'>TASK: {task}</span></p>"
print_pdf2parquet=f"<p><span style='white-space: pre-wrap;color:green; ;font-size:10pt;'>PDF2PARQUET Params: {pdf2parquet_params}</span></p> "
print_doc_chunks=f"<p><span style='white-space: pre-wrap;color:green; ;font-size:10pt;'>DOC CHUNKS Params: {doc_chunk_params}</span></p> "
print_doc_id_params=f"<p><span style='white-space: pre-wrap;color:green; ;font-size:10pt;'>DOC_ID Params: {doc_id_params}</span> </p>"
print_ededup_params=f"<p><span style='color:green; ;font-size:10pt;'>EDEDUP Params: {ededup_params}</span></p> "
print_text_encoder_params=f"<p><span style='color:green; ;font-size:10pt;'>TEXT_ENCODER Params: {text_encoder_params}</span></p> "

HTML(f"{print_task}{print_pdf2parquet}{print_doc_chunks}{print_doc_id_params}{print_ededup_params}{print_text_encoder_params}</span>")

## Define LLM models and tools

We have have tested our project with the following LLM execution frameworks: [Watsonx](https://www.ibm.com/watsonx), [Replicate](https://replicate.com/), and locally running [Ollama](https://ollama.com/).
To use one of the frameworks uncomment its part in the cell below while commenting out the other frameworks.
Please note that the notebooks have been tested with specific Large Language Models (LLMs) that are mentioned in the cell, and due to the inherent nature of LLMs, using a different model may not produce the same results.

- To use Replicate:
  - Obtain Replicate API token
  - Store the following value in the `.env` file located in your project directory:
    ```
        REPLICATE_API_TOKEN=<your Replicate API token>
    ```
- To use Ollama: 
  - Download [Ollama](https://ollama.com/download).
  - Download one of the supported [models](https://ollama.com/search). We tested with `llama3.3` model.
  - update the `model_ollama_*` names if needed.
- To use Watsonx:
  - Register for Watsonx
  - Obtain its API key
  - Store the following values in the `.env` file located in your project directory:
    ```
        WATSONX_URL=<WatsonX entry point, e.g. https://us-south.ml.cloud.ibm.com>
        WATSON_PROJECT_ID=<your Watsonx project ID>
        WATSONX_APIKEY=<your Watsonx API key>
    ```

In [None]:
%load_ext autoreload
%autoreload 2

from dotenv import dotenv_values
config = dotenv_values(".env")

In [None]:
import os
from llm_utils.logging import prep_loggers

os.environ["LLM_LOG_PATH"] = "./logs/llm_log.txt"
os.environ["TOOL_CALLING_LOG_PATH"] = "./logs/tool_log.txt"
prep_loggers("llm=INFO,tool_calling=INFO")

In [None]:
### Define the model

config = dotenv_values("./.env")
# watsonx part 

# model_watsonx_id = "meta-llama/llama-3-1-70b-instruct"
# llm = getChatLLM("watsonx", model_watsonx_id, config)

# # ollama part
# model_ollama = "llama3.3"
# llm = getChatLLM("ollama", model_ollama)

# replicate part
# You can use different llm models
model_replicate = "meta/meta-llama-3-70b-instruct"
llm = getChatLLM("replicate", model_replicate, config)

In [None]:
from llm_utils.models import getChatLLM
from dotenv import dotenv_values

# replicate part
config = dotenv_values("./.env")

model_replicate_id1 = "meta/meta-llama-3-70b-instruct"
llm = getChatLLM("replicate", model_replicate_id1, config)

### List DPK transforms

In [None]:
if define_dpk_as_langchain_tools:
    from llm_utils.dpk.langchain_tools.agent_toolkit.toolkit import DataPrepKitToolkit
    
    toolkit = DataPrepKitToolkit()  
    tools = toolkit.get_tools()
    print("-- DPK tools: --")
    print(tools)
else:
    from llama_index_dpk.tools.dpk.base import DPKTransformsToolSpec
    
    dpk_spec = DPKTransformsToolSpec()
    tools = dpk_spec.to_tool_list()
    print("-- DPK tools: --")
    for t in tools:
        print(t.metadata.name)

In [None]:
if define_dpk_as_langchain_tools:
    from langchain.tools import Tool
    from typing import Union, List
    
    def find_tool_by_name(tools: List[Tool], tool_name: str) -> Tool:
        for tool in tools:
            if tool.name == tool_name:
                return tool
        raise ValueError(f"Tool with name {tool_name} not found")
else:
    from llama_index.core.tools import FunctionTool
    from typing import Union, List
    
    def find_tool_by_name(tools: List[FunctionTool], tool_name: str) -> FunctionTool:
        for tool in tools:
            if tool.metadata.name == tool_name:
                return tool
        raise ValueError(f"Tool with name {tool_name} not found")

In [None]:
if define_dpk_as_langchain_tools:
    from langchain.tools.render import render_text_description
    
    tools_str = render_text_description(tools)
    tool_names = ", ".join([t.name for t in tools]),
else:
    tools_str = '\n'.join(dpk_spec.spec_functions)
    tool_names=", ".join(dpk_spec.spec_functions)

## Define the Prompt

In [None]:
from langchain.prompts import PromptTemplate
from langchain.tools.render import render_text_description
from langchain_core.prompts import ChatPromptTemplate


prompt_template = ChatPromptTemplate.from_template( """Answer the following questions as best you can. You have access to the following tools:

    {tools}
    
    Use the following format:
    
    Question: the input question you must answer
    Thought: you should always think about what to do
    Action: the action to take, should be one of [{tool_names}]
    Action Input: the input to the action
    Observation: the result of the action
    ... (this Thought/Action/Action Input/Observation can repeat N times)
    Thought: I now know the final answer
    Final Answer: the final answer to the original input question

    Final Answer or Action should appear in the answer but not both.
    Follow the exact Action Input format provided in the examples when crafting your response.
    Avoid numbering the output.

    Here's an example.

    For example, If the required task was to execute ededup , doc_id transforms one after the other. 
    The output directory of a transform is the input for the next transform in the transform order. 
    for ededup params use: 'input_folder':'/home/user/input/ededup'
    for doc_id params use : 'output_folder':'/home/user/output/final'. 
    The output should be the following:
      
    Thought: I need to execute the ededup and doc_id one after the other.
    
    Action: ededup
    Action Input: "input_folder":"/home/user/input/ededup", "output_folder":"/home/user/output/ededup"
    Observation: The output of the ededup transform is stored in "/home/user/output/ededup".

    Action: doc_id
    Action Input: "input_folder":"/home/user/output/ededup", "output_folder":"/home/user/output/final"
    Observation: The output of the doc_id transform is stored in "/home/eres/output/final".

    Here's another example: 

    If the required task was to execute ededup , doc_id transforms on a local ray cluster one after the other. 
    The output directory of a transform is the input for the next transform in the transform order. 
    for ededup params use: 'input_folder':'/home/user/input/ededup'
    for doc_id params use : 'output_folder':'/home/user/output/final'
    The output should be the following:
      
    Thought: I need to execute the ededup and doc_id one after the other.
    
    Action: ededup
    Action Input: "runtime_type": "ray", "run_locally": "True", "input_folder":"/home/user/input/ededup", "output_folder":"/home/user/output/ededup"
    Observation: The output of the ededup transform is stored in "/home/user/output/ededup".

    Action: doc_id
    Action Input: "runtime_type": "ray", "run_locally": "True", "input_folder":"/home/user/output/ededup", "output_folder":"/home/user/output/final"
    Observation: The output of the doc_id transform is stored in "/home/user/output/final".

    
    Begin!
    
    Question: {input}
    """)



## Invoke the agent to create the plan

In [None]:
from langchain.prompts import PromptTemplate
print(input)

agent = prompt_template | llm 

agent_step = ""
agent_step = agent.invoke(
            {
                "input": input,
                "tool_names": tool_names,
                "tools": tools_str,
            }
        )
   
print(agent_step.content)

## Parse the plan

In [None]:
content = agent_step.content
if type(content) == list:
    content = ''.join(content)

In [None]:
import re

regex = (r"Action\s*\d*\s*:[\s]*(.*?)[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)")
matches = re.findall(regex, content)

print("LLM result contain the following transforms:\n")
for match in matches:
    print(f"TRANSFORM NAME {match[0]}")
    print(f"TRANSFORM PARAMS {match[1]}")
    print("--------------------------------------")

In [None]:
import json
from typing import Any

def load_from_json(js: str) -> dict[str, Any]:
        try:
            return json.loads(js)
        except Exception as e:
            print(f"Failed to load parameters {js} with error {e}")

## Execute the transfoms by calling their tool definition

In [None]:
def run_tool(match) -> str:
    def contains_parquet_files(dir_path):
      return any(file.endswith(".parquet") for file in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, file)))

    tool_name = match[0]
    tool_to_use = find_tool_by_name(tools, tool_name)
    tool_name = match[0]
    tool_input="{"+match[1]+"}"
    tool_input_dict = load_from_json(tool_input)
    print("=======================================================")
    print (f"🏃🏼 RUNNING {tool_name} with params: {tool_input_dict}")
    print("=======================================================")
    if define_dpk_as_langchain_tools:
        tool_result  = tool_to_use.run(tool_input_dict)
    else:
        tool_result  = tool_to_use.call(**tool_input_dict)
    if not contains_parquet_files(tool_input_dict["output_folder"]):
        out_dir=tool_input_dict["output_folder"]
        raise Exception (f"The {out_dir} directory is unexpectedly empty, indicating the job failed.")
    print (f"✅ {tool_result}")
    
    return tool_result

In [None]:
import time
error=False
for match in matches:
    try:
        tool_result = run_tool(match)
        time.sleep(10)
    except Exception as e:
            error=True
            print(f"❌ Error: " + str(e))
            break

if not error:
    print("=================================================")
    print (f"✅ Transforms execution completed successfully")
    print("=================================================")

## Inspect Generated Output File

You will see a column called embeddings added at the end. This the text content converted into vectors or embeddings. 
We used the model sentence-transformers/all-MiniLM-L6-v2

In [None]:
import glob
import pandas as pd

def read_parquet_files_as_df (parquet_dir):
    parquet_files = glob.glob(f'{parquet_dir}/*.parquet')

    # read each parquet file into a DataFrame and store in a list
    dfs = [pd.read_parquet (f) for f in parquet_files]

    # Concatenate all DataFrames into a single DataFrame
    data_df = pd.concat(dfs, ignore_index=True)
    return data_df

In [None]:
# print the last transform output
last_transform=matches[-1]
tool_input="{"+match[1]+"}"
tool_input_dict = load_from_json(tool_input)
dir=tool_input_dict["output_folder"]
print(dir)
output_df = read_parquet_files_as_df(dir)

print ("Output data dimensions (rows x columns)= ", output_df.shape)

output_df.head(2)