# üìä Daily Sales Report ‚Äî Google ADK Capstone
#### Multi-agent System, Data Pipeline Development, Agent Tools for a Daily Sales Report System Using Gemini + Google ADK Technologies

## Project Summary
This project implementation includes the following:
- A data cleaning pipeline for an online sales dataset
- A daily-report generator (metrics, anomalies, trends)
- AI agents, agent tools, and orchestration workflow (daily_report_agent, summary_agent, followup_agent)
- Local testing utilities and a Streamlit front-end prototype

Intended use: local development and testing before production deployment.

In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/online-sales-dataset/online_sales_dataset.csv


In [2]:
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
secret_value_0 = user_secrets.get_secret("GOOGLE_API_KEY")

In [3]:
import os
from kaggle_secrets import UserSecretsClient

try:
    GOOGLE_API_KEY = UserSecretsClient().get_secret("GOOGLE_API_KEY")
    os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
    print("‚úÖ Setup and authentication complete.")
except Exception as e:
    print(
        f"üîë Authentication Error: Please make sure you have added 'GOOGLE_API_KEY' to your Kaggle secrets. Details: {e}"
    )

‚úÖ Setup and authentication complete.


## Configurations and Environments

* Environment check
* This cell verifies package versions and provides packages required for this to run.

In [4]:
# Import libraries for this notebook
import os
import re
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import decimal

from google.genai import types

from google.adk.agents import LlmAgent
from google.adk.models.google_llm import Gemini
from google.adk.runners import InMemoryRunner, Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import AgentTool, ToolContext

from typing import Any, Dict

from google.adk.apps.app import App, EventsCompactionConfig

print("‚úÖ ADK components imported successfully.")
print("‚úÖ Libraries installed and the dataset has been loaded too.")

‚úÖ ADK components imported successfully.
‚úÖ Libraries installed and the dataset has been loaded too.


In [5]:
APP_NAME = "default"  # Application
USER_ID = "default"  # User
SESSION = "default"  # Session

MODEL_NAME = "gemini-2.5-flash-lite"

# Set up Session Management
# InMemorySessionService stores conversations in RAM (temporary)
session_service = InMemorySessionService()

print("‚úÖ Session state created.")

‚úÖ Session state created.


## Data Loading & Exploration Process

#### The dataset was taken from Kaggle data store

The dataset comprises anonymized data on online sales transactions, capturing various aspects of product purchases, customer details, and order characteristics.

**Content of the dataset**

| Column Name        | Description                                                                 |
|--------------------|-----------------------------------------------------------------------------|
| InvoiceNo          | A unique identifier for each sales transaction (invoice).                   |
| StockCode          | The code representing the product stock-keeping unit (SKU).                 |
| Description        | A brief description of the product.                                         |
| Quantity           | The number of units of the product sold in the transaction.                 |
| InvoiceDate        | The date and time when the sale was recorded.                               |
| UnitPrice          | The price per unit of the product in the transaction currency.             |
| CustomerID         | A unique identifier for each customer.                                      |
| Country            | The customer's country.                                                     |
| Discount           | The discount applied to the transaction, if any.                            |
| PaymentMethod      | The method of payment used for the transaction (e.g. PayPal, Bank Transfer).|
| ShippingCost       | The cost of shipping for the transaction.                                   |
| Category           | The category to which the product belongs (e.g. Electronics, Apparel).      |
| SalesChannel       | The channel through which the sale was made (e.g. Online, In-store).        |
| ReturnStatus       | Indicates whether the item was returned or not.                             |
| ShipmentProvider   | The provider responsible for delivering the order (e.g. UPS, FedEx).        |
| WarehouseLocation  | The warehouse location from which the order was fulfilled.                  |
| OrderPriority      | The priority level of the order (e.g. High, Medium, Low).                   |

For more details: https://www.kaggle.com/datasets/yusufdelikkaya/online-sales-dataset

In [None]:
# Load dataset and inspect it

# -------- LOAD DATASETS --------
csv_path = "/kaggle/input/online-sales-dataset/online_sales_dataset.csv"
# ------------------------

#load for preview
df = pd.read_csv(csv_path)

