In [1]:
from __future__ import annotations

from pathlib import Path

import numpy as np
import pandas as pd

from CAD.config import Config

CFG = Config()
RAW_MERGED = Path(CFG.INTERIM_DATA_DIR) / "Merged_Data.csv"
PARQUET_OUT = Path(CFG.PROCESSED_DATA_DIR) / "FinalTrainingData.parquet"

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [2]:
def recode_values(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.drop("CURR_GRADE_ORD", axis=1, inplace=True)
    df["SCHOOL_YEAR"] = df["SCHOOL_YEAR"].str.split("-").str[0].astype(int)
    df["STUDENT_GRADE_LEVEL"].replace({"PK": -1, "KF": 0, "OS": -2}, inplace=True)
    df["ECONOMIC_CODE"].replace({"F": 1, "R": 1, "N": 0}, inplace=True)
    df["SPECIAL_ED_CODE"].replace({"Y": 1, "N": 0}, inplace=True)
    df["ENG_PROF_CODE"].replace({"NT": -1}, inplace=True)
    df["HISPANIC_IND"].replace({"Yes": 1, "No": 0}, inplace=True)

    return df


def cast_numeric(df: pd.DataFrame) -> pd.DataFrame:
    float_cols = [
        "Total_Days_Unexcused_Absent",
        "Total_Days_Enrolled",
        "Total_Days_Present",
    ]
    object_cols = {"SCHOOL_NAME", "DISTRICT_NAME", "ETHNIC_CODE", "STUDENT_GENDER"}

    for col in df.columns:
        if col in float_cols:
            df[col] = df[col].astype(float)
        elif col not in object_cols:
            df[col] = df[col].astype(int)

    return df


def collapse_snapshot(df: pd.DataFrame) -> pd.DataFrame:
    att = ["Total_Days_Present", "Total_Days_Enrolled", "Total_Days_Unexcused_Absent"]
    agg = {**{c: "sum" for c in att}, **{c: "last" for c in df.columns if c not in att}}
    return (
        df.groupby(["STUDENT_ID", "SCHOOL_YEAR"], as_index=False)
        .agg(agg)
        .sort_values(["STUDENT_ID", "SCHOOL_YEAR"])
    )


def drop_outliers(df: pd.DataFrame) -> pd.DataFrame:
    grade12_sid = list(df[df["STUDENT_GRADE_LEVEL"] == 12].loc[:, 'STUDENT_ID'].values)
    grade12 = df['STUDENT_ID'].isin(grade12_sid)
    bad_sum = (df["Total_Days_Unexcused_Absent"] + df["Total_Days_Present"]) > df["Total_Days_Enrolled"]
    demotion_idx = (
        df.sort_values(["STUDENT_ID", "SCHOOL_YEAR"])
        .groupby("STUDENT_ID")["STUDENT_GRADE_LEVEL"].diff().lt(0)
    )
    manual_idx = [1688836, 731523, 1904340, 1016188, 1227776, 1400304, 1401473, 1413193, 686159, 687907] # Inconsistent Data
    big_enrolled = df["Total_Days_Enrolled"] > 300
    mask = ~(grade12 | bad_sum | big_enrolled | demotion_idx | df.index.isin(manual_idx))

    return df.loc[mask].copy()


def harmonise_gender(df: pd.DataFrame) -> pd.DataFrame:
    mask = df.groupby("STUDENT_ID")["STUDENT_GENDER"].transform("nunique").gt(1)
    gender_fix = df[mask].copy()
    gender_fix["STUDENT_GENDER"] = gender_fix.groupby("STUDENT_ID")["STUDENT_GENDER"].transform("last")
    
    return pd.concat([df[~mask], gender_fix]).sort_index().reset_index(drop=True)


def add_attendance_rate(df: pd.DataFrame) -> pd.DataFrame:
    df["Attendance_Rate"] = df["Total_Days_Present"] / df["Total_Days_Enrolled"]
    return df

In [3]:
base = pd.read_csv(RAW_MERGED)
base = add_attendance_rate(harmonise_gender(drop_outliers(collapse_snapshot(cast_numeric(recode_values(base))))))

years_per_stu = base.groupby("STUDENT_ID")["SCHOOL_YEAR"].nunique()
ids_6yr = years_per_stu[years_per_stu == 6].index
ids_5yr = years_per_stu[years_per_stu == 5].index
ids_4yr = years_per_stu[years_per_stu == 4].index
ids_3yr = years_per_stu[years_per_stu == 3].index
ids_2yr = years_per_stu[years_per_stu == 2].index
ids_1yr = years_per_stu[years_per_stu == 1].index

data_6yr = base[base["STUDENT_ID"].isin(ids_6yr)].copy()
data_5yr = base[base["STUDENT_ID"].isin(ids_5yr)].copy()
data_4yr = base[base["STUDENT_ID"].isin(ids_4yr)].copy()
data_3yr = base[base["STUDENT_ID"].isin(ids_3yr)].copy()
data_2yr = base[base["STUDENT_ID"].isin(ids_2yr)].copy()
data_1yr = base[base["STUDENT_ID"].isin(ids_1yr)].copy()

  base = pd.read_csv(RAW_MERGED)


In [4]:
config = Config()
MIN_YEAR, MAX_YEAR = config.getMinMaxYear()
YEARS_FULL = range(MIN_YEAR, MAX_YEAR + 1)
ATT_COLS = [
    "Total_Days_Present",
    "Total_Days_Enrolled",
    "Total_Days_Unexcused_Absent",
]
STATIC_COLS = [
    c
    for c in base.columns
    if c
    not in ATT_COLS
    + [
        "Attendance_Rate",
        "STUDENT_GRADE_LEVEL",
        "SCHOOL_YEAR",
        "STUDENT_ID",
    ]
]

pk5_ids = set(
    data_5yr.query(
        "SCHOOL_YEAR == 2020 and STUDENT_GRADE_LEVEL == -1"
    ).STUDENT_ID.unique()
)
pk4_ids = set(
    data_4yr.query(
        "SCHOOL_YEAR == 2021 and STUDENT_GRADE_LEVEL == -1"
    ).STUDENT_ID.unique()
)
PK_IDS = pk5_ids | pk4_ids


def linear_impute_core(g: pd.DataFrame, allow_bfill=True) -> pd.DataFrame:
   missing_before = g[ATT_COLS].isna().any(axis=1)
   
   if allow_bfill:
       g[STATIC_COLS] = g[STATIC_COLS].ffill().bfill()
   else:
       g[STATIC_COLS] = g[STATIC_COLS].ffill()
   
   if allow_bfill:
       g[ATT_COLS] = (
           g[ATT_COLS]
           .interpolate("linear", limit_direction="both")
           .round(0)
           .astype("float32")
       )
   else:
       g[ATT_COLS] = (
           g[ATT_COLS]
           .interpolate("linear", limit_direction="forward")
           .round(0)
           .astype("float32")
       )
   
   g["STUDENT_GRADE_LEVEL_Check"] = g["STUDENT_GRADE_LEVEL"].ffill()
   if allow_bfill:
       g["STUDENT_GRADE_LEVEL_Check"] = g["STUDENT_GRADE_LEVEL_Check"].bfill()
   
   g["STUDENT_GRADE_LEVEL"] = (
       g["STUDENT_GRADE_LEVEL"]
       .fillna(g["STUDENT_GRADE_LEVEL_Check"] + 1)
       .astype("float32")
   )
   g.drop(columns="STUDENT_GRADE_LEVEL_Check", inplace=True)
   
   g["Attendance_Rate"] = (
       g["Total_Days_Present"] / g["Total_Days_Enrolled"]
   )
   
   now_filled = g[ATT_COLS].isna().any(axis=1) == False
   g["Is_Imputed"] = (missing_before & now_filled).astype("int8")
   
   return g

def impute_group(group: pd.DataFrame) -> pd.DataFrame:
   sid = group["STUDENT_ID"].iat[0]
   g = (
       group.set_index("SCHOOL_YEAR")
       .reindex(YEARS_FULL)
       .sort_index()
   )
   g.index.name = "SCHOOL_YEAR"
   g["STUDENT_ID"] = sid
   
   if sid in PK_IDS:
       g = linear_impute_core(g, allow_bfill=False)
       
       first_valid_year = group["SCHOOL_YEAR"].min()
       
       lead_mask = g.index < first_valid_year
       if lead_mask.any():
           cols_to_nan = [col for col in g.columns if col not in ["STUDENT_ID", "Is_Imputed"]]
           g.loc[lead_mask, cols_to_nan] = np.nan
           g.loc[lead_mask, "Is_Imputed"] = 0
   else:
       g = linear_impute_core(g, allow_bfill=True)
   
   return g.reset_index()


imputed_5yr = data_5yr.groupby("STUDENT_ID", group_keys=False).apply(
    impute_group
)
imputed_4yr = data_4yr.groupby("STUDENT_ID", group_keys=False).apply(
    impute_group
)

train_long = (
    pd.concat([data_6yr, imputed_5yr, imputed_4yr])
    .sort_values(["STUDENT_ID", "SCHOOL_YEAR"])
    .reset_index(drop=True)
)
train_long["Is_Imputed"].fillna(0, inplace=True)
train_long["Is_Imputed"] = train_long["Is_Imputed"].astype("int8")

train_long.to_csv(config.INTERIM_DATA_DIR / 'TrainLong.csv', index=False)

In [8]:
def pivot(metric: str) -> pd.DataFrame:
    return (
        train_long.pivot(index="STUDENT_ID", columns="SCHOOL_YEAR", values=metric)
        .add_prefix(f"{metric}_")
        .reset_index()
    )

pivots = {m: pivot(m) for m in ["Attendance_Rate", *ATT_COLS, "STUDENT_GRADE_LEVEL", "Is_Imputed"]}

static_last = (
    train_long.groupby("STUDENT_ID").agg({c: "last" for c in train_long.columns if c not in ATT_COLS + ["Attendance_Rate", "Is_Imputed", "STUDENT_ID", "SCHOOL_YEAR"]})
    .reset_index()
)

wide = pivots["Attendance_Rate"]
for k, df in pivots.items():
    if k == "Attendance_Rate":
        continue
    wide = wide.merge(df, on="STUDENT_ID", how="left")
wide = wide.merge(static_last, on="STUDENT_ID", how="left")

In [9]:
wide.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 222863 entries, 0 to 222862
Data columns (total 48 columns):
 #   Column                            Non-Null Count   Dtype  
---  ------                            --------------   -----  
 0   STUDENT_ID                        222863 non-null  int64  
 1   Attendance_Rate_2019              210522 non-null  float64
 2   Attendance_Rate_2020              217233 non-null  float64
 3   Attendance_Rate_2021              222863 non-null  float64
 4   Attendance_Rate_2022              222863 non-null  float64
 5   Attendance_Rate_2023              222863 non-null  float64
 6   Attendance_Rate_2024              222863 non-null  float64
 7   Total_Days_Present_2019           210522 non-null  float64
 8   Total_Days_Present_2020           217233 non-null  float64
 9   Total_Days_Present_2021           222863 non-null  float64
 10  Total_Days_Present_2022           222863 non-null  float64
 11  Total_Days_Present_2023           222863 non-null  f

In [10]:
training_long = wide.drop('STUDENT_GRADE_LEVEL', axis=1)
training_long.to_csv(config.INTERIM_DATA_DIR / 'TrainWide.csv', index=False)

rolling_avg = (
    pd.concat([data_1yr, data_2yr, data_3yr])
    .sort_values(by=['STUDENT_ID', 'SCHOOL_YEAR'])
    .reset_index(drop=True)
)
rolling_avg.to_csv(config.INTERIM_DATA_DIR / 'RollingAvgStudents.csv', index=False)