In [None]:
import ollama
import langgraph
from langgraph.graph import StateGraph, END
from IPython.display import Image, display
from typing_extensions import TypedDict
from typing import List, Annotated
import operator
import sqlite3
import pandas as pd
from langchain_ollama import ChatOllama
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from sqlalchemy import create_engine
from sqlalchemy.pool import StaticPool

# Define model for Ollama
local_llm = "llama3.2:3b-instruct-fp16"
llm = ChatOllama(model=local_llm, temperature=0)

# Define shared graph state
class GraphState(TypedDict):
    question: str
    column_name_str: str
    column_names: dict
    chain: object
    extracted_data: List[dict]
    merged_data: pd.DataFrame
    db_engine: object
    max_retries: int
    loop_step: Annotated[int, operator.add]
    metadata: dict

### Define Agents
class ColumnNameAgent:
    def run(self, state):
        column_name_str = state["column_name_str"]
        column_names = {}
        for name in column_name_str.split(";"):
            col_name, desc = name.strip().split(':')
            column_names[col_name] = f"<{desc}>"
        state["column_names"] = column_names
        return state

class ChainCreationAgent:
    def run(self, state):
        column_names = state["column_names"]
        template = """You need to act as a Named Entity Recognizer.
        Extract the following column names from the review text: {column_names}
        
        Review Text: "{review}"
        
        STRICTLY respond in JSON format like: {{"column_1": "<value 1>", ... }}
        """
        output_parser = JsonOutputParser()
        prompt = PromptTemplate.from_template(template)
        chain = prompt | llm | output_parser
        state["chain"] = chain
        return state

class EntityExtractionAgent:
    def run(self, state):
        chain = state["chain"]
        review_texts = state["df2"]["ReviewText"].tolist()
        column_names = state["column_names"]
        extracted_data = []
        for review in review_texts:
            response = chain.invoke({"review": review, "column_names": column_names})
            extracted_data.append(response)
        state["extracted_data"] = extracted_data
        return state

class DataCombinationAgent:
    def run(self, state):
        df1 = state["df1"]
        extracted_data = state["extracted_data"]
        extracted_df = pd.DataFrame(extracted_data)
        merged_data = pd.merge(df1, extracted_df, left_on=list(state["column_names"].keys()), 
                               right_on=list(state["column_names"].keys()), how='left')
        merged_data.to_csv("merged_data.csv", index=False)
        state["merged_data"] = merged_data
        return state

class DatabaseAgent:
    def run(self, state):
        merged_data = state["merged_data"]
        connection = sqlite3.connect(":memory:", check_same_thread=False)
        merged_data.to_sql('my_table', connection, index=False, if_exists='replace')
        state["db_engine"] = create_engine(
            "sqlite://", creator=lambda: connection, poolclass=StaticPool,
            connect_args={"check_same_thread": False}
        )
        return state

class MetadataExtractionAgent:
    def run(self, state):
        df1 = state["df1"]
        df2 = state["df2"]
        metadata = {
            "df1": {
                "columns": df1.columns.tolist(),
                "dtypes": df1.dtypes.apply(str).to_dict(),
                "num_rows": len(df1),
                "missing_values": df1.isnull().sum().to_dict()
            },
            "df2": {
                "columns": df2.columns.tolist(),
                "dtypes": df2.dtypes.apply(str).to_dict(),
                "num_rows": len(df2),
                "missing_values": df2.isnull().sum().to_dict()
            }
        }
        state["metadata"] = metadata
        return state

class GlobalAgent:
    def __init__(self):
        self.agents = {
            "ColumnNameAgent": ColumnNameAgent(),
            "ChainCreationAgent": ChainCreationAgent(),
            "EntityExtractionAgent": EntityExtractionAgent(),
            "DataCombinationAgent": DataCombinationAgent(),
            "DatabaseAgent": DatabaseAgent(),
            "MetadataExtractionAgent": MetadataExtractionAgent()
        }

    def run(self, agent_name, state):
        if agent_name in self.agents:
            return self.agents[agent_name].run(state)
        else:
            raise ValueError(f"Unknown agent: {agent_name}")

    def monitor_agents(self):
        # Placeholder for monitoring logic, e.g., logging, error handling, retries, etc.
        print("Monitoring agents... All agents are operational.")