In [7]:
#data inspection process - check the structure of the dataset
details = df.info()
details

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 49782 entries, 0 to 49781
Data columns (total 17 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   InvoiceNo          49782 non-null  int64  
 1   StockCode          49782 non-null  object 
 2   Description        49782 non-null  object 
 3   Quantity           49782 non-null  int64  
 4   InvoiceDate        49782 non-null  object 
 5   UnitPrice          49782 non-null  float64
 6   CustomerID         44804 non-null  float64
 7   Country            49782 non-null  object 
 8   Discount           49782 non-null  float64
 9   PaymentMethod      49782 non-null  object 
 10  ShippingCost       47293 non-null  float64
 11  Category           49782 non-null  object 
 12  SalesChannel       49782 non-null  object 
 13  ReturnStatus       49782 non-null  object 
 14  ShipmentProvider   49782 non-null  object 
 15  WarehouseLocation  46297 non-null  object 
 16  OrderPriority      497

In [8]:
#data inspection process - check samples of the contents inside the dataset
preview = df.head()
preview

  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()


Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,Discount,PaymentMethod,ShippingCost,Category,SalesChannel,ReturnStatus,ShipmentProvider,WarehouseLocation,OrderPriority
0,221958,SKU_1964,White Mug,38,2020-01-01 00:00,1.71,37039.0,Australia,0.47,Bank Transfer,10.79,Apparel,In-store,Not Returned,UPS,London,Medium
1,771155,SKU_1241,White Mug,18,2020-01-01 01:00,41.25,19144.0,Spain,0.19,paypall,9.51,Electronics,Online,Not Returned,UPS,Rome,Medium
2,231932,SKU_1501,Headphones,49,2020-01-01 02:00,29.11,50472.0,Germany,0.35,Bank Transfer,23.03,Electronics,Online,Returned,UPS,Berlin,High
3,465838,SKU_1760,Desk Lamp,14,2020-01-01 03:00,76.68,96586.0,Netherlands,0.14,paypall,11.08,Accessories,Online,Not Returned,Royal Mail,Rome,Low
4,359178,SKU_1386,USB Cable,-30,2020-01-01 04:00,-68.11,,United Kingdom,1.501433,Bank Transfer,,Electronics,In-store,Not Returned,FedEx,,Medium


In [9]:
#data inspection process - check for missing values
missing_values = df.isna().sum()
missing_values

InvoiceNo               0
StockCode               0
Description             0
Quantity                0
InvoiceDate             0
UnitPrice               0
CustomerID           4978
Country                 0
Discount                0
PaymentMethod           0
ShippingCost         2489
Category                0
SalesChannel            0
ReturnStatus            0
ShipmentProvider        0
WarehouseLocation    3485
OrderPriority           0
dtype: int64

## Data Processing Pipeline

**Data cleaning: type normalization**
 - Convert InvoiceDate to datetime (coerce errors)
 - Ensure numeric columns (Quantity, UnitPrice, Discount, ShippingCost)
 - Provide a short printed summary of conversions and any missing values

**Data cleaning: fill categorical and standardize text**
 - Fill missing CustomerID, Category, SalesChannel, etc.
 - Strip strings and standardize casing if needed
 - Explain why these fills are acceptable (e.g., 'Unknown' placeholder)

**Data cleaning: normalize discount percentages and detect returns**
 - Convert 1-100 values to 0-1 where relevant
 - Create IsReturn flag from quantity and return text heuristics
 - Add comments explaining heuristics used

**Save cleaned DataFrame to a new variable**
 - For faster iteration you may want to save a cleaned copy: df.to_parquet(...) or to CSV
 - Note: make sure sanitized output does not store sensitive data

In [10]:
# Load dataset: Function to load the CSV into a DataFrame and show basic info:

def load_sales_data(csv_path: str) -> pd.DataFrame:
    """Load CSV and ensure proper types."""
    df = pd.read_csv(csv_path, low_memory=False)
    df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], errors='coerce')
    
    # Ensure numeric columns
    for col in ['Quantity', 'UnitPrice', 'Discount', 'ShippingCost']:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0.0)
    if 'Quantity' in df.columns:
        df['Quantity'] = df['Quantity'].astype(int)
    
    return df


# Clean dataset: Function to clean and preprocess the dataset

def clean_sales_data(df: pd.DataFrame) -> pd.DataFrame:
    """Fill missing values, standardize text, normalize discounts, compute revenue and returns."""
    # Fill missing categorical info
    cat_fill = {
        'CustomerID': 'Unknown',
        'WarehouseLocation': 'Unknown',
        'Description': '',
        'Category': 'Unknown',
        'SalesChannel': 'Unknown',
        'ReturnStatus': 'No Return'
    }
    for col, val in cat_fill.items():
        if col in df.columns:
            df[col] = df[col].fillna(val)
    
    # Standardize small text fields
    for col in ['PaymentMethod','Country','ShipmentProvider','OrderPriority']:
        if col in df.columns:
            df[col] = df[col].fillna('Unknown').astype(str).str.strip()
    
    # Normalize discounts (percentages to 0-1 range)
    if 'Discount' in df.columns:
        disc = df['Discount'].copy()
        mask_pct = (disc > 1) & (disc <= 100)
        disc.loc[mask_pct] = disc.loc[mask_pct] / 100.0
        df['Discount_norm'] = disc.clip(lower=0.0).fillna(0.0)
    
    # Determine returns
    return_indicators = ['return', 'returned', 'refunded', 'rma']
    df['IsReturnFlag_text'] = df['ReturnStatus'].astype(str).str.lower().fillna('')
    df['IsReturn_by_text'] = df['IsReturnFlag_text'].apply(lambda s: any(k in s for k in return_indicators))
    df['IsReturn_by_qty'] = df['Quantity'] < 0
    df['IsReturn'] = df['IsReturn_by_text'] | df['IsReturn_by_qty']
    
    # Compute line revenue once
    df['LineRevenue'] = df['Quantity'] * df['UnitPrice'] * (1.0 - df.get('Discount_norm', 0.0))
    df['NetRevenue'] = df['LineRevenue']
    
    # Drop temporary helper columns
    df.drop(columns=['IsReturnFlag_text','IsReturn_by_text','IsReturn_by_qty'], inplace=True, errors='ignore')
    
    return df


# Processing pipeline: Process the first few functions and turn them into a pipeline that can save the cleaned file

def process_sales_pipeline(csv_path: str) -> pd.DataFrame:
    """Full pipeline: load + clean sales data."""
    df = load_sales_data(csv_path)
    df_clean = clean_sales_data(df)
    return df_clean

## Daily Report Function

