# Part 1

### Data exploration

##### Required Python packages

In [1]:
import importlib
import subprocess
import sys

# We list special packages that don't exist in jupyter installation
special_required_packages = {
    "duckdb": "duckdb",
    "pandas": "pandas"
}

# Verify special packages
for module_name, pip_name in special_required_packages.items():
    try:
        importlib.import_module(module_name)
        print(f"{module_name} already installed")
    except ImportError:
        print(f"Installing {pip_name}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", pip_name])

Installing duckdb...
Collecting duckdb
  Downloading duckdb-1.3.2-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (7.0 kB)
Downloading duckdb-1.3.2-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (21.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m21.1/21.1 MB[0m [31m32.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: duckdb
Successfully installed duckdb-1.3.2
pandas already installed


In [69]:
import os
import json
import duckdb
import shutil
import requests
import pandas as pd
from datetime import datetime

##### Verify system version and set input data path, database path

In [3]:
# Input data path and database path within container
CSV_PATH = "/home/jovyan/challenge/data/ads_spend.csv"
DB_PATH = "/home/jovyan/challenge/database/warehouse.db"

# Verify directory path and python version
print(f"Working directory: {os.getcwd()}")
print(f"Python version: {sys.version}")

Working directory: /home/jovyan/challenge
Python version: 3.11.6 | packaged by conda-forge | (main, Oct  3 2023, 10:40:35) [GCC 12.3.0]


##### Explore first 5 rows within input data (ads_spend.csv)

In [5]:
# Verify if csv exists
if os.path.exists(CSV_PATH):
    
    # Take first 5 rows into pandas format, just for inspection
    df_sample = pd.read_csv(CSV_PATH, nrows=5)
    display(df_sample)
    
else:
    print("CSV file not found")

# Delete dataframe
del df_sample

Unnamed: 0,date,platform,account,campaign,country,device,spend,clicks,impressions,conversions
0,2025-01-01,Meta,AcctA,Prospecting,MX,Desktop,1115.94,360,15840,29
1,2025-01-01,Google,AcctA,Brand_Search,CA,Mobile,789.43,566,22640,28
2,2025-01-01,Google,AcctA,Prospecting,BR,Desktop,381.4,133,10241,12
3,2025-01-01,Google,AcctC,Prospecting,US,Desktop,1268.34,891,49005,36
4,2025-01-01,Google,AcctA,Brand_Search,BR,Desktop,1229.7,628,21352,31


##### Some features and big picture within input data (ads_spend.csv)

In [6]:
# Verify if csv exists
if os.path.exists(CSV_PATH):
    
    # Full data set info
    df_full = pd.read_csv(CSV_PATH)
    print(f"Dataset shape: {df_full.shape}")
    print(f"Columns: {list(df_full.columns)}")
    print(f"Total spent: ${df_full['spend'].sum():,.2f}")
    print(f"Date range: {df_full['date'].min()} a {df_full['date'].max()}")
    print(f"Platforms: {df_full['platform'].unique()}")
    print(f"Unique accounts: {df_full['account'].unique()}")
    
    # Verify data types
    for col, dtype in df_full.dtypes.items():
        print(f"  {col}: {dtype}")
        
else:
    print("CSV file not found")

# Delete dataframe
del df_full

Dataset shape: (2000, 10)
Columns: ['date', 'platform', 'account', 'campaign', 'country', 'device', 'spend', 'clicks', 'impressions', 'conversions']
Total spent: $1,690,764.32
Date range: 2025-01-01 a 2025-06-30
Platforms: ['Meta' 'Google']
Unique accounts: ['AcctA' 'AcctC' 'AcctB']
  date: object
  platform: object
  account: object
  campaign: object
  country: object
  device: object
  spend: float64
  clicks: int64
  impressions: int64
  conversions: int64


### Data ingestion process

##### Function to ingest data in database

In [7]:
def ingest_data():

    # Error handling
    try:

        # Verify csv file again
        if not os.path.exists(CSV_PATH):
            raise FileNotFoundError(f"{CSV_PATH} csv not found")
            
        # Read CSV and convert to dataframe
        input_data_df = pd.read_csv(CSV_PATH)
        print(f"csv loaded: {input_data_df.shape[0]:,} rows, {input_data_df.shape[1]} columns")
        
        # Validate if csv is empty
        if input_data_df.empty:
            raise ValueError("csv is empty")

        # Validate columns name (we put the list of columns that we already know exist)    
        required_columns = ['date', 'platform', 'account', 'campaign', 
                           'country', 'device', 'spend', 'clicks', 
                           'impressions', 'conversions']
        
        # Check if we have missing columns
        missing_cols = [col for col in required_columns if col not in input_data_df.columns]
        if missing_cols:
            raise ValueError(f"Missing columns: {missing_cols}")

        # Check null or empty values in columns
        null_empty_columns = [col for col in required_columns if input_data_df[col].isnull().any() or input_data_df[col].apply(lambda x: isinstance(x, str) and x.strip() == "").any()]

        if null_empty_columns:
            raise ValueError(f"Columnas con valores nulos o vacíos: {null_empty_columns}")
        
        # Add metadata
        # Date
        input_data_df['load_date'] = datetime.now()
        # Filename
        input_data_df['source_file_name'] = 'ads_spend.csv'
        
        print(f"load_date: {input_data_df['load_date'].iloc[0]}")
        print(f"source_file_name: {input_data_df['source_file_name'].iloc[0]}")
        
        # Connect to duckDB
        conn = duckdb.connect(DB_PATH)
        
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS ads_spend_db (
            date DATE,
            platform VARCHAR,
            account VARCHAR,
            campaign VARCHAR,
            country VARCHAR,
            device VARCHAR,
            spend DECIMAL(12,2),
            clicks INTEGER,
            impressions INTEGER,
            conversions INTEGER,
            -- Metadata challenge required
            load_date TIMESTAMP,
            source_file_name VARCHAR
        );
        """
        # Verify table is already created or just verify
        conn.execute(create_table_sql)
        print("Table ads_spend_db verified/created")
        
        # Count register before add
        count_before = conn.execute("SELECT COUNT(*) FROM ads_spend_db").fetchone()[0]
        
        # Insert data in append mode to demostrate persistence
        conn.register('df_new', input_data_df)
        conn.execute("INSERT INTO ads_spend_db SELECT * FROM df_new")
        
        # Count register after add
        count_after = conn.execute("SELECT COUNT(*) FROM ads_spend_db").fetchone()[0]
        
        conn.close()
        
        # Result for n8n in JSON format
        result = {
            "status": "success",
            "timestamp": datetime.now().isoformat(),
            "rows_inserted": len(input_data_df),
            "total_rows_before": count_before,
            "total_rows_after": count_after,
            "source_file": "ads_spend.csv",
            "message": f"Successfully ingested {len(input_data_df):,} rows into warehouse"
        }
        
        print(f"Register added: {len(input_data_df):,}")
        print(f"Total regiser in DB now: {count_after:,}")
        print(f"Increment: +{count_after - count_before:,}")
        
        return result

    # Error handling    
    except Exception as error:
        error_result = {
            "status": "error",
            "timestamp": datetime.now().isoformat(),
            "error_message": str(error),
            "error_type": type(error).__name__
        }
        print(f"ERROR: {error}")
        return error_result

##### Execute data ingest

In [8]:
# Ingest data to database
print("Data Ingest in Execution")
result = ingest_data()

# Get result for n8n
print(f"\nFinal Result: ")
print(json.dumps(result, indent=2))

Data Ingest in Execution
csv loaded: 2,000 rows, 10 columns
load_date: 2025-08-28 04:59:58.325771
source_file_name: ads_spend.csv
Table ads_spend_db verified/created
Register added: 2,000
Total regiser in DB now: 16,000
Increment: +2,000

Final Result: 
{
  "status": "success",
  "timestamp": "2025-08-28T04:59:58.669323",
  "rows_inserted": 2000,
  "total_rows_before": 14000,
  "total_rows_after": 16000,
  "source_file": "ads_spend.csv",
  "message": "Successfully ingested 2,000 rows into warehouse"
}


### Verify data persistence

##### Function to verify persistence

In [9]:
def data_persistence():

    # Connect to database    
    conn = duckdb.connect(DB_PATH)
    
    # Get general statistics
    total_records = conn.execute("SELECT COUNT(*) FROM ads_spend_db").fetchone()[0]
    
    # Get load info
    load_info = conn.execute("""
        SELECT 
            source_file_name,
            strftime(load_date,'%Y-%m-%d %H:%M') as load_minute,
            COUNT(*) as record_count,
            MIN(load_date) as first_load_time,
            MAX(load_date) as last_load_time
        FROM ads_spend_db 
        GROUP BY source_file_name, load_minute
        ORDER BY last_load_time DESC
    """).fetchall()
    
    print(f"Total records persisted {total_records:,}")
    print(f"Registered load sessions {len(load_info)}")
    
    print(f"\nLoad history")
    # Get last 5 five loads
    for load in load_info[:5]:
        filename, load_date, count, first_time, last_time = load
        print(f"{filename} | {load_date}")
        print(f"Registers: {count:,}")
        print(f"Datetime: {first_time}")
        print()
    
    # Verify data quality
    data_quality = conn.execute("""
        SELECT 
            COUNT(*) as total_records,
            COUNT(DISTINCT date) as unique_dates,
            COUNT(DISTINCT platform) as unique_platforms,
            COUNT(DISTINCT account) as unique_accounts,
            SUM(spend) as total_spend,
            SUM(conversions) as total_conversions,
            AVG(spend) as avg_spend_per_record
        FROM ads_spend_db
    """).fetchone()
    
    print(f"Data queality:")
    print(f"Total registers: {data_quality[0]:,}")
    print(f"Unique dates: {data_quality[1]:,}")  
    print(f"Unique platforms: {data_quality[2]}")
    print(f"Unique accounts: {data_quality[3]:,}")
    print(f"Total spent: ${data_quality[4]:,.2f}")
    print(f"Total conversions: {data_quality[5]:,}")
    print(f"Average spent per register: ${data_quality[6]:.2f}")
    
    # Get last data
    sample_data = conn.execute("""
        SELECT date, platform, account, spend, conversions, load_date
        FROM ads_spend_db 
        ORDER BY load_date DESC 
        LIMIT 5
    """).fetchall()
    
    print(f"\nLast data:")
    for row in sample_data:
        date, platform, account, spend, conversions, load_date = row
        print(f"   {date} | {platform} | {account} | ${spend} | {conversions} conv | {load_date}")
    
    conn.close()
    
    return {
        "status": "verified", 
        "total_records": total_records, 
        "load_sessions": len(load_info),
        "data_quality_check": "passed"
    }

##### Execute persistence verification

In [10]:
# Execute verification
persistence_result = data_persistence()
print(f"\nVerification result: {persistence_result}")

Total records persisted 16,000
Registered load sessions 8

Load history
ads_spend.csv | 2025-08-28 04:59
Registers: 2,000
Datetime: 2025-08-28 04:59:58.325771

ads_spend.csv | 2025-08-28 04:24
Registers: 2,000
Datetime: 2025-08-28 04:24:44.721864

ads_spend.csv | 2025-08-28 04:23
Registers: 2,000
Datetime: 2025-08-28 04:23:47.079539

ads_spend.csv | 2025-08-28 01:59
Registers: 2,000
Datetime: 2025-08-28 01:59:55.243061

ads_spend.csv | 2025-08-28 01:56
Registers: 2,000
Datetime: 2025-08-28 01:56:01.048397

Data queality:
Total registers: 16,000
Unique dates: 181
Unique platforms: 2
Unique accounts: 3
Total spent: $13,526,114.56
Total conversions: 439,336
Average spent per register: $845.38

Last data:
   2025-01-01 | Meta | AcctA | $1115.94 | 29 conv | 2025-08-28 04:59:58.325771
   2025-01-01 | Google | AcctA | $789.43 | 28 conv | 2025-08-28 04:59:58.325771
   2025-01-01 | Google | AcctA | $381.40 | 12 conv | 2025-08-28 04:59:58.325771
   2025-01-01 | Google | AcctC | $1268.34 | 36 con

### Create n8n executable script

##### Create script for n8n in python

In [21]:
# Python script content
script_content_ingest_data = '''#!/usr/bin/env python3

import importlib
import subprocess
import sys

# We list special packages that don't exist in jupyter installation
special_required_packages = {
    "duckdb": "duckdb",
    "pandas": "pandas"
}

# Verify special packages
for module_name, pip_name in special_required_packages.items():
    try:
        importlib.import_module(module_name)
        print(f"{module_name} already installed")
    except ImportError:
        print(f"Installing {pip_name}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", pip_name])

#import sys
import pandas as pd
import duckdb
from datetime import datetime
import json
import os

def main():
    try:

        # Localprobe
        #csv_path = "/home/jovyan/challenge/data/ads_spend.csv"
        #db_path = "/home/jovyan/challenge/database/warehouse.db"
        # Production
        csv_path = "/data/ads_spend.csv"
        db_path = "/database/warehouse.db"

        # Verify csv file again
        if not os.path.exists(csv_path):
            raise FileNotFoundError(f"{csv_path} csv not found")
            
        # Read CSV and convert to dataframe
        input_data_df = pd.read_csv(csv_path)
        print(f"csv loaded: {input_data_df.shape[0]:,} rows, {input_data_df.shape[1]} columns")
        
        # Validate if csv is empty
        if input_data_df.empty:
            raise ValueError("csv is empty")

        # Validate columns name (we put the list of columns that we already know exist)    
        required_columns = ['date', 'platform', 'account', 'campaign', 
                           'country', 'device', 'spend', 'clicks', 
                           'impressions', 'conversions']
        
        # Check if we have missing columns
        missing_cols = [col for col in required_columns if col not in input_data_df.columns]
        if missing_cols:
            raise ValueError(f"Missing columns: {missing_cols}")

        # Check null or empty values in columns
        null_empty_columns = [col for col in required_columns if input_data_df[col].isnull().any() or input_data_df[col].apply(lambda x: isinstance(x, str) and x.strip() == "").any()]

        if null_empty_columns:
            raise ValueError(f"Columnas con valores nulos o vacíos: {null_empty_columns}")
        
        # Add metadata
        # Date
        input_data_df['load_date'] = datetime.now()
        # Filename
        input_data_df['source_file_name'] = 'ads_spend.csv'
        
        print(f"load_date: {input_data_df['load_date'].iloc[0]}")
        print(f"source_file_name: {input_data_df['source_file_name'].iloc[0]}")
        
        # Connect to duckDB
        conn = duckdb.connect(db_path)
        
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS ads_spend_db (
            date DATE,
            platform VARCHAR,
            account VARCHAR,
            campaign VARCHAR,
            country VARCHAR,
            device VARCHAR,
            spend DECIMAL(12,2),
            clicks INTEGER,
            impressions INTEGER,
            conversions INTEGER,
            -- Metadata challenge required
            load_date TIMESTAMP,
            source_file_name VARCHAR
        );
        """
        # Verify table is already created or just verify
        conn.execute(create_table_sql)
        print("Table ads_spend_db verified/created")
        
        # Count register before add
        count_before = conn.execute("SELECT COUNT(*) FROM ads_spend_db").fetchone()[0]
        
        # Insert data in append mode to demostrate persistence
        conn.register('df_new', input_data_df)
        conn.execute("INSERT INTO ads_spend_db SELECT * FROM df_new")
        
        # Count register after add
        count_after = conn.execute("SELECT COUNT(*) FROM ads_spend_db").fetchone()[0]
        
        conn.close()
        
        # Result for n8n in JSON format
        result = {
            "status": "success",
            "timestamp": datetime.now().isoformat(),
            "rows_inserted": len(input_data_df),
            "total_rows_before": count_before,
            "total_rows_after": count_after,
            "source_file": "ads_spend.csv",
            "message": f"Successfully ingested {len(input_data_df):,} rows into warehouse"
        }
        
        print(f"Register added: {len(input_data_df):,}")
        print(f"Total regiser in DB now: {count_after:,}")
        print(f"Increment: +{count_after - count_before:,}")
        
        print(json.dumps(result))
        return 0
        
    except Exception as e:
        error = {
            "status": "error",
            "error_message": str(e),
            "error_type": type(e).__name__,
            "timestamp": datetime.now().isoformat()
        }
        print(json.dumps(error))
        return 1

if __name__ == "__main__":
    sys.exit(main())
'''

##### Function to create python script

In [70]:
def create_n8n_executable_script(script_path, script_content):
    
    with open(script_path, 'w') as file:
        file.write(script_content)
    
    # Make the script executable for n8n
    os.chmod(script_path, 0o755)
        
    print(f"Script created: {script_path}")
    print("Can execute by n8n with the node 'Execute Command'")
    
    return script_path

##### Execute function and extract path to probe locally after this execution

In [40]:
# Create script
script_path = "/home/jovyan/challenge/notebooks/n8n_ingest_script.py"
script_path_ingest_data = create_n8n_executable_script(script_path, script_content_ingest_data)

Script created: /home/jovyan/challenge/notebooks/n8n_ingest_script.py
Can execute by n8n with the node 'Execute Command'
Command to n8n: python3 /notebooks/n8n_ingest_script.py


##### Probe python script locally

In [34]:
# Probar el script localmente
print(f"\nProbe script locally")
import subprocess
del conn
result = subprocess.run(['python3', script_path_ingest_data], 
                       capture_output=True, text=True, cwd='/home/jovyan/challenge')

print(f"Return code: {result.returncode}")
print(f"Output: {result.stdout}")
if result.stderr:
    print(f"Errors: {result.stderr}")


Probe script locally
Return code: 0
Output: duckdb already installed
pandas already installed
csv loaded: 2,000 rows, 10 columns
load_date: 2025-08-28 17:17:17.018153
source_file_name: ads_spend.csv
Table ads_spend_db verified/created
Register added: 2,000
Total regiser in DB now: 26,000
Increment: +2,000
{"status": "success", "timestamp": "2025-08-28T17:17:17.300740", "rows_inserted": 2000, "total_rows_before": 24000, "total_rows_after": 26000, "source_file": "ads_spend.csv", "message": "Successfully ingested 2,000 rows into warehouse"}



# Part 2

### Data validation requeried for metrics

##### Verify if we have enough daterange for metrics

In [4]:
#localprobe
db_path = "/home/jovyan/challenge/database/warehouse.db"

# Verify database exists
if not os.path.exists(db_path):
    raise FileNotFoundError(f"Database not found: {db_path}")

# Connect to DuckDB
conn = duckdb.connect(db_path)
print("Connected to warehouse database")

# Check if we have enough data for 60 days analysis
days_available = conn.execute("""
    SELECT COUNT(DISTINCT date) as unique_days 
    FROM ads_spend_db
""").fetchone()[0]

print(f"\nUnique days in dataset: {days_available}")

if days_available < 30:
    print("Warning: Less than 30 days of data available")
elif days_available < 60:
    print("Warning: Less than 60 days of data available for comparison")

Connected to warehouse database

Unique days in dataset: 181


##### Probe metrics queries and final table

In [8]:
kpi_query = """
WITH last_30_days AS (
    SELECT 
        SUM(spend) as total_spend,
        SUM(conversions) as total_conversions,
        SUM(conversions * 100) as total_revenue,
        COUNT(DISTINCT date) as days_count
    FROM ads_spend_db 
    WHERE date >= (SELECT MAX(date) - INTERVAL 29 DAY FROM ads_spend_db)
        AND date <= (SELECT MAX(date) FROM ads_spend_db)
),

prior_30_days AS (
    SELECT 
        SUM(spend) as total_spend,
        SUM(conversions) as total_conversions,
        SUM(conversions * 100) as total_revenue,
        COUNT(DISTINCT date) as days_count
    FROM ads_spend_db 
    WHERE date >= (SELECT MAX(date) - INTERVAL 59 DAY FROM ads_spend_db)
        AND date <= (SELECT MAX(date) - INTERVAL 30 DAY FROM ads_spend_db)
),

metrics_comparison AS (
    SELECT 
        -- Last 30 Days Metrics
        ROUND(l.total_spend / NULLIF(l.total_conversions, 0), 2) as cac_last_30,
        ROUND(l.total_revenue / NULLIF(l.total_spend, 0), 2) as roas_last_30,
        
        -- Prior 30 Days Metrics  
        ROUND(p.total_spend / NULLIF(p.total_conversions, 0), 2) as cac_prior_30,
        ROUND(p.total_revenue / NULLIF(p.total_spend, 0), 2) as roas_prior_30,
        
        -- Raw values for context
        l.total_spend as spend_last_30,
        l.total_conversions as conversions_last_30,
        l.days_count as days_last_30,
        p.total_spend as spend_prior_30,
        p.total_conversions as conversions_prior_30,
        p.days_count as days_prior_30
        
    FROM last_30_days l
    CROSS JOIN prior_30_days p
)

SELECT 
    'CAC (Cost per Acquisition)' as metric,
    '$' || CAST(cac_last_30 AS VARCHAR) as last_30_days,
    '$' || CAST(cac_prior_30 AS VARCHAR) as prior_30_days,
    CASE 
        WHEN cac_prior_30 = 0 OR cac_prior_30 IS NULL THEN 'N/A'
        ELSE CAST(ROUND(((cac_last_30 - cac_prior_30) / cac_prior_30) * 100, 1) AS VARCHAR) || '%'
    END as percent_change
FROM metrics_comparison

UNION ALL

SELECT 
    'ROAS (Return on Ad Spend)' as metric,
    CAST(roas_last_30 AS VARCHAR) as last_30_days,
    CAST(roas_prior_30 AS VARCHAR) as prior_30_days,
    CASE 
        WHEN roas_prior_30 = 0 OR roas_prior_30 IS NULL THEN 'N/A'
        ELSE CAST(ROUND(((roas_last_30 - roas_prior_30) / roas_prior_30) * 100, 1) AS VARCHAR) || '%'
    END as percent_change
FROM metrics_comparison;
"""

kpi_results = conn.execute(kpi_query).fetchdf()
print(kpi_results.to_string(index=False))

                    metric last_30_days prior_30_days percent_change
CAC (Cost per Acquisition)       $29.81        $32.27          -7.6%
 ROAS (Return on Ad Spend)         3.35           3.1           8.1%


##### Function to get metrics

In [None]:
def analyze_kpis():

    # Error handling
    try:

        # Localprobe
        db_path = "/home/jovyan/challenge/database/warehouse.db"
        # Production
        #db_path = "/database/warehouse.db"
        
        # Verify database exists
        if not os.path.exists(db_path):
            raise FileNotFoundError(f"Database not found: {db_path}")
        
        # Connect to DuckDB
        conn = duckdb.connect(db_path)
        print("Connected to warehouse database")
        
        # Check if we have enough data for 60 days analysis
        days_available = conn.execute("""
            SELECT COUNT(DISTINCT date) as unique_days 
            FROM ads_spend_db
        """).fetchone()[0]
        
        print(f"\nUnique days in dataset: {days_available}")
        
        if days_available < 30:
            print("Warning: Less than 30 days of data available")
        elif days_available < 60:
            print("Warning: Less than 60 days of data available for comparison")

        # Final kpi query        
        kpi_query = """
        WITH last_30_days AS (
            SELECT 
                SUM(spend) as total_spend,
                SUM(conversions) as total_conversions,
                SUM(conversions * 100) as total_revenue,
                COUNT(DISTINCT date) as days_count
            FROM ads_spend_db 
            WHERE date >= (SELECT MAX(date) - INTERVAL 29 DAY FROM ads_spend_db)
              AND date <= (SELECT MAX(date) FROM ads_spend_db)
        ),
        
        prior_30_days AS (
            SELECT 
                SUM(spend) as total_spend,
                SUM(conversions) as total_conversions,
                SUM(conversions * 100) as total_revenue,
                COUNT(DISTINCT date) as days_count
            FROM ads_spend_db 
            WHERE date >= (SELECT MAX(date) - INTERVAL 59 DAY FROM ads_spend_db)
              AND date <= (SELECT MAX(date) - INTERVAL 30 DAY FROM ads_spend_db)
        ),
        
        metrics_comparison AS (
            SELECT 
                -- Last 30 Days Metrics
                ROUND(l.total_spend / NULLIF(l.total_conversions, 0), 2) as cac_last_30,
                ROUND(l.total_revenue / NULLIF(l.total_spend, 0), 2) as roas_last_30,
                
                -- Prior 30 Days Metrics  
                ROUND(p.total_spend / NULLIF(p.total_conversions, 0), 2) as cac_prior_30,
                ROUND(p.total_revenue / NULLIF(p.total_spend, 0), 2) as roas_prior_30,
                
                -- Raw values for context
                l.total_spend as spend_last_30,
                l.total_conversions as conversions_last_30,
                l.days_count as days_last_30,
                p.total_spend as spend_prior_30,
                p.total_conversions as conversions_prior_30,
                p.days_count as days_prior_30
                
            FROM last_30_days l
            CROSS JOIN prior_30_days p
        )
        
        SELECT 
            'CAC (Cost per Acquisition)' as metric,
            '$' || CAST(cac_last_30 AS VARCHAR) as last_30_days,
            '$' || CAST(cac_prior_30 AS VARCHAR) as prior_30_days,
            CASE 
                WHEN cac_prior_30 = 0 OR cac_prior_30 IS NULL THEN 'N/A'
                ELSE CAST(ROUND(((cac_last_30 - cac_prior_30) / cac_prior_30) * 100, 1) AS VARCHAR) || '%'
            END as percent_change
        FROM metrics_comparison
        
        UNION ALL
        
        SELECT 
            'ROAS (Return on Ad Spend)' as metric,
            CAST(roas_last_30 AS VARCHAR) as last_30_days,
            CAST(roas_prior_30 AS VARCHAR) as prior_30_days,
            CASE 
                WHEN roas_prior_30 = 0 OR roas_prior_30 IS NULL THEN 'N/A'
                ELSE CAST(ROUND(((roas_last_30 - roas_prior_30) / roas_prior_30) * 100, 1) AS VARCHAR) || '%'
            END as percent_change
        FROM metrics_comparison;
        """
        
        # Get kpi table
        kpi_results = conn.execute(kpi_query).fetchdf()
        print(kpi_results.to_string(index=False))

        # Get some database info
        data_overview = conn.execute("""
            SELECT 
                MIN(date) as earliest_date,
                MAX(date) as latest_date,
                COUNT(*) as total_records,
                COUNT(DISTINCT platform) as platforms,
                SUM(spend) as total_spend,
                SUM(conversions) as total_conversions
            FROM ads_spend_db
        """).fetchdf()

        # Results in dictionary format
        results_summary = {
            "analysis_date": datetime.now().isoformat(),
            "data_range": {
                "earliest_date": str(data_overview['earliest_date'].iloc[0]),
                "latest_date": str(data_overview['latest_date'].iloc[0]),
                "total_records": int(data_overview['total_records'].iloc[0])
            },
            "kpi_comparison": kpi_results.to_dict('records')
        }
        print(json.dumps(results_summary))
                
        conn.close()
        return
        
    except Exception as e:
        error = {
            "status": "error",
            "error_message": str(e),
            "error_type": type(e).__name__,
            "timestamp": datetime.now().isoformat()
        }
        print(json.dumps(error))
        return

##### Execute metric function

In [35]:
analyze_kpis()

Connected to warehouse database

Unique days in dataset: 181
                    metric last_30_days prior_30_days percent_change
CAC (Cost per Acquisition)       $29.81        $32.27          -7.6%
 ROAS (Return on Ad Spend)         3.35           3.1           8.1%
{"analysis_date": "2025-08-28T17:17:40.210086", "data_range": {"earliest_date": "2025-01-01 00:00:00", "latest_date": "2025-06-30 00:00:00", "total_records": 26000}, "kpi_comparison": [{"metric": "CAC (Cost per Acquisition)", "last_30_days": "$29.81", "prior_30_days": "$32.27", "percent_change": "-7.6%"}, {"metric": "ROAS (Return on Ad Spend)", "last_30_days": "3.35", "prior_30_days": "3.1", "percent_change": "8.1%"}]}


##### Create script for n8n in python

In [47]:
# Python script content
script_content_metrics = '''#!/usr/bin/env python3

import importlib
import subprocess
import sys

# We list special packages that don't exist in jupyter installation
special_required_packages = {
    "duckdb": "duckdb",
    "pandas": "pandas"
}

# Verify special packages
for module_name, pip_name in special_required_packages.items():
    try:
        importlib.import_module(module_name)
        #print(f"{module_name} already installed")
    except ImportError:
        #print(f"Installing {pip_name}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", pip_name])

#import sys
import pandas as pd
import duckdb
from datetime import datetime
import json
import os

def main():
        # Error handling
    try:

        # Localprobe
        #db_path = "/home/jovyan/challenge/database/warehouse.db"
        # Production
        db_path = "/database/warehouse.db"
        
        # Verify database exists
        if not os.path.exists(db_path):
            raise FileNotFoundError(f"Database not found: {db_path}")
        
        # Connect to DuckDB
        conn = duckdb.connect(db_path)
        #print("Connected to warehouse database")
        
        # Check if we have enough data for 60 days analysis
        days_available = conn.execute("""
            SELECT COUNT(DISTINCT date) as unique_days 
            FROM ads_spend_db
        """).fetchone()[0]
        
        print(f"Unique days in dataset: {days_available}")
        
        if days_available < 30:
            print("Warning: Less than 30 days of data available")
        elif days_available < 60:
            print("Warning: Less than 60 days of data available for comparison")

        # Final kpi query        
        kpi_query = """
        WITH last_30_days AS (
            SELECT 
                SUM(spend) as total_spend,
                SUM(conversions) as total_conversions,
                SUM(conversions * 100) as total_revenue,
                COUNT(DISTINCT date) as days_count
            FROM ads_spend_db 
            WHERE date >= (SELECT MAX(date) - INTERVAL 29 DAY FROM ads_spend_db)
              AND date <= (SELECT MAX(date) FROM ads_spend_db)
        ),
        
        prior_30_days AS (
            SELECT 
                SUM(spend) as total_spend,
                SUM(conversions) as total_conversions,
                SUM(conversions * 100) as total_revenue,
                COUNT(DISTINCT date) as days_count
            FROM ads_spend_db 
            WHERE date >= (SELECT MAX(date) - INTERVAL 59 DAY FROM ads_spend_db)
              AND date <= (SELECT MAX(date) - INTERVAL 30 DAY FROM ads_spend_db)
        ),
        
        metrics_comparison AS (
            SELECT 
                -- Last 30 Days Metrics
                ROUND(l.total_spend / NULLIF(l.total_conversions, 0), 2) as cac_last_30,
                ROUND(l.total_revenue / NULLIF(l.total_spend, 0), 2) as roas_last_30,
                
                -- Prior 30 Days Metrics  
                ROUND(p.total_spend / NULLIF(p.total_conversions, 0), 2) as cac_prior_30,
                ROUND(p.total_revenue / NULLIF(p.total_spend, 0), 2) as roas_prior_30,
                
                -- Raw values for context
                l.total_spend as spend_last_30,
                l.total_conversions as conversions_last_30,
                l.days_count as days_last_30,
                p.total_spend as spend_prior_30,
                p.total_conversions as conversions_prior_30,
                p.days_count as days_prior_30
                
            FROM last_30_days l
            CROSS JOIN prior_30_days p
        )
        
        SELECT 
            'CAC (Cost per Acquisition)' as metric,
            '$' || CAST(cac_last_30 AS VARCHAR) as last_30_days,
            '$' || CAST(cac_prior_30 AS VARCHAR) as prior_30_days,
            CASE 
                WHEN cac_prior_30 = 0 OR cac_prior_30 IS NULL THEN 'N/A'
                ELSE CAST(ROUND(((cac_last_30 - cac_prior_30) / cac_prior_30) * 100, 1) AS VARCHAR) || '%'
            END as percent_change
        FROM metrics_comparison
        
        UNION ALL
        
        SELECT 
            'ROAS (Return on Ad Spend)' as metric,
            CAST(roas_last_30 AS VARCHAR) as last_30_days,
            CAST(roas_prior_30 AS VARCHAR) as prior_30_days,
            CASE 
                WHEN roas_prior_30 = 0 OR roas_prior_30 IS NULL THEN 'N/A'
                ELSE CAST(ROUND(((roas_last_30 - roas_prior_30) / roas_prior_30) * 100, 1) AS VARCHAR) || '%'
            END as percent_change
        FROM metrics_comparison;
        """
        
        # Get kpi table
        kpi_results = conn.execute(kpi_query).fetchdf()
        #print(kpi_results.to_string(index=False))

        # Get some database info
        data_overview = conn.execute("""
            SELECT 
                MIN(date) as earliest_date,
                MAX(date) as latest_date,
                COUNT(*) as total_records,
                COUNT(DISTINCT platform) as platforms,
                SUM(spend) as total_spend,
                SUM(conversions) as total_conversions
            FROM ads_spend_db
        """).fetchdf()

        # Results in dictionary format
        results_summary = {
            "analysis_date": datetime.now().isoformat(),
            "data_range": {
                "earliest_date": str(data_overview['earliest_date'].iloc[0]),
                "latest_date": str(data_overview['latest_date'].iloc[0]),
                "total_records": int(data_overview['total_records'].iloc[0])
            },
            "kpi_comparison": kpi_results.to_dict('records')
        }
        print(json.dumps(results_summary))
                
        conn.close()
        return 0
        
    except Exception as e:
        error = {
            "status": "error",
            "error_message": str(e),
            "error_type": type(e).__name__,
            "timestamp": datetime.now().isoformat()
        }
        print(json.dumps(error))
        return 1

if __name__ == "__main__":
    sys.exit(main())
'''

##### Execute function and extract path to probe locally after this execution

In [48]:
# Create script
script_path = "/home/jovyan/challenge/notebooks/n8n_metrics_script.py"
script_path_metrics = create_n8n_executable_script(script_path, script_content_metrics)

Script created: /home/jovyan/challenge/notebooks/n8n_metrics_script.py
Can execute by n8n with the node 'Execute Command'
Command to n8n: python3 /notebooks/n8n_ingest_script.py


##### Probe python script locally

In [46]:
# Probar el script localmente
print(f"\nProbe script locally")
import subprocess
result = subprocess.run(['python3', script_path_metrics], 
                       capture_output=True, text=True, cwd='/home/jovyan/challenge')

print(f"Return code: {result.returncode}")
print(f"Output: {result.stdout}")
if result.stderr:
    print(f"Errors: {result.stderr}")


Probe script locally
Return code: 0
Output: duckdb already installed
pandas already installed
Connected to warehouse database
Unique days in dataset: 181
                    metric last_30_days prior_30_days percent_change
CAC (Cost per Acquisition)       $29.81        $32.27          -7.6%
 ROAS (Return on Ad Spend)         3.35           3.1           8.1%
{"analysis_date": "2025-08-28T17:25:48.337979", "data_range": {"earliest_date": "2025-01-01 00:00:00", "latest_date": "2025-06-30 00:00:00", "total_records": 26000}, "kpi_comparison": [{"metric": "CAC (Cost per Acquisition)", "last_30_days": "$29.81", "prior_30_days": "$32.27", "percent_change": "-7.6%"}, {"metric": "ROAS (Return on Ad Spend)", "last_30_days": "3.35", "prior_30_days": "3.1", "percent_change": "8.1%"}]}



# Part 3

### Get Data with API

In [116]:
url = "http://n8n:5678/webhook-test/api?endpoint=metrics"

try:
    # Call API
    response = requests.get(url)

    print("Status code:", response.status_code)
    print("Data:\n\n", response.json())

except requests.exceptions.RequestException as error:
    print("Error:", error)

Status code: 200
Data:

 {'analysis_date': '2025-08-29T19:50:54.614379', 'data_range': {'earliest_date': '2025-01-01 00:00:00', 'latest_date': '2025-06-30 00:00:00', 'total_records': 26000}, 'kpi_comparison': [{'metric': 'CAC (Cost per Acquisition)', 'last_30_days': '$29.81', 'prior_30_days': '$32.27', 'percent_change': '-7.6%'}, {'metric': 'ROAS (Return on Ad Spend)', 'last_30_days': '3.35', 'prior_30_days': '3.1', 'percent_change': '8.1%'}]}


# Part 4

### Experiment with api

##### Verify response

In [20]:
url = "http://n8n:5678/webhook-test/api"
params = {
    "question": "hola que tal",
    "agent": "yes"
}

try:
    # Call API
    response = requests.get(url, params=params)

    print("Status code:", response.status_code)
    print("Data:\n\n", response.json())

except requests.exceptions.RequestException as error:
    print("Error:", error)

Status code: 200
Error: Expecting value: line 1 column 1 (char 0)


##### Integer NL simple example

In [None]:
def format_response(data):
    
    # Transform to human lenguage
    if "error" in data:
        return f"Error getting data: {data['error']}"
    
    answer = "Marketing Performance Analysis\n\n"
    
    # Summary
    if "kpi_comparison" in data:
        s_cac = data.get("kpi_comparison", [{}])[0]
        s_roas = data.get("kpi_comparison", [{}])[0]
        answer += "**Key Metrics:**\n"
        answer += f"• last_30_days CAC: ${s_cac.get('last_30_days', 'N/A')}\n"
        answer += f"• last_30_days ROAS: {s_roas.get('last_30_days', 'N/A')}x\n"
        answer += f"• prior_30_days CAC: ${s_cac.get('prior_30_days', 'N/A')}\n"
        answer += f"• prior_30_days ROAS: {s_roas.get('prior_30_days', 'N/A')}x\n"
        
        # Percent Change
        cac_change = s_cac.get('percent_change', 'N/A')
        roas_change = s_roas.get('percent_change', 'N/A')

        # Try to parse to float if it's a string with %
        def parse_change(val):
            if val is None:
                return None
            if isinstance(val, (int, float)):
                return val
            try:
                return float(str(val).replace("%", "").strip())
            except ValueError:
                return None

        cac_change_val = parse_change(cac_change)
        roas_change_val = parse_change(roas_change)

        
        if cac_change_val is not None:
            trend = "📉" if cac_change_val < 0 else "📈"
            answer += f"• CAC Change: {trend} {cac_change}%\n"
            
        if roas_change_val is not None:
            trend = "📈" if roas_change_val > 0 else "📉"  
            answer += f"• ROAS Change: {trend} {roas_change}%\n"
    
    answer += "\n **Lower CAC + Higher ROAS = Better Performance!**"
    
    return answer

def main():
    
    # Default question if the petition doesn't have argument
    question = sys.argv[1] if len(sys.argv) > 1 else "percent change"
    params = {
        "question": "hola que tal",
        "agent": "yes"
    }
    
    try:
        # Call Data
        response = requests.get("http://n8n:5678/webhook-test/api", params=params)
        api_data = response.json()
        
        # Format human response
        answer = format_response(api_data)
        
        # Return JSON for webhook
        result = {
            "success": True,
            "question": question,
            "answer": answer,
            "timestamp": datetime.now().isoformat()
        }
        
        print(json.dumps(result))
        
    except Exception as e:
        # Error response
        error_result = {
            "success": False,
            "question": question,
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        }
        
        print(json.dumps(error_result))

In [68]:
main()

{"success": true, "question": "-f", "answer": "Marketing Performance Analysis\n\n**Key Metrics:**\n\u2022 last_30_days CAC: $$29.81\n\u2022 last_30_days ROAS: $29.81x\n\u2022 prior_30_days CAC: $$32.27\n\u2022 prior_30_days ROAS: $32.27x\n\u2022 CAC Change: \ud83d\udcc9 -7.6%%\n\u2022 ROAS Change: \ud83d\udcc9 -7.6%%\n\n **Lower CAC + Higher ROAS = Better Performance!**", "timestamp": "2025-08-29T18:56:28.955134"}


##### Create script for n8n in python

In [None]:
# Python script content
script_content_agent = '''#!/usr/bin/env python3

import json
import sys
from datetime import datetime

def main():
    
    # Default question if the petition doesn't have argument
    question = sys.argv[2] if len(sys.argv) > 1 else "percent change"
    response = sys.argv[1]
        
    try:
        # Call Data
        #response = requests.get("http://n8n:5678/webhook-test/api")
        data = json.loads(response)
        
        # Transform to human lenguage
        if "error" in data:
            return f"Error getting data: {data['error']}"
        
        answer = "Marketing Performance Analysis"
        
        # Summary
        if "kpi_comparison" in data:
            s_cac = data.get("kpi_comparison", [{}])[0]
            s_roas = data.get("kpi_comparison", [{}])[1]
            answer += "**Key Metrics:**"
            answer += f"• last_30_days CAC: ${s_cac.get('last_30_days', 'N/A')}"
            answer += f"• last_30_days ROAS: {s_roas.get('last_30_days', 'N/A')}x"
            answer += f"• prior_30_days CAC: ${s_cac.get('prior_30_days', 'N/A')}"
            answer += f"• prior_30_days ROAS: {s_roas.get('prior_30_days', 'N/A')}x"
            
            # Percent Change
            cac_change = s_cac.get('percent_change', 'N/A')
            roas_change = s_roas.get('percent_change', 'N/A')

            # Try to parse to float if it's a string with %
            def parse_change(val):
                if val is None:
                    return None
                if isinstance(val, (int, float)):
                    return val
                try:
                    return float(str(val).replace("%", "").strip())
                except ValueError:
                    return None

            cac_change_val = parse_change(cac_change)
            roas_change_val = parse_change(roas_change)

            
            if cac_change_val is not None:
                trend = "📉" if cac_change_val < 0 else "📈"
                answer += f"• CAC Change: {trend} {cac_change}%"
                
            if roas_change_val is not None:
                trend = "📈" if roas_change_val > 0 else "📉"  
                answer += f"• ROAS Change: {trend} {roas_change}%"
        
        answer += "**Lower CAC + Higher ROAS = Better Performance!**"
        
        # Return JSON for webhook
        result = {
            "success": True,
            "question": question,
            "answer": answer,
            "timestamp": datetime.now().isoformat()
        }
        
        print(json.dumps(result))
        return 0
        
    except Exception as e:
        # Error response
        error_result = {
            "success": False,
            "question": question,
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        }
        
        print(json.dumps(error_result))
        return 1

if __name__ == "__main__":
    sys.exit(main())
'''

##### Execute function and extract

In [104]:
# Create script
script_path = "/home/jovyan/challenge/notebooks/n8n_agent_script.py"
script_path_ingest_data = create_n8n_executable_script(script_path, script_content_agent)

Script created: /home/jovyan/challenge/notebooks/n8n_agent_script.py
Can execute by n8n with the node 'Execute Command'


##### Final test

In [115]:
url = "http://n8n:5678/webhook-test/api"
params = {
    "question": "percentage change",
    "agent": "yes"
}

try:
    # Call API
    response = requests.get(url, params=params)
    response = response.json()

    print("Data:\n\n", response["answer"])

except requests.exceptions.RequestException as error:
    print("Error:", error)

Data:

 Marketing Performance Analysis**Key Metrics:**• last_30_days CAC: $$29.81• last_30_days ROAS: 3.35x• prior_30_days CAC: $$32.27• prior_30_days ROAS: 3.1x• CAC Change: 📉 -7.6%%• ROAS Change: 📈 8.1%%**Lower CAC + Higher ROAS = Better Performance!**