### Define Graph Nodes for Workflow Execution
def execute_column_name_agent(state):
    global_agent = GlobalAgent()
    return global_agent.run("ColumnNameAgent", state)

def execute_chain_creation_agent(state):
    global_agent = GlobalAgent()
    return global_agent.run("ChainCreationAgent", state)

def execute_entity_extraction_agent(state):
    global_agent = GlobalAgent()
    return global_agent.run("EntityExtractionAgent", state)

def execute_data_combination_agent(state):
    global_agent = GlobalAgent()
    return global_agent.run("DataCombinationAgent", state)

def execute_database_agent(state):
    global_agent = GlobalAgent()
    return global_agent.run("DatabaseAgent", state)

def execute_metadata_extraction_agent(state):
    global_agent = GlobalAgent()
    return global_agent.run("MetadataExtractionAgent", state)

# Decision function to proceed based on step completion
def decide_next_step(state):
    if state["loop_step"] < state["max_retries"]:
        return "execute_database_agent"
    else:
        return END

def decide_entry_point(state):
    if "column_name_str" in state and state["column_name_str"]:
        return "execute_chain_creation_agent"
    else:
        return "retry"

### Create Workflow Graph with Conditional Entry Points and Edges
workflow = StateGraph(GraphState)

# Add nodes
workflow.add_node("execute_column_name_agent", execute_column_name_agent)
workflow.add_node("execute_chain_creation_agent", execute_chain_creation_agent)
workflow.add_node("execute_entity_extraction_agent", execute_entity_extraction_agent)
workflow.add_node("execute_data_combination_agent", execute_data_combination_agent)
workflow.add_node("execute_database_agent", execute_database_agent)
workflow.add_node("execute_metadata_extraction_agent", execute_metadata_extraction_agent)

# Adding GlobalAgent as a visual node for monitoring
workflow.add_node("monitor_agents", lambda state: GlobalAgent().monitor_agents())

# Define the entry point and edges
workflow.set_conditional_entry_point(
    decide_entry_point,
    {
        "execute_chain_creation_agent": "execute_chain_creation_agent",
        "retry": "execute_column_name_agent",
    },
)

workflow.add_edge("execute_column_name_agent", "monitor_agents")
workflow.add_edge("monitor_agents", "execute_chain_creation_agent")
workflow.add_edge("execute_chain_creation_agent", "execute_entity_extraction_agent")
workflow.add_edge("execute_entity_extraction_agent", "execute_data_combination_agent")
workflow.add_edge("execute_data_combination_agent", "execute_metadata_extraction_agent")
workflow.add_conditional_edges(
    "execute_metadata_extraction_agent",
    decide_next_step,
    {
        "execute_database_agent": "execute_database_agent",
        "end_workflow": END,
    },
)

# Compile and display the graph
graph = workflow.compile()
display(Image(graph.get_graph().draw_mermaid_png()))

# Process function to execute workflow step by step
def process_workflow(state):
    global_agent = GlobalAgent()
    global_agent.monitor_agents()  # Monitor agents before starting the workflow
    next_node = "execute_column_name_agent"
    while next_node != END:
        if next_node == "execute_column_name_agent":
            state = execute_column_name_agent(state)
            next_node = "monitor_agents"
        elif next_node == "monitor_agents":
            global_agent.monitor_agents()
            next_node = "execute_chain_creation_agent"
        elif next_node == "execute_chain_creation_agent":
            state = execute_chain_creation_agent(state)
            next_node = "execute_entity_extraction_agent"
        elif next_node == "execute_entity_extraction_agent":
            state = execute_entity_extraction_agent(state)
            next_node = "execute_data_combination_agent"
        elif next_node == "execute_data_combination_agent":
            state = execute_data_combination_agent(state)
            next_node = "execute_metadata_extraction_agent"
        elif next_node == "execute_metadata_extraction_agent":
            state = execute_metadata_extraction_agent(state)
            next_node = decide_next_step(state)
        elif next_node == "execute_database_agent":
            state = execute_database_agent(state)
            next_node = END
        else:
            raise ValueError("Unknown next node: {}".format(next_node))
    return state