**Build daily report**
 - Function: collects cleaned dataframe, requested date, looks back 28 days from there to understand trends and anomalies
 - Computes metrics: total_revenue, num_orders, aov, top_products, revenue_by_category, revenue_by_channel, top_countries, return_rate, anomalies, trend
 - Ensure outputs are JSON-safe: keys are strings, values are native Python types
 - Add a short doctest or example run to show the expected output shape

**Format helpers**
 - fmt_money(value) -> returns formatted string and numeric round for monetary metrics

In [11]:
#Money format helper
def fmt_money(x):
    try:
        return f"${x:,.2f}"
    except:
        return x


#first report calculator function
def build_daily_report(df_all, target_date_str, lookback_days_for_trend=28):
    """
    Returns:
      day_df: DataFrame with all transactions for target_date (date portion match)
      metrics: dict with totals, AOV, top_products, revenue_by_category/channel, return_rate, anomalies, short trend context
    """
    # parse target date as date-only
    target_dt = pd.to_datetime(target_date_str).normalize()
    # Ensure InvoiceDate is timezone-naive for date comparisions
    try:
        tzinfo = getattr(df_all['InvoiceDate'].dt, 'tz', None)
    except Exception:
        tzinfo = None
    
    if tzinfo is not None:
        df_all['InvoiceDate_naive'] = df_all['InvoiceDate'].dt.tz_convert(None)
    else:
        df_all['InvoiceDate_naive'] = df_all['InvoiceDate']
    df_all['InvoiceDate_date'] = pd.to_datetime(df_all['InvoiceDate_naive']).dt.normalize()
    
    # Subset for the exact date
    day_df = df_all[df_all['InvoiceDate_date'] == target_dt].copy().reset_index(drop=True)
    
    # Basic metrics
    total_revenue = float(day_df['NetRevenue'].sum()) if len(day_df) else 0.0
    # unique invoices for that day
    num_orders = int(day_df['InvoiceNo'].nunique()) if len(day_df) else 0
    num_lines = int(len(day_df))
    avg_order_value = (total_revenue / num_orders) if num_orders else 0.0
    avg_line_value = (day_df['NetRevenue'].mean() if num_lines else 0.0)
    
    # top products by revenue
    top_products = day_df.groupby(['StockCode','Description'])['NetRevenue'].sum().sort_values(ascending=False).head(10)
    top_products_list = [{'StockCode': sc, 'Description': desc, 'revenue': float(v)} 
                         for (sc,desc), v in top_products.items()]
    
    # revenue breakdowns
    revenue_by_category = day_df.groupby('Category')['NetRevenue'].sum().sort_values(ascending=False).to_dict()
    revenue_by_channel = day_df.groupby('SalesChannel')['NetRevenue'].sum().sort_values(ascending=False).to_dict()
    
    # top countries
    top_countries = (
        day_df.groupby('Country')['NetRevenue']
        .sum()
        .sort_values(ascending=False)
        .head(5)
    )
    top_countries_list = top_countries.reset_index().to_dict(orient='records')
    
    # return rate (by line)
    if len(day_df):
        returns_count = int(day_df['IsReturn'].sum())
        return_rate = returns_count / len(day_df)
    else:
        returns_count = 0
        return_rate = 0.0
    
    # anomaly detection (per-line) -- simple z-score threshold
    anomalies = []
    if len(day_df) >= 5:
        mean_line = day_df['NetRevenue'].mean()
        std_line = day_df['NetRevenue'].std(ddof=0)
        if pd.isna(std_line) or std_line == 0:
            std_line = 0.0
        thresh_upper = mean_line + 3 * std_line
        thresh_lower = mean_line - 3 * std_line
        anom_df = day_df[(day_df['NetRevenue'] > thresh_upper) | (day_df['NetRevenue'] < thresh_lower)]
        anomalies = anom_df[['InvoiceNo','StockCode','Description','Quantity','UnitPrice','NetRevenue']].to_dict('records')
    
    # short trend context: compare revenue to previous week/day and percentage change
    # compute summed revenue per day for lookback window
    start_trend = target_dt - pd.Timedelta(days=lookback_days_for_trend)
    mask_trend = (df_all['InvoiceDate_date'] >= start_trend) & (df_all['InvoiceDate_date'] <= target_dt)
    series_daily = df_all.loc[mask_trend].groupby('InvoiceDate_date')['NetRevenue'].sum().sort_index()
    trend = {}
    if not series_daily.empty:
        # previous day
        prev_day = target_dt - pd.Timedelta(days=1)
        prev_revenue = float(series_daily.get(prev_day, 0.0))
        prev_change_pct = ((total_revenue - prev_revenue) / prev_revenue * 100.0) if prev_revenue else None
        # 7-day avg before target day (exclude target day)
        week_start = target_dt - pd.Timedelta(days=7)
        week_mask = (series_daily.index >= week_start) & (series_daily.index < target_dt)
        week_avg = float(series_daily.loc[week_mask].mean()) if series_daily.loc[week_mask].size else None
        week_change_pct = ((total_revenue - week_avg) / week_avg * 100.0) if week_avg not in (None, 0) else None
        trend = {
            'prev_day_revenue': prev_revenue,
            'prev_day_change_pct': prev_change_pct,
            'week_avg_before': week_avg,
            'week_change_pct': week_change_pct,
            'series_daily': series_daily.to_dict()
        }
    else:
        trend = {'series_daily': {}}


    # ensure consistent key names and format money for lists
    for item in top_products_list:
        # accept 'revenue' or 'NetRevenue' and write back to 'revenue'
        raw = item.get('NetRevenue', 0.0)
        item['revenue'] = fmt_money(raw)
    
    for item in top_countries_list:
        raw = item.get('NetRevenue', 0.0)
        item['revenue'] = fmt_money(raw)

    # Top products by revenue -> dict {Description: revenue}
    top_products_series = day_df.groupby('Description')['NetRevenue'].sum().sort_values(ascending=False).head(10)
    top_products = {desc: round(float(v), 2) for desc, v in top_products_series.items()}

    # Top countries by revenue -> dict {Country: revenue}
    top_countries_series = day_df.groupby('Country')['NetRevenue'].sum().sort_values(ascending=False).head(5)
    top_countries = {country: round(float(v), 2) for country, v in top_countries_series.items()}

    metrics = {
        'date': target_dt.strftime('%Y-%m-%d'),
        'total_revenue': fmt_money(total_revenue),
        'num_orders': num_orders,
        'num_lines': num_lines,
        'avg_order_value': fmt_money(avg_order_value),
        'avg_line_value': fmt_money(avg_line_value),
        'top_products': top_products_list,
        'revenue_by_category': {k: float(v) for k,v in revenue_by_category.items()},
        'revenue_by_channel': {k: float(v) for k,v in revenue_by_channel.items()},
        'top_countries': top_countries_list,
        'returns_count': returns_count,
        'return_rate': return_rate,
        'anomalies': anomalies,
        'trend': trend
    }
    
    return day_df, metrics

