# Load, Merge, and Clean Data

In [1]:
# Setup and Configuration (Updated)
import polars as pl
from pathlib import Path
import gc

# --- Enable Global String Cache for Categoricals ---
pl.enable_string_cache()
# --- ---

# --- Configuration ---
BASE_DATA_PATH = Path('/Users/john/Downloads/osfstorage-archive') # Use your actual path
EXPERIMENT_IDS_PATH = BASE_DATA_PATH / 'experiment_dataset_2021-09-23'

# Output path for the cleaned data
SAVE_CLEANED_PATH_POLARS_PARQUET = BASE_DATA_PATH / 'merged_experiment_data_cleaned_polars.parquet'
SAVE_CLEANED_PATH_POLARS_CSV = BASE_DATA_PATH / 'merged_experiment_data_cleaned_polars.csv'

print(f"Polars version: {pl.__version__}")
print(f"Base data path: {BASE_DATA_PATH}")
print(f"Experiment IDs path: {EXPERIMENT_IDS_PATH}")
print(f"Global String Cache enabled: {pl.using_string_cache()}") # Verify it's enabled

Polars version: 1.29.0
Base data path: /Users/john/Downloads/osfstorage-archive
Experiment IDs path: /Users/john/Downloads/osfstorage-archive/experiment_dataset_2021-09-23
Global String Cache enabled: True


In [2]:
# Generate File Paths
try:
    if not EXPERIMENT_IDS_PATH.is_dir():
        raise FileNotFoundError(f"Error: Directory not found at {EXPERIMENT_IDS_PATH}")
    experiment_ids = [d.name for d in EXPERIMENT_IDS_PATH.iterdir() if d.is_dir()]
    print(f"Found {len(experiment_ids)} experiment ID directories.")
except FileNotFoundError as e:
    print(e)
    experiment_ids = []

performance_file_paths = [str(EXPERIMENT_IDS_PATH / exp_id / 'exp_alogs.csv') for exp_id in experiment_ids]
problems_file_paths = [str(EXPERIMENT_IDS_PATH / exp_id / 'exp_plogs.csv') for exp_id in experiment_ids]
actions_file_paths = [str(EXPERIMENT_IDS_PATH / exp_id / 'exp_slogs.csv') for exp_id in experiment_ids]
metrics_file_paths = [str(EXPERIMENT_IDS_PATH / exp_id / 'priors.csv') for exp_id in experiment_ids]

print("Sample performance file paths:", performance_file_paths[:2])
print("Sample problems file paths:", problems_file_paths[:2])
print("Sample actions file paths:", actions_file_paths[:2])
print("Sample metrics file paths:", metrics_file_paths[:2])

Found 88 experiment ID directories.
Sample performance file paths: ['/Users/john/Downloads/osfstorage-archive/experiment_dataset_2021-09-23/PSAU85Y/exp_alogs.csv', '/Users/john/Downloads/osfstorage-archive/experiment_dataset_2021-09-23/PSAXD6K/exp_alogs.csv']
Sample problems file paths: ['/Users/john/Downloads/osfstorage-archive/experiment_dataset_2021-09-23/PSAU85Y/exp_plogs.csv', '/Users/john/Downloads/osfstorage-archive/experiment_dataset_2021-09-23/PSAXD6K/exp_plogs.csv']
Sample actions file paths: ['/Users/john/Downloads/osfstorage-archive/experiment_dataset_2021-09-23/PSAU85Y/exp_slogs.csv', '/Users/john/Downloads/osfstorage-archive/experiment_dataset_2021-09-23/PSAXD6K/exp_slogs.csv']
Sample metrics file paths: ['/Users/john/Downloads/osfstorage-archive/experiment_dataset_2021-09-23/PSAU85Y/priors.csv', '/Users/john/Downloads/osfstorage-archive/experiment_dataset_2021-09-23/PSAXD6K/priors.csv']


In [3]:
# Define Schemas and Date Parsing Information

actions_schema = {
    'experiment_id': pl.Categorical,
    'student_id': pl.Categorical,
    'problem_id': pl.Categorical,
    'problem_part': pl.Categorical, # Or Utf8 if it can have diverse values
    'scaffold_id': pl.Categorical, # Or Utf8
    'experiment_tag_path': pl.Utf8, # Can be long and varied
    'action': pl.Categorical,
    'timestamp': pl.Utf8, # Read as string, parse later
    'assistments_reference_action_log_id': pl.UInt64
}
actions_parse_dates = ['timestamp']

problems_schema = {
    'experiment_id': pl.Categorical,
    'student_id': pl.Categorical,
    'problem_id': pl.Categorical,
    'problem_part': pl.Categorical,
    'scaffold_id': pl.Categorical,
    'problem_condition': pl.Categorical, # Or Utf8
    'start_time': pl.Utf8, # Parse later
    'end_time': pl.Utf8,   # Parse later
    'session_count': pl.UInt16,
    'time_on_task': pl.Float32,
    'first_response_or_request_time': pl.Float32,
    'first_answer': pl.Utf8,
    'correct': pl.Boolean,
    'reported_score': pl.Float32,
    'answer_before_tutoring': pl.Boolean,
    'attempt_count': pl.UInt16,
    'hints_available': pl.UInt16,
    'hints_given': pl.UInt16,
    'scaffold_problems_available': pl.UInt16,
    'scaffold_problems_given': pl.UInt16,
    'explanation_available': pl.Boolean,
    'explanation_given': pl.Boolean,
    'answer_given': pl.Boolean,
    'assistments_reference_problem_log_id': pl.UInt64
}
problems_parse_dates = ['start_time', 'end_time']

