In [1]:
%%capture
%pip install llama-index llama-index-embeddings-openai qdrant-client llama-index-vector-stores-qdrant llama-index llama-index-llms-openai llama-index-vector-stores-faiss faiss-cpu llama-index-llms-anthropic tavily-python

In [2]:
import os
import nest_asyncio
from getpass import getpass
from dotenv import load_dotenv

# Import LlamaIndex components
from llama_index.llms.openai import OpenAI
from llama_index.core.agent import FunctionCallingAgent
from llama_index.core.workflow import Event, Workflow, Context, StopEvent, step
from llama_index.core.workflow import StartEvent


# Load environment variables and apply nest_asyncio for async operations
load_dotenv()
nest_asyncio.apply()

# Get API keys
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY') or getpass("Enter OPENAI_API_KEY: ")

# Initialize LLM
llm = OpenAI(model="gpt-4o-mini", api_key=OPENAI_API_KEY, temperature=0.5, max_tokens=512)

In [3]:
# Define events for our workflow
class DataPrepEvent(Event):
    dataset_path: str
    
class DataAnalysisEvent(Event):
    prepared_data: str
    
class DataVisualizationEvent(Event):
    analysis_results: str
    
class VisualizationResults(Event):
    visualizations: str

# Define our multi-agent workflow
class DataAnalysisFlow(Workflow):
    
    @step
    async def setup(self, ev: StartEvent) -> DataPrepEvent:
        """Initialize the agents and setup the workflow"""
        # Store the agents from the StartEvent
        self.data_prep_agent = ev.data_prep_agent
        self.data_analysis_agent = ev.data_analysis_agent
        self.data_viz_agent = ev.data_viz_agent
        
        # Return the path to start data preparation
        return DataPrepEvent(dataset_path=ev.dataset_path)
    
    @step
    async def data_preparation(self, ctx: Context, ev: DataPrepEvent) -> DataAnalysisEvent:
        """Prepare the data for analysis"""
        # Store the dataset path in context
        await ctx.set("dataset_path", ev.dataset_path)
        
        # Ask the data preparation agent to clean and prepare the data
        result = self.data_prep_agent.chat(
            f"Prepare this dataset for analysis: {ev.dataset_path}. "
            f"Perform cleaning, handling missing values, and feature engineering as needed."
        )
        
        # Store the prepared data in context
        prepared_data = str(result)
        await ctx.set("prepared_data", prepared_data)
        
        return DataAnalysisEvent(prepared_data=prepared_data)
    
    @step
    async def data_analysis(self, ctx: Context, ev: DataAnalysisEvent) -> DataVisualizationEvent:
        """Analyze the prepared data"""
        # Get the dataset path from context
        dataset_path = await ctx.get("dataset_path")
        
        # Ask the data analysis agent to analyze the data
        result = self.data_analysis_agent.chat(
            f"Analyze this prepared data: <data>{ev.prepared_data}</data>. "
            f"The original dataset is from: {dataset_path}. "
            f"Perform statistical analysis, identify patterns, and extract insights."
        )
        
        # Store the analysis results
        analysis_results = str(result)
        await ctx.set("analysis_results", analysis_results)
        
        return DataVisualizationEvent(analysis_results=analysis_results)
    
    @step
    async def data_visualization(self, ctx: Context, ev: DataVisualizationEvent) -> StopEvent:
        """Create visualizations based on the analysis"""
        # Get dataset info from context
        dataset_path = await ctx.get("dataset_path")
        prepared_data = await ctx.get("prepared_data")
        
        # Ask the visualization agent to create visualizations
        result = self.data_viz_agent.chat(
            f"Create visualizations for this analysis: <analysis>{ev.analysis_results}</analysis>. "
            f"The data was prepared as follows: <prepared_data>{prepared_data}</prepared_data>. "
            f"The original dataset is from: {dataset_path}. "
            f"Suggest appropriate chart types and visualization techniques."
        )
        
        # Send the visualization results as an event
        visualizations = str(result)
        ctx.write_event_to_stream(VisualizationResults(visualizations=visualizations))
        
        # Return a StopEvent with the final result
        return StopEvent(result={
            "prepared_data": prepared_data,
            "analysis_results": ev.analysis_results,
            "visualizations": visualizations
        })

In [4]:
# Create the agents
def create_agents():
    """Create and return the three agents needed for our workflow"""
    data_prep_agent = FunctionCallingAgent.from_tools(
        tools=[],
        llm=llm,
        verbose=False,
        system_prompt="You are a data preparation agent. Your job is to clean, transform, and prepare data for analysis. "
                     "You handle tasks like dealing with missing values, normalizing data, feature engineering, and ensuring data quality."
    )
    
    data_analysis_agent = FunctionCallingAgent.from_tools(
        tools=[],
        llm=llm,
        verbose=False,
        system_prompt="You are a data analysis agent. Your job is to perform statistical analysis on prepared data. "
                     "You identify patterns, correlations, and insights from the data to help answer business or research questions."
    )
    
    data_viz_agent = FunctionCallingAgent.from_tools(
        tools=[],
        llm=llm,
        verbose=False,
        system_prompt="You are a data visualization agent. Your job is to recommend appropriate visualizations for data analysis results. "
                     "You suggest chart types, plotting techniques, and visualization approaches to effectively communicate insights."
    )
    
    return data_prep_agent, data_analysis_agent, data_viz_agent

In [None]:
async def run_workflow(dataset_path):
    """Run the data analysis workflow on the given dataset"""
    # Create the agents
    data_prep_agent, data_analysis_agent, data_viz_agent = create_agents()
    
    # Initialize the workflow
    workflow = DataAnalysisFlow(timeout=60, verbose=True)
    
    # Run the workflow
    handler = workflow.run(
        dataset_path=dataset_path,
        data_prep_agent=data_prep_agent,
        data_analysis_agent=data_analysis_agent,
        data_viz_agent=data_viz_agent
    )
    
    # Process events from the workflow
    async for ev in handler.stream_events():
        if isinstance(ev, VisualizationResults):
            print("==== Visualization Recommendations ====")
            print(ev.visualizations)
    
    # Get the final result
    final_result = await handler
    
    print("\n==== Complete Analysis Results ====")
    print(f"1. Data Preparation:\n{final_result['prepared_data'][:500]}...\n")
    print(f"2. Analysis Results:\n{final_result['analysis_results'][:500]}...\n")
    print(f"3. Visualization Recommendations:\n{final_result['visualizations'][:500]}...\n")
    
    return final_result