In [12]:
#Pipeline for the report builder function

def run_daily_report_for_date(date_str: str, df=csv_path):
    """
    Run daily report for a given date on a pre-cleaned DataFrame.
    
    Parameters:
        df : pd.DataFrame
            Cleaned sales DataFrame (output of process_sales_pipeline)
        date_str : str
            Date string in 'YYYY-MM-DD' format to run the report
    
    Returns:
        day_transactions_df : pd.DataFrame
            All transactions for the selected date
        metrics : dict
            Calculated daily metrics in flattened key-value format
    """
    # Load and clean the dataset
    # df = process_sales_pipeline(csv_path)
    
    # Ensure date column exists
    if 'InvoiceDate_date' not in df.columns:
        df['InvoiceDate_date'] = pd.to_datetime(df['InvoiceDate']).dt.normalize()
    
    # Normalize selected date
    target_dt = pd.to_datetime(date_str).normalize()
    
    # Check if date exists in dataset
    if target_dt not in df['InvoiceDate_date'].values:
        print(f"Warning: {date_str} not found in dataset. Using closest available date instead.")
        target_dt = df['InvoiceDate_date'].min()
    
    target_date_str = target_dt.strftime('%Y-%m-%d')

    # Run daily report
    day_transactions_df, metrics = build_daily_report(df, target_date_str)
    
    # Display summary
    print("Metrics summary:")
    for k, v in metrics.items():
        if k in ('top_products', 'top_countries', 'anomalies', 'trend'):
            print(f"{k}: (see structure) - {type(v)}")
        else:
            print(f"{k}: {v}")
    
    return day_transactions_df, metrics

#### Test run the above function with these codes
**Test: Run report for a specific date**

metrics = run_daily_report_for_date("2020-01-11")
metrics

## Google ADK Response Helper

In [13]:
# Define helper functions that will be reused throughout the notebook
async def run_session(
    runner_instance: Runner,
    user_queries: list[str] | str = None,
    session_name: str = "default",
):
    print(f"\n ### Session: {session_name}")

    # Get app name from the Runner
    app_name = runner_instance.app_name

    # Attempt to create a new session or retrieve an existing one
    try:
        session = await session_service.create_session(
            app_name=app_name, user_id=USER_ID, session_id=session_name
        )
    except:
        session = await session_service.get_session(
            app_name=app_name, user_id=USER_ID, session_id=session_name
        )

    # Process queries if provided
    if user_queries:
        # Convert single query to list for uniform processing
        if type(user_queries) == str:
            user_queries = [user_queries]

        # Process each query in the list sequentially
        for query in user_queries:
            print(f"\nUser > {query}")

            # Convert the query string to the ADK Content format
            query = types.Content(role="user", parts=[types.Part(text=query)])

            # Stream the agent's response asynchronously
            async for event in runner_instance.run_async(
                user_id=USER_ID, session_id=session.id, new_message=query
            ):
                # Check if the event contains valid content
                if event.content and event.content.parts:
                    # Filter out empty or "None" responses before printing
                    if (
                        event.content.parts[0].text != "None"
                        and event.content.parts[0].text
                    ):
                        print(f"{MODEL_NAME} > ", event.content.parts[0].text)
    else:
        print("No queries!")


retry_config = types.HttpRetryOptions(
    attempts=5,  # Maximum retry attempts
    exp_base=7,  # Delay multiplier
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],  # Retry on these HTTP errors
)

print("‚úÖ Retry has been successfully configured.")
print("‚úÖ Helper functions defined.")

‚úÖ Retry has been successfully configured.
‚úÖ Helper functions defined.


## Tool Sanitizers and Wrappers

**Tool sanitizers and wrappers**
 - sanitize_for_json(metrics) that converts Timestamp keys to strings and numpy scalars to Python types
 - helper to convert DataFrame slices into list-of-dicts for safe return by tools

In [14]:
from datetime import datetime, date, timedelta