performance_schema = {
    'experiment_id': pl.Categorical,
    'student_id': pl.Categorical,
    'release_date': pl.Utf8, # Will be parsed to Datetime
    'due_date': pl.Utf8,     # Will be parsed to Datetime
    'start_time': pl.Utf8,   # Will be parsed to Datetime
    'end_time': pl.Utf8,     # Will be parsed to Datetime
    
    # Counts and scores changed to Float32 for robust parsing from CSV
    'assignment_session_count': pl.Float32, # Was UInt16
    'pretest_problem_count': pl.Float32,    # Was UInt16
    'pretest_correct': pl.Float32,          # Was UInt16
    'pretest_time_on_task': pl.Float32,     # Already Float32, OK
    'pretest_average_first_response_time': pl.Float32, # Already Float32, OK
    'pretest_session_count': pl.Float32,    # Was UInt16
    
    'assigned_condition': pl.Categorical,
    
    'condition_time_on_task': pl.Float32,   # Already Float32, OK
    'condition_average_first_response_or_request_time': pl.Float32, # Already Float32, OK
    'condition_problem_count': pl.Float32,  # Changed in previous step, OK
    'condition_total_correct': pl.Float32,  # << FAILING COLUMN, NOW CHANGED (was UInt16)
    'condition_total_correct_after_wrong_response': pl.Float32, # Was UInt16
    'condition_total_correct_after_tutoring': pl.Float32,       # Was UInt16
    'condition_total_answers_before_tutoring': pl.Float32,    # Was UInt16
    'condition_total_attempt_count': pl.Float32,                # Was UInt32
    'condition_total_hints_available': pl.Float32,              # Was UInt32
    'condition_total_hints_given': pl.Float32,                  # Was UInt32
    'condition_total_scaffold_problems_available': pl.Float32,  # Was UInt32
    'condition_total_scaffold_problems_given': pl.Float32,      # Was UInt32
    'condition_total_explanations_available': pl.Float32,       # Was UInt32
    'condition_total_explanations_given': pl.Float32,           # Was UInt32
    'condition_total_answers_given': pl.Float32,                # Was UInt32
    'condition_session_count': pl.Float32,    # Was UInt16
    
    'posttest_problem_count': pl.Float32,   # Was UInt16
    'posttest_correct': pl.Float32,         # Was UInt16
    'posttest_time_on_task': pl.Float32,    # Already Float32, OK
    'posttest_average_first_response_time': pl.Float32, # Already Float32, OK
    'posttest_session_count': pl.Float32,   # Was UInt16
    
    'assistments_reference_assignment_log_id': pl.UInt64 # IDs are usually safe as integers if no decimals
}
performance_parse_dates = ['release_date', 'due_date', 'start_time', 'end_time']

metrics_schema = {
    'experiment_id': pl.Categorical,
    'student_id': pl.Categorical,
    'student_prior_started_skill_builder_count': pl.UInt32,
    'student_prior_completed_skill_builder_count': pl.UInt32,
    'student_prior_started_problem_set_count': pl.UInt32,
    'student_prior_completed_problem_set_count': pl.UInt32,
    'student_prior_completed_problem_count': pl.UInt32,
    'student_prior_median_first_response_time': pl.Float32,
    'student_prior_median_time_on_task': pl.Float32,
    'student_prior_average_correctness': pl.Float32,
    'student_prior_average_attempt_count': pl.Float32,
    'class_id': pl.Categorical,
    'class_creation_date': pl.Utf8,
    'class_student_count': pl.UInt16,
    'class_prior_skill_builder_count': pl.UInt32,
    'class_prior_problem_set_count': pl.UInt32,
    'class_prior_skill_builder_percent_started': pl.Float32,
    'class_prior_skill_builder_percent_completed': pl.Float32,
    'class_prior_problem_set_percent_started': pl.Float32,
    'class_prior_problem_set_percent_completed': pl.Float32,
    'class_prior_completed_problem_count': pl.UInt32,
    'class_prior_median_time_on_task': pl.Float32,
    'class_prior_median_first_response_time': pl.Float32,
    'class_prior_average_correctness': pl.Float32,
    'class_prior_average_attempt_count': pl.Float32,
    'teacher id': pl.Categorical, # Note: column name with space, will need quoting or renaming
    'teacher_account_creation_date': pl.Utf8,
    'district_id': pl.Categorical, # Or Utf8
    'location': pl.Categorical,    # Or Utf8
    'opportunity_zone': pl.Categorical, # Or Utf8
    'locale_description': pl.Categorical # Or Utf8
}
metrics_parse_dates = ['class_creation_date', 'teacher_account_creation_date']

In [4]:
# Helper Function for Memory-Efficient CSV Concatenation