# Initialize the state and run the workflow
initial_state = GraphState(
    question="What are the types of agent memory?",
    column_name_str="Name:Name of the customer;PurchaseDate:Date of purchase of product",
    column_names={},
    chain=None,
    extracted_data=[],
    merged_data=pd.DataFrame(),
    db_engine=None,
    max_retries=3,
    loop_step=0,
    metadata={}
)

# Example data for df1 and df2 (replace these with actual data)
initial_state["df1"] = pd.DataFrame({
    'Name': ['Alice', 'Bob'],
    'PurchaseDate': ['2023-01-01', '2023-01-02']
})

initial_state["df2"] = pd.DataFrame({
    'ReviewText': ['Alice bought a product on 2023-01-01.', 'Bob bought a product on 2023-01-02.']
})

# Run the workflow step by step
final_state = process_workflow(initial_state)

# Output the result
print("Final state:", final_state)
print("Database engine:", final_state["db_engine"])
print("Metadata:", final_state["metadata"])

In [None]:
import json
import sqlite3
import operator
import pandas as pd
from typing import List, Dict, Annotated
from typing_extensions import TypedDict

from langchain_ollama import ChatOllama  # Import Ollama LLM
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain  # Import LLMChain
from langchain.tools import Tool
from langchain.agents import initialize_agent, AgentType
from langchain_core.memory import BaseMemory

# Define model using Ollama
local_llm = "llama3.2:3b-instruct-fp16"
llm = ChatOllama(model=local_llm, temperature=0)

# Define shared graph state
class GraphState(TypedDict):
    question: str
    column_name_str: str
    column_names: dict
    chain: object
    extracted_data: List[dict]
    merged_data: pd.DataFrame
    metadata: dict
    max_retries: int
    loop_step: Annotated[int, operator.add]
    db_engine: sqlite3.Connection
    df1: Dict
    df2: Dict

# Stateful Memory Class
class StatefulMemory(BaseMemory):
    state: Dict = {}

    @property
    def memory_variables(self):
        return list(self.state.keys())

    def load_memory_variables(self, inputs):
        return self.state

    def save_context(self, inputs, outputs):
        pass  # We handle state updates manually

    def clear(self):
        self.state = {}

# Initialize memory
memory = StatefulMemory()

# SQLite database setup
connection = sqlite3.connect(":memory:")  # Create an in-memory SQLite database
memory.state["db_engine"] = connection  # Save the database engine in state
cursor = connection.cursor()

# Create tables for storing results
cursor.execute("""
CREATE TABLE IF NOT EXISTS merged_data (
    CustomerName TEXT,
    PurchaseDate TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS metadata (
    table_name TEXT,
    key TEXT,
    value TEXT
)
""")
connection.commit()

# Function to store data in SQLite
def store_in_sqlite(table_name: str, data: pd.DataFrame):
    data.to_sql(table_name, connection, if_exists="replace", index=False)

# Tools Definition
def column_name_extraction(input_text: str) -> str:
    state = memory.state
    input_text = input_text.strip('"').strip()
    column_names = {}
    for name in input_text.split(";"):
        if ':' in name:
            col_name, desc = name.strip().split(':', 1)
            column_names[col_name.strip()] = f"<{desc.strip()}>"
        else:
            return "Invalid format in column_name_str."
    state["column_names"] = column_names
    return f"Column names extracted: {list(column_names.keys())}"

