# Data Analyser Workflow Tutorial

This notebook demonstrates how the workflow system in `src/agent/workflow.py` functions. The workflow is a directed graph that processes JIRA tickets through several stages:

1. Extract task description from a JIRA ticket
2. Generate SQL query based on the task
3. Validate the SQL query
4. Execute the query against a database
5. Validate the query results
6. Generate business insights from the results
7. Update the JIRA ticket with the insights

The system includes automatic retry logic for error handling.

In [None]:
import os
import sys
import logging
from pathlib import Path

# Add the project root to Python path
project_root = Path().absolute().parent
sys.path.append(str(project_root))

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print(f"Project root: {project_root}")
print(f"Current working directory: {os.getcwd()}")

In [None]:
# Import required components
from src.agent.agent import DataAnalysisAgent
from src.agent.workflow import create_workflow
from src.clients.db_client import DatabaseClient
from src.clients.jira_client import JiraClient
from src.tools.sql_tool import SQLTool
from src.tools.validator_tool import ValidatorTool
from src.tools.insight_tool import InsightTool
from src.models.schemas import JiraTicket, AgentState

from dotenv import load_dotenv
load_dotenv()

## 1. Initialize Components

First, we need to initialize all the components that will be used in the workflow:

In [None]:
# Initialize the agent with configuration
agent = DataAnalysisAgent(config_path=str(project_root / "config" / "config.yaml"))

# Initialize database client
DB_PATH = os.path.expanduser("../data/porsche_analytics.db")
sqlite_connection_string = f"sqlite:///{DB_PATH}"
db_client = DatabaseClient(sqlite_connection_string)

# Initialize JIRA client
JIRA_BASE_URL = os.environ.get('JIRA_BASE_URL')
JIRA_USER_EMAIL = os.environ.get('JIRA_USER_EMAIL')
JIRA_API_TOKEN = os.environ.get('JIRA_API_TOKEN')
jira_client = JiraClient(base_url=JIRA_BASE_URL, email=JIRA_USER_EMAIL, api_token=JIRA_API_TOKEN)

# Initialize tools
sql_tool = SQLTool(llm=agent.llm)
validator_tool = ValidatorTool(llm=agent.llm, schema_dict=agent.schema)
insight_tool = InsightTool(llm=agent.llm)

## 2. Define Workflow Functions

Now we need to define the functions that will be passed to `create_workflow()`:

In [None]:
# Define the individual functions for each workflow step
def generate_sql_function(task_description):
    """Generate SQL query from task description"""
    return sql_tool.generate_query(task_description=task_description, schema_dict=agent.schema)

def validate_sql_function(sql_query, task_description):
    """Validate SQL query"""
    return validator_tool.validate_sql(sql_query=sql_query.query, task_description=task_description)

def execute_query_function(query):
    """Execute SQL query against the database"""
    return db_client.execute_query(query)

def validate_results_function(query_result, task_description):
    """Validate query results"""
    # Simple validation - check if we have data
    is_valid = query_result.row_count > 0
    errors = [] if is_valid else ["Query returned no results"]
    return validator_tool.create_validation_result(is_valid=is_valid, errors=errors)

def generate_insights_function(task_description, query_result):
    """Generate business insights from query results"""
    return insight_tool.generate_insights(task_description=task_description, query_result=query_result)

def update_jira_function(ticket_id, business_insight, failed=False):
    """Update JIRA ticket with insights"""
    status = "Failed" if failed else "Completed"
    comment = f"**Analysis {status}**\n\n"
    
    comment += f"**Summary:** {business_insight.summary}\n\n"
    comment += "**Key Points:**\n"
    for point in business_insight.key_points:
        comment += f"- {point}\n"
    
    # In a real implementation, use jira_client to update the ticket
    print(f"Would update ticket {ticket_id} with comment:\n{comment}")
    return True

## 3. Create the Workflow

Now we create the workflow by passing our functions to `create_workflow()`:

