In [8]:
import os
import sqlite3
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
from dotenv import find_dotenv, load_dotenv
import json
from utils import *
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from pprint import pprint



In [9]:

load_dotenv(find_dotenv())

True

In [10]:
token= os.getenv("OPENAI_API_KEY")

In [11]:
from openai import OpenAI

client = OpenAI(api_key=token, base_url="https://chat.int.bayer.com/api/v2")

In [13]:
all_models = client.models.list()

pprint(f"{(all_models.data)}")
pprint(f"Total number of available models: {len(all_models.data)}")

("[Model(id='o4-mini', created=None, object='model', owned_by='myGenAssist', "
 "name='o4-mini', max_input_tokens=200000, max_output_tokens=200000, "
 "description='o4-mini is optimized for fast, effective reasoning with "
 "exceptionally efficient performance in coding and visual tasks.', "
 "training_data='Up to Jun 2024', model_type='chat_completion', "
 'input_cost_per_million_token=11.0, output_cost_per_million_token=44.0, '
 "supports_tools=True, supports_reasoning=True, model_status='available', "
 "supported_modalities={'text': True, 'image': {'methods': ['base64', 'url'], "
 "'mime_types': ['image/png', 'image/jpeg', 'image/webp', 'image/gif']}, "
 "'file': None, 'video': None, 'audio': None}, model='o4-mini'), "
 "Model(id='grok-3', created=None, object='model', owned_by='myGenAssist', "
 "name='grok-3', max_input_tokens=1000000, max_output_tokens=1000000, "
 'description="Grok 3 displays significant improvements in reasoning, '
 'mathematics, coding, world knowledge, and ins

In [None]:
def explore_database(db_path):
    """Explore database schema"""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # Get all tables
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = [row[0] for row in cursor.fetchall()]
    
    print(f"📊 Database: {db_path}")
    print(f"📋 Tables found: {len(tables)}\n")
    
    schema = {}
    for table in tables:
        cursor.execute(f"PRAGMA table_info({table})")
        columns = cursor.fetchall()
        schema[table] = [col[1] for col in columns]
        print(f"  • {table}: {', '.join(schema[table][:5])}{'...' if len(schema[table]) > 5 else ''}")
    
    conn.close()
    return schema

# Explore chinook database
schema = explore_database("../data/chinook.db")

In [None]:
schema_str = str(schema)

In [None]:
len(schema_str)

In [None]:
context_usage=calculate_context_percentage(schema_str)

In [None]:
context_usage

In [None]:
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())

In [None]:
token = os.getenv("OPEENAI_API_KEY")
model = 'o4-mini'

llm = ChatOpenAI(
    openai_api_base="https://chat.int.bayer.com/api/v2",
    openai_api_key=token,
    model=model,
    temperature=0.0
)


In [None]:
import pandas as pd

