In [19]:
import sys  
sys.path.insert(1, '/Users/Kranthi_1/SQL-Analytics')

In [33]:
from typing import Dict, List, Any, Annotated, Sequence, TypedDict,Optional,Literal
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_ollama import ChatOllama
from langchain.output_parsers import PydanticOutputParser
from langchain.prompts import PromptTemplate
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver
from pathlib import Path
import uuid
import logging
from sqlite3 import connect
from threading import Lock
from langchain_core.tools import tool
from src.tools import SearchTools
from src.db.database import ChatDatabase
from langchain_experimental.utilities import PythonREPL
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.language_models.chat_models import BaseChatModel
from langgraph.types import Command
logger = logging.getLogger(__name__)

In [21]:
repl = PythonREPL()
@tool
def python_repl_tool(
    code: Annotated[str, "The python code to execute to generate your chart."],
):
    """Use this to execute python code. If you want to see the output of a value,
    you should print it out with `print(...)`. This is visible to the user."""
    try:
        result = repl.run(code)
    except BaseException as e:
        return f"Failed to execute. Error: {repr(e)}"
    return f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"

In [30]:
llm = ChatOllama(
        model="llama3.2:3b",
        temperature=0,
        base_url="http://localhost:11434",
        timeout=120,
    )


# Define state type
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], "The messages in the conversation"]
    code_context: Annotated[Dict, "Code search results"]
    doc_context: Annotated[Dict, "Documentation search results"]
    code_analysis: Annotated[str, "Code analysis output"]
    doc_analysis: Annotated[str, "Documentation analysis output"]
    schema_context: Annotated[Dict, "Schema results"]
    schema_analysis: Annotated[str, "Schema analysis output"]
    sql_query: Annotated[str, "Sql query output"]
    next: str
    
# Define structured outputs
class CodeAnalysis(BaseModel):
    tables_and_columns: Dict[str, List[str]] = Field(
        description="Dictionary of table names and their columns"
    )
    relationships: List[str] = Field(
        description="List of relationships between tables"
    )
    business_logic: str = Field(
        description="Description of the business logic implemented"
    )
    technical_details: str = Field(
        description="Technical implementation details"
    )

class DocAnalysis(BaseModel):
    key_concepts: List[str] = Field(
        description="Key concepts found in documentation"
    )
    workflows: List[str] = Field(
        description="Business workflows described"
    )
    requirements: str = Field(
        description="Business requirements identified"
    )
    additional_context: str = Field(
        description="Additional contextual information"
    )
    
class FinalSummary(BaseModel):
    overview: str = Field(
        description="High-level overview of the analyzed system"
    )
    data_model: Dict[str, Any] = Field(
        description="Simplified data model with tables and relationships"
    )
    business_processes: List[str] = Field(
        description="Key business processes identified"
    )
    implementation_notes: List[str] = Field(
        description="Important technical implementation details"
    )
    recommendations: List[str] = Field(
        description="Suggested considerations or improvements"
    )

In [31]:
def make_supervisor_node(llm: BaseChatModel, members: list[str]) -> str:
    options = ["FINISH"] + members
    system_prompt = (
        "You are a supervisor tasked with managing a conversation between the"
        f" following workers: {members}. Given the following user request,"
        " respond with the worker to act next. Each worker will perform a"
        " task and respond with their results and status. When finished,"
        " respond with FINISH."
    )

    class Router(TypedDict):
        """Worker to route to next. If no workers needed, route to FINISH."""

        next: Literal[*options]

    def supervisor_node(state: AgentState) -> Command[Literal[*members, "__end__"]]:
        """An LLM-based router."""
        messages = [
            {"role": "system", "content": system_prompt},
        ] + state["messages"]
        response = llm.with_structured_output(Router).invoke(messages)
        goto = response["next"]
        if goto == "FINISH":
            goto = END

        return Command(goto=goto, update={"next": goto})

    return supervisor_node

In [32]:
    # Create parsers
code_parser = PydanticOutputParser(pydantic_object=CodeAnalysis)
doc_parser = PydanticOutputParser(pydantic_object=DocAnalysis)

    # Create prompt templates
code_template = """
    Analyze the following code and provide structured information about it.
    
    CODE TO ANALYZE:
    {code_context}
    
    Guidelines:
    - Focus on SQL and Python code structure
    - Identify tables, columns and their relationships
    - Explain technical implementation details
    - Describe the business logic 
    - Provide the Column level lineage which is relavent to the ask and code.
    
    Your response MUST be in the following JSON format:
    {format_instructions}
    
    Make sure to include:
    1. All tables and their columns in the tables_and_columns field
    2. All relationships between tables in the relationships field
    3. Clear business logic description in the business_logic field
    4. Implementation details in the technical_details field
    
    Response:
    """

doc_template = """
    Analyze the following documentation and provide structured information about it.
    
    DOCUMENTATION TO ANALYZE:
    {doc_context}
    
    Guidelines:
    - Focus on business requirements and workflows
    - Identify key concepts and terminology
    - Extract business rules and processes
    - Note any important considerations
    - Make sure revist, anlyze and double heck if you miss any table or columns before you confirm the output.
    
    Your response MUST be in the following JSON format:
    {format_instructions}
    
    Response:
    """

code_prompt = PromptTemplate(
        template=code_template,
        input_variables=["code_context"],
        partial_variables={"format_instructions": code_parser.get_format_instructions()}
    )

doc_prompt = PromptTemplate(
        template=doc_template,
        input_variables=["doc_context"],
        partial_variables={"format_instructions": doc_parser.get_format_instructions()}
    )


In [34]:
def process_code(state: AgentState) -> Dict:
        """Process code analysis."""
        try:
            messages = state['messages']
            if not messages:
                return state
            
            query = messages[-1].content if isinstance(messages[-1], BaseMessage) else str(messages[-1])
            
            # Search code
            search_results = SearchTools.search_code(query)
            
            # Format code context
            code_snippets = []
            for result in search_results.get('results', []):
                code_snippets.append(
                    f"Source: {result['source']}\n"
                    f"Code:\n{result['content']}\n"
                    f"File Info: {result['file_info']}\n"
                )
            
            code_context = "\n".join(code_snippets)
            
            # Generate analysis
            formatted_prompt = code_prompt.format(code_context=code_context)
            response = llm.invoke(formatted_prompt)
            response_text = response.content if isinstance(response, BaseMessage) else str(response)
            
            try:
                analysis = code_parser.parse(response_text)
                output = f"""
                Code Analysis Results:
                
                1. Tables and Columns:
                {analysis.tables_and_columns}
                
                2. Relationships:
                {analysis.relationships}
                
                3. Business Logic:
                {analysis.business_logic}
                
                4. Technical Details:
                {analysis.technical_details}
                """
            except Exception as parse_error:
                logger.warning(f"Failed to parse code output: {str(parse_error)}")
                output = response_text
            
            return Command({
                "code_context": {"query": query, "results": search_results.get('results', [])},
                "code_analysis": output
            }, goto="s")
            
        except Exception as e:
            logger.error(f"Error in code processing: {str(e)}")
            return {
                "code_analysis": f"Error during code analysis: {str(e)}",
                "code_context": {}
            }
