In [None]:
import json
import os
import sys
import pandas as pd
import requests
import threading
import time
from dotenv import load_dotenv

# ------------
# INSTRUCTIONS
#
# Set the FROM_DATE and TO_DATE variables to define the frame of data you want to load.
# Then, run this cell. The first time you run it, it will get all the Mixpanel events
# in the time range, then it will save that data to a json file in /app/analytics/.
# The next time you run this cell, it will load from the json file (unless you change the dates).
#
# ------------
# Helpful Dates
#
# Completed pilots:
#   LA LWC May run:           2025-05-18 - 2025-06-30
#   AZ Constrained MAC Pilot: 2025-06-13 - 2025-08-13 7:29pm MST
# Incomplete pilots:
#   LA LWC August Run:        2025-08-17            - ??? Final day to query before publish date of report
#   AZ Expanded MAC Pilot:    2025-08-13 7:30pm MST - ??? Final day to query before publish date of report
#
# -------------
FROM_DATE = '2025-05-18'
TO_DATE = '2025-06-30'
CLIENT_AGENCY = 'la_ldh'
FORCE_RELOAD_FROM_API = False

def fetch_mixpanel_data_from_api(params):
    API_ENDPOINT = 'https://data.mixpanel.com/api/2.0/export'

    try:
        project_root = os.path.dirname(os.getcwd())
        dotenv_path = os.path.join(project_root, '.env.local')
        load_dotenv(dotenv_path=dotenv_path)
        
        SA_USERNAME = os.environ["MIXPANEL_SERVICE_ACCOUNT_USERNAME"].strip()
        SA_SECRET = os.environ["MIXPANEL_SERVICE_ACCOUNT_SECRET"].strip()
        PROJECT_ID = os.environ["MIXPANEL_PROJECT_ID"].strip()
        AUTH_CREDS = (SA_USERNAME, SA_SECRET)
    except KeyError as e:
        print(f"Error: Environment variable {e} not found. Please check your .env.local file.", file=sys.stderr)
        sys.exit(1)

    params['project_id'] = PROJECT_ID
    
    print("Fetching data from Mixpanel API...")
    try:
        response = requests.get(
            API_ENDPOINT,
            headers={"accept": "text/plain"},
            params=params,
            auth=AUTH_CREDS
        )
        response.raise_for_status()
        print("Successfully fetched data from API.")
        return response.text
    except requests.exceptions.RequestException as e:
        print(f"Error: API request failed. {e}", file=sys.stderr)
        return None

def parse_raw_data_to_df(raw_text):
    """Parses newline-delimited JSON text into a pandas DataFrame."""
    raw_data = []
    for line in raw_text.strip().split('\n'):
        if not line:
            continue
        try:
            raw_data.append(json.loads(line))
        except json.JSONDecodeError:
            print(f"Warning: Could not decode line: {line}", file=sys.stderr)
    
    if not raw_data:
        return pd.DataFrame()
        
    return pd.DataFrame(raw_data)

def deduplicate_events(df):
    """De-duplicates a DataFrame of Mixpanel events, keeping the latest event per $insert_id."""
    if df.empty:
        return df

    print(f"Original event count: {len(df)}")
    
    df['timestamp'] = pd.to_datetime(
        df['properties'].apply(lambda p: p.get('time') or p.get('timestamp')),
        unit='s'
    )
    df['$insert_id'] = df['properties'].apply(lambda p: p.get('$insert_id'))
    
    # Drop rows where '$insert_id' is missing, as they cannot be de-duplicated
    df.dropna(subset=['$insert_id'], inplace=True)
    
    df.sort_values('timestamp', inplace=True)
    df.drop_duplicates(subset=['$insert_id'], keep='last', inplace=True)
    df.drop(columns=['$insert_id'], inplace=True)
    
    print(f"Event count after de-duplication: {len(df)}")
    return df

def get_mixpanel_data(from_date, to_date, force_reload=False):
    """
    Loads de-duplicated Mixpanel data from a local file if it exists,
    otherwise fetches, processes, de-duplicates, and saves it.
    """
    # NOTE: The saved file is now the CLEAN, DE-DUPLICATED data.
    file_name = f"mixpanel_data_{from_date}_to_{to_date}.json"
    
    if os.path.exists(file_name) and not force_reload:
        print(f"Loading de-duplicated data from local file: {file_name}")
        clean_df = pd.read_json(file_name, orient='records', lines=True)
        return clean_df

    # Fetch data from API
    params = {'from_date': from_date, 'to_date': to_date}
    raw_text = fetch_mixpanel_data_from_api(params)
    
    if raw_text is None:
        return pd.DataFrame()

    # De-duplicate
    df = parse_raw_data_to_df(raw_text)
    clean_df = deduplicate_events(df)
    
    # Save the de-duplicated data for future use
    if not clean_df.empty:
        clean_df.to_json(file_name, orient='records', lines=True, date_format='iso')
        print(f"Clean, de-duplicated data saved to {file_name}")
        
    return clean_df