class DataAnalysisAgent:
    """Analyzes data using LLM-generated SQL"""
    
    def __init__(self, db_path, llm, schema):
        self.db_path = db_path
        self.llm = llm
        self.schema = schema
        self.conn = sqlite3.connect(db_path)
        
    def analyze(self, user_query: str, allowed_tables: list = None) -> dict:
        """Main analysis method"""
        print(f"\n{'='*60}")
        print(f"🔍 Query: {user_query}")
        print(f"{'='*60}")
        
        sql_query = self._generate_sql(user_query, allowed_tables)
        print(f"\n Generated SQL:\n{sql_query}\n")
        
        if self._is_safe_query(sql_query):
            try:
                df = pd.read_sql_query(sql_query, self.conn)
                print(f" Query executed: {len(df)} rows returned\n")
                
                insights = self._generate_insights(user_query, df)
                
                return {
                    "status": "success",
                    "query": user_query,
                    "sql": sql_query,
                    "data": df,
                    "insights": insights,
                    "rows": len(df)
                }
            except Exception as e:
                return {
                    "status": "error",
                    "error": str(e),
                    "query": user_query
                }
        else:
            return {
                "status": "error",
                "error": "Unsafe query detected",
                "query": user_query
            }
    
    def _generate_sql(self, user_query: str, allowed_tables: list = None) -> str:
        """Generate SQL from natural language"""
        tables_info = "\n".join([
            f"- {table}: {', '.join(cols)}"
            for table, cols in self.schema.items()
            if allowed_tables is None or table in allowed_tables
        ])
        
        prompt = ChatPromptTemplate.from_template(
            """You are an expert SQL generator for SQLite databases.           
                DATABASE SCHEMA:
                {schema}
                USER QUERY: {query}
                Generate a safe, efficient SELECT query. Rules:
                1. ONLY use SELECT statements (no INSERT, UPDATE, DELETE, DROP)
                2. Include LIMIT clause if not specified (default LIMIT 100)
                3. Use proper JOINs when needed
                4. Return ONLY the SQL query, no explanations
                SQL Query:""")
        
        response = self.llm.invoke(prompt.format(schema=tables_info, query=user_query))
        
        # Clean the response
        sql = response.content.strip()
        # Remove markdown code blocks if present
        sql = sql.replace("```sql", "").replace("```", "").strip()
        
        return sql
    
    def _is_safe_query(self, sql: str) -> bool:
        """Check if SQL query is safe"""
        sql_upper = sql.upper()
        dangerous_keywords = ['DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'CREATE', 'TRUNCATE']
        
        for keyword in dangerous_keywords:
            if keyword in sql_upper:
                print(f"⚠️  Dangerous keyword detected: {keyword}")
                return False
        return True
    
    def _generate_insights(self, query: str, df: pd.DataFrame) -> str:
        """Generate natural language insights"""
        data_summary = df.head(10).to_string()
        
        prompt = ChatPromptTemplate.from_template(
            """Based on this query and results, provide 2-3 key insights in bullet points.
                QUERY: {query}
                RESULTS (first 10 rows):
                {data}
                Key Insights (2-3 bullets):""")
        
        response = self.llm.invoke(prompt.format(query=query, data=data_summary))
        
        return response.content.strip()
    
    def close(self):
        self.conn.close()

analysis_agent = DataAnalysisAgent("../data/chinook.db", llm, schema)

In [None]:
analysis_agent.analyze("Give me top 10 artist with most albulmns")

In [None]:
# Company Style Configuration for plottinng and visualziations
COMPANY_STYLE = {
    "colors": ["#2E86AB", "#A23B72", "#F18F01", "#C73E1D", "#6A994E"],
    "font_size": 12,
    "figure_size": (10, 6),
    "dpi": 100
}

sns.set_palette(COMPANY_STYLE["colors"])
plt.rcParams['figure.figsize'] = COMPANY_STYLE["figure_size"]
plt.rcParams['figure.dpi'] = COMPANY_STYLE["dpi"]


In [None]:
class VisualizationAgent:
    """Creates visualizations with company branding"""
    
    def __init__(self, llm, style_config=COMPANY_STYLE):
        self.llm = llm
        self.style = style_config
        
    def visualize(self, data: pd.DataFrame, query: str, output_dir="../outputs") -> dict:
        """Create appropriate visualization"""
        print(f"\n{'='*60}")
        print(f"Creating Visualization")
        print(f"{'='*60}")
        
        chart_type = self._determine_chart_type(data, query)
        print(f" Chart type: {chart_type}\n")

        fig, ax = plt.subplots(figsize=self.style['figure_size'])
        
        if chart_type == 'bar':
            self._create_bar_chart(data, ax)
        elif chart_type == 'line':
            self._create_line_chart(data, ax)
        elif chart_type == 'pie':
            self._create_pie_chart(data, ax)
        elif chart_type == 'scatter':
            self._create_scatter_plot(data, ax)
        else:
            self._create_bar_chart(data, ax)  # default
        
        # Apply company branding
        self._apply_branding(ax, query)
        
        # Save
        os.makedirs(output_dir, exist_ok=True)
        filename = f"viz_{pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')}.png"
        filepath = os.path.join(output_dir, filename)
        plt.tight_layout()
        plt.savefig(filepath, dpi=self.style['dpi'], bbox_inches='tight')
        
        print(f"Saved: {filepath}\n")
        
        plt.show()
        
        return {
            "status": "success",
            "chart_type": chart_type,
            "filepath": filepath
        }
    
    def _determine_chart_type(self, df: pd.DataFrame, query: str) -> str:
        """Use LLM to determine best chart type"""
        data_info = f"Columns: {list(df.columns)}, Rows: {len(df)}, Types: {df.dtypes.to_dict()}"
        
        prompt = ChatPromptTemplate.from_template(
            """Given this data and query, suggest ONE chart type.
                DATA INFO: {data_info}
                QUERY: {query}
                Choose from: bar, line, pie, scatter
                Return ONLY the chart type, nothing else:""")
        
        response = self.llm.invoke(prompt.format(data_info=data_info, query=query))
        
        chart_type = response.content.strip().lower()
        return chart_type if chart_type in ['bar', 'line', 'pie', 'scatter'] else 'bar'
    
    def _create_bar_chart(self, df, ax):
        """Create bar chart"""
        # Use first column as x, second as y
        if len(df.columns) >= 2:
            x_col, y_col = df.columns[0], df.columns[1]
            df_plot = df.head(10)  # Limit to top 10
            ax.bar(range(len(df_plot)), df_plot[y_col], color=self.style['colors'][0])
            ax.set_xticks(range(len(df_plot)))
            ax.set_xticklabels(df_plot[x_col], rotation=45, ha='right')
            ax.set_xlabel(x_col)
            ax.set_ylabel(y_col)
    
    def _create_line_chart(self, df, ax):
        """Create line chart"""
        if len(df.columns) >= 2:
            x_col, y_col = df.columns[0], df.columns[1]
            ax.plot(df[x_col], df[y_col], marker='o', color=self.style['colors'][0], linewidth=2)
            ax.set_xlabel(x_col)
            ax.set_ylabel(y_col)
            ax.grid(True, alpha=0.3)
    
    def _create_pie_chart(self, df, ax):
        """Create pie chart"""
        if len(df.columns) >= 2:
            labels_col, values_col = df.columns[0], df.columns[1]
            df_plot = df.head(10)
            ax.pie(df_plot[values_col], labels=df_plot[labels_col], autopct='%1.1f%%',
                   colors=self.style['colors'])
    
    def _create_scatter_plot(self, df, ax):
        """Create scatter plot"""
        if len(df.columns) >= 2:
            x_col, y_col = df.columns[0], df.columns[1]
            ax.scatter(df[x_col], df[y_col], alpha=0.6, color=self.style['colors'][0])
            ax.set_xlabel(x_col)
            ax.set_ylabel(y_col)
            ax.grid(True, alpha=0.3)
    
    def _apply_branding(self, ax, query):
        """Apply company branding"""
        # Title based on query (simplified)
        title = query[:50] + "..." if len(query) > 50 else query
        ax.set_title(title, fontsize=14, fontweight='bold', pad=20)
        
        # Company watermark
        ax.text(0.99, 0.01, 'Company Analytics ©', 
                transform=ax.transAxes,
                fontsize=8, alpha=0.5,
                ha='right', va='bottom')

In [None]:
viz_agent = VisualizationAgent(llm)

In [None]:
def run_complete_workflow(query, user_role="analyst"):
    """Run complete analysis + visualization workflow"""
    
    # Define permissions (simulated)
    permissions = {
        "analyst": ["artists", "albums", "tracks", "invoices", "customers"],
        "viewer": ["artists", "albums", "tracks"],
        "admin": None  # all tables
    }
    
    allowed_tables = permissions.get(user_role)
    
    analysis_result = analysis_agent.analyze(query, allowed_tables)
    
    if analysis_result["status"] == "success":
        print(analysis_result["insights"])
        print(f"\nData shape: {analysis_result['data'].shape}")
        display(analysis_result["data"].head())
        
        viz_result = viz_agent.visualize(analysis_result["data"], query)
        
        return {
            "analysis": analysis_result,
            "visualization": viz_result
        }
    else:
        print(f"Error: {analysis_result.get('error')}")
        return analysis_result


In [None]:
print("TEST 1: Top 10 Selling Artists")
result1 = run_complete_workflow(
    "Show me the top 10 artists by total sales amount",
    user_role="analyst"
)