## DataPulse: AI-Powered Analytics Agent for Small Business Reports

#### Environment Setup & Gemini API Configuration

In [9]:
import warnings
warnings.filterwarnings("ignore")

import os
import uuid
import json
import logging

import pandas as pd
import google.generativeai as genai

try:
    from dotenv import load_dotenv
    load_dotenv()
except ImportError:
    pass

logging.basicConfig(level=logging.INFO)

# ---- Configure Gemini from environment ----
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
    print("WARNING: GEMINI_API_KEY is not set. The agent will fall back to offline text reports.")
else:
    genai.configure(api_key=api_key)
    print("GEMINI_API_KEY loaded.")

GEMINI_API_KEY loaded.


#### Model Auto-Selection Helper (Gemini API)

In [10]:
def pick_default_model_name() -> str | None:
    """
    Ask the API which models are available and pick one that supports generateContent.
    If anything fails, return None (will trigger offline fallback).
    """
    if not os.getenv("GEMINI_API_KEY"):
        return None

    try:
        models = list(genai.list_models())
    except Exception as e:
        print("Could not list models from Gemini API:", repr(e))
        return None

    # Filter models that support generateContent
    candidates = []
    for m in models:
        name = getattr(m, "name", "")
        supported = getattr(m, "supported_generation_methods", []) or []
        if "generateContent" in supported:
            candidates.append(name)

    if not candidates:
        print("No models found that support generateContent for this API key.")
        return None

    # Preference order (we match substrings like "gemini-1.5-flash")
    preferred = [
        "gemini-1.5-flash",
        "gemini-1.5-pro",
        "gemini-1.0-pro",
        "gemini-pro",
    ]
    for p in preferred:
        for name in candidates:
            if p in name:
                print("Using Gemini model:", name)
                return name

    # Fallback: just pick first candidate
    print("Using first available model:", candidates[0])
    return candidates[0]

#### Data Tools, Schema Inspection & KPI Computation

In [11]:
def load_csv(path: str) -> pd.DataFrame:
    """Safely load CSV with encoding fallbacks."""
    last_err = None
    for enc in ("utf-8", "latin1", "cp1252"):
        try:
            df = pd.read_csv(path, encoding=enc)
            logging.info(f"Loaded CSV with encoding={enc}")
            return df
        except UnicodeDecodeError as e:
            last_err = e
            continue
    try:
        df = pd.read_csv(path, engine="python")
        logging.info("Loaded CSV with engine='python'")
        return df
    except Exception as e:
        raise RuntimeError(f"Failed to read CSV {path}: {e}") from last_err


def inspect_schema(df: pd.DataFrame) -> dict:
    return {
        "n_rows": int(len(df)),
        "n_columns": int(df.shape[1]),
        "columns": list(df.columns),
        "dtypes": df.dtypes.astype(str).to_dict(),
    }


def basic_data_profile(df: pd.DataFrame) -> dict:
    return {
        "missing_values": df.isna().sum().to_dict(),
        "sample_rows": df.head(5).to_dict(orient="records"),
    }


def compute_kpis(df: pd.DataFrame) -> dict:
    """
    Compute KPIs with auto-detected revenue and optional cost.

    - Revenue column: first whose name contains one of:
      ['revenue', 'sales', 'amount', 'total', 'price', 'value']
    - Cost column (optional): name contains ['cost', 'expense', 'cogs'].
    - If no cost column: synthetic Cost = 0.7 * Revenue.
    """
    df = df.copy()

    revenue_keywords = ["revenue", "sales", "amount", "total", "price", "value"]
    cost_keywords = ["cost", "expense", "cogs"]
    product_keywords = ["product", "item", "sku", "category"]

    # --- Find revenue column ---
    revenue_candidates = [
        c for c in df.columns
        if any(k in c.lower() for k in revenue_keywords)
    ]
    if not revenue_candidates:
        raise ValueError(
            f"No revenue-like column found.\n"
            f"Tried keywords={revenue_keywords}\n"
            f"Columns={list(df.columns)}"
        )
    revenue_col = revenue_candidates[0]
    df["Revenue"] = pd.to_numeric(df[revenue_col], errors="coerce").fillna(0.0)

    # --- Find cost column (optional) ---
    cost_candidates = [
        c for c in df.columns
        if any(k in c.lower() for k in cost_keywords)
    ]
    if cost_candidates:
        cost_col = cost_candidates[0]
        df["Cost"] = pd.to_numeric(df[cost_col], errors="coerce").fillna(0.0)
        cost_source = cost_col
    else:
        df["Cost"] = df["Revenue"] * 0.7
        cost_col = None
        cost_source = "synthetic (0.7 * revenue)"

    df["Profit"] = df["Revenue"] - df["Cost"]

    total_revenue = float(df["Revenue"].sum())
    total_cost = float(df["Cost"].sum())
    total_profit = float(df["Profit"].sum())
    avg_revenue = float(df["Revenue"].mean()) if len(df) else 0.0
    avg_profit = float(df["Profit"].mean()) if len(df) else 0.0

    # --- Try to find product-like column for top products ---
    product_candidates = [
        c for c in df.columns
        if any(k in c.lower() for k in product_keywords)
    ]
    if product_candidates:
        product_col = product_candidates[0]
        top_products = (
            df.groupby(product_col)["Revenue"]
            .sum()
            .sort_values(ascending=False)
            .head(5)
            .reset_index()
            .to_dict(orient="records")
        )
    else:
        product_col = None
        top_products = []

    return {
        "Total Revenue": total_revenue,
        "Total Cost": total_cost,
        "Total Profit": total_profit,
        "Average Revenue": avg_revenue,
        "Average Profit": avg_profit,
        "Top Products": top_products,
        "meta": {
            "revenue_column_used": revenue_col,
            "cost_column_used": cost_col if cost_col else "synthetic",
            "product_column_used": product_col,
            "cost_source": cost_source,
        },
    }

