In [2]:
%%capture
!pip install llama-index llama-index-embeddings-openai qdrant-client llama-index-vector-stores-qdrant llama-index llama-index-llms-openai llama-index-vector-stores-faiss faiss-cpu llama-index-llms-anthropic tavily-python

In [4]:
import os
import nest_asyncio
from getpass import getpass
from dotenv import load_dotenv

load_dotenv(r"C:\Users\anteb\PycharmProjects\JupyterProject\.env")
nest_asyncio.apply()

CO_API_KEY = os.environ.get('CO_API_KEY') or getpass("Enter CO_API_KEY: ")
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY') or getpass("Enter OPENAI_API_KEY: ")
QDRANT_URL = os.environ.get('QDRANT_URL') or getpass("Enter QDRANT_URL: ")
QDRANT_API_KEY = os.environ.get('QDRANT_API_KEY') or getpass("Enter QDRANT_API_KEY: ")

In [5]:
import os
import nest_asyncio
from getpass import getpass
from dotenv import load_dotenv
import pandas as pd
import numpy as np
from llama_index.core.workflow import Context
import asyncio
from llama_index.core.workflow import Workflow, Context, StartEvent, step

from ai_agents.agents import data_prep_agent
from events.events import DataLoadedEvent, InputRequiredEvent, DataPreparedEvent, HumanResponseEvent



In [7]:

async def load_csv(ctx: Context, filename: str) -> pd.DataFrame:
    """Load a CSV file into a pandas DataFrame."""
    try:
        df = pd.read_csv(filename)
        print(f"Successfully loaded {filename} with {len(df)} rows and {len(df.columns)} columns")
        return df
    except Exception as e:
        print(f"Error loading CSV: {e}")
        return None


async def analyze_data_quality(ctx: Context, dataframe: pd.DataFrame) -> dict:
    """Analyze the quality of data in a DataFrame."""
    if dataframe is None:
        return {"error": "No dataframe provided"}

    quality_report = {
        "missing_values": {},
        "data_types": {},
        "unique_values": {},
        "statistics": {}
    }

    # Check for missing values
    missing = dataframe.isnull().sum()
    quality_report["missing_values"] = {col: int(count) for col, count in missing.items() if count > 0}

    # Check data types
    quality_report["data_types"] = {col: str(dtype) for col, dtype in dataframe.dtypes.items()}

    # Check unique values for categorical columns
    for col in dataframe.select_dtypes(include=['object', 'category']).columns:
        if len(dataframe[col].unique()) < 10:  # Only for columns with few unique values
            quality_report["unique_values"][col] = dataframe[col].value_counts().to_dict()

    # Basic statistics for numeric columns
    for col in dataframe.select_dtypes(include=['number']).columns:
        quality_report["statistics"][col] = {
            "min": float(dataframe[col].min()),
            "max": float(dataframe[col].max()),
            "mean": float(dataframe[col].mean()),
            "median": float(dataframe[col].median()),
            "std": float(dataframe[col].std())
        }

    return quality_report