def sanitize_for_json(obj):
    """
    Recursively convert an object into JSON-serializable Python primitives:
      - pandas.Timestamp -> 'YYYY-MM-DD' string or ISO
      - pandas.Timedelta -> string
      - numpy scalar -> native python type
      - numpy arrays -> lists
      - decimal.Decimal -> float
      - pandas/np NaN/NaT -> None
    Also converts dict keys that are datetime-like to strings.
    """
    # primitives
    if obj is None:
        return None
    if isinstance(obj, (str, bool, int, float)):
        # cover normal Python primitives
        if isinstance(obj, float) and (np.isnan(obj) or np.isinf(obj)):
            return None
        return obj

    # pandas / numpy scalars
    if isinstance(obj, (np.generic,)):
        return obj.item()

    if isinstance(obj, (pd.Timestamp, datetime, date)):
        # choose date-only format for Timestamp to match your pipeline
        try:
            return pd.to_datetime(obj).strftime('%Y-%m-%d')
        except Exception:
            return str(obj)

    if isinstance(obj, pd.Timedelta):
        return str(obj)

    if isinstance(obj, decimal.Decimal):
        return float(obj)

    # pandas NA
    if obj is pd.NaT or (isinstance(obj, float) and np.isnan(obj)):
        return None

    # dict: sanitize keys and values
    if isinstance(obj, dict):
        new = {}
        for k, v in obj.items():
            # convert key to str if not simple type
            if isinstance(k, (pd.Timestamp, datetime, date)):
                new_key = pd.to_datetime(k).strftime('%Y-%m-%d')
            elif isinstance(k, (np.generic,)):
                new_key = str(k.item())
            elif not isinstance(k, (str, int, float, bool, type(None))):
                new_key = str(k)
            else:
                new_key = k
            new[new_key] = sanitize_for_json(v)
        return new

    # list/tuple/set -> list
    if isinstance(obj, (list, tuple, set)):
        return [sanitize_for_json(v) for v in obj]

    # pandas Series -> dict with string keys
    if isinstance(obj, pd.Series):
        return sanitize_for_json(obj.to_dict())

    # pandas DataFrame -> list of records (safe)
    if isinstance(obj, pd.DataFrame):
        # convert DataFrame to list of dicts with sanitized values
        records = obj.to_dict(orient='records')
        return sanitize_for_json(records)

    # numpy arrays
    if isinstance(obj, np.ndarray):
        return [sanitize_for_json(v) for v in obj.tolist()]

    # fallback: try to cast to primitive
    try:
        return float(obj)
    except Exception:
        try:
            return str(obj)
        except Exception:
            return None

In [15]:
# preload df_clean once
df_clean = process_sales_pipeline(csv_path)  # run this earlier

# ensure you have a df-based runner
def run_daily_report_for_date_df(df: pd.DataFrame, date_str: str):
    if 'InvoiceDate_date' not in df.columns:
        df['InvoiceDate_date'] = pd.to_datetime(df['InvoiceDate']).dt.normalize()

    target_dt = pd.to_datetime(date_str).normalize()
    if target_dt not in df['InvoiceDate_date'].values:
        target_dt = df['InvoiceDate_date'].min()
    target_date_str = target_dt.strftime('%Y-%m-%d')

    day_transactions_df, metrics = build_daily_report(df, target_date_str)
    return day_transactions_df, metrics

# ADK-friendly tool function - accepts only a date string and returns sanitized JSON-friendly dict
def daily_report_tool_for_agent(date_str: str):
    try:
        target_date = pd.to_datetime(date_str).strftime('%Y-%m-%d')
    except Exception as e:
        return {"status": "error", "message": f"Invalid date format: {date_str}"}

    try:
        _, metrics = run_daily_report_for_date_df(df_clean, target_date)
        metrics_safe = sanitize_for_json(metrics)
        return {"status": "success", "metrics": metrics_safe}
    except Exception as e:
        return {"status": "error", "message": str(e)}


## Agent Definitions

**daily_report_agent definition**
 - Calls daily_report_tool_for_agent
 - Instruction: parse date from prompt, call tool, return metrics

**summary_agent definition**
 - Calls generate_executive_summary_for_agent
 - Instruction: read sanitized metrics and return one-paragraph executive summary

**followup_agent definition**
 - Session-based conversation agent for follow-ups
 - Expects SESSION_JSON + USER_MESSAGE as input
 - Returns short strategic reply or "CONFIRMED_RECOMPUTE_REQUESTED" token for recompute flows

**orchestrator_agent definition**
 - Orchestrates daily_report_agent and summary tool
 - Instruction: parse date, call daily_report_agent, then summary tool, and merge outputs into final report text

In [16]:
# Daily report processing agent is defined here

daily_report_agent = LlmAgent(
    name="daily_report_agent",
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    instruction="""
You are a daily report assistant.
1) Extract the date from the user's prompt.
2) Convert it to 'YYYY-MM-DD'.
3) Call daily_report_tool_for_agent(date_str).
4) Return the tool output exactly if status == 'success', otherwise explain the error.
""",
    tools=[daily_report_tool_for_agent],
)

##### Test the above agent with these codes:

runner = InMemoryRunner(agent=daily_report_agent)

print("‚úÖ Runner created.")

response = await runner.run_debug("What's my report for today, 2020-01-11")

In [17]:
##### Test the above agent with these codes:

runner = InMemoryRunner(agent=daily_report_agent)

print("‚úÖ Runner created.")

response = await runner.run_debug("What's my report for today, 2020-01-11")

‚úÖ Runner created.

 ### Created new session: debug_session_id