#### Session Memory Manager (SessionStore)

In [12]:
class SessionStore:
    def __init__(self):
        self.sessions = {}  # session_id -> dict

    def create_session(self, business_name: str, industry: str) -> str:
        sid = str(uuid.uuid4())
        self.sessions[sid] = {
            "business_profile": {
                "business_name": business_name,
                "industry": industry,
            },
            "history": [],  # list of {"report": str, "kpis": dict}
        }
        return sid

    def get_session(self, session_id: str):
        return self.sessions.get(session_id, None)

    def append_history(self, session_id: str, report: str, kpis: dict):
        session = self.get_session(session_id)
        if session is None:
            return
        session["history"].append({"report": report, "kpis": kpis})

    def get_summary(self, session_id: str) -> str:
        session = self.get_session(session_id)
        if not session or not session["history"]:
            return "No previous sessions."
        last_report = session["history"][-1]["report"]
        return f"Last report (truncated): {last_report[:400]}"

#### NarrativeAgent (Gemini-Backed Report Generator + Offline Fallback)

In [13]:
class NarrativeAgent:
    def __init__(self, model_name: str | None = None):
        """
        If a working Gemini model is available, use it.
        Otherwise, fall back to offline text generation.
        """
        self.model = None
        self.model_name = None

        if not os.getenv("GEMINI_API_KEY"):
            print("No GEMINI_API_KEY set. NarrativeAgent will use offline fallback.")
            return

        # Try to pick a model and initialize it
        try:
            if model_name is None:
                model_name = pick_default_model_name()
            if model_name is None:
                print("Could not auto-select a Gemini model. Fallback will be used.")
                return

            self.model_name = model_name
            self.model = genai.GenerativeModel(model_name)
            print(f"NarrativeAgent using Gemini model: {model_name}")
        except Exception as e:
            print("Failed to initialize Gemini model. Fallback will be used.")
            print("Error was:", repr(e))
            self.model = None
            self.model_name = None

    def _offline_report(self, business_profile, schema_summary, kpis, session_summary) -> str:
        """Fallback plain-text report if Gemini is unavailable."""
        bp = business_profile
        lines = []

        lines.append(f"Business Analytics Report for {bp.get('business_name', 'the business')}")
        lines.append(f"Industry: {bp.get('industry', 'N/A')}")
        lines.append("")
        lines.append("Executive Summary:")
        lines.append(
            f"- Total revenue: {kpis.get('Total Revenue', 0):,.2f}"
        )
        lines.append(
            f"- Total profit: {kpis.get('Total Profit', 0):,.2f}"
        )
        lines.append(
            f"- Average revenue per row: {kpis.get('Average Revenue', 0):,.2f}"
        )
        lines.append("")
        lines.append("Key Insights:")
        lines.append("- Revenue and profit figures are computed from your uploaded CSV file.")
        if kpis.get("Top Products"):
            lines.append("- The dataset has identified top-performing products by revenue.")
        else:
            lines.append("- No product column was detected, so product-level breakdown is not available.")
        lines.append(
            f"- Cost was derived using: {kpis.get('meta', {}).get('cost_source', 'N/A')}."
        )
        lines.append("")
        lines.append("Recommendations:")
        lines.append("- Focus on increasing high-revenue rows or categories in your dataset.")
        lines.append("- Investigate rows with low or negative profit, if any.")
        lines.append("- Consider enriching your dataset with more features (e.g., region, channel) to refine analysis.")

        return "\n".join(lines)

    def generate_report(
        self,
        business_profile: dict,
        schema_summary: dict,
        kpis: dict,
        session_summary: str = "",
    ) -> str:
        # If Gemini model is not available, use offline text report
        if self.model is None:
            return self._offline_report(business_profile, schema_summary, kpis, session_summary)

        # Try Gemini; if it fails, fall back
        prompt = f"""
You are an AI analytics copilot for small businesses.

Business profile:
{json.dumps(business_profile, indent=2)}

Previous session summary:
{session_summary}

Dataset schema summary:
{json.dumps(schema_summary, indent=2)}

KPIs:
{json.dumps(kpis, indent=2)}

Write a concise, non-technical business report for the owner.
Include:
1) Executive summary (2 short paragraphs).
2) 3â€“5 key insights as bullet points.
3) 3 actionable recommendations.
Keep it under 500 words.
"""
        try:
            response = self.model.generate_content(prompt)
            return response.text
        except Exception as e:
            print("Gemini generate_content failed. Falling back to offline text.")
            print("Error was:", repr(e))
            return self._offline_report(business_profile, schema_summary, kpis, session_summary)