column_name_tool = Tool(
    name="ColumnNameExtraction",
    func=column_name_extraction,
    description="Extracts column names from a string formatted as 'ColumnName:Description;ColumnName:Description'."
)

def chain_creation(input_text: str) -> str:
    state = memory.state
    column_names = state.get("column_names", {})
    if not column_names:
        return "Error: 'column_names' not found in state."
    template = f"""You are a Named Entity Recognizer.
Given the following review text, extract the entities corresponding to the specified column names.

Column Names and Descriptions: {column_names}

Review Text: "{{{{review}}}}"

Respond in JSON format with the extracted entities, e.g., {{"column_1": "value1", "column_2": "value2"}}
"""
    try:
        prompt = PromptTemplate(template=template, input_variables=["review"])
        chain = LLMChain(prompt=prompt, llm=llm)
        state["chain"] = chain
        return "Chain created for entity extraction"
    except Exception as e:
        return f"Error in chain creation: {str(e)}"

chain_creation_tool = Tool(
    name="ChainCreation",
    func=chain_creation,
    description="Creates a chain for entity extraction based on the column names in the state."
)

def entity_extraction(input_text: str) -> str:
    state = memory.state
    if "df2" not in state or not state["df2"]:
        return "Error: 'df2' not found in state."

    chain = state.get("chain")
    if not chain:
        return "Error: 'chain' not found in state."
    
    df2 = pd.DataFrame(state["df2"])
    if "ReviewText" not in df2.columns:
        return "Error: 'ReviewText' column not found in df2."

    review_texts = df2["ReviewText"].tolist()
    extracted_data = []
    for review in review_texts:
        try:
            response = chain.run({"review": review})
            extracted_entities = json.loads(response)
            extracted_data.append(extracted_entities)
        except Exception as e:
            extracted_data.append({})
    state["extracted_data"] = extracted_data
    return "Entities extracted from review texts"

entity_extraction_tool = Tool(
    name="EntityExtraction",
    func=entity_extraction,
    description="Extracts entities from review texts using the chain created."
)

def data_combination(input_text: str) -> str:
    state = memory.state
    if "df1" not in state or not state["df1"]:
        return "Error: 'df1' not found in state."

    df1 = pd.DataFrame(state["df1"])
    extracted_data = state.get("extracted_data", [])
    if not extracted_data:
        return "Error: 'extracted_data' not found in state."
    
    extracted_df = pd.DataFrame(extracted_data)
    column_keys = list(df1.columns)
    if not all(key in extracted_df.columns for key in column_keys):
        for key in column_keys:
            if key not in extracted_df.columns:
                extracted_df[key] = None

    merged_data = pd.merge(df1, extracted_df, how='left')
    state["merged_data"] = merged_data.to_dict(orient='records')  # Use 'records' for the desired output
    # Store merged data in SQLite
    store_in_sqlite("merged_data", pd.DataFrame(state["merged_data"]))
    return "Data combined and merged data updated in state."

data_combination_tool = Tool(
    name="DataCombination",
    func=data_combination,
    description="Combines the extracted data with the original dataframe."
)

def metadata_extraction(input_text: str) -> str:
    state = memory.state
    df1 = pd.DataFrame(state["df1"])
    df2 = pd.DataFrame(state["df2"])
    metadata = {
        "df1": {
            "columns": df1.columns.tolist(),
            "dtypes": df1.dtypes.apply(str).to_dict(),
            "num_rows": len(df1),
            "missing_values": df1.isnull().sum().to_dict(),
            "statistical_summary": df1.describe(include="all").to_dict()
        },
        "df2": {
            "columns": df2.columns.tolist(),
            "dtypes": df2.dtypes.apply(str).to_dict(),
            "num_rows": len(df2),
            "missing_values": df2.isnull().sum().to_dict(),
            "statistical_summary": df2.describe(include="all").to_dict()
        }
    }
    state["metadata"] = metadata

    # Serialize metadata and store in SQLite
    def serialize_metadata(metadata: dict) -> pd.DataFrame:
        flat_metadata = []
        for table, details in metadata.items():
            for key, value in details.items():
                if isinstance(value, (list, dict)):
                    value = json.dumps(value)  # Serialize lists and dicts to JSON strings
                flat_metadata.append({"table_name": table, "key": key, "value": value})
        return pd.DataFrame(flat_metadata)

    metadata_df = serialize_metadata(metadata)
    store_in_sqlite("metadata", metadata_df)
    return "Metadata extracted and added to state."