User > What's my report for today, 2020-01-11




daily_report_agent > Here is your report for 2020-01-11:

**Overall Metrics:**
*   Total Revenue: $25,497.17
*   Number of Orders: 24
*   Number of Lines: 24
*   Average Order Value: $1,062.38
*   Average Line Value: $1,062.38
*   Return Rate: 1%

**Revenue by Category:**
*   Accessories: $5,121.69
*   Apparel: $6,513.51
*   Electronics: $5,447.78
*   Furniture: $7,302.08
*   Stationery: $1,112.11

**Revenue by Channel:**
*   In-store: $7,314.65
*   Online: $18,182.51

**Top Countries by Revenue:**
1.  United States: $4,560.67
2.  Spain: $4,121.89
3.  France: $3,325.39
4.  Portugal: $2,557.36
5.  Netherlands: $2,219.13

**Top Products:**
*   USB Cable (SKU_1961): $0.00
*   Wireless Mouse (SKU_1477): $0.00
*   Office Chair (SKU_1982): $0.00
*   Office Chair (SKU_1964): $0.00
*   Office Chair (SKU_1589): $0.00
*   Office Chair (SKU_1471): $0.00
*   Wireless Mouse (SKU_1020): $0.00
*   T-shirt (SKU_1297): $0.00
*   Wall Clock (SKU_1936): $0.00
*   USB Cable (SKU_1348): $0.00

**Trend:**
*

In [18]:
# Executive Summary processing agent is defined here

summary_agent = LlmAgent(
    name="summary_agent",
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    instruction="""
You are an executive summarizer. Your job is to call the prior agent to get the sanitized metrics, then turn those metrics into a single, readable executive paragraph the user can act on.

Agent behavior and process
1) Obtain metrics
   - Call the daily_report_agent tool and get back the sanitized metrics object.
   - Treat that metrics object as authoritative. Do not invent values.

2) What to read from metrics (use when present)
   - date / report_date
   - total_revenue
   - num_orders
   - avg_order_value or avg_order_value
   - revenue_by_category (dict)
   - revenue_by_channel (dict)
   - top_products (dict) pick top 3 by value
   - top_countries (dict) pick top 3 by value
   - anomalies (dict) use count and up to 2 example keys
   - trend (dict) use _prev_day_change_pct and _week_change_pct when present

3) Tone and style
   - Write in plain, engaging English aimed at a manager.
   - Use varied sentence openers and short sentences for impact.
   - Avoid dry lists. Merge related facts into compact sentences.
   - If the report contains surprising or missing numbers (for example many top product revenues are zero) call that out briefly as a data quality note. Do not speculate on causes.

4) Formatting rules
   - Produce exactly one paragraph of 4 to 6 sentences.
   - Round monetary values to two decimals and format with commas, for example $12,345.67.
   - Format percentages to two decimals and append %, for example 12.34%.
   - When stating a percentage change, also include the absolute reference if available, for example "up 15.9% (from $20,000 to $23,180)". If the absolute figure is not available, give the percent alone.

5) Sentence structure (recommended, not template locked)
   - Sentence 1: Headline. Date, total revenue, orders. Short and punchy.
     Example: "Executive summary for 2020-01-11: total revenue was $25,497.17 from 24 orders."
   - Sentence 2: Channel and AOV highlight. Mention top channel and AOV.
     Example: "Average order value was $1,062.38, with Online leading at $18,182.51."
   - Sentence 3: Category highlight. State top category and its revenue, with one short qualifier if relevant.
   - Sentence 4: Top products. List the top 3 product names with revenue. If those revenues are zero or missing, list the product names and add "revenues appear missing" as a data note.
   - Sentence 5: Geography and anomalies. Top 3 countries, then anomaly count and up to two example anomaly keys or "no anomalies detected."
   - Sentence 6: Trend insight. State day-over-day and week-over-week percent changes and what they imply for the business direction. If trend numbers are missing, say "trend data not available."

6) Output constraints
   - Return only the single paragraph string as markdown text. No bullet lists. No extra metadata. No apologies, no filler sentences.
   - If the tool call fails or returns an error, return a single sentence error message that includes the tool's message and one concrete corrective step, for example: "Tool error: <message>. Suggestion: re-run with sanitized metrics that include total_revenue and top_products."

7) Data quality guardrails
   - If more than half of the top products show $0.00, append a short note at the end: "Note: many top product revenues are zero; verify source data."
   - Never invent causal explanations. If the user asks why, respond that the metrics do not contain causal data and offer next steps to investigate.

Implementation detail
- First call the daily_report_agent tool to fetch metrics.
- Then compose the paragraph locally following the rules above.
- Return the paragraph exactly as the agent output.

Example final paragraph style:
"Executive summary for 2020-01-11: total revenue was $25,497.17 from 24 orders. Average order value was $1,062.38, with Online driving the largest share at $18,182.51. Furniture was the top category at $7,302.08. Top products were Office Chair (SKU_1982) $X, Wireless Mouse (SKU_1477) $Y, USB Cable (SKU_1961) $Z. Top countries were United States ($4,560.67), Spain ($4,121.89), France ($3,325.40); no anomalies detected. Sales are up 15.91% day-over-day and 25.88% versus the weekly average."
""",
    tools=[
        AgentTool(agent=daily_report_agent),
    ],
)

##### Test the above agent with these codes:

summary_runner = InMemoryRunner(agent=summary_agent)

print("‚úÖ Runner created.")