In [None]:
# Create the workflow
workflow = create_workflow(
    generate_sql_fn=generate_sql_function,
    validate_sql_fn=validate_sql_function,
    execute_query_fn=execute_query_function,
    validate_results_fn=validate_results_function,
    generate_insights_fn=generate_insights_function,
    update_jira_fn=update_jira_function,
    max_retries=3
)

## 4. Run the Workflow with a Sample JIRA Ticket

Let's create a sample JIRA ticket and run it through the workflow:

In [None]:
# Create a sample JIRA ticket
sample_ticket = JiraTicket(
    ticket_id="KAN-8",
    summary="Car Models Analysis",
    description="How many unique car models do we have per car category? Sort the results in descending order!",
    created_at="2025-07-02T14:00:00",
    status="Open"
)

# Initialize the agent state with the ticket
initial_state = AgentState(
    ticket=sample_ticket,
    task_description=None,
    sql_query=None,
    validation_result=None,
    query_result=None,
    business_insight=None,
    retry_count=0,
    error_message=None
)

## 5. Execute the Workflow

Now let's execute the workflow and trace the execution:

In [None]:
# Run the workflow with our initial state
for event in workflow.stream(initial_state):
    print(f"\n{'='*50}")
    step_name = event["step"]
    state = event["state"]
    
    # Print current step and key state information
    print(f"Step: {step_name}")
    
    # Print relevant state information based on the step
    if step_name == "extract_task" and state.task_description:
        print(f"Extracted task: {state.task_description}")
    
    elif step_name == "generate_sql" and state.sql_query:
        print(f"Generated SQL: {state.sql_query.query}")
    
    elif step_name == "validate_sql" and state.validation_result:
        print(f"SQL validation: {state.validation_result.is_valid}")
        if not state.validation_result.is_valid:
            print(f"Validation errors: {state.validation_result.errors}")
    
    elif step_name == "execute_query" and state.query_result:
        print(f"Query executed with {state.query_result.row_count} rows returned")
    
    elif step_name == "validate_results" and state.validation_result:
        print(f"Results validation: {state.validation_result.is_valid}")
    
    elif step_name == "generate_insights" and state.business_insight:
        print(f"Generated insights summary: {state.business_insight.summary[:100]}...")
    
    elif step_name == "update_jira":
        print(f"JIRA ticket {state.ticket.ticket_id} updated")
    
    elif step_name == "increment_retry":
        print(f"Retry count incremented to {state.retry_count}")

## 6. Workflow Visualization

To better understand the workflow, let's visualize it (if graphviz is installed):

In [None]:
try:
    from IPython.display import display, Image
    from langgraph.graph import get_graph_representation
    import graphviz
    
    # Get the graph representation
    graph_representation = get_graph_representation(workflow)
    
    # Create a Graphviz object
    dot = graphviz.Digraph(comment='Data Analysis Workflow')
    
    # Add nodes
    for node in graph_representation["nodes"]:
        dot.node(node, node)
    
    # Add edges
    for edge in graph_representation["edges"]:
        dot.edge(edge["source"], edge["target"], label=edge.get("condition", ""))
    
    # Render the graph
    dot.render('workflow_graph', format='png', cleanup=True)
    display(Image('workflow_graph.png'))
except Exception as e:
    print(f"Could not generate workflow visualization: {e}")
    print("\nWorkflow structure:")
    print("- Start with extract_task")
    print("- Move to generate_sql")
    print("- Validate SQL (branch based on validation result)")
    print("- If valid, execute query; otherwise retry or fail")
    print("- Validate results (branch based on validation)")
    print("- If valid, generate insights; otherwise retry or fail")
    print("- Update JIRA ticket")
    print("- End workflow")

## Conclusion

This notebook demonstrates how the `workflow.py` system works. Key points:

1. The workflow is a directed graph built with LangGraph's `StateGraph`
2. It processes JIRA tickets through a series of steps (SQL generation, validation, execution, etc.)
3. It includes conditional branching for error handling and retries
4. The state is maintained throughout the process in an `AgentState` object
5. Each function in the workflow takes a state object and returns an updated state object
6. The `create_workflow()` function wires everything together based on the function implementations provided

This orchestration system allows for a robust and modular approach to automated data analysis tasks from JIRA tickets.