def combine_polars_csvs(file_paths, schema=None, parse_dates_list=None, 
                        # CRITICAL: Provide this if your date format is consistent!
                        known_date_format_str: str = None, 
                        date_time_unit='us'):
    """
    Scans multiple CSVs lazily, concatenates them, and then collects into a Polars DataFrame.
    Includes robust date parsing and UTC conversion.
    Optimized to reduce schema collection calls and use known date format.
    """
    lazy_frames = []
    print(f"\nScanning {len(file_paths)} files...")

    # Attempt to get a common set of columns from the first file if a schema is provided
    # This helps in applying date parsing expressions more consistently.
    common_columns_from_first_file = None
    if file_paths and schema:
        try:
            common_columns_from_first_file = pl.scan_csv(
                file_paths[0], infer_schema_length=100, n_rows=10 # Scan a bit more for robustness
            ).collect_schema().names()
        except Exception as e:
            print(f"  Warning: Could not determine common columns from first file {file_paths[0]}: {e}")
            common_columns_from_first_file = list(schema.keys()) # Fallback to all schema keys

    for i, file_path_str in enumerate(file_paths):
        file_path = Path(file_path_str)
        if i % 10 == 0:
            print(f"  Scanning file {i+1}/{len(file_paths)}: {file_path.parent.name}/{file_path.name}")

        try:
            # Use the provided schema directly. scan_csv is generally tolerant of 
            # schema columns not in the file (will be null) or file columns not in schema (will be excluded or inferred).
            # For more control, use `dtypes` argument to override types for specific columns.
            lf = pl.scan_csv(file_path, 
                             schema=schema, # Use the global schema for this file type
                             infer_schema_length=100, # Allow some inference for columns not in schema
                             null_values=["", "NA", "NaN", "null"])
            
            if parse_dates_list:
                date_parsing_expressions = []
                # Use common_columns_from_first_file or fallback to current lf schema if first file scan failed
                # This avoids calling collect_schema() for *every* file inside the loop for this check
                columns_to_check = common_columns_from_first_file if common_columns_from_first_file else lf.collect_schema().names()

                for col_name in parse_dates_list:
                    if col_name in columns_to_check: 
                        # Check if column actually exists in the current specific file's lazy frame
                        # This is a compromise: call collect_schema() once per file if needed,
                        # but only if the more optimized `columns_to_check` suggests it might be there.
                        if col_name not in lf.collect_schema().names():
                            continue

                        date_expr = pl.col(col_name).cast(pl.Utf8, strict=False)
                        
                        if known_date_format_str:
                            date_expr = date_expr.str.to_datetime(
                                format=known_date_format_str,
                                strict=False,
                                time_unit=date_time_unit
                            )
                        else: # Fallback to inference if no format string is given (slower)
                            date_expr = date_expr.str.to_datetime(
                                strict=False,
                                time_unit=date_time_unit
                            )
                        
                        # Always convert to UTC after parsing
                        date_parsing_expressions.append(
                            date_expr.dt.convert_time_zone("UTC").alias(col_name)
                        )
                if date_parsing_expressions:
                    lf = lf.with_columns(date_parsing_expressions)
            
            lazy_frames.append(lf)
        except FileNotFoundError:
            print(f"  Warning: File not found, skipping: {file_path}")
        except pl.exceptions.NoDataError: 
             print(f"  Warning: File is empty, skipping: {file_path}")
        except Exception as e: 
            # Catching Polars specific ComputeError which can happen during .str.to_datetime if format is wrong
            if "strptime" in str(e).lower() or "conversion" in str(e).lower():
                 print(f"  Potential date parsing error for {file_path} (column likely {col_name if 'col_name' in locals() else 'unknown'}): {e}")
            else:
                print(f"  Error scanning {file_path} or applying initial transforms: {e}")


    if not lazy_frames:
        print("  No lazy frames were created from scanning files.")
        return None

    print(f"Concatenating {len(lazy_frames)} lazy frames...")
    try:
        combined_lf = pl.concat(lazy_frames, how="vertical_relaxed")
        print("Collecting data into DataFrame...")
        collected_df = combined_lf.collect(engine="streaming") # Enable streaming for collect
        print("Concatenation and collection complete.")
        return collected_df
    except Exception as e:
        print(f"Error during lazy concatenation or collection: {e}")
        return None

In [5]:
# Load DataFrames for each type using the Polars helper

# *** Use the VERIFIED format string from Cell 3.5 output ***
COMMON_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S%.f%z" 

print("Combining Actions Data (exp_slogs)...")
actions_df = combine_polars_csvs(
    actions_file_paths, 
    schema=actions_schema, 
    parse_dates_list=actions_parse_dates,
    known_date_format_str=COMMON_DATETIME_FORMAT 
)
if actions_df is not None:
    print(f"Actions DataFrame shape: {actions_df.shape}")

print("\nCombining Problems Data (exp_plogs)...")
# Assuming problems_df date columns also use this format. If not, adjust or pass None.
problems_df = combine_polars_csvs(
    problems_file_paths, 
    schema=problems_schema, 
    parse_dates_list=problems_parse_dates,
    known_date_format_str=COMMON_DATETIME_FORMAT 
)
if problems_df is not None:
    print(f"Problems DataFrame shape: {problems_df.shape}")

print("\nCombining Performance Data (exp_alogs)...")
# Assuming performance_df date columns also use this format.
performance_df = combine_polars_csvs(
    performance_file_paths, 
    schema=performance_schema, 
    parse_dates_list=performance_parse_dates,
    known_date_format_str=COMMON_DATETIME_FORMAT
)
if performance_df is not None:
    print(f"Performance DataFrame shape: {performance_df.shape}")

