In [None]:
from langchain_community.chat_models import ChatLlamaCpp
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.prebuilt import create_react_agent
from langgraph.prebuilt.chat_agent_executor import AgentState
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.tools import tool
import requests

In [None]:
from langchain_openai import ChatOpenAI

In [None]:
@tool
def createStep(request: str):
    "Creates and configures a transformation step. Expects the request to be provided as a JSON string. The step is stored in memory to be later connected with other steps through hops."
    headers = {"Content-Type": "application/json"}
    url = "http://localhost:8080/steps/create"
    response = requests.post(url, data=request, headers=headers)
    time.sleep(10)
    return response.text

@tool
def defineHops(fromStep: str, toStep: str):
    "Use to create a connection (hop) between two steps in a Pentaho Kettle (PDI) transformation. The `fromStep` parameter is the name of the source step, and `toStep` is the name of the target step. This defines the execution flow so that data flows from the source step to the target step."
    url = f"http://localhost:8080/steps/hop?fromStep={fromStep}&toStep={toStep}"
    response = requests.get(url)
    time.sleep(10)
    return response.text

@tool
def createTransformation(name: str):
    "Creates a new Pentaho Kettle (PDI) transformation using all steps and hops defined. After the transformation is created, the in-memory steps and hops are cleared so that a new transformation can be defined independently."
    url = f"http://localhost:8080/steps/save?transformationName={name}"
    response = requests.get(url)
    time.sleep(10)
    return response.text

@tool
def getAllStepTypes(input: str):
    "Retrieve a list of all supported step types. Use this information to determine the appropriate step type for a given scenario. Note: Tool doesn't require an input, pass an empty string."
    url = "http://localhost:8080/steps/stepType"
    response = requests.get(url)
    time.sleep(10)
    return response.text

@tool
def getStepTypeInfo(stepType: str):
    "Get detailed information for a specific stepType, including its description, typical use case, and a sample payload outlining the required fields and their expected formats."
    url = f"http://localhost:8080/steps/payload?stepType={stepType}"
    response = requests.get(url)
    time.sleep(10)
    return response.text

@tool
def createJob(request: str):
    "Creates a Pentaho job by taking inputs such as job name, transformation files (.ktr), job files (.kjb), and hop definitions. It automatically builds the job structure, connects transformations and jobs as per the provided hops, and saves a fully configured .kjb file."
    url = "http://localhost:8080/jobs/create"
    headers = {"Content-Type": "application/json"}
    response = requests.post(url, data=request, headers=headers)
    time.sleep(10)
    return response

@tool
def getJobSamplePayload(input: str):
    "Fetches a sample payload for creating a job. Note: Tool doesn't require an input, pass an empty string."
    url = "http://localhost:8080/jobs/payload"
    time.sleep(10)
    return requests.get(url)

@tool
def completeJob(input: str):
    "After generating the transformations or jobs, this tool packages all the created .ktr and .kjb files into a single ZIP archive and returns it for download. The tool requires no input — simply pass an empty string when invoking it."
    url = "http://localhost:8080/jobs/complete"
    response = requests.get(url, stream=True)
    time.sleep(10)
    if response.status_code == 200:
        return response.content
    else:
        raise Exception(f"Failed to download ZIP file. Status: {response.status_code}")

def humanInterrupt(query: str):
    "Use this to review the plan for building the entire job or transformation before starting. You can also use it whenever you get stuck or need additional input from the user."
    request = HumanInterrupt(
        action_request=ActionRequest(
            action="run_command"
        ),
        config=HumanInterruptConfig(
            allow_ignore=True, 
            allow_respond=True,
            allow_edit=False,  
            allow_accept=True  
        ),
        description="Please review the plan before execution"
    )
    response = interrupt([request])[0]
    print('HumanInterrupt called')
    
    return human_response

In [None]:
import os

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")


In [None]:
model = ChatOpenAI(model="gpt-4o-mini", temperature=0, api_key=OPENAI_API_KEY)

In [None]:
memory = MemorySaver()
tools = [createStep, defineHops, createTransformation, getAllStepTypes, getStepTypeInfo, getJobSamplePayload, createJob, completeJob]
config = {"configurable": {"thread_id": "test-thread"}}

system_prompt = """
You are an assistant for creating Pentaho Kettle (PDI) workflows.
- Plan workflows based on user prompts.
- Define jobs, transformations, steps and hops using available tools.
- Generate transformations from the defined steps and hops.
- Generate jobs from defined jobs, transformations and hops.
Use the getAllStepTypes tool if needed to fetch all available step types and getStepTypeInfo to fetch the request structure of a given step type.
You can use getJobSamplePayload tool to fetch a sample payload for creating jobs.
Before you start creating the steps and transformations, come up with a overall flow first and must seek approval or suggestions from user.
"""
# Before you start creating the steps, transformations, or jobs, come up with an overall flow first.
# Define the hop between two steps only once, and then create the transformation file once all hops are defined.
# For jobs, hops between transformations and other jobs must be provided in the createJob tool request payload.

langgraph_agent_executor = create_react_agent(model, tools=tools, prompt=system_prompt, checkpointer=memory)

In [None]:
from flask import Flask, request, jsonify, send_file
import io

app = Flask(__name__)

@app.route('/predict', methods=['POST'])
def predict_response():
    try:
        req = request.get_json()
        inputs = {"messages": [("user", req['prompt'])]}
        response = langgraph_agent_executor.invoke(inputs, config)
        agent_output = response['messages'][-1].content
        if isinstance(agent_output, bytes):
            return send_file(
                io.BytesIO(agent_output),
                mimetype='application/zip',
                as_attachment=True,
                download_name="pentaho_jobs.zip"
            )

        return jsonify({"response": agent_output}), 200
    except Exception as e:
        print(e)
        return jsonify({"response": "Error processing the request please try again later."}), 500
    
if __name__ == '__main__':
    app.run()