metadata_extraction_tool = Tool(
    name="MetadataExtraction",
    func=metadata_extraction,
    description="Extracts metadata from df1 and df2."
)

# Define the custom prompt
custom_prompt = """
You are a helpful assistant that uses tools to answer questions.

When responding, always use the following format:
Thought: [Your thought process]
Action: [The tool name]
Action Input: [The input to the tool. Do not leave this empty!]

If you have enough information to provide a final answer, use this format:
Thought: [Your thought process]
Final Answer: [Your answer]

Ensure every Action includes a valid and properly formatted Action Input.
{agent_scratchpad}
"""

agent = initialize_agent(
    tools=[
        column_name_tool,
        chain_creation_tool,
        entity_extraction_tool,
        data_combination_tool,
        metadata_extraction_tool
    ],
    llm=llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    memory=memory,
    verbose=True,
    max_iterations=5,
    agent_kwargs={"agent_prompt": custom_prompt},
    handle_parsing_errors=True
)

# Global Agent
def global_agent(state):
    max_retries = 3  # Set the maximum number of retries for failed steps
    try:
        print("> Starting global agent...")
        next_agent = "execute_column_name_agent"

        while next_agent != END:
            if next_agent == "execute_column_name_agent":
                state = execute_column_name_agent(state)
                next_agent = "execute_chain_creation_agent"
            elif next_agent == "execute_chain_creation_agent":
                state = execute_chain_creation_agent(state)
                next_agent = "execute_entity_extraction_agent"
            elif next_agent == "execute_entity_extraction_agent":
                state = execute_entity_extraction_agent(state)
                next_agent = "execute_data_combination_agent"
            elif next_agent == "execute_data_combination_agent":
                state = execute_data_combination_agent(state)
                next_agent = "execute_metadata_extraction_agent"
            elif next_agent == "execute_metadata_extraction_agent":
                state = execute_metadata_extraction_agent(state)
                next_agent = END

        print("> Finished chain.")
        print("Global agent completed successfully.")
        final_answer = state.get("merged_data", {})
        print(f"Final Answer: {final_answer}")
        return final_answer

    except Exception as e:
        print(f"Error in global agent: {e}")
        return None

# Example Data
initial_state = GraphState(
    question="Which customers purchased products",
    column_name_str="CustomerName:Name of the customer;PurchaseDate:Date of purchase of product",
    column_names={},  # Will be populated correctly by the workflow
    chain=None,
    extracted_data=[],
    merged_data=pd.DataFrame(),
    metadata={},
    max_retries=3,
    loop_step=0,
    df1={
        'CustomerName': ['Alice', 'Bob'],
        'PurchaseDate': ['2023-01-01', '2023-01-02']
    },
    df2={
        'ReviewText': [
            "Alice purchased a laptop on 2023-01-01.",
            "Bob purchased a smartphone on 2023-01-02."
        ]
    }
)
memory.state = initial_state  # Set the initial state in memory

# Run the Global Agent
final_result = global_agent(initial_state)

# Verify SQLite content
print("\nData from SQLite - Merged Data:")
print(pd.read_sql_query("SELECT * FROM merged_data", connection))

print("\nData from SQLite - Metadata:")
print(pd.read_sql_query("SELECT * FROM metadata", connection))


In [None]:
import json
import sqlite3
import operator
import pandas as pd
from typing import List, Dict, Annotated
from typing_extensions import TypedDict

from langchain_ollama import ChatOllama  # Import Ollama LLM
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain  # Import LLMChain
from langchain.tools import Tool
from langchain.agents import initialize_agent, AgentType
from langchain_core.memory import BaseMemory