async def clean_data(ctx: Context, dataframe: pd.DataFrame, cleaning_actions: dict) -> tuple:
    """Clean a dataframe based on specified cleaning actions."""
    if dataframe is None:
        return None, {"error": "No dataframe provided"}

    df = dataframe.copy()
    cleaning_summary = {}

    # Handle missing values
    if "handle_missing" in cleaning_actions:
        for col, action in cleaning_actions["handle_missing"].items():
            if action == "drop":
                before_count = len(df)
                df = df.dropna(subset=[col])
                after_count = len(df)
                cleaning_summary[f"dropped_rows_{col}"] = before_count - after_count
            elif action == "mean":
                if pd.api.types.is_numeric_dtype(df[col]):
                    df[col] = df[col].fillna(df[col].mean())
                    cleaning_summary[f"filled_mean_{col}"] = int(dataframe[col].isnull().sum())
            elif action == "median":
                if pd.api.types.is_numeric_dtype(df[col]):
                    df[col] = df[col].fillna(df[col].median())
                    cleaning_summary[f"filled_median_{col}"] = int(dataframe[col].isnull().sum())
            elif action == "mode":
                df[col] = df[col].fillna(df[col].mode()[0])
                cleaning_summary[f"filled_mode_{col}"] = int(dataframe[col].isnull().sum())
            elif action == "zero":
                df[col] = df[col].fillna(0)
                cleaning_summary[f"filled_zero_{col}"] = int(dataframe[col].isnull().sum())

    # Convert data types
    if "convert_types" in cleaning_actions:
        for col, new_type in cleaning_actions["convert_types"].items():
            try:
                df[col] = df[col].astype(new_type)
                cleaning_summary[f"converted_{col}"] = new_type
            except:
                cleaning_summary[f"failed_convert_{col}"] = new_type

    # Remove outliers
    if "remove_outliers" in cleaning_actions:
        for col in cleaning_actions["remove_outliers"]:
            if pd.api.types.is_numeric_dtype(df[col]):
                Q1 = df[col].quantile(0.25)
                Q3 = df[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR

                before_count = len(df)
                df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
                after_count = len(df)
                cleaning_summary[f"removed_outliers_{col}"] = before_count - after_count

    # Rename columns
    if "rename_columns" in cleaning_actions:
        df = df.rename(columns=cleaning_actions["rename_columns"])
        cleaning_summary["renamed_columns"] = cleaning_actions["rename_columns"]

    # Drop columns
    if "drop_columns" in cleaning_actions:
        df = df.drop(columns=cleaning_actions["drop_columns"])
        cleaning_summary["dropped_columns"] = cleaning_actions["drop_columns"]

    return df, cleaning_summary

In [8]:
from ai_agents.data_preparation_workflow import DataPreparationWorkflow


async def run_data_preparation_pipeline(filename):
    """Run the data preparation pipeline with human-in-the-loop interactions."""
    # Initialize the workflow
    workflow = DataPreparationWorkflow(timeout=300)

    # Start the workflow
    handler = workflow.run(filename=filename)

    # Handle events
    async for event in handler.stream_events():
        if isinstance(event, InputRequiredEvent):
            # Display question to the user
            print(f"\n[QUESTION] {event.question}")

            # Display context if available
            if event.context:
                print("\nContext:")
                for key, value in event.context.items():
                    print(f"- {key}: {value}")

            # Get user response
            response = input("\nYour response: ")

            # Send response back to the workflow
            handler.ctx.send_event(
                HumanResponseEvent(
                    response=response,
                    step_name=event.step_name
                )
            )

    # Get final result
    result = await handler
    print("\nWorkflow completed!")
    print(f"Data preparation summary: {result.get('cleaning_summary', {})}")

    # Check if we have a dataframe to print shape
    prepared_df = await handler.ctx.get("prepared_dataframe")
    if prepared_df is not None:
        print(f"Processed dataframe shape: {prepared_df.shape}")

    return result

In [9]:
# Apply nest_asyncio to run async code in notebook
nest_asyncio.apply()

# Run the data preparation pipeline
result = asyncio.run(run_data_preparation_pipeline(r"/data/Commute_Times_V1.csv"))

# Examine the prepared data
if result:
    df = result.dataframe
    print("\nFirst 5 rows of prepared data:")
    display(df.head())

    print("\nData summary:")
    display(df.describe())

Exception in callback Dispatcher.span.<locals>.wrapper.<locals>.handle_future_result(span_id='Workflow.run...-6858db605965', bound_args=<BoundArgumen...mes_V1.csv'})>, instance=<ai_agents.da...002886441B680>, context=<_contextvars...00288639C3440>)(<WorkflowHand...async_chat'")>) at C:\Users\anteb\anaconda3\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py:274
handle: <Handle Dispatcher.span.<locals>.wrapper.<locals>.handle_future_result(span_id='Workflow.run...-6858db605965', bound_args=<BoundArgumen...mes_V1.csv'})>, instance=<ai_agents.da...002886441B680>, context=<_contextvars...00288639C3440>)(<WorkflowHand...async_chat'")>) at C:\Users\anteb\anaconda3\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py:274>
Traceback (most recent call last):
  File "C:\Users\anteb\anaconda3\Lib\site-packages\llama_index\core\workflow\context.py", line 500, in _step_worker
    new_ev = await instrumented_step(**kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  Fi

WorkflowRuntimeError: Error in step 'load_data': 'FunctionAgent' object has no attribute 'async_chat'