print("\nCombining Metrics Data (priors)...")
# Assuming metrics_df date columns also use this format.
if 'teacher id' in metrics_schema: 
    metrics_schema_corrected = metrics_schema.copy()
    metrics_schema_corrected['teacher_id'] = metrics_schema_corrected.pop('teacher id')
else:
    metrics_schema_corrected = metrics_schema

metrics_df = combine_polars_csvs(
    metrics_file_paths, 
    schema=metrics_schema_corrected, 
    parse_dates_list=metrics_parse_dates,
    known_date_format_str=COMMON_DATETIME_FORMAT
)
if metrics_df is not None:
    if 'teacher id' in metrics_df.columns: 
        metrics_df = metrics_df.rename({'teacher id': 'teacher_id'})
    print(f"Metrics DataFrame shape: {metrics_df.shape}")

import gc # Ensure gc is imported if you use it here
gc.collect()

Combining Actions Data (exp_slogs)...

Scanning 88 files...
  Scanning file 1/88: PSAU85Y/exp_slogs.csv
  Scanning file 11/88: PSAYCFH/exp_slogs.csv
  Scanning file 21/88: PSAZ2G4/exp_slogs.csv
  Scanning file 31/88: PSAQJFP/exp_slogs.csv
  Scanning file 41/88: PSAJVP8/exp_slogs.csv
  Scanning file 51/88: PSA9XWV/exp_slogs.csv
  Scanning file 61/88: PSAM4NK/exp_slogs.csv
  Scanning file 71/88: PSATP2Z/exp_slogs.csv
  Scanning file 81/88: PSASDZY/exp_slogs.csv
Concatenating 88 lazy frames...
Collecting data into DataFrame...
Concatenation and collection complete.
Actions DataFrame shape: (3708299, 9)

Combining Problems Data (exp_plogs)...

Scanning 88 files...
  Scanning file 1/88: PSAU85Y/exp_plogs.csv
  Scanning file 11/88: PSAYCFH/exp_plogs.csv
  Scanning file 21/88: PSAZ2G4/exp_plogs.csv
  Scanning file 31/88: PSAQJFP/exp_plogs.csv
  Scanning file 41/88: PSAJVP8/exp_plogs.csv
  Scanning file 51/88: PSA9XWV/exp_plogs.csv
  Scanning file 61/88: PSAM4NK/exp_plogs.csv
  Scanning file 7

17793

In [6]:
# Merge DataFrames into One

merged_df = None
merge_successful = True

print("\n--- Starting Merge Operations (Polars) ---")

if actions_df is None or actions_df.is_empty():
    print("Actions DataFrame is empty or None. Cannot proceed with merge.")
    merge_successful = False
else:
    merged_df = actions_df.clone()
    print(f"Starting with actions_df: {merged_df.shape}")

    # Merge Problems Data
    if problems_df is not None and not problems_df.is_empty() and merge_successful:
        try:
            print("Merging problems_df...")
            problem_keys = ['experiment_id', 'student_id', 'problem_id', 'problem_part', 'scaffold_id']
            # Ensure keys are present in both DataFrames. Polars will error if not.
            merged_df = merged_df.join(problems_df, on=problem_keys, how="left", suffix="_problem")
            print(f"After merging problems_df: {merged_df.shape}")
            del problems_df 
            gc.collect()
        except Exception as e:
            print(f"Error merging problems_df: {e}")
            merge_successful = False
    elif merge_successful: # only print skip message if not already failed
        print("Skipping problems_df merge (not loaded or empty).")

    # Merge Performance Data
    if performance_df is not None and not performance_df.is_empty() and merge_successful:
        try:
            print("Merging performance_df...")
            perf_keys = ['experiment_id', 'student_id']
            merged_df = merged_df.join(performance_df, on=perf_keys, how="left", suffix="_perf")
            print(f"After merging performance_df: {merged_df.shape}")
            del performance_df
            gc.collect()
        except Exception as e:
            print(f"Error merging performance_df: {e}")
            merge_successful = False
    elif merge_successful:
        print("Skipping performance_df merge (not loaded or empty).")

    # Merge Metrics Data
    if metrics_df is not None and not metrics_df.is_empty() and merge_successful:
        try:
            print("Merging metrics_df...")
            metrics_keys = ['experiment_id', 'student_id']
            merged_df = merged_df.join(metrics_df, on=metrics_keys, how="left", suffix="_metrics")
            print(f"After merging metrics_df: {merged_df.shape}")
            del metrics_df
            gc.collect()
        except Exception as e:
            print(f"Error merging metrics_df: {e}")
            merge_successful = False
    elif merge_successful:
        print("Skipping metrics_df merge (not loaded or empty).")

    if merged_df is not None and merge_successful:
        print("\n--- Merge Complete ---")
        print("Final Merged DataFrame Info:")
        print(f"Shape: {merged_df.shape}")
        print("\nIMPORTANT: Review these column names carefully before proceeding to Cell 7!")
        print("Columns in merged_df:", merged_df.columns)
        print(merged_df.head(3))
        print(merged_df.schema)
        
        if 'actions_df' in locals() and actions_df is not merged_df: # only delete if it's a separate object
            del actions_df
            gc.collect()
            
    elif merged_df is not None: # Merge was partially complete or some DFs were missing
        print("\n--- Merge Partially Complete or Some DataFrames Skipped ---")
        print("Columns in partially merged_df:", merged_df.columns)
    else: # merge_successful is False and merged_df might be None or the initial actions_df
        print("\n--- Merge Failed or Base DataFrame (actions_df) was not suitable ---")


