In [None]:
import sqlite3
import pandas as pd

db_path = "data/challenge.db"
conn = sqlite3.connect(db_path)

### Data Exploration

In [None]:
start_end_query = """
SELECT MIN(event_date) AS start_date, MAX(event_date) AS end_date
FROM session_sources
"""
start_end_dates = pd.read_sql(start_end_query, conn)
print(f"Data starts from: {start_end_dates['start_date'][0]}")
print(f"Data ends on: {start_end_dates['end_date'][0]}")

In [None]:
query = "SELECT name FROM sqlite_master WHERE type='table';"
tables = pd.read_sql_query(query, conn)
tables

In [None]:
query = "SELECT name FROM sqlite_master WHERE type='table';"
tables = pd.read_sql_query(query, conn)
tables

In [None]:
pd.read_sql_query("PRAGMA table_info(attribution_customer_journey);", conn)

In [None]:
# session_sources table
pd.read_sql_query("SELECT * FROM session_sources LIMIT 5;", conn)

In [None]:
# conversions table
pd.read_sql_query("SELECT * FROM conversions LIMIT 5;", conn)

In [None]:
# session_costs table
pd.read_sql_query("SELECT * FROM session_costs LIMIT 5;", conn)

### Time-Range Input
####  Allowing to analyze marketing performance for specific time periods

In [None]:
def get_valid_table_names(conn):
    """Fetch all table names in the database for validation."""
    query = "SELECT name FROM sqlite_master WHERE type='table';"
    result = pd.read_sql_query(query, conn)
    return result['name'].tolist()

def get_date_input(prompt):
    """Function to handle date input validation."""
    while True:
        try:
            date_input = input(prompt)
            date = pd.to_datetime(date_input, format='%Y-%m-%d')
            return date
        except ValueError:
            print("Invalid date format. Please enter the date in 'YYYY-MM-DD' format.")

start_date = get_date_input("Enter start date (YYYY-MM-DD): ")
end_date = get_date_input("Enter end date (YYYY-MM-DD): ")

# Converting to string format for SQL query
start_date_str = start_date.strftime('%Y-%m-%d')
end_date_str = end_date.strftime('%Y-%m-%d')

# Tables to query from
valid_tables = get_valid_table_names(conn)
print(f"Available tables: {', '.join(valid_tables)}")

tables_input = input("Enter the tables to query from (comma separated): ")
tables_to_query = [table.strip() for table in tables_input.split(',')]

# Validate table names
invalid_tables = [table for table in tables_to_query if table not in valid_tables]
if invalid_tables:
    print(f"Invalid table(s): {', '.join(invalid_tables)}. Please select valid table names.")
else:
    # Base query 
    query = """
        SELECT *
        FROM session_sources
    """

    # JOINs if different tables are selected
    if 'conversions' in tables_to_query:
        query += " LEFT JOIN conversions ON session_sources.user_id = conversions.user_id"
    if 'session_costs' in tables_to_query:
        query += " LEFT JOIN session_costs ON session_sources.session_id = session_costs.session_id"

    # date filter only for session sources, cost and conversions tables
    query += " WHERE session_sources.event_date BETWEEN ? AND ?"

    # Execute the query with parameters
    try:
        df = pd.read_sql_query(query, conn, params=(start_date_str, end_date_str))
        print("Data fetched successfully:")
        print(df.head())
    except Exception as e:
        print(f"An error occurred: {e}")

### Data Cleaning

In [None]:
session_costs = pd.read_sql_query("SELECT * FROM session_costs;", conn)

missing_count = session_costs['cost'].isna().sum()
total_count = len(session_costs)

print(f"Missing values in 'cost': {missing_count}")
print(f"Total rows in 'session_costs': {total_count}")

In [None]:

session_sources = pd.read_sql_query("SELECT * FROM session_sources;", conn)
missing_sources = session_sources.isna().sum()

conversions = pd.read_sql_query("SELECT * FROM conversions;", conn)
missing_conversions = conversions.isna().sum()

print("Missing values in session_sources:")
print(missing_sources)

print("\nMissing values in conversions:")
print(missing_conversions)

In [None]:
duplicates_sources = session_sources.duplicated().sum()
print(f"Duplicate rows in session_sources: {duplicates_sources}")

duplicates_conversions = conversions.duplicated().sum()
print(f"Duplicate rows in conversions: {duplicates_conversions}")

duplicates_costs = session_costs.duplicated().sum()
print(f"Duplicate rows in session_costs: {duplicates_costs}")

#### Replacing NaN with 0.0 ensures that these rows are included in calculations with an appropriate assumption like A user might visit the website through organic search or direct traffic, which typically doesn’t incur a cost

In [None]:
session_costs['cost'] = session_costs['cost'].fillna(0.0)

print(session_costs['cost'].isna().sum())

# Data Transformation