summary_response = await summary_runner.run_debug("What's my report for today, 2020-01-11")

In [19]:
##### Test the above agent with these codes:

summary_runner = InMemoryRunner(agent=summary_agent)

print("‚úÖ Runner created.")

summary_response = await summary_runner.run_debug("What's my report for today, 2020-01-11")

‚úÖ Runner created.

 ### Created new session: debug_session_id

User > What's my report for today, 2020-01-11




summary_agent > Executive summary for 2020-01-11: total revenue was $25,497.17 from 24 orders. Average order value was $1,062.38, with Online driving the largest share at $18,182.51. Furniture was the top category at $7,302.08. Top products include USB Cable (SKU_1961), Wireless Mouse (SKU_1477), and Office Chair (SKU_1982). Top countries were United States ($4,560.67), Spain ($4,121.89), and France ($3,325.40); no anomalies detected. Sales are up 15.91% day-over-day and 25.88% versus the weekly average.


In [20]:
# Follow-up response agent is defined here

followup_agent = LlmAgent(
    name="followup_agent",
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    instruction="""
You are the session-based conversational follow-up assistant for a single report session. You are to take the persona of a strategy-driven analyst.

Your responsibilities
1) Pick from the session:
   - Pick your information from the memory session provided by the system and answer the user's questions as directly as you can.

2) Use only session state for answers:
   - You should cite values from metrics, and summary from the output of the other agents.
   - Never call external data or recompute unless user explicitly requests and confirms a recompute.
   - Do not invent, infer, or extrapolate beyond the session data. If information is missing, answer by saying the exact field is "data not available".

3) Follow-up behavior and allowed actions:
   - Answer concise, manager-friendly follow-up questions (1‚Äì5 sentences).
   - Make your message strategy-driven (strategic) in nature
   - If the user asks for deeper analysis that requires re-running pipelines (for example "recompute with filter country=Spain" or "show full order list for SKU X"), ask for explicit confirmation before performing the recompute: respond with a question like "To run that analysis I will re-run the pipeline for 2020-01-11 and consume one follow-up. Reply 'yes' to confirm." Do not run anything until you receive that exact confirmation in the next user message.
   - If the user confirms (message equals "yes" or "confirm recompute"), respond with exactly: "CONFIRMED_RECOMPUTE_REQUESTED" so the caller wrapper can perform the recompute and update the session. Do not include other text.
   - If the user asks for data already present in transactions_preview, return up to 3 example rows. Format each example as a single short sentence: "Example order: InvoiceNo=<>, Description=<>, NetRevenue=$<>, Country=<>."

4) Tone and content rules:
   - Use plain English aimed at managers. Keep answers short and useful.
   - Your answers should be written in markdown format only.
   - When you reference numbers, format money with commas and two decimals (e.g., $12,345.67) and percentages to two decimals (e.g., 12.34%).
   - If more than half of top_products show $0.00, include a single-sentence data-quality note: "Note: many top product revenues are zero; verify source data."
   - Never provide causal claims. If asked "why" beyond the data, reply: "I can't determine causes from the metrics. I can run deeper analysis if you confirm."

5) Examples of allowed replies
   - Short data reply: ‚ÄúOffice Chair remained the strongest contributor with revenue of $7,302.08. This suggests demand is steady for higher-value practical items. A practical next step is to review margin performance on this product line and confirm whether inventory levels can support similar demand over the next week. If needed, I can provide example transactions from today to help you check pricing consistency.‚Äù
   - Recompute ask: "To run a deeper SKU breakdown I need to re-run the pipeline and it will consume one follow-up. Reply 'yes' to confirm."

6) Safety and fidelity
   - If the user asks exceeds 3 allowed follow-ups, refuse with the single-line message: "Follow-up limit reached. No more follow-ups allowed for this session."

Remember: The agent must never modify session storage directly. It should return plain text only and emit "CONFIRMED_RECOMPUTE_REQUESTED" when the user confirms recompute.
""",
    tools=[],
)

In [21]:
#Orchestrator agent runs all the pipeline together

