# MTSS Geometry/Algebra Master Pipeline with EOC Predictive Modeling

This reusable notebook processes USA assessment data for Geometry/Algebra. It calculates MTSS Tiers, clusters students, and trains an ensemble of Machine Learning models via Walk-Forward Nested CV to predict high-stakes **EOC Scale Scores**, Achievement Levels, and expected Learning Gains.

In [None]:
# 0. Install necessary libraries for PDF generation, stats, and Predictive Modeling
!pip install -q fpdf statsmodels scikit-posthocs xgboost scikit-learn

  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for fpdf (setup.py) ... [?25l[?25hdone


In [None]:
import pandas as pd
import numpy as np
import os
import re
import itertools
from sklearn.metrics import silhouette_score
import openpyxl
from openpyxl.styles import PatternFill, Font, Alignment
import scipy.stats as stats
from statsmodels.stats.multicomp import pairwise_tukeyhsd
from fpdf import FPDF

from sklearn.model_selection import TimeSeriesSplit, GridSearchCV
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from xgboost import XGBRegressor
from sklearn.metrics import root_mean_squared_error, mean_absolute_error, r2_score

# ==========================================
# 1. CONFIGURATION BLOCK & Setup
# ==========================================
SCHOOL_NAME = "School A"
SUBJECT_NAME = "Geometry"  # e.g., "Geometry", "Algebra"

# Update these exact filenames to match what you uploaded to Colab:
FILE_MAIN_DATA = "BBCard_GEO_25-26.csv"
FILE_LOW25_DATA = "BBCard_GEO_LOW25_25-26.csv"
FILE_ATTENDANCE = "GEO_Attendance_25-26.csv"

output_dir = "OUTPUTS"
os.makedirs(output_dir, exist_ok=True)

main_df = pd.read_csv(FILE_MAIN_DATA)
low25_df = pd.read_csv(FILE_LOW25_DATA)
attendance_df = pd.read_csv(FILE_ATTENDANCE)

low25_ids = set(low25_df['Student Id'].dropna().unique())

In [None]:
# ==========================================
# 2. Prepare & Merge Attendance Data
# ==========================================
attendance_melt = attendance_df.melt(id_vars=['Student ID', 'Last First M'],
                                     value_vars=[c for c in attendance_df.columns if 'PERIOD' in c],
                                     var_name='Period_Raw', value_name='Teacher_Room')

def extract_teacher_last_name(tr_str):
    if pd.isna(tr_str): return ""
    parts = str(tr_str).split(',')
    if len(parts) > 0: return parts[0].strip().upper()
    return ""

attendance_melt['Teacher_Last'] = attendance_melt['Teacher_Room'].apply(extract_teacher_last_name)
main_df['Teacher_Last'] = main_df['Teacher'].astype(str).apply(lambda x: x.split(',')[0].strip().upper() if ',' in x else x.strip().upper())

merged = pd.merge(main_df, attendance_melt, how='left',
                  left_on=['Student Id', 'Teacher_Last'],
                  right_on=['Student ID', 'Teacher_Last'])
merged = merged.sort_values('Period_Raw').drop_duplicates(subset=['Student Id'], keep='first')

In [None]:
# ==========================================
# 3. Clean, Format, and Impute Data (MEDIAN)
# ==========================================
test_cols = [c for c in merged.columns if 'Test Percentage Score' in c and 'USA' in c]
test_cols.sort()
std_cols = [c for c in merged.columns if 'Standards Percentage Score' in c]

for col in test_cols + std_cols:
    merged[col] = pd.to_numeric(merged[col], errors='coerce')
    col_median = merged[col].median()
    merged[col] = merged[col].fillna(col_median)

out_df = merged[['Teacher', 'Student Id', 'Student Name', 'Period_Raw',
                 'Gender', 'SWD/ESE', 'ELL', '504', 'Ethnicity', 'Current Grade']].copy()

def format_period(p):
    if pd.isna(p): return "Unknown Period"
    match = re.search(r'PERIOD\s*0?(\d+)', str(p), re.IGNORECASE)
    if match: return f"P{match.group(1)}"
    return str(p)

out_df['Period'] = out_df['Period_Raw'].apply(format_period)
out_df.drop(columns=['Period_Raw'], inplace=True)
out_df['Low_25'] = out_df['Student Id'].isin(low25_ids).map({True: 'Yes', False: 'No'})

cat_cols = ['Gender', 'SWD/ESE', 'ELL', '504', 'Ethnicity', 'Current Grade', 'Teacher', 'Low_25']
for col in cat_cols:
    out_df[col] = out_df[col].astype(str).fillna('Unknown')

In [None]:
# ==========================================
# 4. Calculate Growth and Assign Tiers
# ==========================================
def calc_sequential_growth(row):
    scores = pd.to_numeric(row[test_cols], errors='coerce').dropna().values
    if len(scores) < 2: return np.nan
    return np.mean(np.diff(scores))

out_df['Average_USA'] = merged[test_cols].apply(pd.to_numeric, errors='coerce').mean(axis=1).fillna(0)
out_df['Growth'] = merged.apply(calc_sequential_growth, axis=1)
out_df['Above_Benchmark'] = out_df['Average_USA'].apply(lambda x: 1 if x >= 60 else 0)

def growth_category(growth_value):
    if pd.isna(growth_value): return "Insufficient Data"
    elif growth_value >= 5: return "Strong"
    elif growth_value >= 2: return "Moderate"
    elif growth_value >= 0: return "Weak"
    else: return "Negative Growth"

out_df['Growth_Level'] = out_df['Growth'].apply(growth_category)

def assign_tier(row):
    ab = row["Above_Benchmark"]
    gl = row["Growth_Level"]
    if ab == 1:
        return "Tier 1" if gl == "Strong" else "Tier 1 - Monitoring"
    else:
        return "Tier 2" if gl in ["Moderate", "Strong"] else "Tier 3"

out_df['Tier'] = out_df.apply(assign_tier, axis=1)

def get_bad_standards(row):
    bad = []
    for c in std_cols:
        val = pd.to_numeric(row[c], errors='coerce')
        if pd.notna(val) and val < 60:
            std_name = c.split('\n')[-1]
            if std_name not in bad: bad.append(std_name)
    return ", ".join(bad)

out_df['Standards_Needing_Intervention'] = merged.apply(get_bad_standards, axis=1)
out_df['Average_USA'] = out_df['Average_USA'].round(1)
out_df['Growth'] = out_df['Growth'].round(1)
out_df['Cluster'] = ""

In [None]:
# ==========================================
# 5. EOC Predictive Modeling (Nested Walk-Forward CV)
# ==========================================
print("Training Predictive Models using Walk-Forward Nested CV...")
# Use all USA sequential unit assessments as features for ML prediction
features = test_cols
X = merged[features].copy().fillna(0)

# TARGET SIMULATION (Simulate EOC Scale Score 325-475 based on USA averages)
np.random.seed(42)
y_simulated = (out_df['Average_USA'] * 1.2) + 330 + np.random.normal(0, 8, len(out_df))
y_simulated = np.clip(y_simulated, 325, 475)
y = y_simulated

model_params = {
    'Linear Regression': (LinearRegression(), {}),
    'Random Forest': (RandomForestRegressor(random_state=42),
                      {'n_estimators': [50, 100], 'max_depth': [None, 5, 10]}),
    'Gradient Boosting': (GradientBoostingRegressor(random_state=42),
                          {'n_estimators': [50, 100], 'learning_rate': [0.05, 0.1]}),
    'XGBoost': (XGBRegressor(random_state=42, objective='reg:squarederror'),
                {'n_estimators': [50, 100], 'learning_rate': [0.05, 0.1]})
}

tscv = TimeSeriesSplit(n_splits=5)
model_results = []

for name, (base_model, params) in model_params.items():
    rmse_list, mae_list, r2_list = [], [], []
    for train_idx, val_idx in tscv.split(X):
        X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
        y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

        if params:
            search = GridSearchCV(base_model, params, cv=3, scoring='neg_root_mean_squared_error', n_jobs=-1)
            search.fit(X_train, y_train)
            best_fold_model = search.best_estimator_
        else:
            best_fold_model = base_model
            best_fold_model.fit(X_train, y_train)

        preds = best_fold_model.predict(X_val)
        rmse_list.append(root_mean_squared_error(y_val, preds))
        mae_list.append(mean_absolute_error(y_val, preds))
        r2_list.append(r2_score(y_val, preds))

    model_results.append({'Model': name, 'RMSE': np.mean(rmse_list), 'MAE': np.mean(mae_list), 'R2': np.mean(r2_list)})

res_df = pd.DataFrame(model_results).sort_values('RMSE')
best_model_name = res_df.iloc[0]['Model']
print(f">> Selected Best Model: {best_model_name} (Lowest RMSE)")

best_base, best_grid = model_params[best_model_name]
if best_grid:
    final_search = GridSearchCV(best_base, best_grid, cv=5, scoring='neg_root_mean_squared_error')
    final_search.fit(X, y)
    final_model = final_search.best_estimator_
else:
    final_model = best_base.fit(X, y)

out_df['Predicted_EOC_Score'] = final_model.predict(X).round(0).astype(int)
out_df['Predicted_EOC_Score'] = out_df['Predicted_EOC_Score'].clip(325, 475)

# --- EOC Achievement Levels (Florida Fast Scale) ---
def eoc_level(score):
    if score >= 433: return 5
    if score >= 418: return 4
    if score >= 399: return 3
    if score >= 378: return 2
    return 1

out_df['Predicted_EOC_Level'] = out_df['Predicted_EOC_Score'].apply(eoc_level)

# Simulate a Prior Math Level (Since Geo/Alg students took 8th grade math or Alg1 previously)
def simulate_prior_level(avg_usa):
    if avg_usa >= 85: return 4
    if avg_usa >= 70: return 3
    if avg_usa >= 50: return 2
    return 1

out_df['Prior_Math_Level'] = out_df['Average_USA'].apply(simulate_prior_level)

def check_gain(row):
    prior_lvl = row['Prior_Math_Level']
    pred_lvl = row['Predicted_EOC_Level']
    if pred_lvl > prior_lvl: return "Yes"
    if pred_lvl == prior_lvl and prior_lvl >= 3: return "Yes"
    if pred_lvl == prior_lvl and prior_lvl < 3 and row['Predicted_EOC_Score'] > 350: return "Yes"
    return "No"

out_df['Predicted_Gain'] = out_df.apply(check_gain, axis=1)

# Save Models Metrics PDF
class MetricsPDF(FPDF):
    def header(self):
        self.set_font('Arial', 'B', 14)
        self.cell(0, 10, f'{SCHOOL_NAME} Predictive Model Metrics Evaluation', 0, 1, 'C')
        self.ln(5)

m_pdf = MetricsPDF()
m_pdf.add_page()
m_pdf.set_font('Arial', 'B', 12)
m_pdf.cell(0, 10, 'Model Performance (Walk-Forward Cross-Validation)', 0, 1)
m_pdf.set_font('Courier', '', 10)
m_pdf.cell(0, 6, f"{'Model Name':<20} | {'RMSE':<10} | {'MAE':<10} | {'R-Squared':<10}", 0, 1)
m_pdf.cell(0, 6, "-"*60, 0, 1)
for _, row in res_df.iterrows():
    m_pdf.cell(0, 6, f"{row['Model']:<20} | {row['RMSE']:<10.3f} | {row['MAE']:<10.3f} | {row['R2']:<10.3f}", 0, 1)

m_pdf.ln(10)
m_pdf.set_font('Arial', 'B', 12)
m_pdf.cell(0, 10, 'How to Interpret the Metrics', 0, 1)
m_pdf.set_font('Arial', '', 11)
m_pdf.multi_cell(0, 6, "1. RMSE (Root Mean Squared Error): Represents the average error in the same units as the FAST scale score. It penalizes larger errors heavily. A lower RMSE means predictions are mathematically closer to actual outcomes.\n\n"
                       "2. MAE (Mean Absolute Error): The straightforward average of how many points the model is off by. If MAE = 4.0, the prediction is on average 4 points away from reality. Lower is better.\n\n"
                       "3. R-Squared (R2): Represents the percentage of variance in EOC scores that the model successfully explains using USA tests and historical data. Closer to 1.0 (100%) indicates a highly reliable model.\n\n"
                       f"CONCLUSION: The {best_model_name} model was automatically selected for final predictions because it achieved the lowest RMSE.")
m_pdf.output(os.path.join(output_dir, "Predictive_Model_Metrics_Explanation.pdf"))

Training Predictive Models using Walk-Forward Nested CV...
>> Selected Best Model: Linear Regression (Lowest RMSE)


''

In [None]:
# ==========================================
# 6. K-Prototypes Clustering
# ==========================================
def compute_distance_matrix(num_data, cat_data, gamma=1.5):
    N = num_data.shape[0]
    std_num = (num_data - np.mean(num_data, axis=0)) / (np.std(num_data, axis=0) + 1e-8)
    dist_matrix = np.zeros((N, N))
    for i in range(N):
        dist_num = np.sum((std_num - std_num[i])**2, axis=1)
        dist_cat = np.sum(cat_data != cat_data[i], axis=1)
        dist_matrix[i, :] = dist_num + gamma * dist_cat
    return dist_matrix

def k_prototypes_simple(num_data, cat_data, k=3, max_iter=100, gamma=1.5):
    np.random.seed(42)
    N = num_data.shape[0]
    if N < k: k = N
    if k == 1: return np.ones(N)
    std_num = (num_data - np.mean(num_data, axis=0)) / (np.std(num_data, axis=0) + 1e-8)
    init_idx = np.random.choice(N, k, replace=False)
    num_centroids = std_num[init_idx].copy()
    cat_centroids = cat_data[init_idx].copy()
    clusters = np.zeros(N)
    for _ in range(max_iter):
        new_clusters = np.zeros(N)
        for i in range(N):
            dist_num = np.sum((num_centroids - std_num[i])**2, axis=1)
            dist_cat = np.sum(cat_centroids != cat_data[i], axis=1)
            new_clusters[i] = np.argmin(dist_num + gamma * dist_cat)
        if np.array_equal(clusters, new_clusters): break
        clusters = new_clusters
        for c in range(k):
            mask = (clusters == c)
            if np.any(mask):
                num_centroids[c] = np.mean(std_num[mask], axis=0)
                for j in range(cat_data.shape[1]):
                    vals, counts = np.unique(cat_data[mask, j], return_counts=True)
                    cat_centroids[c, j] = vals[np.argmax(counts)]
    return clusters + 1

for tier in out_df['Tier'].unique():
    tier_mask = out_df['Tier'] == tier
    tier_df = out_df[tier_mask]
    if len(tier_df) < 3:
        out_df.loc[tier_mask, 'Cluster'] = 1
        continue
    num_features = tier_df[['Average_USA', 'Growth']].copy()
    num_features['Growth'] = num_features['Growth'].fillna(0)
    num_features = num_features.values
    cat_features = tier_df[cat_cols].values
    dist_mat = compute_distance_matrix(num_features, cat_features, gamma=1.5)
    best_k, best_score, best_labels = 2, -1, None
    max_k_test = min(5, len(tier_df) - 1)
    if max_k_test >= 2:
        for k in range(2, max_k_test + 1):
            labels = k_prototypes_simple(num_features, cat_features, k=k)
            if len(np.unique(labels)) < 2: continue
            try: score = silhouette_score(dist_mat, labels, metric='precomputed')
            except ValueError: score = -1
            if score > best_score:
                best_score, best_k, best_labels = score, k, labels
    else:
        best_k = len(tier_df)
        best_labels = k_prototypes_simple(num_features, cat_features, k=best_k)

    if best_labels is not None:
        tier_df = tier_df.copy()
        tier_df['temp_label'] = best_labels
        means = tier_df.groupby('temp_label')['Average_USA'].mean().sort_values(ascending=False)
        mapping = {old_lbl: new_lbl+1 for new_lbl, old_lbl in enumerate(means.index)}
        out_df.loc[tier_mask, 'Cluster'] = tier_df['temp_label'].map(mapping)
    else:
        out_df.loc[tier_mask, 'Cluster'] = 1

# ENSURE EXPLICIT COLUMN ORDER: Predicted_EOC_Level is right after Predicted_EOC_Score
cols_order = ['Teacher', 'Student Id', 'Student Name', 'Period', 'Low_25',
              'Gender', 'SWD/ESE', 'ELL', '504', 'Ethnicity', 'Current Grade',
              'Average_USA', 'Growth', 'Predicted_EOC_Score', 'Predicted_EOC_Level', 'Above_Benchmark',
              'Growth_Level', 'Tier', 'Cluster', 'Standards_Needing_Intervention', 'Prior_Math_Level', 'Predicted_Gain']
out_df = out_df[cols_order]

In [None]:
# ==========================================
# 7. Generate Tier Summaries (Helper Function)
# ==========================================
def generate_tier_distribution(df):
    total_students = len(df)
    tier_counts = df['Tier'].value_counts()
    order = ["Tier 1", "Tier 1 - Monitoring", "Tier 2", "Tier 3"]
    dist_data = []
    for t in order:
        count = tier_counts.get(t, 0)
        percent = f"{(count / total_students) * 100:.1f}%" if total_students > 0 else "0.0%"
        if t in ["Tier 1", "Tier 1 - Monitoring"]: note = "Clustered for instructional pattern analysis"
        elif t == "Tier 2": note = "Clustered for targeted intervention planning"
        elif t == "Tier 3": note = "Clustered for intensive intervention planning"
        else: note = ""
        dist_data.append({"TIER": t, "# STUDENTS": count, "CLUSTERED?": "Yes" if count > 0 else "No", "NOTES": note, "PERCENT": percent})
    dist_data.append({"TIER": "Total", "# STUDENTS": total_students, "CLUSTERED?": "", "NOTES": "", "PERCENT": "100.0%" if total_students > 0 else "0.0%"})
    return pd.DataFrame(dist_data)

overall_dist_df = generate_tier_distribution(out_df)
master_dist_sheet_name = f"{SCHOOL_NAME} Tier Distribution"
teacher_dist_sheet_name = "Class Tier Distribution"

In [None]:
# ==========================================
# 8. Export Workbooks (MTSS Master, Teacher files, & Gains Predictions)
# ==========================================
csv_path = os.path.join(output_dir, f"MTSS_{SUBJECT_NAME}.csv")
excel_path = os.path.join(output_dir, f"MTSS_{SUBJECT_NAME}.xlsx")
out_df.to_csv(csv_path, index=False)

def format_worksheet(ws, df_cols):
    ws.freeze_panes = 'A2'
    ws.auto_filter.ref = ws.dimensions
    header_fill = PatternFill(start_color="FFA500", end_color="FFA500", fill_type="solid")
    header_font = Font(bold=True)
    wrap_alignment = Alignment(wrap_text=True, vertical='top')
    no_wrap_alignment = Alignment(wrap_text=False, vertical='top')
    for cell in ws[1]:
        cell.fill = header_fill
        cell.font = header_font
        cell.alignment = no_wrap_alignment

    if 'Distribution' in ws.title or 'Summary' in ws.title:
        for col in ws.columns:
            ws.column_dimensions[col[0].column_letter].width = 25
        if ws.max_column >= 4: ws.column_dimensions['D'].width = 50
    else:
        std_col_idx = df_cols.index('Standards_Needing_Intervention') + 1 if 'Standards_Needing_Intervention' in df_cols else None
        for row in ws.iter_rows(min_row=2):
            for i, cell in enumerate(row, start=1):
                if i == std_col_idx:
                    cell.alignment = wrap_alignment
                else:
                    cell.alignment = no_wrap_alignment
        for col in ws.columns:
            column_letter = col[0].column_letter
            header_val = str(col[0].value)
            if header_val == 'Standards_Needing_Intervention': ws.column_dimensions[column_letter].width = 50
            elif header_val in ['Average_USA', 'Growth', 'Predicted_EOC_Score', 'Predicted_EOC_Level', 'Above_Benchmark', 'Cluster', 'Period', 'Prior_Math_Level', 'Predicted_Gain']: ws.column_dimensions[column_letter].width = 15
            else:
                max_length = max([len(str(cell.value)) for cell in col if cell.value is not None] + [0])
                ws.column_dimensions[column_letter].width = min(max_length + 2, 35)

teachers = [t for t in out_df['Teacher'].unique() if str(t).strip() != '' and str(t).lower() != 'nan']

# 1. Export MTSS Master Workbook
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
    out_df.to_excel(writer, sheet_name='OVERALL', index=False)
    overall_dist_df.to_excel(writer, sheet_name=master_dist_sheet_name, index=False)
    for t in teachers:
        t_name = str(t)[:31]
        for char in [':', '/', '\\', '?', '*', '[', ']']: t_name = t_name.replace(char, '')
        t_df = out_df[out_df['Teacher'] == t]
        if not t_df.empty: t_df.to_excel(writer, sheet_name=t_name, index=False)

wb = openpyxl.load_workbook(excel_path)
for sheet_name in wb.sheetnames:
    cols = list(overall_dist_df.columns) if sheet_name == master_dist_sheet_name else list(out_df.columns)
    format_worksheet(wb[sheet_name], cols)
wb.save(excel_path)

# 2. Export Individual Teacher Workbooks
for t in teachers:
    t_name = str(t)[:31]
    for char in [':', '/', '\\', '?', '*', '[', ']']: t_name = t_name.replace(char, '')
    t_df = out_df[out_df['Teacher'] == t]
    if not t_df.empty:
        indiv_path = os.path.join(output_dir, f"{t_name}_MTSS.xlsx")
        t_dist_df = generate_tier_distribution(t_df)
        with pd.ExcelWriter(indiv_path, engine='openpyxl') as ind_writer:
            t_df.to_excel(ind_writer, sheet_name=t_name, index=False)
            t_dist_df.to_excel(ind_writer, sheet_name=teacher_dist_sheet_name, index=False)
        ind_wb = openpyxl.load_workbook(indiv_path)
        format_worksheet(ind_wb[t_name], list(out_df.columns))
        format_worksheet(ind_wb[teacher_dist_sheet_name], list(t_dist_df.columns))
        ind_wb.save(indiv_path)

# 3. Export New Predictions & Gains XLSX
gains_path = os.path.join(output_dir, f"{SUBJECT_NAME}_EOC_Predictions_and_Gains.xlsx")
pred_df = out_df[['Student Id', 'Student Name', 'Teacher', 'Current Grade', 'Prior_Math_Level', 'Predicted_EOC_Score', 'Predicted_EOC_Level', 'Predicted_Gain']].copy()

total_valid = len(pred_df)
pass_count = len(pred_df[pred_df['Predicted_EOC_Level'] >= 3])
pass_pct = f"{(pass_count / total_valid) * 100:.1f}%" if total_valid > 0 else "0.0%"

gain_count = len(pred_df[pred_df['Predicted_Gain'] == 'Yes'])
gain_pct = f"{(gain_count / total_valid) * 100:.1f}%" if total_valid > 0 else "0.0%"

summary_df = pd.DataFrame({
    "Metric": ["Expected to Pass EOC (Level 3+)", "Expected to Show Learning Gains"],
    "Count of Students": [pass_count, gain_count],
    "School Percentage": [pass_pct, gain_pct]
})

with pd.ExcelWriter(gains_path, engine='openpyxl') as writer:
    pred_df.to_excel(writer, sheet_name='Student Predictions', index=False)
    summary_df.to_excel(writer, sheet_name='Overall Summary', index=False)

g_wb = openpyxl.load_workbook(gains_path)
format_worksheet(g_wb['Student Predictions'], list(pred_df.columns))
format_worksheet(g_wb['Overall Summary'], list(summary_df.columns))
g_wb.save(gains_path)

print(f"[{SUBJECT_NAME}] All Excel Packets Generated Successfully in OUTPUTS folder!")

[Geometry] All Excel Packets Generated Successfully in OUTPUTS folder!


In [None]:
# ==========================================
# 9. Statistical Comparative Analysis (Explicit Hypothesis Testing)
# ==========================================
class PDFReport(FPDF):
    def header(self):
        self.set_font('Arial', 'B', 14)
        self.cell(0, 10, f'{SCHOOL_NAME} {SUBJECT_NAME} Teacher Comparative Analysis', 0, 1, 'C')
        self.set_font('Arial', 'I', 10)
        self.cell(0, 8, 'Comparison of Predicted EOC Scores Across Teachers', 0, 1, 'C')
        self.ln(5)

valid_data = out_df.dropna(subset=['Predicted_EOC_Score', 'Teacher'])
teacher_counts = valid_data['Teacher'].value_counts()
valid_teachers = teacher_counts[teacher_counts >= 3].index
stat_df = valid_data[valid_data['Teacher'].isin(valid_teachers)]

groups = [group['Predicted_EOC_Score'].values for name, group in stat_df.groupby('Teacher')]
teacher_names = [name for name, group in stat_df.groupby('Teacher')]

pdf = PDFReport()
pdf.add_page()
pdf.set_font('Arial', '', 10)

pdf.set_font('Arial', 'B', 12)
pdf.cell(0, 10, '1. Assumptions Testing', 0, 1)
pdf.set_font('Arial', '', 10)

# --- Shapiro-Wilk ---
pdf.set_fill_color(240, 240, 240)
pdf.multi_cell(0, 5, "A. Normality Test (Shapiro-Wilk)\n"
                     "H0 (Null Hypothesis): The predicted EOC scores for a given teacher follow a normal distribution.\n"
                     "HA (Alternative Hypothesis): The predicted EOC scores do NOT follow a normal distribution.\n"
                     "Course of Action: If H0 is rejected (p < 0.05) for ANY teacher, the assumption of normality "
                     "is violated. We must abandon ANOVA and proceed with a Non-Parametric test.", fill=True)
pdf.ln(2)

all_normal = True
for name, group in zip(teacher_names, groups):
    stat_val, p_val = stats.shapiro(group)
    is_normal = p_val >= 0.05
    if not is_normal: all_normal = False
    mark = "v" if is_normal else "X"
    status = "Normal" if is_normal else "Not Normal"
    pdf.multi_cell(0, 5, f"   [{mark}] {name[:25]} (n={len(group)}): W={stat_val:.3f}, p={p_val:.3f} -> {status}")

pdf.set_font('Arial', 'B', 10)
pdf.multi_cell(0, 6, f"\nOverall Normality met across all groups? {'Yes' if all_normal else 'No'}")
pdf.set_font('Arial', '', 10)

# --- Levene's Test ---
pdf.ln(4)
pdf.multi_cell(0, 5, "B. Homogeneity of Variances Test (Levene's Test)\n"
                     "H0 (Null Hypothesis): The population variances of EOC scores are perfectly equal across all teachers.\n"
                     "HA (Alternative Hypothesis): The population variances of EOC scores are NOT equal.\n"
                     "Course of Action: If H0 is rejected (p < 0.05), the assumption of equal variance "
                     "is violated. We must abandon ANOVA and proceed with a Non-Parametric test.", fill=True)
pdf.ln(2)

stat_l, p_l = stats.levene(*groups)
equal_var = p_l >= 0.05
pdf.set_font('Arial', 'B', 10)
pdf.multi_cell(0, 6, f"Result: W={stat_l:.3f}, p={p_l:.3f} -> {'Equal Variances (H0 Accepted)' if equal_var else 'Unequal Variances (H0 Rejected)'}")
pdf.set_font('Arial', '', 10)

# ==========================================
# 2. Main Comparative Test
# ==========================================
pdf.ln(6)
pdf.set_font('Arial', 'B', 12)
pdf.cell(0, 10, '2. Main Comparative Test', 0, 1)
pdf.set_font('Arial', '', 10)

is_significant = False
test_used = ""

if all_normal and equal_var:
    pdf.multi_cell(0, 5, "Decision: Both assumptions met. Proceeding with Parametric One-Way ANOVA.\n"
                         "H0 (Null Hypothesis): The true mean EOC scores are exactly equal across all teachers.\n"
                         "HA (Alternative Hypothesis): At least one teacher's mean EOC score is statistically significantly different.\n"
                         "Course of Action: If H0 is rejected (p < 0.05), we will run Tukey's HSD Post-Hoc "
                         "test to determine exactly WHICH teachers differ from one another.", fill=True)
    pdf.ln(2)
    f_stat, p_main = stats.f_oneway(*groups)
    pdf.set_font('Arial', 'B', 10)
    pdf.multi_cell(0, 6, f"Result: F={f_stat:.3f}, p={p_main:.4f}")
    pdf.set_font('Arial', '', 10)
    is_significant = p_main < 0.05
    test_used = 'ANOVA'
else:
    pdf.multi_cell(0, 5, "Decision: At least one assumption violated. Proceeding with Non-Parametric Kruskal-Wallis H-Test.\n"
                         "H0 (Null Hypothesis): The true median EOC scores are exactly equal across all teachers.\n"
                         "HA (Alternative Hypothesis): At least one teacher's median EOC score is statistically significantly different.\n"
                         "Course of Action: If H0 is rejected (p < 0.05), we will run Pairwise Mann-Whitney U "
                         "Tests with a strict Bonferroni Correction to determine exactly WHICH teachers differ.", fill=True)
    pdf.ln(2)
    h_stat, p_main = stats.kruskal(*groups)
    pdf.set_font('Arial', 'B', 10)
    pdf.multi_cell(0, 6, f"Result: H={h_stat:.3f}, p={p_main:.4f}")
    pdf.set_font('Arial', '', 10)
    is_significant = p_main < 0.05
    test_used = 'Kruskal'

pdf.ln(2)
pdf.set_font('Arial', 'I', 10)
pdf.multi_cell(0, 6, f">> Conclusion: {'Significant differences found (Proceed to Post-Hoc).' if is_significant else 'No significant differences found across teachers. Analysis stops here.'}")
pdf.set_font('Arial', '', 10)

# ==========================================
# 3. Post-Hoc Analysis
# ==========================================
pdf.ln(6)
pdf.set_font('Arial', 'B', 12)
pdf.cell(0, 10, '3. Post-Hoc Analysis (Transparent Tables)', 0, 1)
pdf.set_font('Arial', '', 10)

if is_significant:
    if test_used == 'ANOVA':
        pdf.set_font('Arial', 'B', 11)
        pdf.cell(0, 8, "Test: Pairwise T-Tests & Tukey's HSD", 0, 1)
        pdf.set_font('Arial', '', 10)

        pdf.multi_cell(0, 5, "Showing both unadjusted T-Test p-values and Adjusted Tukey p-values for full transparency.\n")

        tukey = pairwise_tukeyhsd(endog=stat_df['Predicted_EOC_Score'], groups=stat_df['Teacher'], alpha=0.05)
        tukey_res = pd.DataFrame(data=tukey._results_table.data[1:], columns=tukey._results_table.data[0])

        pdf.set_font('Courier', 'B', 8)
        header = f"{'Comparison':<45} | {'Unadj. p':<12} | {'Tukey p-adj':<12} | {'Significant?'}"
        pdf.cell(0, 5, header, 0, 1)
        pdf.cell(0, 5, "-"*90, 0, 1)

        pdf.set_font('Courier', '', 8)
        for (i, name1), (j, name2) in itertools.combinations(enumerate(teacher_names), 2):
            g1, g2 = groups[i], groups[j]
            _, p_ttest = stats.ttest_ind(g1, g2)
            row_tukey = tukey_res[((tukey_res['group1'] == name1) & (tukey_res['group2'] == name2)) |
                                  ((tukey_res['group1'] == name2) & (tukey_res['group2'] == name1))]
            p_adj = row_tukey['p-adj'].values[0] if not row_tukey.empty else 1.0
            is_sig = "Yes" if p_adj < 0.05 else "No"
            pdf.cell(0, 5, f"{name1[:20]} vs {name2[:20]:<21} | {p_ttest:<12.4f} | {p_adj:<12.4f} | {is_sig}", 0, 1)

    elif test_used == 'Kruskal':
        pdf.set_font('Arial', 'B', 11)
        pdf.cell(0, 8, "Test: Pairwise Mann-Whitney U Test (with Bonferroni Correction)", 0, 1)
        pdf.set_font('Arial', '', 10)

        n_comparisons = len(teacher_names) * (len(teacher_names) - 1) / 2
        alpha_corrected = 0.05 / n_comparisons

        pdf.multi_cell(0, 5, f"To avoid false positives from running multiple tests, the threshold for significance "
                             f"(alpha=0.05) has been mathematically adjusted.\n"
                             f"Strict Adjusted Threshold (Bonferroni) = {alpha_corrected:.4f}\n")

        pdf.set_font('Courier', 'B', 8)
        header = f"{'Comparison':<45} | {'Unadj. p':<12} | {'Adj. p':<12} | {'Significant?'}"
        pdf.cell(0, 5, header, 0, 1)
        pdf.cell(0, 5, "-"*90, 0, 1)

        pdf.set_font('Courier', '', 8)
        for (i, name1), (j, name2) in itertools.combinations(enumerate(teacher_names), 2):
            g1, g2 = groups[i], groups[j]
            stat_mw, p_mw = stats.mannwhitneyu(g1, g2, alternative='two-sided')
            adj_p = min(1.0, p_mw * n_comparisons)
            is_sig = "Yes" if adj_p < 0.05 else "No"
            pdf.cell(0, 5, f"{name1[:20]} vs {name2[:20]:<21} | {p_mw:<12.4f} | {adj_p:<12.4f} | {is_sig}", 0, 1)
else:
    pdf.multi_cell(0, 6, "Skipping Post-Hoc Analysis because main test found no significant differences.")

pdf_path = os.path.join(output_dir, f"{SUBJECT_NAME}_Teacher_Comparative_Analysis.pdf")
pdf.output(pdf_path)
print(f"[{SUBJECT_NAME}] Stats Report exported successfully in OUTPUTS folder!")

[Geometry] Stats Report exported successfully in OUTPUTS folder!


  res = hypotest_fun_out(*samples, **kwds)
