In [0]:
import sys
import os
sys.path.append(os.path.join(os.path.dirname(os.getcwd()), 'src'))

import yaml
import pandas as pd
import numpy as np

from data_pull.loaders import (
    load_user_table,
    load_task_complete_table,
    load_respondent_info_table,
    load_task_table,
    load_ditr_table,
    load_all_wonky_studies
)
from data_pull.joiners import (
    join_user_task_respondent,
    join_wonky_balance_with_task,
    merge_wonky_data_with_user_info
)
from data_pull.aggregators import (
    enrich_user_info_with_task_counts,
    union_wonky_study_dataframes,
    create_wonky_respondent_summary,
    calculate_wonky_task_ratio
)

with open('../configs/data_paths.yaml', 'r') as f:
    paths_config = yaml.safe_load(f)

with open('../configs/wonky_studies.yaml', 'r') as f:
    wonky_config = yaml.safe_load(f)

try:
    spark.catalog.clearCache()
except Exception:
    pass

print("Setup complete")

In [0]:
# Notebook config
SILVER_PATH = paths_config['silver_path']
COUNTRY = paths_config['filters']['country']
MIN_DATE = paths_config['filters']['min_date']
TASK_ORIGIN = paths_config['filters']['task_origin']
PROJECT_REPO_PATH = paths_config['project_repository_path']

print(f"Country: {COUNTRY}")
print(f"Min date: {MIN_DATE}")
print(f"Task origin: {TASK_ORIGIN}")

In [0]:
s_user = load_user_table(spark, SILVER_PATH, country=COUNTRY)
s_task_complete = load_task_complete_table(
    spark, SILVER_PATH, min_date=MIN_DATE, task_origin=TASK_ORIGIN
)
s_respondent_info = load_respondent_info_table(
    spark,
    SILVER_PATH,
    country=COUNTRY,
    exclude_cols=wonky_config.get("cols_to_exclude_respondent_info", []),
) # exclusions needed to reduce size ammend config if cols are needed
s_ditr = load_ditr_table(spark, SILVER_PATH, user_df=s_user)  
s_task = load_task_table(spark, SILVER_PATH)

print(f"Users: {s_user.count():,}")
print(f"Task completions: {s_task_complete.count():,}")
print(f"Respondent info: {s_respondent_info.count():,}")
print(f"DITR records: {s_ditr.count():,}")
print(f"Tasks: {s_task.count():,}")

In [0]:
user_info = join_user_task_respondent(
    s_user, 
    s_task_complete, 
    s_respondent_info,
    ditr_df=s_ditr
)

# Enrich with task counts and time calculations
user_info_enriched = enrich_user_info_with_task_counts(user_info)

print(f"Joined records: {user_info_enriched.count():,}")
print(f"Unique respondents: {user_info_enriched.select('respondentPk').distinct().count():,}")

In [0]:
print(f"Rows to collect: {user_info_enriched.count():,}")
print(f"Estimated size: {user_info_enriched.count() * len(user_info_enriched.columns) * 50 / 1e9:.2f} GB")

In [0]:
user_info_df = user_info_enriched.toPandas()
tasks_df = s_task.toPandas()

# Merge task metadata
user_info_df = user_info_df.merge(
    tasks_df[['task_pk', 'task_length_of_task']], 
    left_on='taskPk', 
    right_on='task_pk', 
    how='left'
)

print(f"Task-level records: {len(user_info_df):,}")
print(f"Unique respondents: {user_info_df['respondentPk'].nunique():,}")
print(f"Avg tasks/respondent: {len(user_info_df) / user_info_df['respondentPk'].nunique():.2f}")

In [0]:
try:
    spark.catalog.clearCache()
    print("cache cleared boss")
except Exception:
    pass

In [0]:
print(f"Rows to collect: {user_info_enriched.count():,}")
print(f"Estimated size: {user_info_enriched.count() * len(user_info_enriched.columns) * 50 / 1e9:.2f} GB")

In [0]:
def process_wonky_studies(spark, uuids, task_df, base_path, cols_to_include, wonky_flag):
    """Load and process wonky studies, returning task-level DataFrame."""
    balance_dfs, failed = load_all_wonky_studies(
        spark, uuids, base_path=base_path, 
        cols_to_include=cols_to_include, verbose=True,
        max_workers=4
    )
    
    if not balance_dfs:
        return pd.DataFrame()
    
    # Join each balance table with task table and union
    joined_dfs = [join_wonky_balance_with_task(df, task_df) for df in balance_dfs]
    wonky_spark = union_wonky_study_dataframes(joined_dfs)

    print(f"Rows to collect: {wonky_spark.count():,}")
    print(f"Estimated size: {wonky_spark.count() * len(wonky_spark.columns) * 50 / 1e9:.2f} GB")
    
    wonky_df = wonky_spark.toPandas()
    wonky_df['wonky_study_count'] = wonky_flag
    
    print(f"  Loaded {len(balance_dfs)}/{len(uuids)} studies | {wonky_df['respondent_pk'].nunique():,} respondents")
    return wonky_df

# Split UUIDs by wonky status
uuids_wonky = [u for u, is_wonky in wonky_config['wonky_study_uuids'].items() if is_wonky]
uuids_control = [u for u, is_wonky in wonky_config['wonky_study_uuids'].items() if not is_wonky]

print("Processing wonky studies...")
wonky_map_exposed = process_wonky_studies(
    spark, uuids_wonky, s_task,
    PROJECT_REPO_PATH, wonky_config['cols_to_include_subset'], wonky_flag=1
)

print("Processing control studies...")
wonky_map_control = process_wonky_studies(
    spark, uuids_control, s_task,
    PROJECT_REPO_PATH, wonky_config['cols_to_include_subset'], wonky_flag=0
)