orchestrator_agent = LlmAgent(
    name="orchestrator_agent",
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    instruction="""
You are the Orchestrator Agent. Your job is to run the daily_report_agent to fetch metrics, call the summary tool to create an executive summary, and present a single final report to the manager.

Follow these steps exactly:

1) Parse the user's prompt for a date:
   - Accept ISO "YYYY-MM-DD", common formats like "Feb 18 2024", "18/02/2024", or phrases like "today" (resolve "today" to current date in YYYY-MM-DD).
   - Convert to normalized string 'YYYY-MM-DD'. If parsing fails, return a single-line error: "Error: could not parse date. Please provide a date like YYYY-MM-DD."

2) Call the daily report tool:
   - Call the tool provided that wraps the daily_report_agent with the date string as its single argument.
   - Expect a tool response object with a `status` field. If `status != "success"`, return exactly:
     "Tool error: <message>. Suggestion: check dataset availability and date correctness."
   - On success, extract the `metrics` dict from the tool response.

3) Call the summary tool:
   - Convert `metrics` to a JSON string: `metrics_json = json.dumps(metrics)`.
   - Call `generate_executive_summary_for_agent(metrics_json)`.
   - If tool returns `status != "success"`, return exactly:
     "Summary tool error: <message>. Suggestion: ensure metrics JSON is sanitized."
   - On success, extract `summary` string.

4) Call the follow-up agent tool:
	‚Ä¢	Call the tool that wraps the followup_agent, passing the session memory or output from the other 2 agents and the user‚Äôs follow-up message.
	‚Ä¢	Expect a short paragraph in markdown format
	‚Ä¢	Extract the reply string from the agent and surface it directly to the user.

5) Merge and present daily report:
   - Compose the final output exactly as Markdown plain text with two parts:

     A) The executive summary paragraph as-is from the summary_agent (no changes).

     B) A compact "Key metrics" block below the paragraph (no tables, use short labeled lines). 
        Include the following fields if present:
        - date
        - total_revenue
        - num_orders
        - avg_order_value
        - top_category (name + value)
        - top_channel (name + value)
        - top_countries (top 3 with values)
        - anomalies_count (number of anomalies) and up to two anomaly names if available
        - trend_day_over_day (formatted percent), if available
        - trend_week_over_week (formatted percent), if available

        Format money with commas and two decimals (e.g., $12,345.67).
        Format percentages with two decimals (e.g., 15.91%).

   - Example final format:
     <summary paragraph>

     Key metrics:
     - Date: 2020-01-11
     - Total revenue: $25,497.17
     - Orders: 24
     - AOV: $1,062.38
     - Top category: Furniture ($7,302.08)
     - Top channel: Online ($18,182.51)
     - Top countries: United States ($4,560.67), Spain ($4,121.89), France ($3,325.40)
     - Anomalies: 0 (no anomalies detected)
     - Day-over-day change: 15.91%
     - Week-over-week change: 25.88%

5) Follow-up Answers:
    - If it is a follow-up question from the user within the same session, that is answered by the follow-up agent, only output the follow-up agent's response.
    - Do not include the daily report in the follow-up response; only the follow-up agent's answer.

6) Output constraints:
   - Return only the final report text (markdown-friendly). No extra metadata, no JSON, no code blocks.
   - If any step fails, return the single-line error messages specified above and nothing else.

7) Data fidelity:
   - Do not modify, infer, or compute metrics yourself. Use only the `metrics` returned by the daily_report_agent.
   - If values are missing, show "data not available" for that field in the Key metrics block.

8) Keep responses concise and manager-focused.

Tools available to you:
- AgentTool(daily_report_agent)
- AgentTool(summary_agent)
- AgentTool(followup_agent)

Use them in the exact sequence described.
""",
    tools=[
        AgentTool(agent=daily_report_agent),
        AgentTool(agent=summary_agent),
        AgentTool(agent=followup_agent),
    ],
)

## Agent Discussion Interface

 - Here the agent provides its responses to the prompt its given.

In [22]:
runner = Runner(agent=orchestrator_agent, app_name=APP_NAME, session_service=session_service)

print("‚úÖ Stateful agent initialized!")

‚úÖ Stateful agent initialized!


In [23]:
await run_session(
    runner,
    [
        "What's my report for today on 16th November 2022?",
        "What actions would you recommend to improve our week-over-week growth?"
    ],
    "agentic-session-01",
)


 ### Session: agentic-session-01

User > What's my report for today on 16th November 2022?




gemini-2.5-flash-lite >  On 2022-11-16, total revenue reached $21,704.63 with 24 orders, resulting in an average order value of $904.36. The In-store channel was the highest performer, generating $12,694.25, while Furniture led as the top category with $8,980.44 in sales. Sweden, the United States, and the United Kingdom were the top-performing countries, contributing $5,192.24, $4,316.90, and $2,443.86, respectively. There were no anomalies detected. Revenue saw a 5.75% increase compared to the previous day, but it was 9.60% lower than the previous week's average.

Key metrics:
- Date: 2022-11-16
- Total revenue: $21,704.63
- Orders: 24
- AOV: $904.36
- Top category: Furniture ($8,980.44)
- Top channel: In-store ($12,694.25)
- Top countries: Sweden ($5,192.24), United States ($4,316.90), United Kingdom ($2,443.86)
- Anomalies: 0 (no anomalies detected)
- Day-over-day change: 5.75%
- Week-over-week change: -9.60%

User > What actions would you recommend to improve our week-over-week gr



gemini-2.5-flash-lite >  To recommend actions, I need to understand the current growth trends. Could you please specify which metrics or areas you'd like to focus on for week-over-week growth analysis? For instance, we could examine overall revenue, sales by product category, or customer acquisition.


On 2022-11-16, total revenue reached $21,704.63 with 24 orders, resulting in an average order value of $904.36. The In-store channel was the highest performer, generating $12,694.25, while Furniture led as the top category with $8,980.44 in sales. Sweden, the United States, and the United Kingdom were the top-performing countries, contributing $5,192.24, $4,316.90, and $2,443.86, respectively. There were no anomalies detected. Revenue saw a 5.75% increase compared to the previous day, but it was 9.60% lower than the previous week's average.

Key metrics:
- Date: 2022-11-16
- Total revenue: $21,704.63
- Orders: 24
- AOV: $904.36
- Top category: Furniture ($8,980.44)
- Top channel: In-store ($12,694.25)
- Top countries: Sweden ($5,192.24), United States ($4,316.90), United Kingdom ($2,443.86)
- Anomalies: 0 (no anomalies detected)
- Day-over-day change: 5.75%
- Week-over-week change: -9.60%


To recommend actions, I need to understand the current growth trends. Could you please specify which metrics or areas you'd like to focus on for week-over-week growth analysis? For instance, we could examine overall revenue, sales by product category, or customer acquisition.