# Define model using Ollama
local_llm = "llama3.2:3b-instruct-fp16"
llm = ChatOllama(model=local_llm, temperature=0)

# Define shared graph state
class GraphState(TypedDict):
    question: str
    column_name_str: str
    column_names: dict
    chain: object
    extracted_data: List[dict]
    merged_data: pd.DataFrame
    metadata: dict
    max_retries: int
    loop_step: Annotated[int, operator.add]
    db_engine: sqlite3.Connection
    df1: Dict
    df2: Dict

# Stateful Memory Class
class StatefulMemory(BaseMemory):
    state: Dict = {}

    @property
    def memory_variables(self):
        return list(self.state.keys())

    def load_memory_variables(self, inputs):
        return self.state

    def save_context(self, inputs, outputs):
        pass  # We handle state updates manually

    def clear(self):
        self.state = {}

# Initialize memory
memory = StatefulMemory()

# SQLite database setup
connection = sqlite3.connect(":memory:")  # Create an in-memory SQLite database
memory.state["db_engine"] = connection  # Save the database engine in state
cursor = connection.cursor()

# Create tables for storing results
cursor.execute("""
CREATE TABLE IF NOT EXISTS merged_data (
    CustomerName TEXT,
    PurchaseDate TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS metadata (
    key TEXT,
    value TEXT
)
""")
connection.commit()

# Function to store data in SQLite
def store_in_sqlite(table_name: str, data: pd.DataFrame):
    data.to_sql(table_name, connection, if_exists="replace", index=False)

# Tools Definition
def column_name_extraction(input_text: str) -> str:
    state = memory.state
    input_text = input_text.strip('"').strip()
    column_names = {}
    for name in input_text.split(";"):
        if ':' in name:
            col_name, desc = name.strip().split(':', 1)
            column_names[col_name.strip()] = f"<{desc.strip()}>"
        else:
            return "Invalid format in column_name_str."
    state["column_names"] = column_names
    return f"Column names extracted: {list(column_names.keys())}"

column_name_tool = Tool(
    name="ColumnNameExtraction",
    func=column_name_extraction,
    description="Extracts column names from a string formatted as 'ColumnName:Description;ColumnName:Description'."
)

def chain_creation(input_text: str) -> str:
    state = memory.state
    column_names = state.get("column_names", {})
    if not column_names:
        return "Error: 'column_names' not found in state."
    template = f"""You are a Named Entity Recognizer.
Given the following review text, extract the entities corresponding to the specified column names.

Column Names and Descriptions: {column_names}

Review Text: "{{{{review}}}}"

Respond in JSON format with the extracted entities, e.g., {{"column_1": "value1", "column_2": "value2"}}
"""
    try:
        prompt = PromptTemplate(template=template, input_variables=["review"])
        chain = LLMChain(prompt=prompt, llm=llm)
        state["chain"] = chain
        return "Chain created for entity extraction"
    except Exception as e:
        return f"Error in chain creation: {str(e)}"

chain_creation_tool = Tool(
    name="ChainCreation",
    func=chain_creation,
    description="Creates a chain for entity extraction based on the column names in the state."
)

def entity_extraction(input_text: str) -> str:
    state = memory.state
    if "df2" not in state or not state["df2"]:
        return "Error: 'df2' not found in state."

    chain = state.get("chain")
    if not chain:
        return "Error: 'chain' not found in state."
    
    df2 = pd.DataFrame(state["df2"])
    if "ReviewText" not in df2.columns:
        return "Error: 'ReviewText' column not found in df2."

    review_texts = df2["ReviewText"].tolist()
    extracted_data = []
    for review in review_texts:
        try:
            response = chain.run({"review": review})
            extracted_entities = json.loads(response)
            extracted_data.append(extracted_entities)
        except Exception as e:
            extracted_data.append({})
    state["extracted_data"] = extracted_data
    return "Entities extracted from review texts"