--- Starting Merge Operations (Polars) ---
Starting with actions_df: (3708299, 9)
Merging problems_df...
After merging problems_df: (3708299, 28)
Merging performance_df...
After merging performance_df: (3711215, 61)
Merging metrics_df...
After merging metrics_df: (3711215, 90)

--- Merge Complete ---
Final Merged DataFrame Info:
Shape: (3711215, 90)

IMPORTANT: Review these column names carefully before proceeding to Cell 7!
Columns in merged_df: ['experiment_id', 'student_id', 'problem_id', 'problem_part', 'scaffold_id', 'experiment_tag_path', 'action', 'timestamp', 'assistments_reference_action_log_id', 'problem_condition', 'start_time', 'end_time', 'session_count', 'time_on_task', 'first_response_or_request_time', 'first_answer', 'correct', 'reported_score', 'answer_before_tutoring', 'attempt_count', 'hints_available', 'hints_given', 'scaffold_problems_available', 'scaffold_problems_given', 'explanation_available', 'explanation_given', 'answer_given', 'assistments_reference_problem

# Data Cleaning

In [7]:
# Cell 7: Data Cleaning (Polars Style) - UPDATED

if 'merged_df' in locals() and merged_df is not None and merge_successful:
    print("\n--- Starting Data Cleaning (Polars) ---")
    print(f"Initial merged_df shape for cleaning: {merged_df.shape}")
    print("Initial columns in merged_df for cleaning:", merged_df.columns) # Good for reference

    # --- Renaming Columns ---
    print("\n--- Renaming Columns ---")
    
    # Based on your merged_df.columns output:
    rename_map = {
        'assistments_reference_action_log_id': 'action_log_id', # From actions_df
        'start_time_perf': 'assignment_start_time', # 'start_time' from performance_df became 'start_time_perf'
        'end_time_perf': 'assignment_end_time',     # 'end_time' from performance_df became 'end_time_perf'
        'assistments_reference_assignment_log_id': 'assignment_log_id' # From performance_df, NO suffix needed as per your output
    }
    actual_renames = {k: v for k, v in rename_map.items() if k in merged_df.columns}
    if actual_renames:
        print(f"Applying renames: {actual_renames}")
        merged_df = merged_df.rename(actual_renames)
    else:
        print("No columns matched for renaming based on the current rename_map.")
    
    # --- Type Transformations ---
    column_transformations = [] # Reset list for this section

    # Ensure Datetime columns are correctly parsed (most should be from load) and UTC
    # These are original names or target names from renames above.
    datetime_cols_final_check = [
        'timestamp', # from actions
        'start_time', 'end_time', # from problems
        'release_date', 'due_date', # from performance
        'assignment_start_time', 'assignment_end_time', # TARGETS of rename from _perf
        'class_creation_date', 'teacher_account_creation_date' # from metrics
    ]
    for col_name in datetime_cols_final_check:
        if col_name in merged_df.columns:
            if merged_df[col_name].dtype == pl.Utf8: # If any were missed and are still string
                column_transformations.append(
                    pl.col(col_name).str.to_datetime(format=COMMON_DATETIME_FORMAT, strict=False, time_unit='us')
                    .dt.convert_time_zone("UTC")
                    .alias(col_name)
                )
                print(f"Scheduled for datetime re-parsing: {col_name}")
            elif isinstance(merged_df[col_name].dtype, pl.Datetime): # A more robust check for Datetime dtype
                # Access the time_zone attribute from the dtype object itself
                current_column_dtype = merged_df[col_name].dtype
                if hasattr(current_column_dtype, 'time_zone'): # Check if the dtype object has time_zone (it should for pl.Datetime)
                    current_tz = current_column_dtype.time_zone
                    if current_tz is not None and current_tz != "UTC":
                        column_transformations.append(
                            pl.col(col_name).dt.convert_time_zone("UTC").alias(col_name)
                        )
                        print(f"Scheduled for UTC conversion (already datetime, was '{current_tz}'): {col_name}")
                    elif current_tz is None:
                        # This case (naive datetime) should ideally not happen if combine_polars_csvs worked correctly,
                        # as it's supposed to convert to UTC.
                        # If it does happen, you might want to assume it's UTC and localize it:
                        # column_transformations.append(
                        #     pl.col(col_name).dt.replace_time_zone("UTC", ambiguous='raise').alias(col_name)
                        # )
                        # print(f"Warning: Naive datetime found for '{col_name}'. Attempting to localize to UTC.")
                        print(f"Info: Datetime column '{col_name}' is naive (no timezone). Assuming it should be UTC from prior steps.")
                else:
                    # This should not happen for a pl.Datetime dtype from recent Polars versions
                    print(f"Warning: Datetime column '{col_name}' does not have a 'time_zone' attribute on its dtype, though dtype is {current_column_dtype}.")

    # Convert to Categorical (using original names as suffixes were minimal)
    # These names are directly from your merged_df.columns list (or schemas)
    cols_to_category_polars = [
        'experiment_id', 'student_id', 'problem_id', 'problem_part', 'scaffold_id',
        'experiment_tag_path', 'action', 'problem_condition', 'assigned_condition',
        'class_id', 'district_id', 'location', 'opportunity_zone', 
        'locale_description', 'teacher_id'
    ]
    for col_name in cols_to_category_polars:
        if col_name in merged_df.columns and merged_df[col_name].dtype != pl.Categorical :
             column_transformations.append(pl.col(col_name).cast(pl.Categorical).alias(col_name))
             print(f"Scheduled for categorical conversion: {col_name}")
    
    # Cast columns loaded as Float32 (from performance_df) back to appropriate Integer types
    # These are original column names from performance_schema
    float_to_int_casts = {
        # Col name : Target Polars Int Dtype
        'assignment_session_count': pl.UInt16,
        'pretest_problem_count': pl.UInt16,
        'pretest_correct': pl.UInt16,
        'pretest_session_count': pl.UInt16,
        'condition_problem_count': pl.UInt16,
        'condition_total_correct': pl.UInt16,
        'condition_total_correct_after_wrong_response': pl.UInt16,
        'condition_total_correct_after_tutoring': pl.UInt16,
        'condition_total_answers_before_tutoring': pl.UInt16,
        'condition_total_attempt_count': pl.UInt32, # Note: some were UInt32
        'condition_total_hints_available': pl.UInt32,
        'condition_total_hints_given': pl.UInt32,
        'condition_total_scaffold_problems_available': pl.UInt32,
        'condition_total_scaffold_problems_given': pl.UInt32,
        'condition_total_explanations_available': pl.UInt32,
        'condition_total_explanations_given': pl.UInt32,
        'condition_total_answers_given': pl.UInt32,
        'condition_session_count': pl.UInt16,
        'posttest_problem_count': pl.UInt16,
        'posttest_correct': pl.UInt16,
        'posttest_session_count': pl.UInt16,
    }
    for col_name, target_int_type in float_to_int_casts.items():
        if col_name in merged_df.columns:
            if merged_df[col_name].dtype == pl.Float32: # Ensure it's currently Float32
                column_transformations.append(
                    pl.col(col_name)
                      .fill_null(0) # Or other appropriate fill strategy for counts
                      .cast(target_int_type, strict=False) # strict=False allows 3.0 -> 3
                      .alias(col_name) 
                )
                print(f"Scheduled '{col_name}' for Float32 to {target_int_type} conversion.")
            elif merged_df[col_name].dtype != target_int_type : # If it's some other type already
                print(f"Warning: Column '{col_name}' was expected to be Float32 for int conversion, but found {merged_df[col_name].dtype}. Skipping int cast.")
        else:
            print(f"Warning: Column '{col_name}' intended for int conversion not found in merged_df.")


    # General Float64 to Float32 pass (if any remain)
    for col_name in merged_df.columns: 
        if col_name in merged_df.columns and merged_df[col_name].dtype == pl.Float64: 
            column_transformations.append(pl.col(col_name).cast(pl.Float32).alias(col_name))
            print(f"Scheduled for Float64 to Float32 conversion: {col_name}")

    if column_transformations:
        print("\nApplying column type transformations...")
        merged_df = merged_df.with_columns(column_transformations)
        print("Type transformations applied.")
        # print(merged_df.schema) # Good check here

    # --- Specific Value Cleaning ---
    print("\n--- Specific Value Cleaning (Polars) ---")
    specific_value_cleaning_expressions = []
    
    # Convert 'opportunity_zone' (from metrics_df, original name) to boolean
    if 'opportunity_zone' in merged_df.columns:
        specific_value_cleaning_expressions.append(
            pl.when(pl.col('opportunity_zone') == "Yes").then(True)
              .when(pl.col('opportunity_zone') == "No").then(False)
              .otherwise(None) 
              .cast(pl.Boolean)
              .alias('opportunity_zone_bool')
        )
        print(f"Scheduled 'opportunity_zone' to boolean 'opportunity_zone_bool' conversion.")

    # Fill NA for specific category columns (from metrics_df, original names)
    cat_cols_to_fill_info = { 
        'district_id': 'Unknown_District', 
        'location': 'Unknown_Location',    
        'locale_description': 'Unknown_Locale' 
    }
    for col_name, fill_val in cat_cols_to_fill_info.items():
        if col_name in merged_df.columns:
            # Ensure column is categorical before filling, or cast it
            if merged_df[col_name].dtype != pl.Categorical:
                merged_df = merged_df.with_columns(pl.col(col_name).cast(pl.Categorical))
                print(f"Casted '{col_name}' to Categorical before fill_null.")
            specific_value_cleaning_expressions.append(pl.col(col_name).fill_null(fill_val).alias(col_name))
            print(f"Scheduled fill_null for categorical {col_name} with '{fill_val}'.")
    
    if specific_value_cleaning_expressions:
        print("\nApplying specific value cleaning expressions...")
        merged_df = merged_df.with_columns(specific_value_cleaning_expressions)
        print("Specific value cleaning applied.")

    # --- Dropping Columns ---
    # Drop fully empty columns (likely from problems_df if not all actions had problem details)
    # These are original column names from `problems_schema`.
    empty_cols_candidates_from_problems = [] 
    pandas_identified_empty_cols = [
         'problem_condition', 'start_time', 'end_time', 'session_count', 'time_on_task',
         'first_response_or_request_time', 'first_answer', 'correct', 'reported_score',
         'answer_before_tutoring', 'attempt_count', 'hints_available', 'hints_given',
         'scaffold_problems_available', 'scaffold_problems_given', 'explanation_available',
         'explanation_given', 'answer_given',
         'assistments_reference_problem_log_id'
    ]

    actual_empty_cols_to_drop = []
    if not merged_df.is_empty():
        for col_name in pandas_identified_empty_cols:
            if col_name in merged_df.columns and merged_df[col_name].is_null().all():
                actual_empty_cols_to_drop.append(col_name)
            elif col_name in merged_df.columns:
                print(f"Info: Column '{col_name}' (candidate for empty drop) was not fully null. Nulls: {merged_df[col_name].is_null().sum()}/{merged_df.height}")
    
    if actual_empty_cols_to_drop:
        print(f"\nDropping fully empty columns (identified from pandas analysis): {actual_empty_cols_to_drop}")
        merged_df = merged_df.drop(actual_empty_cols_to_drop)
    else:
        print("\nNo fully empty columns (from the predefined list derived from pandas analysis) identified for dropping.")

    # Drop original 'opportunity_zone' if the boolean version 'opportunity_zone_bool' was created
    if 'opportunity_zone' in merged_df.columns and 'opportunity_zone_bool' in merged_df.columns:
        print(f"Dropping original opportunity zone column: 'opportunity_zone'")
        merged_df = merged_df.drop('opportunity_zone')

    print(f"\nShape after Cleaning: {merged_df.shape}")
    print("Columns after cleaning:", merged_df.columns)
    gc.collect()

else:
    print("Skipping Cell 7 cleaning: merged_df not available or previous merge failed.")


--- Starting Data Cleaning (Polars) ---
Initial merged_df shape for cleaning: (3711215, 90)
Initial columns in merged_df for cleaning: ['experiment_id', 'student_id', 'problem_id', 'problem_part', 'scaffold_id', 'experiment_tag_path', 'action', 'timestamp', 'assistments_reference_action_log_id', 'problem_condition', 'start_time', 'end_time', 'session_count', 'time_on_task', 'first_response_or_request_time', 'first_answer', 'correct', 'reported_score', 'answer_before_tutoring', 'attempt_count', 'hints_available', 'hints_given', 'scaffold_problems_available', 'scaffold_problems_given', 'explanation_available', 'explanation_given', 'answer_given', 'assistments_reference_problem_log_id', 'release_date', 'due_date', 'start_time_perf', 'end_time_perf', 'assignment_session_count', 'pretest_problem_count', 'pretest_correct', 'pretest_time_on_task', 'pretest_average_first_response_time', 'pretest_session_count', 'assigned_condition', 'condition_time_on_task', 'condition_average_first_response_

In [8]:
# Cell 8: Create Reduced DataFrame and Save (Polars)

if 'merged_df' in locals() and merged_df is not None and merge_successful: # merge_successful check might be redundant if merged_df exists
    print("\n--- Reducing DataFrame to Essential Columns (Polars) ---")

    # USER ACTION: Define this list with the EXACT final column names you want
    # from the cleaned `merged_df` (output of Cell 7).
    # Below is a *template* based on your original pandas variable names and common sense.
    # YOU MUST VERIFY EACH COLUMN NAME AGAINST THE OUTPUT OF CELL 7.
    essential_cols_to_keep_polars = [
        # Keys / Base Info from actions_df (verify actual names after rename)
        'experiment_id', 
        'student_id', 
        'timestamp', 
        'action', 
        'action_log_id', # Was 'assistments_reference_action_log_id'
        
        # From performance_df (verify actual names after rename and type cast)
        'assignment_start_time', # Was 'start_time_perf'
        'assignment_end_time',   # Was 'end_time_perf'
        'assignment_log_id',     # Was 'assistments_reference_assignment_log_id'
        'assignment_session_count', # Now UInt16
        'condition_problem_count',  # Now UInt16
        'condition_time_on_task',   # Float32
        'condition_average_first_response_or_request_time', # Float32
        'condition_total_correct',  # Now UInt16
        'condition_total_attempt_count', # Now UInt32
        'condition_total_hints_given', # Now UInt32
        'condition_total_explanations_given', # Now UInt32
        
        # From metrics_df (verify actual names)
        'student_prior_average_correctness', 
        
        # Created in Cell 7
        'opportunity_zone_bool', 
        
        # Other columns you might need from your original list of 71:
        # e.g., 'pretest_correct', 'posttest_correct', 'assigned_condition', 'class_id', 'teacher_id'
        # 'release_date', 'due_date', 
        # 'experiment_tag_path'
        # Add any other columns from the 71 that are essential for your next steps.
    ]
    
    # Filter to only include columns that actually exist in the cleaned merged_df
    final_essential_columns = [col for col in essential_cols_to_keep_polars if col in merged_df.columns]
    
    print(f"Attempting to select these {len(final_essential_columns)} essential columns: {final_essential_columns}")
    missing_essentials_for_reduction = [col for col in essential_cols_to_keep_polars if col not in final_essential_columns]

    if missing_essentials_for_reduction:
        print(f"Warning: The following conceptual essential columns were NOT FOUND in merged_df for reduction: {missing_essentials_for_reduction}")
        print("Please ensure their names are correct in the 'essential_cols_to_keep_polars' list and they exist in the output of Cell 7.")
    
    if not final_essential_columns:
        print("Error: No essential columns available for selection based on your list. Cannot create reduced DataFrame.")
        merged_df_reduced = None
    else:
        try:
            merged_df_reduced = merged_df.select(final_essential_columns)
            print(f"\nReduced DataFrame Info (Polars): Shape {merged_df_reduced.shape}")
            # print(merged_df_reduced.head())
            # print(merged_df_reduced.schema) # Good to check the schema of the final saved df

            print(f"\nAttempting to save cleaned and reduced Polars DataFrame to: {SAVE_CLEANED_PATH_POLARS_PARQUET}")
            merged_df_reduced.write_parquet(SAVE_CLEANED_PATH_POLARS_PARQUET) # Parquet is preferred
            print(f"Successfully saved to {SAVE_CLEANED_PATH_POLARS_PARQUET}")
            
        except Exception as e:
            print(f"Error during final select or save: {e}")
            merged_df_reduced = None
            
    if 'merged_df' in locals(): 
        del merged_df # Free up memory from the full cleaned DataFrame
        gc.collect()
        print("\nFull cleaned merged_df (Polars) deleted from memory.")

else:
    print("Skipping Cell 8 (reduction and save): merged_df not available from Cell 7 or previous steps failed.")
    merged_df_reduced = None


--- Reducing DataFrame to Essential Columns (Polars) ---
Attempting to select these 18 essential columns: ['experiment_id', 'student_id', 'timestamp', 'action', 'action_log_id', 'assignment_start_time', 'assignment_end_time', 'assignment_log_id', 'assignment_session_count', 'condition_problem_count', 'condition_time_on_task', 'condition_average_first_response_or_request_time', 'condition_total_correct', 'condition_total_attempt_count', 'condition_total_hints_given', 'condition_total_explanations_given', 'student_prior_average_correctness', 'opportunity_zone_bool']

Reduced DataFrame Info (Polars): Shape (3711215, 18)

Attempting to save cleaned and reduced Polars DataFrame to: /Users/john/Downloads/osfstorage-archive/merged_experiment_data_cleaned_polars.parquet
Successfully saved to /Users/john/Downloads/osfstorage-archive/merged_experiment_data_cleaned_polars.parquet

Full cleaned merged_df (Polars) deleted from memory.


In [9]:
# Cell 9: Load Cleaned Data (Polars) - Optional

if 'merged_df_reduced' in locals() and merged_df_reduced is not None and not merged_df_reduced.is_empty() and SAVE_CLEANED_PATH_POLARS_PARQUET.exists():
    print(f"\n--- Loading Cleaned Parquet File (Polars) ---")
    try:
        df_reloaded_polars = pl.read_parquet(SAVE_CLEANED_PATH_POLARS_PARQUET)
        print(f"Successfully reloaded: {SAVE_CLEANED_PATH_POLARS_PARQUET}")
        print(f"Reloaded DataFrame Shape: {df_reloaded_polars.shape}")
        print("\nReloaded DataFrame Head (from Parquet):")
        print(df_reloaded_polars.head())
        print("\nReloaded DataFrame Schema (from Parquet):")
        print(df_reloaded_polars.schema)
    except Exception as e:
        print(f"An error occurred while reloading the cleaned Parquet file: {e}")
elif SAVE_CLEANED_PATH_POLARS_PARQUET.exists():
     print(f"Cleaned Parquet file found at {SAVE_CLEANED_PATH_POLARS_PARQUET}, but merged_df_reduced may not have been successfully created or was empty in the previous step (this script might have been re-run starting from here). Consider reloading manually if needed.")
else:
    print(f"\nCleaned Parquet file not found at {SAVE_CLEANED_PATH_POLARS_PARQUET} or reduction/save step failed.")


--- Loading Cleaned Parquet File (Polars) ---
Successfully reloaded: /Users/john/Downloads/osfstorage-archive/merged_experiment_data_cleaned_polars.parquet
Reloaded DataFrame Shape: (3711215, 18)

Reloaded DataFrame Head (from Parquet):
shape: (5, 18)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ experimen ┆ student_i ┆ timestamp ┆ action    ┆ … ┆ condition ┆ condition ┆ student_p ┆ opportun │
│ t_id      ┆ d         ┆ ---       ┆ ---       ┆   ┆ _total_hi ┆ _total_ex ┆ rior_aver ┆ ity_zone │
│ ---       ┆ ---       ┆ datetime[ ┆ cat       ┆   ┆ nts_given ┆ planation ┆ age_corre ┆ _bool    │
│ cat       ┆ cat       ┆ μs, UTC]  ┆           ┆   ┆ ---       ┆ s_g…      ┆ ctn…      ┆ ---      │
│           ┆           ┆           ┆           ┆   ┆ u32       ┆ ---       ┆ ---       ┆ bool     │
│           ┆           ┆           ┆           ┆   ┆           ┆ u32       ┆ f32       ┆          │
╞═══════════╪═══════════╪═══════════╪═══