In [None]:
# Extract session_sources
session_sources = pd.read_sql_query("SELECT * FROM session_sources;", conn)

# Extract conversions
conversions = pd.read_sql_query("SELECT * FROM conversions;", conn)

In [None]:
print("Session Sources:")
print(session_sources.head())

print("\nConversions:")
print(conversions.head())

In [None]:
# Mergeing session sources with conversions on user_id
merged = session_sources.merge(conversions, on='user_id', suffixes=('_session', '_conversion'))

# Filtering sessions that occurred before conversions using sessions date and time
filtered_sessions = merged[
    (
        (merged['event_date'] < merged['conv_date']) 
        |  
        (
            (merged['event_date'] == merged['conv_date']) & 
            (merged['event_time'] < merged['conv_time'])  
        )
    )
].sort_values(by=['conv_id', 'event_date', 'event_time']) 

In [None]:
filtered_sessions

In [None]:
# Grouping sessions by conv id and convert to dictionary
grouped_journeys = filtered_sessions.groupby("conv_id").apply(lambda x: x.to_dict(orient="records")).to_dict()

grouped_journeys[list(grouped_journeys.keys())[0]]


In [None]:
len(grouped_journeys)

In [None]:
for conv_id, sessions in grouped_journeys.items():
    for session in sessions:
        print(session['channel_name']) 

### Transform customer journeys Data for API input

In [None]:
grouped_journeys_transformed = {
    conv_id: [
        {
            "conversion_id": conv_id, 
            "session_id": session["session_id"],
            "timestamp": f"{session['event_date']} {session['event_time']}",  # Concatenate event date and time for session
            "channel_label": session["channel_name"],  
            "holder_engagement": session["holder_engagement"],
            "closer_engagement": session["closer_engagement"],
            "conversion": 1 if session["conv_id"] == conv_id else 0,  # Conversion happens if the conv id matches
            "impression_interaction": session["impression_interaction"],
        }
        for session in sessions
    ]
    for conv_id, sessions in grouped_journeys.items()
}

redistribution_parameter = {
    'initializer': {
        'direction': 'earlier_sessions_only',
        'receive_threshold': 0,
        'redistribution_channel_labels': [
            'Direct Traffic', 
            'Newsletter & Email', 
            'FB & IG Ads', 
            'TikTok Ads',  
            'Paid Search Brand', 
            'Organic Traffic', 
            'Referral', 
            'Affiliate & Partnerships', 
            'Performance Max', 
            'Paid Search Non Brand', 
            'Microsoft Ads', 
            'Social Organic' 
        ],
    },
    'holder': {
        'direction': 'any_session',
        'receive_threshold': 0,
        'redistribution_channel_labels': [
            'Direct Traffic', 
            'Newsletter & Email', 
            'Organic Traffic', 
            'FB & IG Ads', 
            'TikTok Ads', 
            'Referral', 
            'Paid Search Non Brand', 
            'Social Organic', 
            'Affiliate & Partnerships'
        ],
    },
    'closer': {
        'direction': 'later_sessions_only',
        'receive_threshold': 0.1,
        'redistribution_channel_labels': [
            'Paid Search Brand', 
            'Organic Traffic', 
            'Referral', 
            'Performance Max', 
            'Social Organic'
        ],
    }
}

api_data = {
    'customer_journeys': grouped_journeys_transformed,
    'redistribution_parameter': redistribution_parameter
}

import json
with open('customer_journeys.json', 'w') as f:
    json.dump(api_data, f, indent=4)

print("Data Ready for API:")
print(api_data)

In [None]:
import json

file_path = '/Users/vee/Desktop/ihc_data_pipeline/customer_journeys.json'

with open(file_path, 'r') as f:
    api_data = json.load(f)

print(api_data.keys())  

In [None]:
import json
import csv

json_file_path = 'customer_journeys.json'  
csv_file_path = 'customer_journeys.csv'   


with open(json_file_path, 'r') as file:
    api_data = json.load(file)

customer_journeys = api_data['customer_journeys']

flattened_data = []
for conv_id, sessions in customer_journeys.items():
    for session in sessions:
        flattened_data.append({
            "conversion_id": session["conversion_id"],
            "session_id": session["session_id"],
            "timestamp": session["timestamp"],
            "channel_label": session["channel_label"],
            "holder_engagement": session["holder_engagement"],
            "closer_engagement": session["closer_engagement"],
            "conversion": session["conversion"],
            "impression_interaction": session["impression_interaction"],
        })

csv_headers = [
    "conversion_id", "session_id", "timestamp", "channel_label",
    "holder_engagement", "closer_engagement", "conversion", "impression_interaction"
]

with open(csv_file_path, 'w', newline='') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=csv_headers)
    writer.writeheader()
    writer.writerows(flattened_data)

print(f"CSV file created at: {csv_file_path}")