# Combine
wonky_map = pd.concat([wonky_map_exposed, wonky_map_control], ignore_index=True)
print(f"\nTotal wonky map: {len(wonky_map):,} records | {wonky_map['respondent_pk'].nunique():,} respondents")

In [0]:
try:
  spark.catalog.clearCache()
  print("cache cleared boss")
except Exception:
  pass

In [0]:
wonky_respondent_df = (
    wonky_map
    .groupby(wonky_config['cols_to_group'])
    .agg({'uuid': 'count', 'wonky_study_count': 'sum'})
    .reset_index()
    .rename(columns={'uuid': 'balance_study_count', 'respondent_pk': 'balance_respondentPk'})
)

# Drop unwanted columns
cols_to_drop = wonky_config.get('cols_to_drop', [])
wonky_respondent_df = wonky_respondent_df.drop(
    columns=[c for c in cols_to_drop if c in wonky_respondent_df.columns]
)

print(f"Wonky respondent-level: {len(wonky_respondent_df):,} records")

In [0]:
user_info_final = merge_wonky_data_with_user_info(user_info_df, wonky_respondent_df)
user_info_final = user_info_final.drop(columns=['_merge'], errors='ignore')

# Remove duplicate columns
user_info_final = user_info_final.loc[:, ~user_info_final.columns.duplicated()]

print(f"\nFinal dataset shape: {user_info_final.shape}")
print(f"Unique respondents: {user_info_final['respondentPk'].nunique():,}")
print(f"Records with wonky exposure: {(user_info_final['wonky_study_count'] > 0).sum():,}")

In [0]:
categorical_cols = ['survey_pk', 'platform_name', 'hardware_version', 'yob', 'survey_locale', 'exposure_band']

In [0]:
wonky_summary = create_wonky_respondent_summary(
    wonky_respondent_df,
    respondent_id_col='balance_respondentPk',
    categorical_cols=categorical_cols
)

# Add total tasks completed
tasks_per_respondent = (
    user_info_final[['respondentPk', 'task_completed']]
    .drop_duplicates()
)

wonky_summary = wonky_summary.merge(
    tasks_per_respondent,
    left_on='balance_respondentPk',
    right_on='respondentPk',
    how='left'
)

print(f"Wonky summary: {len(wonky_summary):,} respondents")

In [0]:
wonky_counts = calculate_wonky_task_ratio(
    tasks_per_respondent.rename(columns={'task_completed': 'tasks_completed', 'respondentPk': 'respondentPk_tc'}),
    wonky_summary,
    task_completed_col='tasks_completed',
    wonky_instances_col='total_wonky_studies'
)

# Cap ratio at 1
wonky_counts['wonky_task_ratio_cap'] = wonky_counts['wonky_task_ratio'].clip(upper=1)

print(f"Avg wonky task ratio: {wonky_counts['wonky_task_ratio'].mean():.2%}")

In [0]:
print("=" * 60)
print("FINAL DATA SUMMARY")
print("=" * 60)
print(f"\nuser_info_final:")
print(f"  Shape: {user_info_final.shape}")
print(f"  Unique respondents: {user_info_final['respondentPk'].nunique():,}")
print(f"  Unique tasks: {user_info_final['taskPk'].nunique():,}")
print(f"  Wonky exposures: {(user_info_final['wonky_study_count'] > 0).sum():,}")

# Check device info coverage (NEW)
device_cols = ['hardware', 'manufacturer', 'os', 'os_version', 'app_version']
available_device_cols = [c for c in device_cols if c in user_info_final.columns]
if available_device_cols:
    print(f"\nDevice info coverage:")
    for col in available_device_cols:
        coverage = user_info_final[col].notna().mean()
        print(f"  {col}: {coverage:.1%}")

print(f"\nTask time stats:")
print(f"  Min: {user_info_final['task_time_taken_s'].min():.0f}s")
print(f"  Median: {user_info_final['task_time_taken_s'].median():.0f}s")
print(f"  Max: {user_info_final['task_time_taken_s'].max():.0f}s")

In [0]:
sorted(user_info_final.columns)

In [0]:
notebook_path = os.getcwd()
repo_root = os.path.abspath(os.path.join(notebook_path, ".."))
misc_dir = os.path.join(repo_root, "misc")
os.makedirs(misc_dir, exist_ok=True)

# Build output paths from config
output_path = os.path.join(misc_dir, os.path.basename(paths_config['output_files']['user_info_df']))
wonky_respondent_df_path = os.path.join(misc_dir, os.path.basename(paths_config['output_files']['wonky_respondent_df']))
wonky_respondent_summary_path = os.path.join(misc_dir, os.path.basename(paths_config['output_files']['wonky_respondent_summary']))
wonky_map_path = os.path.join(misc_dir, os.path.basename(paths_config['output_files']['wonky_map']))
wonky_counts_path = os.path.join(misc_dir, os.path.basename(paths_config['output_files']['wonky_user_counts']))

# Remove duplicate columns and save
user_info_export = user_info_final.loc[:, ~user_info_final.columns.duplicated()]
user_info_export.to_parquet(output_path, index=False)
wonky_respondent_df.to_parquet(wonky_respondent_df_path, index=False)
wonky_summary.to_parquet(wonky_respondent_summary_path, index=False)
wonky_counts.to_parquet(wonky_counts_path, index=False)
wonky_map.to_parquet(wonky_map_path, index=False)

print("Files saved successfully:")
print(f"  - {output_path}")
print(f"  - {wonky_respondent_df_path}")
print(f"  - {wonky_respondent_summary_path}")
print(f"  - {wonky_counts_path}")
print(f"  - {wonky_map_path}")