#### IngestionAgent & AnalyticsAgent

In [14]:
class IngestionAgent:
    def ingest(self, csv_path: str) -> dict:
        df = load_csv(csv_path)
        schema = inspect_schema(df)
        profile = basic_data_profile(df)
        return {"df": df, "schema": schema, "profile": profile}


class AnalyticsAgent:
    def analyze(self, df: pd.DataFrame) -> dict:
        return compute_kpis(df)

#### Orchestrator (End-to-End Pipeline Controller)

In [15]:
class DataPulseOrchestrator:
    def __init__(self):
        self.session_store = SessionStore()
        self.ingestion_agent = IngestionAgent()
        self.analytics_agent = AnalyticsAgent()
        self.narrative_agent = NarrativeAgent()  # auto-picks Gemini + fallback

    def run_sales_report(
        self,
        csv_path: str,
        business_name: str,
        industry: str = "Retail",
    ) -> dict:
        logging.info("=== Starting DataPulse report run ===")

        # 1. Create session
        session_id = self.session_store.create_session(business_name, industry)
        session = self.session_store.get_session(session_id)

        # 2. Ingest
        logging.info(f"Ingesting CSV: {csv_path}")
        data_bundle = self.ingestion_agent.ingest(csv_path)
        df = data_bundle["df"]
        schema = data_bundle["schema"]
        profile = data_bundle["profile"]

        logging.info(f"Data shape: {schema['n_rows']} rows x {schema['n_columns']} cols")

        # 3. KPIs
        logging.info("Computing KPIs...")
        kpis = self.analytics_agent.analyze(df)

        # 4. Memory summary
        session_summary = self.session_store.get_summary(session_id)

        # 5. Narrative
        logging.info("Generating narrative report...")
        report = self.narrative_agent.generate_report(
            business_profile=session["business_profile"],
            schema_summary={**schema, **profile},
            kpis=kpis,
            session_summary=session_summary,
        )

        # 6. Save
        self.session_store.append_history(session_id, report, kpis)

        logging.info("=== DataPulse report generation completed ===")

        return {
            "session_id": session_id,
            "kpis": kpis,
            "report": report,
        }

#### Run the Full Pipeline

In [16]:
csv_path = "data/retail_sales_dataset.csv"   # adjust if needed (relative to notebook)

orchestrator = DataPulseOrchestrator()

result = orchestrator.run_sales_report(
    csv_path=csv_path,
    business_name="DataPulse Retail Co.",
    industry="Retail",
)

print("SESSION ID:", result["session_id"])
print("\n=== KPIs ===")
print(json.dumps(result["kpis"], indent=2))

print("\n=== REPORT ===\n")
print(result["report"])

INFO:root:=== Starting DataPulse report run ===
INFO:root:Ingesting CSV: data/retail_sales_dataset.csv
INFO:root:Loaded CSV with encoding=latin1
INFO:root:Data shape: 2823 rows x 25 cols
INFO:root:Computing KPIs...
INFO:root:Generating narrative report...


Using Gemini model: models/gemini-pro-latest
NarrativeAgent using Gemini model: models/gemini-pro-latest


INFO:root:=== DataPulse report generation completed ===


SESSION ID: be369d60-56a8-442e-afe7-5e3c0293d8f7

=== KPIs ===
{
  "Total Revenue": 236168.07,
  "Total Cost": 165317.64899999998,
  "Total Profit": 70850.421,
  "Average Revenue": 83.65854410201914,
  "Average Profit": 25.09756323060574,
  "Top Products": [
    {
      "PRODUCTLINE": "Classic Cars",
      "Revenue": 84453.7
    },
    {
      "PRODUCTLINE": "Vintage Cars",
      "Revenue": 47435.96
    },
    {
      "PRODUCTLINE": "Motorcycles",
      "Revenue": 27472.19
    },
    {
      "PRODUCTLINE": "Trucks and Buses",
      "Revenue": 26345.91
    },
    {
      "PRODUCTLINE": "Planes",
      "Revenue": 25012.72
    }
  ],
  "meta": {
    "revenue_column_used": "PRICEEACH",
    "cost_column_used": "synthetic",
    "product_column_used": "PRODUCTLINE",
    "cost_source": "synthetic (0.7 * revenue)"
  }
}

=== REPORT ===

**To:** Owner, DataPulse Retail Co.
**From:** Your AI Analytics Copilot
**Subject:** Business Performance and Growth Opportunities Report

### Executive Summary