def spinning_cursor():
    while is_loading:
        for cursor in '|/-\\':
            print(f"\r{cursor}", end="", flush=True)
            time.sleep(0.1)
    # Clear the spinner line after loading is complete
    print("\r" + " " * 20 + "\r", end="", flush=True)

is_loading = True
spinner_thread = threading.Thread(target=spinning_cursor)
spinner_thread.start()

df = get_mixpanel_data(FROM_DATE, TO_DATE, force_reload=FORCE_RELOAD_FROM_API)

is_loading = False
spinner_thread.join()

if not df.empty:
    print(f"\nTotal deduplicated events loaded: {len(df)}")
    if CLIENT_AGENCY:
        mask = df['properties'].apply(lambda p: p.get('client_agency_id') == CLIENT_AGENCY)
        df_filtered = df[mask]
        print(f"Found {len(df_filtered)} events with client_agency_id set to '{CLIENT_AGENCY}'")
else:
    print("No data available to process.")


In [None]:
# Users who shared the income summary
income_events_df = df[df['event'] == 'ApplicantSharedIncomeSummary']
users_who_shared_pdf = income_events_df['properties'].apply(lambda p: p.get('distinct_id')).unique()
print(f"Found {len(users_who_shared_pdf)} users with at least one 'ApplicantSharedIncomeSummary' event.")

# Create a df of just these users
mask = df['properties'].apply(lambda p: p.get('distinct_id')).isin(users_who_shared_pdf)
users_who_shared_pdf_df = df[mask]

# Find all the Finished*Sync events for these users (these are the "mega events")
sync_events = ['ApplicantFinishedArgyleSync', 'ApplicantFinishedPinwheelSync']
sync_events_df = users_who_shared_pdf_df[users_who_shared_pdf_df['event'].isin(sync_events)].copy()

# Extract the distinct_id and timestamp from the properties dictionary.
sync_events_df['distinct_id'] = sync_events_df['properties'].apply(lambda p: p.get('distinct_id'))
sync_events_df['timestamp'] = pd.to_datetime(sync_events_df['properties'].apply(lambda p: p.get('timestamp')))

# Sort the DataFrame by user and then by time to ensure the most recent event is last
sync_events_df = sync_events_df.sort_values(by=['distinct_id', 'timestamp'])

# De-duplicate and keep only the most recent event for each user
latest_sync_per_user_df = sync_events_df.drop_duplicates(subset='distinct_id', keep='last').copy()

# Extract the final counts from this de-duplicated DataFrame
latest_sync_per_user_df['total_w2_count'] = latest_sync_per_user_df['properties'].apply(lambda p: p.get('employment_type_w2_count', 0))
latest_sync_per_user_df['total_gig_count'] = latest_sync_per_user_df['properties'].apply(lambda p: p.get('employment_type_gig_count', 0))

# Create a DataFrame with the user as the index
user_employment_counts = latest_sync_per_user_df.set_index('distinct_id')[['total_w2_count', 'total_gig_count']]

# Df of users who have at least one job
users_with_jobs_df = user_employment_counts[
    (user_employment_counts['total_w2_count'] + user_employment_counts['total_gig_count'] > 0)
]

print("\nJob counts per user: ")
user_employment_counts['total_jobs'] = user_employment_counts['total_w2_count'] + user_employment_counts['total_gig_count']
job_distribution = user_employment_counts['total_jobs'].value_counts().sort_index()
print(job_distribution)

users_with_gigs_count = len(user_employment_counts[user_employment_counts['total_gig_count'] > 0])
print(f"\nFound {users_with_gigs_count} users with at least one gig source.")

In [None]:
# Debugging -- See one user's events

import pprint

target_user_id = 'applicant-100114' 
temp_df = users_who_shared_pdf_df.copy()
temp_df['distinct_id'] = temp_df['properties'].apply(lambda p: p.get('distinct_id'))
temp_df['timestamp'] = pd.to_datetime(
    temp_df['properties'].apply(lambda p: p.get('time') or p.get('timestamp')), unit='s'
)

user_events_df = temp_df[temp_df['distinct_id'] == target_user_id]
sorted_user_events = user_events_df.sort_values(by='timestamp')

pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)

print(f"Showing all events for user '{target_user_id}', sorted by time:")
print(sorted_user_events[['timestamp', 'event', 'properties']])

pd.reset_option('display.max_rows')
pd.reset_option('display.max_colwidth')