entity_extraction_tool = Tool(
    name="EntityExtraction",
    func=entity_extraction,
    description="Extracts entities from review texts using the chain created."
)

def data_combination(input_text: str) -> str:
    state = memory.state
    if "df1" not in state or not state["df1"]:
        return "Error: 'df1' not found in state."

    df1 = pd.DataFrame(state["df1"])
    extracted_data = state.get("extracted_data", [])
    if not extracted_data:
        return "Error: 'extracted_data' not found in state."
    
    extracted_df = pd.DataFrame(extracted_data)
    column_keys = list(df1.columns)
    if not all(key in extracted_df.columns for key in column_keys):
        for key in column_keys:
            if key not in extracted_df.columns:
                extracted_df[key] = None

    merged_data = pd.merge(df1, extracted_df, how='left')
    state["merged_data"] = merged_data.to_dict(orient='records')  # Use 'records' for the desired output
    # Store merged data in SQLite
    store_in_sqlite("merged_data", pd.DataFrame(state["merged_data"]))
    return "Data combined and merged data updated in state."

data_combination_tool = Tool(
    name="DataCombination",
    func=data_combination,
    description="Combines the extracted data with the original dataframe."
)

def metadata_extraction(input_text: str) -> str:
    state = memory.state
    df1 = pd.DataFrame(state["df1"])
    df2 = pd.DataFrame(state["df2"])
    metadata = {
        "df1": {
            "columns": df1.columns.tolist(),
            "dtypes": df1.dtypes.apply(str).to_dict(),
            "num_rows": len(df1),
            "missing_values": df1.isnull().sum().to_dict(),
            "statistical_summary": df1.describe(include="all").to_dict()
        },
        "df2": {
            "columns": df2.columns.tolist(),
            "dtypes": df2.dtypes.apply(str).to_dict(),
            "num_rows": len(df2),
            "missing_values": df2.isnull().sum().to_dict(),
            "statistical_summary": df2.describe(include="all").to_dict()
        }
    }
    state["metadata"] = metadata
    # Store metadata in SQLite
    metadata_df = pd.DataFrame.from_dict(metadata, orient="index").reset_index()
    store_in_sqlite("metadata", metadata_df)
    return "Metadata extracted and added to state."

metadata_extraction_tool = Tool(
    name="MetadataExtraction",
    func=metadata_extraction,
    description="Extracts metadata from df1 and df2."
)

# Define the custom prompt
custom_prompt = """
You are a helpful assistant that uses tools to answer questions.

When responding, always use the following format:
Thought: [Your thought process]
Action: [The tool name]
Action Input: [The input to the tool. Do not leave this empty!]

If you have enough information to provide a final answer, use this format:
Thought: [Your thought process]
Final Answer: [Your answer]

Ensure every Action includes a valid and properly formatted Action Input.
{agent_scratchpad}
"""

agent = initialize_agent(
    tools=[
        column_name_tool,
        chain_creation_tool,
        entity_extraction_tool,
        data_combination_tool,
        metadata_extraction_tool
    ],
    llm=llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    memory=memory,
    verbose=True,
    max_iterations=5,
    agent_kwargs={"agent_prompt": custom_prompt},
    handle_parsing_errors=True
)

# Define individual agents
def execute_column_name_agent(state):
    input_text = state["column_name_str"]
    agent.run(input_text)
    return memory.state

def execute_chain_creation_agent(state):
    if "column_names" not in state or not state["column_names"]:
        raise ValueError("Error: 'column_names' is missing or empty in the state.")
    
    input_text = f"Create a chain for entity extraction using these column names: {state['column_names']}"
    agent.run(input_text)
    return memory.state

def execute_entity_extraction_agent(state):
    if "df2" not in state or not state["df2"]:
        raise ValueError("Error: 'df2' is missing or empty in the state.")
    
    review_texts = pd.DataFrame(state["df2"]).get("ReviewText", []).tolist()
    if not review_texts:
        raise ValueError("Error: 'ReviewText' column is missing or empty in df2.")
    
    input_text = f"Extract entities from the following review texts: {review_texts}"
    agent.run(input_text)
    return memory.state

def execute_data_combination_agent(state):
    input_text = "Combine extracted data with the original dataframe."
    agent.run(input_text)
    return memory.state

def execute_metadata_extraction_agent(state):
    input_text = "Extract metadata from the dataframes."
    agent.run(input_text)
    return memory.state

# Global Agent
def global_agent(state):
    max_retries = 3  # Set the maximum number of retries for failed steps
    try:
        print("> Starting global agent...")
        next_agent = "execute_column_name_agent"

        while next_agent != END:
            if next_agent == "execute_column_name_agent":
                if "column_name_str" not in state or not state["column_name_str"]:
                    raise ValueError("State validation failed: 'column_name_str' is missing.")
                for _ in range(max_retries):  # Retry logic
                    try:
                        state = execute_column_name_agent(state)
                        next_agent = "execute_chain_creation_agent"
                        break
                    except Exception as e:
                        print(f"Retry failed: {e}")
                        continue

            elif next_agent == "execute_chain_creation_agent":
                if "column_names" not in state or not state["column_names"]:
                    raise ValueError("State validation failed: 'column_names' are missing.")
                for _ in range(max_retries):
                    try:
                        state = execute_chain_creation_agent(state)
                        next_agent = "execute_entity_extraction_agent"
                        break
                    except Exception as e:
                        print(f"Retry failed: {e}")
                        continue

            elif next_agent == "execute_entity_extraction_agent":
                if "df2" not in state or not state["df2"]:
                    raise ValueError("State validation failed: 'df2' is missing.")
                for _ in range(max_retries):
                    try:
                        state = execute_entity_extraction_agent(state)
                        next_agent = "execute_data_combination_agent"
                        break
                    except Exception as e:
                        print(f"Retry failed: {e}")
                        continue

            elif next_agent == "execute_data_combination_agent":
                if "df1" not in state or not state["df1"]:
                    raise ValueError("State validation failed: 'df1' is missing.")
                for _ in range(max_retries):
                    try:
                        state = execute_data_combination_agent(state)
                        next_agent = "execute_metadata_extraction_agent"
                        break
                    except Exception as e:
                        print(f"Retry failed: {e}")
                        continue

            elif next_agent == "execute_metadata_extraction_agent":
                for _ in range(max_retries):
                    try:
                        state = execute_metadata_extraction_agent(state)
                        next_agent = END
                        break
                    except Exception as e:
                        print(f"Retry failed: {e}")
                        continue

        print("> Finished chain.")
        print("Global agent completed successfully.")
        final_answer = state.get("merged_data", {})
        print(f"Final Answer: {final_answer}")
        return final_answer

    except Exception as e:
        print(f"Error in global agent: {e}")
        return None

# Example Data
initial_state = GraphState(
    question="Which customers purchased products",
    column_name_str="CustomerName:Name of the customer;PurchaseDate:Date of purchase of product",
    column_names={},  # Will be populated correctly by the workflow
    chain=None,
    extracted_data=[],
    merged_data=pd.DataFrame(),
    metadata={},
    max_retries=3,
    loop_step=0,
    df1={
        'CustomerName': ['Alice', 'Bob'],
        'PurchaseDate': ['2023-01-01', '2023-01-02']
    },
    df2={
        'ReviewText': [
            "Alice purchased a laptop on 2023-01-01.",
            "Bob purchased a smartphone on 2023-01-02."
        ]
    }
)
memory.state = initial_state  # Set the initial state in memory

# Run the Global Agent
final_result = global_agent(initial_state)

# Verify SQLite content
print("\nData from SQLite - Merged Data:")
print(pd.read_sql_query("SELECT * FROM merged_data", connection))

print("\nData from SQLite - Metadata:")
print(pd.read_sql_query("SELECT * FROM metadata", connection))
