In [None]:
from __future__ import annotations 

import os 
import sys 
import json 
import time 
import random 
import platform 
import logging 
from pathlib import Path 
from typing import Dict ,Any ,Optional ,Tuple ,List 
import joblib 
import csv 
import glob 
import math 
import re 
import shutil 
import uuid 
import warnings 
from datetime import datetime 
from sklearn .model_selection import train_test_split 
import numpy as np 
import pandas as pd 

SEED =42 

def set_global_seed (seed :int =42 )->None :
    """Best-effort determinism across python/numpy/(optional torch)."""
    os .environ ["PYTHONHASHSEED"]=str (seed )
    random .seed (seed )
    np .random .seed (seed )

    try :
        import torch 
        torch .manual_seed (seed )
        if torch .cuda .is_available ():
            torch .cuda .manual_seed_all (seed )
        torch .backends .cudnn .deterministic =True 
        torch .backends .cudnn .benchmark =False 
    except Exception :
        pass 

def detect_project_root ()->Path :
    """
    Robust: wenn Notebook in project_root/notebooks liegt -> root = parent,
    sonst root = aktuelles Arbeitsverzeichnis.
    """
    cwd =Path .cwd ().resolve ()
    if cwd .name .lower ()=="notebooks":
        return cwd .parent 
    return cwd 

def ensure_project_structure (root :Path )->Dict [str ,Path ]:
    paths ={
    "project_root":root ,
    "data_raw":root /"data"/"raw",
    "data_processed":root /"data"/"processed",
    "data_features":root /"data"/"features",
    "models":root /"models",
    "reports":root /"reports",
    "figs":root /"figs",
    "notebooks":root /"notebooks",
    }
    for p in paths .values ():
        p .mkdir (parents =True ,exist_ok =True )
    return paths 

def setup_logging (log_path :Path )->logging .Logger :
    logger =logging .getLogger ("airline_rating_project")
    logger .setLevel (logging .INFO )
    logger .handlers .clear ()

    fmt =logging .Formatter ("%(asctime)s | %(levelname)s | %(message)s")

    fh =logging .FileHandler (log_path ,encoding ="utf-8")
    fh .setFormatter (fmt )
    fh .setLevel (logging .INFO )

    sh =logging .StreamHandler (sys .stdout )
    sh .setFormatter (fmt )
    sh .setLevel (logging .INFO )

    logger .addHandler (fh )
    logger .addHandler (sh )
    logger .propagate =False 
    return logger 

def capture_environment ()->Dict [str ,Any ]:
    env ={
    "python_version":sys .version .replace ("\n"," "),
    "platform":platform .platform (),
    "machine":platform .machine (),
    "processor":platform .processor (),
    "executable":sys .executable ,
    "cwd":str (Path .cwd ().resolve ()),
    "time_utc":time .strftime ("%Y-%m-%dT%H:%M:%SZ",time .gmtime ()),
    "packages":{
    "numpy":getattr (np ,"__version__",None ),
    "pandas":getattr (pd ,"__version__",None ),
    },
    }
    optional ={
    "scipy":"scipy",
    "sklearn":"sklearn",
    "matplotlib":"matplotlib",
    "sentence_transformers":"sentence_transformers",
    "transformers":"transformers",
    "torch":"torch",
    "datasets":"datasets",
    "accelerate":"accelerate",
    "jsonschema":"jsonschema",
    "requests":"requests",
    "pyarrow":"pyarrow",
    }
    for k ,mod in optional .items ():
        try :
            m =__import__ (mod )
            env ["packages"][k ]=getattr (m ,"__version__","unknown")
        except Exception :
            env ["packages"][k ]=None 

    try :
        import torch 
        env ["torch"]={
        "cuda_available":torch .cuda .is_available (),
        "mps_available":hasattr (torch .backends ,"mps")and torch .backends .mps .is_available (),
        "device_count_cuda":torch .cuda .device_count ()if torch .cuda .is_available ()else 0 ,
        }
    except Exception :
        env ["torch"]=None 

    return env 

def check_ollama (base_url :str ="http://localhost:11434")->Dict [str ,Any ]:
    """
    Non-fatal Check: prüft, ob Ollama erreichbar ist und welche Modelle gelistet werden.
    """
    out ={"reachable":False ,"base_url":base_url ,"models":None ,"error":None }
    try :
        import requests 
        r =requests .get (f"{base_url}/api/tags",timeout =2 )
        if r .status_code ==200 :
            out ["reachable"]=True 
            data =r .json ()
            out ["models"]=[m .get ("name")for m in data .get ("models",[])]
        else :
            out ["error"]=f"HTTP {r.status_code}"
    except Exception as e :
        out ["error"]=repr (e )
    return out 

set_global_seed (SEED )

PATHS =ensure_project_structure (detect_project_root ())
LOG_PATH =PATHS ["reports"]/"run.log"
LOGGER =setup_logging (LOG_PATH )

RUN_ID =time .strftime ("%Y%m%d_%H%M%S",time .gmtime ())
RUN_TAG ="default"
LOGGER .info (f"RUN_ID={RUN_ID} RUN_TAG={RUN_TAG}")

CONFIG ={
"seed":SEED ,
"llm_subset_size":800 ,
"rating_bins_default":[[1 ,3 ],[4 ,7 ],[8 ,10 ]],
"ollama_model":"phi3:mini",
"ollama_base_url":"http://localhost:11434",
"created_utc":time .strftime ("%Y-%m-%dT%H:%M:%SZ",time .gmtime ()),
}

ENV =capture_environment ()
OLLAMA =check_ollama (CONFIG ["ollama_base_url"])

config_path =PATHS ["reports"]/"config.json"
env_path =PATHS ["reports"]/"environment.json"
with open (config_path ,"w",encoding ="utf-8")as f :
    json .dump (CONFIG ,f ,indent =2 ,ensure_ascii =False )
with open (env_path ,"w",encoding ="utf-8")as f :
    json .dump (ENV ,f ,indent =2 ,ensure_ascii =False )

if OLLAMA ["reachable"]:
    print ("Ollama models (first 10):",(OLLAMA ["models"]or [])[:10 ])
else :
    print ("Ollama check error:",OLLAMA ["error"])


with open (PATHS ["reports"]/"config.json","r",encoding ="utf-8")as f :
    _cfg =json .load (f )
with open (PATHS ["reports"]/"environment.json","r",encoding ="utf-8")as f :
    _env =json .load (f )

CSV_CANDIDATES =[]
_env =os .getenv ("AIRLINE_CSV_PATH")
if _env :
    CSV_CANDIDATES .append (Path (_env ).expanduser ())

CSV_CANDIDATES +=[
PATHS ["data_raw"]/"BA_AirlineReviews.csv",
PATHS ["project_root"]/"BA_AirlineReviews.csv",
Path ("BA_AirlineReviews.csv"),
Path ("/Users/jackgilbert/Desktop/ML_Praktikum/BA_AirlineReviews.csv").expanduser (),
]

CSV_PATH =None 
for _p in CSV_CANDIDATES :
    try :
        _pp =_p .expanduser ().resolve ()
    except Exception :
        _pp =_p 
    if Path (_pp ).exists ():
        CSV_PATH =Path (_pp )
        break 

if CSV_PATH is None :
    raise FileNotFoundError (
    "CSV nicht gefunden. Setze AIRLINE_CSV_PATH oder lege die Datei unter data/raw/BA_AirlineReviews.csv ab.\n"
    f"Geprüfte Pfade: {[str(p) for p in CSV_CANDIDATES]}"
    )

def sniff_csv_format (path :Path ,bytes_to_read :int =50_000 )->Dict [str ,Any ]:
    """Snifft delimiter & quoting heuristisch."""
    with open (path ,"rb")as f :
        raw =f .read (bytes_to_read )
    try :
        sample =raw .decode ("utf-8")
        enc ="utf-8"
    except UnicodeDecodeError :
        sample =raw .decode ("latin-1",errors ="replace")
        enc ="latin-1"

    sniffer =csv .Sniffer ()
    try :
        dialect =sniffer .sniff (sample )
        delim =dialect .delimiter 
        quotechar =dialect .quotechar 
    except Exception :
        first_line =sample .splitlines ()[0 ]if sample .splitlines ()else sample 
        candidates =[",",";","\t","|"]
        delim =max (candidates ,key =lambda d :first_line .count (d ))if first_line else ","
        quotechar ='"'
    return {"encoding_guess":enc ,"delimiter":delim ,"quotechar":quotechar }

def read_csv_robust (path :Path )->Tuple [pd .DataFrame ,Dict [str ,Any ]]:
    info =sniff_csv_format (path )
    delim =info ["delimiter"]

    last_err =None 
    for enc in ["utf-8","utf-8-sig","latin-1"]:
        try :
            df =pd .read_csv (
            path ,
            sep =delim if delim else None ,
            engine ="python",
            encoding =enc ,
            on_bad_lines ="skip",
            )
            info ["encoding_used"]=enc 
            info ["rows_loaded"]=len (df )
            return df ,info 
        except Exception as e :
            last_err =e 
    raise RuntimeError (f"Konnte CSV nicht lesen. Letzter Fehler: {last_err}")

df_raw ,csv_info =read_csv_robust (CSV_PATH )

unnamed_cols =[c for c in df_raw .columns if str (c ).startswith ('Unnamed')]
if unnamed_cols :
    df_raw =df_raw .drop (columns =unnamed_cols )

raw_copy_path =PATHS ["data_raw"]/CSV_PATH .name 
try :
    _src =CSV_PATH .resolve ()
    _dst =raw_copy_path .resolve ()
except Exception :
    _src ,_dst =CSV_PATH ,raw_copy_path 

if _src !=_dst :
    if not raw_copy_path .exists ():
        shutil .copy2 (CSV_PATH ,raw_copy_path )

def norm_col (c :str )->str :
    return re .sub (r"[^a-z0-9]+","",str (c ).strip ().lower ())

def coerce_rating_series (s :pd .Series )->pd .Series :
    """
    Robust: macht aus '9', '9.0', '9/10', 'Rating: 9' -> 9.0
    Gibt float-Serie zurück (NaN wenn unparsebar).
    """
    if pd .api .types .is_numeric_dtype (s ):
        return pd .to_numeric (s ,errors ="coerce").astype (float )

    s2 =s .astype (str ).str .strip ()
    extracted =s2 .str .extract (r"(-?\d+(?:\.\d+)?)",expand =False )
    return pd .to_numeric (extracted ,errors ="coerce").astype (float )

def pick_best_target (df :pd .DataFrame )->Tuple [str ,pd .Series ,Dict [str ,Any ]]:
    """
    Wählt bestes Target-Feld:
    - priorisierte Namensmatches
    - hohe Parse-Rate
    - Wertebereich plausibel (0..10 oder 1..10)
    """
    cols =list (df .columns )
    ncols ={c :norm_col (c )for c in cols }

    priority =[
    "overall_rating","overallrating","overall","rating","score",
    "overallscore","overallrate","overallstars","stars"
    ]

    ordered =[]
    for p in priority :
        ordered +=[c for c in cols if ncols [c ]==p ]
    ordered +=[c for c in cols if any (k in ncols [c ]for k in ["overall","rating","score","stars"])]
    seen =set ()
    ordered =[c for c in ordered if not (c in seen or seen .add (c ))]

    if not ordered :
        ordered =cols [:]

    best =None 
    best_meta =None 
    best_series =None 

    for c in ordered :
        s =coerce_rating_series (df [c ])
        valid =s .notna ().mean ()
        if valid <0.2 :
            continue 

        q01 ,q99 =s .quantile (0.01 ),s .quantile (0.99 )
        range_score =0.0 
        if (q01 >=-0.5 )and (q99 <=10.5 ):
            range_score =1.0 
        if (q01 >=0.5 )and (q99 <=10.5 ):
            range_score =1.2 

        score =valid *range_score 
        meta ={"col":c ,"valid_rate":float (valid ),"q01":float (q01 ),"q99":float (q99 ),"score":float (score )}
        if (best is None )or (score >best_meta ["score"]):
            best =c 
            best_meta =meta 
            best_series =s 

    if best is None :
        raise RuntimeError ("Konnte kein plausibles Target-Feld identifizieren (zu wenig parsebare Werte).")

    return best ,best_series ,best_meta 

def pick_text_columns (df :pd .DataFrame )->Tuple [Optional [str ],str ,Dict [str ,Any ]]:
    """
    Findet review_title + review_text robust.
    Title ist optional; review_text ist required.
    
    NOTE (Setup-Fix): Wenn Spalten 'ReviewHeader' und 'ReviewBody' existieren,
    erzwingen wir: title=ReviewHeader, text=ReviewBody (Paper-Definition: Title+Body).
    """
    cols =list (df .columns )
    ncols ={c :norm_col (c )for c in cols }

    has_header =any (ncols [c ]=="reviewheader"for c in cols )
    has_body =any (ncols [c ]=="reviewbody"for c in cols )
    if has_header and has_body :
        title_col =next (c for c in cols if ncols [c ]=="reviewheader")
        text_col =next (c for c in cols if ncols [c ]=="reviewbody")
        meta ={"title_col":title_col ,"text_col":text_col ,"forced":"ReviewHeader+ReviewBody"}
        return title_col ,text_col ,meta 

    title_priority =["review_title","reviewtitle","title","summary","headline"]
    text_priority =["review_text","reviewtext","review","text","content","body","comment","comments","reviewbody"]

    title_col =None 
    for p in title_priority :
        for c in cols :
            if ncols [c ]==p :
                title_col =c 
                break 
        if title_col :
            break 

    text_col =None 
    for p in text_priority :
        for c in cols :
            if ncols [c ]==p :
                text_col =c 
                break 
        if text_col :
            break 

    obj_cols =[c for c in cols if df [c ].dtype =="object"or str (df [c ].dtype ).startswith ("string")]
    if text_col is None :
        if not obj_cols :
            raise RuntimeError ("Keine Textspalten (object/string) gefunden. Kann review_text nicht bestimmen.")
        lengths =[]
        for c in obj_cols :
            s =df [c ].astype (str )
            lengths .append ((c ,float (s .str .len ().replace ({np .inf :np .nan }).fillna (0 ).mean ())))
        lengths .sort (key =lambda x :x [1 ],reverse =True )
        text_col =lengths [0 ][0 ]
        if title_col is None and len (lengths )>1 :
            title_candidate =lengths [1 ][0 ]
            title_col =title_candidate if lengths [1 ][1 ]<lengths [0 ][1 ]else None 

    meta ={"title_col":title_col ,"text_col":text_col }
    return title_col ,text_col ,meta 

target_col ,target_s ,target_meta =pick_best_target (df_raw )
title_col ,text_col ,text_meta =pick_text_columns (df_raw )

LOGGER .info (f"Target gewählt: {target_col} | meta={target_meta}")
LOGGER .info (f"Text gewählt: title={title_col} | text={text_col}")

df =df_raw .copy ()

title_part =df [title_col ].astype (str )if title_col is not None else ""
text_part =df [text_col ].astype (str )

def clean_str_series (s :pd .Series )->pd .Series :
    s =s .fillna ("").astype (str )
    s =s .str .replace (r"\s+"," ",regex =True ).str .strip ()
    s =s .replace ({"nan":"","None":"","null":"","NULL":"","NaN":""})
    return s 

title_part =clean_str_series (title_part )if isinstance (title_part ,pd .Series )else title_part 
text_part =clean_str_series (text_part )

df ["text"]=(
(title_part +"\n\n"+text_part )if isinstance (title_part ,pd .Series )
else text_part 
).str .strip ()

_sample =df [[c for c in [title_col ,text_col ]if c is not None ]+["text"]].head (3 ).copy ()
if title_col is not None and text_col is not None :
    for _i ,_r in _sample .iterrows ():
        _t =str (_r .get (title_col ,""))
        _b =str (_r .get (text_col ,""))
        _x =str (_r .get ("text",""))

df ["target_raw"]=df [target_col ]
df ["target"]=coerce_rating_series (df [target_col ])

before =len (df )

df ["text_len"]=df ["text"].fillna ("").astype (str ).str .len ()
df =df [df ["text_len"]>0 ].copy ()

df =df [df ["target"].notna ()].copy ()

n_zero =int ((df ["target"]==0 ).sum ())
if n_zero :
    LOGGER .warning (f"Found {n_zero} rows with target==0; dropping them to enforce 1..10.")
df =df [(df ["target"]>=1 )&(df ["target"]<=10 )].copy ()

after =len (df )
dropped =before -after 

processed_path_parquet =PATHS ["data_processed"]/"reviews_processed.parquet"
processed_path_csv =PATHS ["data_processed"]/"reviews_processed.csv"
schema_path =PATHS ["reports"]/"data_schema.json"

df .reset_index (drop =True ,inplace =True )
df .to_parquet (processed_path_parquet ,index =False )
df .to_csv (processed_path_csv ,index =False )

schema ={
"created_utc":datetime .utcnow ().isoformat ()+"Z",
"raw_csv":str (raw_copy_path ),
"raw_shape":list (df_raw .shape ),
"processed_shape":list (df .shape ),
"chosen_columns":{
"title_col":title_col ,
"text_col":text_col ,
"target_col":target_col ,
},
"target_meta":target_meta ,
"cleaning":{
"dropped_rows":int (dropped ),
"rules":[
"drop empty text",
"drop missing target",
"keep 1 <= target <= 10",
],
},
"columns":{c :str (df [c ].dtype )for c in df .columns },
}

with open (schema_path ,"w",encoding ="utf-8")as f :
    json .dump (schema ,f ,indent =2 ,ensure_ascii =False )

bins =pd .cut (df ["target"],bins =[-0.1 ,3 ,7 ,10.1 ],labels =["1-3","4-7","8-10"])
dist =bins .value_counts (dropna =False ).sort_index ()

sample_i =0 

bins =pd .cut (df ["target"],bins =[-0.1 ,3 ,7 ,10.1 ],labels =["1-3","4-7","8-10"])
dist =bins .value_counts (dropna =False ).sort_index ()

sample_i =0 

processed_path_parquet =PATHS ["data_processed"]/"reviews_processed.parquet"

df_all =pd .read_parquet (processed_path_parquet ).copy ()

if "row_id"not in df_all .columns :
    df_all .insert (0 ,"row_id",np .arange (len (df_all ),dtype =np .int64 ))
else :
    df_all ["row_id"]=pd .to_numeric (df_all ["row_id"],errors ="coerce").astype ("Int64")

BIN_EDGES =[-0.1 ,3 ,7 ,10.1 ]
BIN_LABELS =["1-3","4-7","8-10"]

df_all ["rating_bin"]=pd .cut (df_all ["target"],bins =BIN_EDGES ,labels =BIN_LABELS ,include_lowest =True ,right =True )

split_dir =PATHS ["data_processed"]/"splits"
split_dir .mkdir (parents =True ,exist_ok =True )

train_path =split_dir /"train.parquet"
val_path =split_dir /"val.parquet"
test_path =split_dir /"test.parquet"

splits_meta_path =PATHS ["reports"]/"splits_meta.json"

def make_splits (df :pd .DataFrame ,seed :int =42 ,
test_size :float =0.20 ,val_size_of_trainval :float =0.20 ):
    """
    1) split: trainval vs test (stratified)
    2) split: train vs val innerhalb trainval (stratified)
    Default => test=20%, val=16% overall (20% von 80%), train=64%.
    """
    trainval ,test =train_test_split (
    df ,test_size =test_size ,random_state =seed ,stratify =df ["rating_bin"]
    )
    train ,val =train_test_split (
    trainval ,test_size =val_size_of_trainval ,random_state =seed ,stratify =trainval ["rating_bin"]
    )
    return train .reset_index (drop =True ),val .reset_index (drop =True ),test .reset_index (drop =True )

def save_split (df_split :pd .DataFrame ,path :Path )->None :
    df_split .to_parquet (path ,index =False )

def load_split (path :Path )->pd .DataFrame :
    return pd .read_parquet (path )

if train_path .exists ()and val_path .exists ()and test_path .exists ():
    df_train =load_split (train_path )
    df_val =load_split (val_path )
    df_test =load_split (test_path )
else :
    df_train ,df_val ,df_test =make_splits (df_all ,seed =SEED )
    save_split (df_train ,train_path )
    save_split (df_val ,val_path )
    save_split (df_test ,test_path )

def _relpath (p :Path )->str :
    try :
        return str (Path (p ).resolve ().relative_to (PATHS ["project_root"].resolve ()))
    except Exception :
        return str (p )

meta ={

"seed":SEED ,
"bin_edges":BIN_EDGES ,
"bin_labels":BIN_LABELS ,
"rating_bin_cut":{
"bins":BIN_EDGES ,
"labels":BIN_LABELS ,
"right":True ,
"include_lowest":True ,
"boundary_note":"3→low, 7→mid",
},
"train_test_split":{
"shuffle":True ,
"stratify_on":"rating_bin",
"random_state":SEED ,
},
"split_scheme":{"test_size":0.20 ,"val_size_of_trainval":0.20 },
"sizes":{"all":int (len (df_all )),"train":int (len (df_train )),"val":int (len (df_val )),"test":int (len (df_test ))},
"paths":{"train":_relpath (train_path ),"val":_relpath (val_path ),"test":_relpath (test_path )},
}
with open (splits_meta_path ,"w",encoding ="utf-8")as f :
    json .dump (meta ,f ,indent =2 ,ensure_ascii =False )

subset_path =split_dir /"subset_S.parquet"
subset_meta_path =PATHS ["reports"]/"subset_S_meta.json"

def balanced_stratified_sample (df :pd .DataFrame ,n :int ,seed :int =42 ,strata_col :str ="rating_bin")->pd .DataFrame :
    """
    Balanciert so gut wie möglich über Strata:
    - Ziel: ~n/num_bins pro Bin
    - Wenn ein Bin zu wenig hat, wird Rest auf andere Bins umverteilt.
    """
    rng =np .random .default_rng (seed )
    bins =list (df [strata_col ].astype (str ).unique ())
    bins =[b for b in BIN_LABELS if b in bins ]

    groups ={b :df [df [strata_col ].astype (str )==b ]for b in bins }

    k =len (bins )
    base =n //k 
    remainder =n %k 
    alloc ={b :base for b in bins }
    for i in range (remainder ):
        alloc [bins [i ]]+=1 

    available ={b :len (groups [b ])for b in bins }
    deficit =0 
    for b in bins :
        if alloc [b ]>available [b ]:
            deficit +=alloc [b ]-available [b ]
            alloc [b ]=available [b ]
    while deficit >0 :
        progressed =False 
        for b in bins :
            room =available [b ]-alloc [b ]
            if room >0 and deficit >0 :
                take =min (room ,deficit )
                alloc [b ]+=take 
                deficit -=take 
                progressed =True 
        if not progressed :
            break 

    parts =[]
    for b in bins :
        g =groups [b ]
        if alloc [b ]<=0 :
            continue 
        idx =rng .choice (g .index .to_numpy (),size =alloc [b ],replace =False )
        parts .append (df .loc [idx ])

    out =pd .concat (parts ,axis =0 ).sample (frac =1.0 ,random_state =seed ).reset_index (drop =True )
    return out 

N_S =min (CONFIG ["llm_subset_size"],len (df_test ))
if subset_path .exists ():
    df_S =pd .read_parquet (subset_path )
else :
    df_S =balanced_stratified_sample (df_test ,n =N_S ,seed =SEED ,strata_col ="rating_bin")
    df_S .to_parquet (subset_path ,index =False )

subset_meta ={
"seed":SEED ,
"n_requested":int (N_S ),
"n_actual":int (len (df_S )),
"source":"test",
"balanced_bins":BIN_LABELS ,
"path":str (subset_path ),
}
with open (subset_meta_path ,"w",encoding ="utf-8")as f :
    json .dump (subset_meta ,f ,indent =2 ,ensure_ascii =False )

ids_all =set (df_all ["row_id"].tolist ())
ids_train =set (df_train ["row_id"].tolist ())
ids_val =set (df_val ["row_id"].tolist ())
ids_test =set (df_test ["row_id"].tolist ())

union_ids =ids_train |ids_val |ids_test 

def bin_dist (df_ ):
    return df_ ["rating_bin"].astype (str ).value_counts (normalize =True ).reindex (BIN_LABELS ).fillna (0.0 )

dist_all =bin_dist (df_all )
dist_train =bin_dist (df_train )
dist_val =bin_dist (df_val )
dist_test =bin_dist (df_test )

dist_table =pd .DataFrame ({
"all":dist_all ,
"train":dist_train ,
"val":dist_val ,
"test":dist_test ,
}).fillna (0.0 )

try :
    from sklearn .pipeline import Pipeline 
    from sklearn .feature_extraction .text import TfidfVectorizer 
    from sklearn .linear_model import Ridge 
    from sklearn .preprocessing import StandardScaler ,Normalizer 
except Exception as e :
    raise ImportError (
    "scikit-learn fehlt. Installiere lokal z.B.:\n"
    "  pip install scikit-learn joblib\n"
    f"Original error: {e}"
    )

HAS_MPL =True 
try :
    import matplotlib .pyplot as plt 
except Exception :
    HAS_MPL =False 
    plt =None 

def savefig_with_runid (path :Path ,**kwargs )->Path :
    """Save figure to `path` AND to a run-specific filename prefixed with RUN_ID."""
    if not HAS_MPL :
        return path 
    plt .savefig (path ,**kwargs )
    run_path =path .with_name (f"{RUN_ID}_{path.name}")
    if str (run_path )!=str (path ):
        plt .savefig (run_path ,**kwargs )
    return run_path 

split_dir =PATHS ["data_processed"]/"splits"
df_train =pd .read_parquet (split_dir /"train.parquet")
df_val =pd .read_parquet (split_dir /"val.parquet")
df_test =pd .read_parquet (split_dir /"test.parquet")
df_S =pd .read_parquet (split_dir /"subset_S.parquet")


def mae (y_true ,y_pred )->float :
    y_true =np .asarray (y_true ,dtype =float )
    y_pred =np .asarray (y_pred ,dtype =float )
    return float (np .mean (np .abs (y_true -y_pred )))

def rmse (y_true ,y_pred )->float :
    y_true =np .asarray (y_true ,dtype =float )
    y_pred =np .asarray (y_pred ,dtype =float )
    return float (np .sqrt (np .mean ((y_true -y_pred )**2 )))

def spearman_corr (y_true ,y_pred )->float :
    """
    Spearman robust ohne scipy:
    - rank() mit average ties
    - dann Pearson-Korrelation der Ranks
    """
    a =pd .Series (np .asarray (y_true ,dtype =float )).rank (method ="average").to_numpy ()
    b =pd .Series (np .asarray (y_pred ,dtype =float )).rank (method ="average").to_numpy ()
    if np .std (a )==0 or np .std (b )==0 :
        return float ("nan")
    return float (np .corrcoef (a ,b )[0 ,1 ])

RATING_MIN =1.0 
RATING_MAX =10.0 

def clip_rating_pred (y :np .ndarray )->np .ndarray :
    y =np .asarray (y ,dtype =float )
    return np .clip (y ,RATING_MIN ,RATING_MAX )

def rounded_mae (y_true :np .ndarray ,y_pred :np .ndarray )->float :
    yt =np .asarray (y_true ,dtype =float )
    yp =np .rint (clip_rating_pred (y_pred ))
    return float (np .mean (np .abs (yp -yt )))

def evaluate_regression (y_true ,y_pred )->Dict [str ,float ]:
    return {
    "mae":mae (y_true ,y_pred ),
    "rmse":rmse (y_true ,y_pred ),
    "spearman":spearman_corr (y_true ,y_pred ),
    }

def runtime_s_per_100 (samples :int ,seconds :float )->float :
    if samples <=0 :
        return float ("nan")
    return float (seconds *100.0 /samples )


In [None]:
X_train =df_train ["text"].astype (str ).tolist ()
y_train =df_train ["target"].astype (float ).to_numpy ()

X_val =df_val ["text"].astype (str ).tolist ()
y_val =df_val ["target"].astype (float ).to_numpy ()

grid =[]
for ngram in [(1 ,1 ),(1 ,2 )]:
    for max_features in [20000 ,50000 ]:
        for min_df in [2 ,5 ]:
            for alpha in [1.0 ,10.0 ,50.0 ]:
                grid .append ({
                "tfidf__ngram_range":ngram ,
                "tfidf__max_features":max_features ,
                "tfidf__min_df":min_df ,
                "tfidf__max_df":0.95 ,
                "ridge__alpha":alpha ,
                })

def make_pipeline (params :Dict [str ,Any ])->Pipeline :
    pipe =Pipeline (steps =[
    ("tfidf",TfidfVectorizer (
    lowercase =True ,
    strip_accents ="unicode",
    stop_words =None ,
    sublinear_tf =True ,
    dtype =np .float32 ,

    )),
    ("ridge",Ridge (
    alpha =1.0 ,
    random_state =SEED ,
    solver ="lsqr",
    )),
    ])
    pipe .set_params (**params )
    return pipe 

records =[]
best =None 

t0_all =time .perf_counter ()
for i ,params in enumerate (grid ,start =1 ):
    pipe =make_pipeline (params )

    t0 =time .perf_counter ()
    pipe .fit (X_train ,y_train )
    fit_s =time .perf_counter ()-t0 

    t1 =time .perf_counter ()
    pred_val =pipe .predict (X_val )
    pred_s =time .perf_counter ()-t1 

    pred_min ,pred_max =float (np .min (pred_val )),float (np .max (pred_val ))
    oor =float (np .mean ((pred_val <1 )|(pred_val >10 )))
    print (f"[tfidf+ridge grid] pred range: {pred_min:.3f}..{pred_max:.3f} | out_of_range_rate: {oor:.3f}")

    m =evaluate_regression (y_val ,pred_val )
    rec ={
    "i":i ,
    **params ,
    **m ,
    "fit_s":float (fit_s ),
    "pred_val_s":float (pred_s ),
    "pred_val_s_per_100":runtime_s_per_100 (len (X_val ),pred_s ),
    }
    records .append (rec )

    if (best is None )or (rec ["mae"]<best ["mae"]-1e-12 )or (abs (rec ["mae"]-best ["mae"])<=1e-12 and rec ["rmse"]<best ["rmse"]):
        best =rec 

t_all =time .perf_counter ()-t0_all 

df_grid =pd .DataFrame (records ).sort_values (["mae","rmse"],ascending =True ).reset_index (drop =True )

METHOD ="tfidf_ridge"

X_trainval =pd .concat ([df_train ["text"],df_val ["text"]],axis =0 ).astype (str ).tolist ()
y_trainval =pd .concat ([df_train ["target"],df_val ["target"]],axis =0 ).astype (float ).to_numpy ()

X_test =df_test ["text"].astype (str ).tolist ()
y_test =df_test ["target"].astype (float ).to_numpy ()

X_S =df_S ["text"].astype (str ).tolist ()
y_S =df_S ["target"].astype (float ).to_numpy ()

best_params ={k :best [k ]for k in best .keys ()if k .startswith ("tfidf__")or k .startswith ("ridge__")}
final_pipe =make_pipeline (best_params )

t0 =time .perf_counter ()
final_pipe .fit (X_trainval ,y_trainval )
fit_trainval_s =time .perf_counter ()-t0 

t1 =time .perf_counter ()
pred_test =final_pipe .predict (X_test )
pred_test_raw =pred_test .copy ()
pred_test =clip_rating_pred (pred_test )
pred_test_s =time .perf_counter ()-t1 

t2 =time .perf_counter ()
pred_S =final_pipe .predict (X_S )
pred_S_raw =pred_S .copy ()
pred_S =clip_rating_pred (pred_S )
pred_S_s =time .perf_counter ()-t2 

m_test =evaluate_regression (y_test ,pred_test )
m_S =evaluate_regression (y_S ,pred_S )

reliability ={
"parse_success_rate":1.0 ,
"schema_adherence_rate":1.0 ,
"out_of_range_rate":0.0 ,
"empty_refusal_rate":0.0 ,
}
coverage =1.0 

model_path =PATHS ["models"]/f"{METHOD}.joblib"
joblib .dump (final_pipe ,model_path )

tuned_pipe =make_pipeline (best_params )
tuned_pipe .fit (X_train ,y_train )
pred_val_tuned =tuned_pipe .predict (X_val )

pred_path =PATHS ["reports"]/f"predictions_{METHOD}.parquet"
pred_df =pd .concat ([
pd .DataFrame ({
"row_id":df_val ["row_id"].to_numpy (),
"split":"val",
"method":METHOD ,
"y_true":y_val ,
"y_pred":pred_val_tuned ,
}),
pd .DataFrame ({
"row_id":df_test ["row_id"].to_numpy (),
"split":"test",
"method":METHOD ,
"y_true":y_test ,
"y_pred":pred_test ,
}),
pd .DataFrame ({
"row_id":df_S ["row_id"].to_numpy (),
"split":"S",
"method":METHOD ,
"y_true":y_S ,
"y_pred":pred_S ,
}),
],axis =0 ).reset_index (drop =True )

pred_df .to_parquet (pred_path ,index =False )

results_path =PATHS ["reports"]/"results.csv"

def upsert_results (existing :Optional [pd .DataFrame ],new_rows :pd .DataFrame ,key_cols :List [str ])->pd .DataFrame :
    if existing is None or len (existing )==0 :
        out =new_rows .copy ()
    else :
        out =pd .concat ([existing ,new_rows ],axis =0 ,ignore_index =True )
        out ["_dupkey"]=out [key_cols ].astype (str ).agg ("||".join ,axis =1 )
        out =out .drop_duplicates (subset ="_dupkey",keep ="last").drop (columns =["_dupkey"])
    return out .sort_values (key_cols ).reset_index (drop =True )

timestamp_utc =time .strftime ("%Y-%m-%dT%H:%M:%SZ",time .gmtime ())

new_rows =pd .DataFrame ([
{
"timestamp_utc":timestamp_utc ,
"method":METHOD ,
"split":"test",
**best_params ,
**m_test ,
**reliability ,
"coverage":coverage ,
"risk_mae":m_test ["mae"],
"risk_rmse":m_test ["rmse"],
"fit_trainval_s":float (fit_trainval_s ),
"inference_s":float (pred_test_s ),
"sec_per_100":runtime_s_per_100 (len (X_test ),pred_test_s ),
"n_samples":int (len (X_test )),
"notes":"tuned_on=val; final_fit=train+val",
},
{
"timestamp_utc":timestamp_utc ,
"method":METHOD ,
"split":"S",
**best_params ,
**m_S ,
**reliability ,
"coverage":coverage ,
"risk_mae":m_S ["mae"],
"risk_rmse":m_S ["rmse"],
"fit_trainval_s":float (fit_trainval_s ),
"inference_s":float (pred_S_s ),
"sec_per_100":runtime_s_per_100 (len (X_S ),pred_S_s ),
"n_samples":int (len (X_S )),
"notes":"tuned_on=val; final_fit=train+val",
},
])

if results_path .exists ():
    df_exist =pd .read_csv (results_path )
else :
    df_exist =pd .DataFrame ()

df_results =upsert_results (df_exist ,new_rows ,key_cols =["method","split"])
df_results .to_csv (results_path ,index =False )

plot_path =PATHS ["figs"]/f"{METHOD}_pred_vs_true_test.png"
if HAS_MPL :
    plt .figure ()
    plt .scatter (y_test ,pred_test ,alpha =0.5 )
    plt .xlabel ("y_true")
    plt .ylabel ("y_pred")
    plt .title ("TF-IDF + Ridge: Test y_true vs y_pred")
    plt .tight_layout ()
    savefig_with_runid (plot_path ,dpi =150 )
    plt .close ()

METHOD ="tfidf_ridge"
model_path =PATHS ["models"]/f"{METHOD}.joblib"
pred_path =PATHS ["reports"]/f"predictions_{METHOD}.parquet"
results_path =PATHS ["reports"]/"results.csv"

pred_df_check =pd .read_parquet (pred_path )

n_val =(pred_df_check ["split"]=="val").sum ()
n_test =(pred_df_check ["split"]=="test").sum ()
n_S =(pred_df_check ["split"]=="S").sum ()

tmp =pred_df_check [pred_df_check ["split"]=="test"]
m =evaluate_regression (tmp ["y_true"].to_numpy (),tmp ["y_pred"].to_numpy ())

df_res =pd .read_csv (results_path )


In [None]:
warnings .filterwarnings ("ignore",category =UserWarning )

try :
    import torch 
    from transformers import AutoTokenizer ,AutoModel 
except Exception as e :
    raise ImportError (
    "Fehlende Dependencies für Embeddings.\n"
    "Installiere lokal (Terminal):\n"
    "  pip install torch transformers\n"
    f"Original error: {e}"
    )

HAS_ST =True 
try :
    from sentence_transformers import SentenceTransformer 
except Exception :
    HAS_ST =False 


EMB_MODEL_NAME ="all-MiniLM-L6-v2"
METHOD ="minilm_ridge"

def get_device ()->str :
    if torch .cuda .is_available ():
        return "cuda"
    if hasattr (torch .backends ,"mps")and torch .backends .mps .is_available ():
        return "mps"
    return "cpu"

DEVICE =get_device ()


processed_parquet =PATHS ["data_processed"]/"reviews_processed.parquet"
processed_csv =PATHS ["data_processed"]/"reviews_processed.csv"

df_all =pd .read_parquet (processed_parquet ).copy ()


df_all =df_all .reset_index (drop =True )

if "row_id"not in df_all .columns :
    df_all .insert (0 ,"row_id",np .arange (len (df_all ),dtype =np .int64 ))
    df_all .to_parquet (processed_parquet ,index =False )
    if processed_csv .exists ():
        df_all .to_csv (processed_csv ,index =False )
else :
    df_all ["row_id"]=pd .to_numeric (df_all ["row_id"],errors ="coerce").astype ("Int64")
    df_all ["row_id"]=df_all ["row_id"].astype (np .int64 )

split_dir =PATHS ["data_processed"]/"splits"
df_train =pd .read_parquet (split_dir /"train.parquet")
df_val =pd .read_parquet (split_dir /"val.parquet")
df_test =pd .read_parquet (split_dir /"test.parquet")


df_all =df_all .sort_values ("row_id").reset_index (drop =True )

features_dir =PATHS ["data_features"]
emb_path =features_dir /f"emb_{EMB_MODEL_NAME.replace('/','_')}.npz"
emb_meta_path =features_dir /f"emb_{EMB_MODEL_NAME.replace('/','_')}_meta.json"

def mean_pooling (last_hidden_state :torch .Tensor ,attention_mask :torch .Tensor )->torch .Tensor :
    mask =attention_mask .unsqueeze (-1 ).type_as (last_hidden_state )
    summed =(last_hidden_state *mask ).sum (dim =1 )
    counts =mask .sum (dim =1 ).clamp (min =1e-9 )
    return summed /counts 

@torch .no_grad ()
def embed_texts_transformers (texts :List [str ],batch_size :int =32 ,max_length :int =256 )->np .ndarray :
    """
    Fallback Encoder: HF MiniLM + mean pooling.
    (Wenn sentence-transformers fehlt.)
    """
    tokenizer =AutoTokenizer .from_pretrained (EMB_MODEL_NAME )
    model =AutoModel .from_pretrained (EMB_MODEL_NAME ).to (DEVICE )
    model .eval ()

    all_vecs =[]
    for i in range (0 ,len (texts ),batch_size ):
        batch =texts [i :i +batch_size ]
        enc =tokenizer (
        batch ,
        padding =True ,
        truncation =True ,
        max_length =max_length ,
        return_tensors ="pt",
        )
        enc ={k :v .to (DEVICE )for k ,v in enc .items ()}
        out =model (**enc )
        pooled =mean_pooling (out .last_hidden_state ,enc ["attention_mask"])
        pooled =torch .nn .functional .normalize (pooled ,p =2 ,dim =1 )
        all_vecs .append (pooled .cpu ().numpy ().astype (np .float32 ))
    return np .vstack (all_vecs )

def embed_texts_sentence_transformers (texts :List [str ],batch_size :int =32 )->np .ndarray :
    """
    Exakter SBERT Encoder (wenn sentence-transformers verfügbar).
    """
    st =SentenceTransformer (EMB_MODEL_NAME ,device =DEVICE )
    vecs =st .encode (
    texts ,
    batch_size =batch_size ,
    show_progress_bar =True ,
    convert_to_numpy =True ,
    normalize_embeddings =True ,
    )
    return vecs .astype (np .float32 )

def load_embeddings_npz (path :Path )->Tuple [np .ndarray ,np .ndarray ]:
    data =np .load (path ,allow_pickle =False )
    row_ids =data ["row_id"].astype (np .int64 )
    emb =data ["emb"].astype (np .float32 )
    return row_ids ,emb 

def save_embeddings_npz (path :Path ,row_ids :np .ndarray ,emb :np .ndarray )->None :
    np .savez_compressed (path ,row_id =row_ids .astype (np .int64 ),emb =emb .astype (np .float32 ))

def embeddings_cache_valid (path :Path ,df :pd .DataFrame )->bool :
    if not path .exists ():
        return False 
    try :
        row_ids ,emb =load_embeddings_npz (path )
        ok =(len (row_ids )==len (df ))and (row_ids .min ()==df ["row_id"].min ())and (row_ids .max ()==df ["row_id"].max ())
        ok =ok and np .array_equal (row_ids ,df ["row_id"].to_numpy (dtype =np .int64 ))
        ok =ok and (emb .shape [0 ]==len (df ))and (emb .ndim ==2 )
        return bool (ok )
    except Exception :
        return False 

texts_all =df_all ["text"].astype (str ).tolist ()
row_ids_all =df_all ["row_id"].to_numpy (dtype =np .int64 )

if embeddings_cache_valid (emb_path ,df_all ):
    row_ids_cached ,emb_all =load_embeddings_npz (emb_path )
    cached =True 
    embed_total_s =None 
else :
    cached =False 

    MAX_LENGTH =256 
    BATCH_SIZE =32 if DEVICE !="cpu"else 16 

    t0 =time .perf_counter ()
    if HAS_ST :
        emb_all =embed_texts_sentence_transformers (texts_all ,batch_size =BATCH_SIZE )
        backend ="sentence_transformers"
    else :
        emb_all =embed_texts_transformers (texts_all ,batch_size =BATCH_SIZE ,max_length =MAX_LENGTH )
        backend ="hf_transformers_mean_pool"
    embed_total_s =float (time .perf_counter ()-t0 )

    save_embeddings_npz (emb_path ,row_ids_all ,emb_all )

    meta ={
    "created_utc":time .strftime ("%Y-%m-%dT%H:%M:%SZ",time .gmtime ()),
    "model":EMB_MODEL_NAME ,
    "backend":backend ,
    "device":DEVICE ,
    "batch_size":int (BATCH_SIZE ),
    "max_length_if_hf_fallback":int (MAX_LENGTH ),
    "n_samples":int (len (df_all )),
    "emb_dim":int (emb_all .shape [1 ]),
    "total_embed_s":embed_total_s ,
    }
    with open (emb_meta_path ,"w",encoding ="utf-8")as f :
        json .dump (meta ,f ,indent =2 ,ensure_ascii =False )

    LOGGER .info (f"Embeddings berechnet: n={len(df_all)} dim={emb_all.shape[1]} total_s={embed_total_s:.2f} backend={backend}")


split_dir =PATHS ["data_processed"]/"splits"
df_train =pd .read_parquet (split_dir /"train.parquet").sort_values ("row_id").reset_index (drop =True )
df_val =pd .read_parquet (split_dir /"val.parquet").sort_values ("row_id").reset_index (drop =True )
df_test =pd .read_parquet (split_dir /"test.parquet").sort_values ("row_id").reset_index (drop =True )
df_S =pd .read_parquet (split_dir /"subset_S.parquet").sort_values ("row_id").reset_index (drop =True )

row_to_ix ={rid :i for i ,rid in enumerate (row_ids_all .tolist ())}

def get_X_from_row_ids (df_split :pd .DataFrame )->np .ndarray :
    idx =[row_to_ix [int (r )]for r in df_split ["row_id"].to_numpy ()]
    return emb_all [np .array (idx ,dtype =np .int64 )]

X_train =get_X_from_row_ids (df_train )
y_train =df_train ["target"].astype (float ).to_numpy ()

X_val =get_X_from_row_ids (df_val )
y_val =df_val ["target"].astype (float ).to_numpy ()

X_trainval =np .vstack ([X_train ,X_val ])
y_trainval =np .concatenate ([y_train ,y_val ])

X_test =get_X_from_row_ids (df_test )
y_test =df_test ["target"].astype (float ).to_numpy ()

X_S =get_X_from_row_ids (df_S )
y_S =df_S ["target"].astype (float ).to_numpy ()


preprocs ={
"none":None ,
"scaler":StandardScaler (with_mean =True ,with_std =True ),
"l2norm":Normalizer (norm ="l2"),
}

def build_ridge_pipe (preproc_name :str ,alpha :float )->Pipeline :
    steps =[]
    if preprocs [preproc_name ]is not None :
        steps .append (("pre",preprocs [preproc_name ]))
    steps .append (("ridge",Ridge (alpha =float (alpha ),random_state =SEED )))
    return Pipeline (steps )

alphas =[0.1 ,1.0 ,10.0 ,50.0 ,100.0 ,300.0 ]
preproc_names =["none","scaler","l2norm"]

best =None 
records =[]

t0_grid =time .perf_counter ()
for pp in preproc_names :
    for a in alphas :
        pipe =build_ridge_pipe (pp ,a )

        t0 =time .perf_counter ()
        pipe .fit (X_train ,y_train )
        fit_s =time .perf_counter ()-t0 

        t1 =time .perf_counter ()
        pred_val =pipe .predict (X_val )
        pred_val_raw =pred_val .copy ()
        pred_val =clip_rating_pred (pred_val )
        pred_s =time .perf_counter ()-t1 

        print ("[minilm ridge grid]","preproc=",pp ,"alpha=",a ,
        "| pred range:",float (np .min (pred_val )),float (np .max (pred_val )),
        "| out_of_range_rate:",float (np .mean ((pred_val <1 )|(pred_val >10 ))))

        m =evaluate_regression (y_val ,pred_val )
        rec ={
        "preproc":pp ,
        "ridge__alpha":float (a ),
        **m ,
        "fit_s":float (fit_s ),
        "pred_val_s":float (pred_s ),
        "pred_val_s_per_100":runtime_s_per_100 (len (X_val ),pred_s ),
        }
        records .append (rec )

        if (best is None )or (rec ["mae"]<best ["mae"]-1e-12 )or (abs (rec ["mae"]-best ["mae"])<=1e-12 and rec ["rmse"]<best ["rmse"]):
            best =rec 

grid_s =time .perf_counter ()-t0_grid 
df_grid =pd .DataFrame (records ).sort_values (["mae","rmse"],ascending =True ).reset_index (drop =True )

final_pipe =build_ridge_pipe (best ["preproc"],best ["ridge__alpha"])
t0 =time .perf_counter ()
final_pipe .fit (X_trainval ,y_trainval )
fit_trainval_s =time .perf_counter ()-t0 

t1 =time .perf_counter ()
pred_test =final_pipe .predict (X_test )
pred_test_raw =pred_test .copy ()
pred_test =clip_rating_pred (pred_test )
pred_test_s =time .perf_counter ()-t1 

t2 =time .perf_counter ()
pred_S =final_pipe .predict (X_S )
pred_S_raw =pred_S .copy ()
pred_S =clip_rating_pred (pred_S )
pred_S_s =time .perf_counter ()-t2 

m_test =evaluate_regression (y_test ,pred_test )
m_S =evaluate_regression (y_S ,pred_S )


import joblib 

tuned_pipe =build_ridge_pipe (best ["preproc"],best ["ridge__alpha"])
tuned_pipe .fit (X_train ,y_train )
pred_val_tuned =tuned_pipe .predict (X_val )

model_path =PATHS ["models"]/f"{METHOD}.joblib"
joblib .dump (final_pipe ,model_path )

pred_path =PATHS ["reports"]/f"predictions_{METHOD}.parquet"
pred_df =pd .concat ([
pd .DataFrame ({
"row_id":df_val ["row_id"].to_numpy (),
"split":"val",
"method":METHOD ,
"y_true":y_val ,
"y_pred":pred_val_tuned ,
}),
pd .DataFrame ({
"row_id":df_test ["row_id"].to_numpy (),
"split":"test",
"method":METHOD ,
"y_true":y_test ,
"y_pred":pred_test ,
}),
pd .DataFrame ({
"row_id":df_S ["row_id"].to_numpy (),
"split":"S",
"method":METHOD ,
"y_true":y_S ,
"y_pred":pred_S ,
}),
],axis =0 ).reset_index (drop =True )
pred_df .to_parquet (pred_path ,index =False )

reliability ={
"parse_success_rate":1.0 ,
"schema_adherence_rate":1.0 ,
"out_of_range_rate":0.0 ,
"empty_refusal_rate":0.0 ,
}
coverage =1.0 

embed_meta =None 
if emb_meta_path .exists ():
    with open (emb_meta_path ,"r",encoding ="utf-8")as f :
        embed_meta =json .load (f )

def estimated_embed_s (n_split :int )->float :
    if not embed_meta or not embed_meta .get ("total_embed_s"):
        return float ("nan")
    total_s =float (embed_meta ["total_embed_s"])
    n_all =int (embed_meta ["n_samples"])
    return float (total_s *(n_split /max (1 ,n_all )))

timestamp_utc =time .strftime ("%Y-%m-%dT%H:%M:%SZ",time .gmtime ())
feature_s_test =estimated_embed_s (len (df_test ))
feature_s_S =estimated_embed_s (len (df_S ))

sec_per_100_test =runtime_s_per_100 (len (df_test ),(0.0 if math .isnan (feature_s_test )else feature_s_test )+pred_test_s )
sec_per_100_S =runtime_s_per_100 (len (df_S ),(0.0 if math .isnan (feature_s_S )else feature_s_S )+pred_S_s )

new_rows =pd .DataFrame ([
{
"timestamp_utc":timestamp_utc ,
"method":METHOD ,
"split":"test",
"ridge__alpha":float (best ["ridge__alpha"]),
**m_test ,
**reliability ,
"coverage":coverage ,
"risk_mae":m_test ["mae"],
"risk_rmse":m_test ["rmse"],
"fit_trainval_s":float (fit_trainval_s ),
"feature_s_est":float (feature_s_test )if not math .isnan (feature_s_test )else np .nan ,
"inference_s":float (pred_test_s ),
"sec_per_100":float (sec_per_100_test ),
"n_samples":int (len (df_test )),
"notes":f"emb_model={EMB_MODEL_NAME}; preproc={best.get('preproc')}; cached={cached}",
},
{
"timestamp_utc":timestamp_utc ,
"method":METHOD ,
"split":"S",
"ridge__alpha":float (best ["ridge__alpha"]),
**m_S ,
**reliability ,
"coverage":coverage ,
"risk_mae":m_S ["mae"],
"risk_rmse":m_S ["rmse"],
"fit_trainval_s":float (fit_trainval_s ),
"feature_s_est":float (feature_s_S )if not math .isnan (feature_s_S )else np .nan ,
"inference_s":float (pred_S_s ),
"sec_per_100":float (sec_per_100_S ),
"n_samples":int (len (df_S )),
"notes":f"emb_model={EMB_MODEL_NAME}; preproc={best.get('preproc')}; cached={cached}",
},
])

results_path =PATHS ["reports"]/"results.csv"

def upsert_results (existing :Optional [pd .DataFrame ],new_rows :pd .DataFrame ,key_cols :List [str ])->pd .DataFrame :
    if existing is None or len (existing )==0 :
        out =new_rows .copy ()
    else :
        out =pd .concat ([existing ,new_rows ],axis =0 ,ignore_index =True )
        out ["_dupkey"]=out [key_cols ].astype (str ).agg ("||".join ,axis =1 )
        out =out .drop_duplicates (subset ="_dupkey",keep ="last").drop (columns =["_dupkey"])
    return out .sort_values (key_cols ).reset_index (drop =True )

df_exist =pd .read_csv (results_path )if results_path .exists ()else pd .DataFrame ()
df_results =upsert_results (df_exist ,new_rows ,key_cols =["method","split"])
df_results .to_csv (results_path ,index =False )

plot_path =PATHS ["figs"]/f"{METHOD}_pred_vs_true_test.png"
if HAS_MPL :
    plt .figure ()
    plt .scatter (y_test ,pred_test ,alpha =0.5 )
    plt .xlabel ("y_true")
    plt .ylabel ("y_pred")
    plt .title ("MiniLM Embeddings + Ridge: Test y_true vs y_pred")
    plt .tight_layout ()
    savefig_with_runid (plot_path ,dpi =150 )
    plt .close ()

row_ids_chk ,emb_chk =load_embeddings_npz (emb_path )

pred_df_check =pd .read_parquet (pred_path )

tmp =pred_df_check [pred_df_check ["split"]=="test"]
m =evaluate_regression (tmp ["y_true"].to_numpy (),tmp ["y_pred"].to_numpy ())

df_res =pd .read_csv (results_path )


In [None]:
from torch import nn 
from torch .utils .data import Dataset ,DataLoader 

from transformers import AutoTokenizer ,AutoModelForSequenceClassification 

try :
    from transformers import set_seed as hf_set_seed 
    hf_set_seed (SEED )
except Exception :
    pass 

def get_device ()->str :
    if torch .cuda .is_available ():
        return "cuda"
    if hasattr (torch .backends ,"mps")and torch .backends .mps .is_available ():
        return "mps"
    return "cpu"

DEVICE =get_device ()

split_dir =PATHS ["data_processed"]/"splits"
df_train =pd .read_parquet (split_dir /"train.parquet")
df_val =pd .read_parquet (split_dir /"val.parquet")
df_test =pd .read_parquet (split_dir /"test.parquet")
df_S =pd .read_parquet (split_dir /"subset_S.parquet")


MODEL_NAME ="distilbert-base-uncased"
tokenizer =AutoTokenizer .from_pretrained (MODEL_NAME )

def estimate_token_lengths (texts ,max_n :int =2000 ):
    n =min (max_n ,len (texts ))
    rng =np .random .default_rng (SEED )
    idx =rng .choice (np .arange (len (texts )),size =n ,replace =False )
    sample =[texts [i ]for i in idx ]
    lens =[]
    for t in sample :
        ids =tokenizer (t ,add_special_tokens =True ,truncation =False )["input_ids"]
        lens .append (len (ids ))
    return np .array (lens ,dtype =np .int32 )

train_texts =df_train ["text"].astype (str ).tolist ()
lens =estimate_token_lengths (train_texts ,max_n =2000 )

q50 ,q90 ,q95 ,q99 =np .quantile (lens ,[0.50 ,0.90 ,0.95 ,0.99 ])
max_len_raw =int (lens .max ())

def choose_max_length (p95 :float ,cap :int =384 ,floor :int =128 )->int :
    m =int (math .ceil (p95 /8 )*8 )
    m =max (floor ,m )
    m =min (cap ,m )
    return m 

MAX_LENGTH =choose_max_length (q95 ,cap =384 ,floor =128 )

METHOD ="distilbert_reg"

class TextRegDataset (Dataset ):
    def __init__ (self ,texts ,targets ,tokenizer ,max_length :int ):
        self .texts =[str (t )for t in texts ]
        self .targets =np .asarray (targets ,dtype =np .float32 )
        self .tokenizer =tokenizer 
        self .max_length =int (max_length )

    def __len__ (self ):
        return len (self .texts )

    def __getitem__ (self ,idx :int ):
        enc =self .tokenizer (
        self .texts [idx ],
        padding ="max_length",
        truncation =True ,
        max_length =self .max_length ,
        return_tensors ="pt",
        )
        item ={k :v .squeeze (0 )for k ,v in enc .items ()}
        item ["labels"]=torch .tensor (self .targets [idx ],dtype =torch .float32 )
        return item 

def build_model (model_name :str )->nn .Module :
    m =AutoModelForSequenceClassification .from_pretrained (model_name ,num_labels =1 )
    try :
        m .config .problem_type ="regression"
    except Exception :
        pass 
    return m 

model =build_model (MODEL_NAME ).to (DEVICE )

FREEZE_FIRST_N_LAYERS =4 

def freeze_distilbert_layers (model :nn .Module ,n_freeze :int )->int :
    frozen =0 
    try :
        layers =model .distilbert .transformer .layer 
        for i ,layer in enumerate (layers ):
            if i <n_freeze :
                for p in layer .parameters ():
                    p .requires_grad =False 
                frozen +=1 
    except Exception as e :
        print ("Konnte Layers nicht sauber freezen:",repr (e ))
    return frozen 

frozen_layers =freeze_distilbert_layers (model ,FREEZE_FIRST_N_LAYERS )

EPOCHS =3 
BATCH_SIZE =8 if DEVICE !="cpu"else 4 
LR =2e-5 
WEIGHT_DECAY =0.01 
PATIENCE =1 
MAX_GRAD_NORM =1.0 

train_ds =TextRegDataset (df_train ["text"].tolist (),df_train ["target"].to_numpy (),tokenizer ,MAX_LENGTH )
val_ds =TextRegDataset (df_val ["text"].tolist (),df_val ["target"].to_numpy (),tokenizer ,MAX_LENGTH )

train_loader =DataLoader (train_ds ,batch_size =BATCH_SIZE ,shuffle =True ,drop_last =False )
val_loader =DataLoader (val_ds ,batch_size =BATCH_SIZE ,shuffle =False ,drop_last =False )

optimizer =torch .optim .AdamW ([p for p in model .parameters ()if p .requires_grad ],lr =LR ,weight_decay =WEIGHT_DECAY )

def forward_batch (model ,batch ):
    batch ={k :v .to (DEVICE )for k ,v in batch .items ()}
    out =model (**batch )
    loss =out .loss 
    preds =out .logits .squeeze (-1 ).detach ()
    return loss ,preds 

@torch .no_grad ()
def eval_loader (model ,loader )->Tuple [np .ndarray ,np .ndarray ,float ]:
    model .eval ()
    preds_all ,y_all =[],[]
    total_loss =0.0 
    n =0 
    for batch in loader :
        loss ,preds =forward_batch (model ,batch )
        bs =preds .shape [0 ]
        total_loss +=float (loss .item ())*bs 
        n +=bs 
        preds_all .append (preds .cpu ().numpy ())
        y_all .append (batch ["labels"].cpu ().numpy ())
    preds_all =np .concatenate (preds_all ,axis =0 )
    y_all =np .concatenate (y_all ,axis =0 )
    avg_loss =total_loss /max (1 ,n )
    return y_all ,preds_all ,avg_loss 

best_state =None 
best_val_mae =float ("inf")
bad_epochs =0 
history =[]

t_train0 =time .perf_counter ()
for epoch in range (1 ,EPOCHS +1 ):
    model .train ()
    running =0.0 
    n_seen =0 

    t0 =time .perf_counter ()
    for batch in train_loader :
        optimizer .zero_grad (set_to_none =True )
        loss ,_ =forward_batch (model ,batch )
        loss .backward ()
        torch .nn .utils .clip_grad_norm_ (model .parameters (),MAX_GRAD_NORM )
        optimizer .step ()

        bs =batch ["labels"].shape [0 ]
        running +=float (loss .item ())*bs 
        n_seen +=bs 

    train_loss =running /max (1 ,n_seen )
    epoch_s =time .perf_counter ()-t0 

    yv ,pv ,val_loss =eval_loader (model ,val_loader )
    m_val =evaluate_regression (yv ,pv )

    history .append ({
    "epoch":epoch ,
    "train_loss":float (train_loss ),
    "val_loss":float (val_loss ),
    **m_val ,
    "epoch_s":float (epoch_s ),
    })

    print (f"Epoch {epoch}/{EPOCHS} | train_loss={train_loss:.4f} | val_mae={m_val['mae']:.4f} val_rmse={m_val['rmse']:.4f} val_spearman={m_val['spearman']:.4f} | epoch_s={epoch_s:.1f}s")

    if m_val ["mae"]<best_val_mae -1e-6 :
        best_val_mae =m_val ["mae"]
        best_state ={k :v .detach ().cpu ().clone ()for k ,v in model .state_dict ().items ()}
        bad_epochs =0 
    else :
        bad_epochs +=1 
        if bad_epochs >PATIENCE :
            print ("Early stopping triggered.")
            break 

total_train_s =time .perf_counter ()-t_train0 
hist_df =pd .DataFrame (history )

model .load_state_dict (best_state )
model .to (DEVICE )
model .eval ()

@torch .no_grad ()
def predict_texts_e2e (model ,texts :List [str ],batch_size :int ,max_length :int )->Tuple [np .ndarray ,float ]:
    """
    End-to-end: Tokenize + Forward. Gibt preds und seconds zurück.
    """
    model .eval ()
    t0 =time .perf_counter ()
    preds_all =[]
    for i in range (0 ,len (texts ),batch_size ):
        batch_texts =[str (t )for t in texts [i :i +batch_size ]]
        enc =tokenizer (
        batch_texts ,
        padding =True ,
        truncation =True ,
        max_length =max_length ,
        return_tensors ="pt",
        )
        enc ={k :v .to (DEVICE )for k ,v in enc .items ()}
        out =model (**enc )
        preds =out .logits .squeeze (-1 ).detach ().cpu ().numpy ()
        preds_all .append (preds )
    secs =time .perf_counter ()-t0 
    return np .concatenate (preds_all ,axis =0 ),float (secs )

X_val_text =df_val ["text"].astype (str ).tolist ()
y_val_true =df_val ["target"].astype (float ).to_numpy ()

X_test_text =df_test ["text"].astype (str ).tolist ()
y_test_true =df_test ["target"].astype (float ).to_numpy ()

X_S_text =df_S ["text"].astype (str ).tolist ()
y_S_true =df_S ["target"].astype (float ).to_numpy ()

pred_val ,t_val =predict_texts_e2e (model ,X_val_text ,batch_size =BATCH_SIZE ,max_length =MAX_LENGTH )
pred_test ,t_test =predict_texts_e2e (model ,X_test_text ,batch_size =BATCH_SIZE ,max_length =MAX_LENGTH )
pred_S ,t_S =predict_texts_e2e (model ,X_S_text ,batch_size =BATCH_SIZE ,max_length =MAX_LENGTH )

m_test =evaluate_regression (y_test_true ,pred_test )
m_S =evaluate_regression (y_S_true ,pred_S )

model_dir =PATHS ["models"]/f"{METHOD}_{MODEL_NAME.replace('/','_')}"
model_dir .mkdir (parents =True ,exist_ok =True )
model .save_pretrained (model_dir )
tokenizer .save_pretrained (model_dir )

meta_path =PATHS ["reports"]/f"{METHOD}_meta.json"
meta ={
"timestamp_utc":time .strftime ("%Y-%m-%dT%H:%M:%SZ",time .gmtime ()),
"method":METHOD ,
"model_name":MODEL_NAME ,
"device":DEVICE ,
"seed":SEED ,
"max_length":int (MAX_LENGTH ),
"batch_size":int (BATCH_SIZE ),
"epochs_ran":int (len (history )),
"epochs_planned":int (EPOCHS ),
"lr":float (LR ),
"weight_decay":float (WEIGHT_DECAY ),
"freeze_first_n_layers":int (FREEZE_FIRST_N_LAYERS ),
"frozen_layers_effective":int (frozen_layers ),
"total_train_s":float (total_train_s ),
"val_history":history ,
}
with open (meta_path ,"w",encoding ="utf-8")as f :
    json .dump (meta ,f ,indent =2 ,ensure_ascii =False )

pred_path =PATHS ["reports"]/f"predictions_{METHOD}.parquet"
pred_df =pd .concat ([
pd .DataFrame ({
"row_id":df_val ["row_id"].to_numpy (),
"split":"val",
"method":METHOD ,
"y_true":y_val_true ,
"y_pred":pred_val ,
}),
pd .DataFrame ({
"row_id":df_test ["row_id"].to_numpy (),
"split":"test",
"method":METHOD ,
"y_true":y_test_true ,
"y_pred":pred_test ,
}),
pd .DataFrame ({
"row_id":df_S ["row_id"].to_numpy (),
"split":"S",
"method":METHOD ,
"y_true":y_S_true ,
"y_pred":pred_S ,
}),
],axis =0 ).reset_index (drop =True )
pred_df .to_parquet (pred_path ,index =False )

reliability ={
"parse_success_rate":1.0 ,
"schema_adherence_rate":1.0 ,
"out_of_range_rate":float (np .mean ((pred_test <0 )|(pred_test >10 ))),
"empty_refusal_rate":0.0 ,
}
coverage =1.0 

results_path =PATHS ["reports"]/"results.csv"

def upsert_results (existing :Optional [pd .DataFrame ],new_rows :pd .DataFrame ,key_cols :List [str ])->pd .DataFrame :
    if existing is None or len (existing )==0 :
        out =new_rows .copy ()
    else :
        out =pd .concat ([existing ,new_rows ],axis =0 ,ignore_index =True )
        out ["_dupkey"]=out [key_cols ].astype (str ).agg ("||".join ,axis =1 )
        out =out .drop_duplicates (subset ="_dupkey",keep ="last").drop (columns =["_dupkey"])
    return out .sort_values (key_cols ).reset_index (drop =True )

timestamp_utc =time .strftime ("%Y-%m-%dT%H:%M:%SZ",time .gmtime ())

new_rows =pd .DataFrame ([
{
"timestamp_utc":timestamp_utc ,
"method":METHOD ,
"split":"test",
"model_name":MODEL_NAME ,
"max_length":int (MAX_LENGTH ),
"batch_size":int (BATCH_SIZE ),
"epochs":int (len (history )),
"freeze_first_n_layers":int (FREEZE_FIRST_N_LAYERS ),
**m_test ,
**reliability ,
"coverage":coverage ,
"risk_mae":m_test ["mae"],
"risk_rmse":m_test ["rmse"],
"fit_trainval_s":float (total_train_s ),
"inference_s":float (t_test ),
"sec_per_100":runtime_s_per_100 (len (df_test ),t_test ),
"n_samples":int (len (df_test )),
"notes":f"hf_savedir={model_dir.name}",
},
{
"timestamp_utc":timestamp_utc ,
"method":METHOD ,
"split":"S",
"model_name":MODEL_NAME ,
"max_length":int (MAX_LENGTH ),
"batch_size":int (BATCH_SIZE ),
"epochs":int (len (history )),
"freeze_first_n_layers":int (FREEZE_FIRST_N_LAYERS ),
**m_S ,
**reliability ,
"coverage":coverage ,
"risk_mae":m_S ["mae"],
"risk_rmse":m_S ["rmse"],
"fit_trainval_s":float (total_train_s ),
"inference_s":float (t_S ),
"sec_per_100":runtime_s_per_100 (len (df_S ),t_S ),
"n_samples":int (len (df_S )),
"notes":f"hf_savedir={model_dir.name}",
},
])

df_exist =pd .read_csv (results_path )if results_path .exists ()else pd .DataFrame ()
df_results =upsert_results (df_exist ,new_rows ,key_cols =["method","split"])
df_results .to_csv (results_path ,index =False )

plot_path =PATHS ["figs"]/f"{METHOD}_pred_vs_true_test.png"
try :
    import matplotlib .pyplot as plt 
    plt .figure ()
    plt .scatter (y_test_true ,pred_test ,alpha =0.5 )
    plt .xlabel ("y_true")
    plt .ylabel ("y_pred")
    plt .title ("DistilBERT Regression: Test y_true vs y_pred")
    plt .tight_layout ()
    savefig_with_runid (plot_path ,dpi =150 )
    plt .close ()
    plot_saved =True 
except Exception :
    plot_saved =False 

pred_df_check =pd .read_parquet (pred_path )

tmp =pred_df_check [pred_df_check ["split"]=="test"]
m =evaluate_regression (tmp ["y_true"].to_numpy (),tmp ["y_pred"].to_numpy ())

df_res =pd .read_csv (results_path )


In [None]:
import requests 

OLLAMA_BASE_URL =CONFIG .get ("ollama_base_url","http://localhost:11434")
OLLAMA_MODEL =CONFIG .get ("ollama_model","phi3:mini")

def ollama_tags (base_url :str )->Dict [str ,Any ]:
    r =requests .get (f"{base_url}/api/tags",timeout =3 )
    r .raise_for_status ()
    return r .json ()

try :
    tags =ollama_tags (OLLAMA_BASE_URL )
    models =[m .get ("name")for m in tags .get ("models",[])]
    print ("Ollama reachable:",OLLAMA_BASE_URL )
    print ("Models (first 20):",models [:20 ])
    print ("Model available:",OLLAMA_MODEL )
except Exception as e :
    print ("Ollama hard check failed:",repr (e ))
    raise RuntimeError (
    "Ollama ist nicht erreichbar oder phi3:mini ist nicht installiert.\n"
    "Bitte starte Ollama lokal und installiere das Modell (siehe Terminal-Commands unterhalb dieses Schritts),\n"
    "dann Zelle erneut ausführen."
    )

import hashlib 

CACHE_PATH =PATHS ["reports"]/f"llm_cache_{RUN_ID}.jsonl"
CACHE_PATH_LATEST =PATHS ["reports"]/"llm_cache.jsonl"
CACHE_PATH .parent .mkdir (parents =True ,exist_ok =True )

MODES =["M1_FREE","M2_JSON_PROMPT","M3_SCHEMA_PROMPT","M3_SCHEMA_ENFORCED","M4_REPAIR"]

SCHEMA_OBJ ={
"type":"object",
"properties":{"rating":{"type":"integer","minimum":1 ,"maximum":10 }},
"required":["rating"],
"additionalProperties":False ,
}

try :
    import jsonschema 
except Exception as e :
    raise ImportError (
    "Package 'jsonschema' fehlt (wird für M3_SCHEMA/M4_REPAIR benötigt).\n"
    "Installiere lokal (Terminal):\n"
    "  pip install jsonschema\n"
    f"Original error: {e}"
    )

def prompt_hash (s :str )->str :
    return hashlib .sha256 (s .encode ("utf-8")).hexdigest ()[:12 ]

PROMPT_PROFILE ={"max_chars":4000 ,"tail_chars":500 ,"marker":"\n...\n"}

def set_prompt_profile (max_chars :int =4000 ,tail_chars :int =500 ,marker :str ="\n...\n")->None :
    """Central, sweep-friendly prompt truncation profile."""
    global PROMPT_PROFILE 
    PROMPT_PROFILE ={"max_chars":int (max_chars ),"tail_chars":int (tail_chars ),"marker":str (marker )}

def truncate_text (
text :str ,
max_chars :Optional [int ]=None ,
tail_chars :Optional [int ]=None ,
marker :Optional [str ]=None ,
)->Tuple [str ,bool ]:
    """
    Deterministic head+tail truncation that GUARANTEES len(out) <= max_chars,
    counting head + marker + tail inside the budget (fixes >max_chars bug).
    """
    t =str (text or "")
    max_c =int (PROMPT_PROFILE ["max_chars"]if max_chars is None else max_chars )
    tail_c =int (PROMPT_PROFILE ["tail_chars"]if tail_chars is None else tail_chars )
    mark =PROMPT_PROFILE ["marker"]if marker is None else str (marker )

    if max_c <=0 :
        return ("",bool (t ))
    if len (t )<=max_c :
        return (t ,False )

    tail_c =max (0 ,min (tail_c ,max_c ))
    head_c =max (0 ,max_c -tail_c -len (mark ))

    if head_c ==0 :
        if tail_c >0 and (len (mark )+tail_c )<=max_c :
            out =mark +t [-tail_c :]
        else :
            out =t [-max_c :]
        return (out [:max_c ],True )

    if tail_c >0 :
        out =t [:head_c ]+mark +t [-tail_c :]
    else :
        out =t [:max_c ]

    return (out [:max_c ],True )

set_prompt_profile (max_chars =4000 ,tail_chars =500 ,marker ="\n...\n")

def make_prompt (mode :str ,review_text :str )->str :
    review_text ,_ =truncate_text (review_text )

    if mode =="M1_FREE":
        return (
        "You are a strict regressor.\n"
        "Task: Predict the overall airline review rating as an INTEGER from 1 to 10.\n"
        "Output format: ONLY the number (e.g., 7) OR 'Rating: 7'. No other text.\n\n"
        f"REVIEW:\n{review_text}\n"
        )

    if mode =="M2_JSON_PROMPT":
        return (
        "You are a strict regressor.\n"
        "Task: Predict the overall airline review rating as an INTEGER from 1 to 10.\n"
        "Output format: ONLY valid JSON exactly like: {\"rating\": 7}\n"
        "No extra keys, no commentary, no code fences.\n\n"
        f"REVIEW:\n{review_text}\n"
        )

    if mode =="M3_SCHEMA_PROMPT":
        schema_str =json .dumps (SCHEMA_OBJ ,ensure_ascii =False )
        return (
        "You are a strict regressor.\n"
        "Task: Predict the overall airline review rating as an INTEGER from 1 to 10.\n"
        "You MUST output ONLY JSON that validates against this JSON Schema:\n"
        f"{schema_str}\n"
        "No extra keys, no commentary, no code fences.\n\n"
        f"REVIEW:\n{review_text}\n"
        )

    if mode =="M3_SCHEMA_ENFORCED":
        return (
        "You are a strict regressor.\n"
        "Task: Predict the overall airline review rating as an INTEGER from 1 to 10.\n"
        "Return ONLY JSON with key 'rating' as integer 1..10. No extra keys, no commentary, no code fences.\n\n"
        f"REVIEW:\n{review_text}\n"
        )

    if mode =="M4_REPAIR":
        return (
        "You are a strict regressor.\n"
        "Task: Predict the overall airline review rating as an INTEGER from 1 to 10.\n"
        "Output format: ONLY valid JSON exactly like: {\"rating\": 7}\n"
        "No extra keys, no commentary, no code fences.\n\n"
        f"REVIEW:\n{review_text}\n"
        )

    raise ValueError (f"Unknown mode: {mode}")
def is_refusal (text :str )->bool :
    t =(text or "").strip ().lower ()
    if not t :
        return False 

    hard =[
    "i can't","i cannot","cannot","can't",
    "i’m sorry","i am sorry","sorry",
    "unable to","not able to",
    "can't help","cannot help",
    "won't comply","i won't",
    ]
    if any (p in t for p in hard ):
        return True 

    if "as an ai"in t or "as a language model"in t :
        soft =["cannot","can't","unable","not able","won't","sorry"]
        return any (p in t for p in soft )

    return False 

def parse_m1_free (raw :str )->Tuple [Optional [int ],Optional [str ]]:
    t =(raw or "").strip ()
    if t =="":
        return None ,"empty"

    m =re .search (r"\brating\s*:\s*(\d+)\b",t ,flags =re .IGNORECASE )
    candidates_int =[]
    candidates_float =[]

    if m :
        candidates_int .append (m .group (1 ))

    nums =re .findall (r"(-?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?)",t )
    for s in nums :
        if any (ch in s for ch in [".","e","E"]):
            candidates_float .append (s )
        else :
            candidates_int .append (s )

    for s in candidates_int :
        try :
            xi =int (s )
        except Exception :
            continue 
        if 1 <=xi <=10 :
            return xi ,None 

    for s in candidates_float :
        try :
            x =float (s )
        except Exception :
            continue 
        if 1.0 <=x <=10.0 :
            return None ,"type"

    if is_refusal (t ):
        return None ,"refusal"

    if len (candidates_int )>0 :
        return None ,"out_of_range"

    return None ,"parse_fail"

CODEFENCE_RE =re .compile (r"^\s*```(?:json)?\s*([\s\S]*?)\s*```\s*$",re .IGNORECASE )

def strip_code_fences (text :str )->str :
    t =(text or "").strip ()
    m =CODEFENCE_RE .match (t )
    if m :
        return (m .group (1 )or "").strip ()
    return t 

def extract_first_json_object (text :str )->Optional [str ]:
    """
    Extract first top-level JSON object {...} from text using balanced braces.
    Handles braces inside JSON strings correctly.
    Returns substring or None.
    """
    t =(text or "")
    start =t .find ("{")
    if start <0 :
        return None 

    depth =0 
    in_str =False 
    esc =False 

    for i in range (start ,len (t )):
        ch =t [i ]

        if in_str :
            if esc :
                esc =False 
            elif ch =="\\":
                esc =True 
            elif ch =='"':
                in_str =False 
            continue 

        if ch =='"':
            in_str =True 
            continue 

        if ch =="{":
            depth +=1 
        elif ch =="}":
            depth -=1 
            if depth ==0 :
                return t [start :i +1 ]

    return None 

def preprocess_relaxed_json (raw :str )->str :
    """
    Relaxed preprocessing:
    - strip code fences
    - extract first JSON object if extra text exists
    """
    t =strip_code_fences (raw )
    if t .startswith ("{")and t .endswith ("}"):
        return t 
    sub =extract_first_json_object (t )
    return (sub or t ).strip ()

def _coerce_rating_to_int (rating :Any )->Optional [int ]:
    if isinstance (rating ,bool ):
        return None 
    if isinstance (rating ,int ):
        return rating 
    if isinstance (rating ,float ):
        if abs (rating -round (rating ))<1e-9 :
            return int (round (rating ))
        return None 
    return None 

def parse_json_rating_strict (raw :str )->Tuple [Optional [int ],Optional [str ],Optional [Dict [str ,Any ]]]:
    """
    Strict: raw must be valid JSON object (no fences, no extra text).
    """
    t =(raw or "").strip ()
    if t =="":
        return None ,"empty",None 
    if is_refusal (t ):
        return None ,"refusal",None 

    try :
        obj =json .loads (t )
    except Exception :
        return None ,"json_decode",None 

    if not isinstance (obj ,dict ):
        return None ,"type",obj 
    if "rating"not in obj :
        return None ,"missing_key",obj 

    ri =_coerce_rating_to_int (obj .get ("rating"))
    if ri is None :
        return None ,"type",obj 
    if not (1 <=ri <=10 ):
        return None ,"out_of_range",obj 

    return int (ri ),None ,obj 

def parse_json_rating_relaxed (raw :str )->Tuple [Optional [int ],Optional [str ],Optional [Dict [str ,Any ]]]:
    """
    Relaxed: allow code fences and extra text; extract first JSON object.
    """
    t0 =(raw or "").strip ()
    if t0 =="":
        return None ,"empty",None 
    if is_refusal (t0 ):
        return None ,"refusal",None 

    t =preprocess_relaxed_json (t0 )
    try :
        obj =json .loads (t )
    except Exception :
        return None ,"json_decode",None 

    if not isinstance (obj ,dict ):
        return None ,"type",obj 
    if "rating"not in obj :
        return None ,"missing_key",obj 

    ri =_coerce_rating_to_int (obj .get ("rating"))
    if ri is None :
        return None ,"type",obj 
    if not (1 <=ri <=10 ):
        return None ,"out_of_range",obj 

    return int (ri ),None ,obj 

def parse_json_rating (raw :str ,*,relaxed :bool =False )->Tuple [Optional [int ],Optional [str ],Optional [Dict [str ,Any ]]]:
    return parse_json_rating_relaxed (raw )if relaxed else parse_json_rating_strict (raw )

def validate_schema (obj :Dict [str ,Any ])->Optional [str ]:
    try :
        jsonschema .validate (instance =obj ,schema =SCHEMA_OBJ )
        return None 
    except jsonschema .ValidationError as e :
        msg =str (e ).lower ()
        if "minimum"in msg or "maximum"in msg :
            return "out_of_range"
        return "schema"
    except Exception :
        return "schema"

def ollama_generate (prompt :str ,model :str ,base_url :str ,temperature :float =0.0 ,timeout_s :int =120 )->str :
    """DEPRECATED (Patch 6.3a+): Use the streaming + format-capable ollama_generate defined in SCHRITT 6.3a."""
    raise RuntimeError ("Deprecated: run SCHRITT 6.3a and use the patched streaming ollama_generate (supports format/stream/retry).")

def load_cache_index (path :Path )->Dict [Tuple [int ,str ],Dict [str ,Any ]]:
    idx :Dict [Tuple [int ,str ],Dict [str ,Any ]]={}
    if not path .exists ():
        return idx 
    with open (path ,"r",encoding ="utf-8")as f :
        for line in f :
            line =line .strip ()
            if not line :
                continue 
            try :
                rec =json .loads (line )
                key =(int (rec ["row_id"]),str (rec ["mode"]))
                idx [key ]=rec 
            except Exception :
                continue 
    return idx 

def append_cache (path :Path ,rec :Dict [str ,Any ])->None :
    """Append cache record, but avoid logging an identical record twice."""
    key =(int (rec .get ("row_id")),str (rec .get ("mode")))
    prev =cache_index .get (key )
    if prev is not None :
        same =(
        prev .get ("model")==rec .get ("model")and 
        prev .get ("prompt_hash")==rec .get ("prompt_hash")and 
        prev .get ("format_kind")==rec .get ("format_kind")and 
        prev .get ("format_fp")==rec .get ("format_fp")and 
        prev .get ("raw_output")==rec .get ("raw_output")and 
        prev .get ("error_type")==rec .get ("error_type")
        )
        if same :
            return 
    with open (path ,"a",encoding ="utf-8")as f :
        f .write (json .dumps (rec ,ensure_ascii =False )+"\n")

cache_index =load_cache_index (CACHE_PATH )

import requests 
from requests .exceptions import ReadTimeout ,ConnectionError 

OLLAMA_CONNECT_TIMEOUT_S =5 
OLLAMA_READ_TIMEOUT_S =900 
OLLAMA_NUM_PREDICT =32 
OLLAMA_NUM_CTX =2048 

PROMPT_MAX_CHARS =1800 
PROMPT_TAIL_CHARS =350 

set_prompt_profile (max_chars =PROMPT_MAX_CHARS ,tail_chars =PROMPT_TAIL_CHARS ,marker ="\n...\n")

USE_OLLAMA_FORMAT =True 

M4_REPAIR_PROMPT_VERSION ="patch4_review_context_v1"

def mode_to_format (mode :str ):
    """Return Ollama 'format' spec for this mode.
    Prompt-only modes MUST return None to allow invalid outputs (for RQ1/M4).
    Enforced mode uses SCHEMA_OBJ when USE_OLLAMA_FORMAT is True.
    """
    if not USE_OLLAMA_FORMAT :
        return None 
    if mode =="M3_SCHEMA_ENFORCED":
        return SCHEMA_OBJ 
    return None 

def format_fingerprint (fmt )->tuple [str ,str ]:
    """
    Returns (kind, fp) to version cache.
    kind in {'none','json','schema'}; fp is stable string.
    """
    if fmt is None :
        return ("none","")
    if fmt =="json":
        return ("json","json")
    try :
        s =json .dumps (fmt ,sort_keys =True )
    except Exception :
        s =str (fmt )
    return ("schema",hashlib .sha256 (s .encode ("utf-8")).hexdigest ()[:12 ])

def ollama_generate_stream (
prompt :str ,
model :str ,
base_url :str ,
temperature :float =0.0 ,
timeout_connect_s :int =OLLAMA_CONNECT_TIMEOUT_S ,
timeout_read_s :int =OLLAMA_READ_TIMEOUT_S ,
num_predict :int =OLLAMA_NUM_PREDICT ,
num_ctx :int =OLLAMA_NUM_CTX ,
stop :Optional [List [str ]]=None ,
fmt =None ,
)->str :
    """
    Streaming call: sammelt 'response' aus JSONL-chunks bis done=True.
    Zusätzlich: optional 'format' für strukturierte Outputs.
    """
    payload ={
    "model":model ,
    "prompt":prompt ,
    "stream":True ,
    "options":{
    "temperature":float (temperature ),
    "num_predict":int (num_predict ),
    "num_ctx":int (num_ctx ),
    }
    }
    if stop :
        payload ["options"]["stop"]=stop 
    if fmt is not None :
        payload ["format"]=fmt 

    r =requests .post (
    f"{base_url}/api/generate",
    json =payload ,
    stream =True ,
    timeout =(timeout_connect_s ,timeout_read_s ),
    )
    r .raise_for_status ()

    out =[]
    for line in r .iter_lines (decode_unicode =True ):
        if not line :
            continue 
        try :
            j =json .loads (line )
        except Exception :
            continue 
        out .append (j .get ("response",""))
        if j .get ("done",False ):
            break 
    return "".join (out )

def ollama_generate (prompt :str ,model :str ,base_url :str ,temperature :float =0.0 ,timeout_s :int =120 ,fmt =None )->str :
    """Wrapper: nutzt STREAM-Implementierung; fmt wird durchgereicht."""
    return ollama_generate_stream (
    prompt =prompt ,
    model =model ,
    base_url =base_url ,
    temperature =temperature ,
    timeout_connect_s =OLLAMA_CONNECT_TIMEOUT_S ,
    timeout_read_s =max (timeout_s ,300 ),
    num_predict =OLLAMA_NUM_PREDICT ,
    num_ctx =OLLAMA_NUM_CTX ,
    stop =None ,
    fmt =fmt ,
    )

def ollama_generate_with_transport_retry (
prompt :str ,
*,
model :str ,
base_url :str ,
temperature :float =0.0 ,
fmt =None ,
)->Tuple [str ,Optional [str ]]:
    try :
        return (
        ollama_generate (prompt ,model =model ,base_url =base_url ,temperature =temperature ,timeout_s =OLLAMA_READ_TIMEOUT_S ,fmt =fmt ),
        None 
        )
    except (ReadTimeout ,ConnectionError )as e :
        shorter ,_ =truncate_text (prompt ,max_chars =1200 )
        try :
            return (
            ollama_generate_stream (
            prompt =shorter ,
            model =model ,
            base_url =base_url ,
            temperature =temperature ,
            timeout_connect_s =OLLAMA_CONNECT_TIMEOUT_S ,
            timeout_read_s =1200 ,
            num_predict =OLLAMA_NUM_PREDICT ,
            num_ctx =OLLAMA_NUM_CTX ,
            fmt =fmt ,
            ),
            "transport_retry"
            )
        except Exception as e2 :
            return "",f"transport_timeout:{type(e2).__name__}"

warmup_prompt ='Return ONLY JSON exactly like: {"rating": 7}'

warmup_fmt =mode_to_format ("M3_SCHEMA_ENFORCED")

t0 =time .perf_counter ()
resp ,retry_flag =ollama_generate_with_transport_retry (
warmup_prompt ,
model =OLLAMA_MODEL ,
base_url =OLLAMA_BASE_URL ,
temperature =0.0 ,
fmt =warmup_fmt ,
)
dt =time .perf_counter ()-t0 

pr ,err ,obj =parse_json_rating (resp ,relaxed =False )

if warmup_fmt not in (None ,"json"):
    sch_err =validate_schema (obj )

CHUNK_SIZE =25 
RETRY_TIMEOUT_RECORDS =True 

def should_skip_cached (rec :Dict [str ,Any ])->bool :
    if rec .get ("parsed_rating",None )is not None :
        return True 
    err =rec .get ("error_type")
    if err is None :
        return True 
    if err .startswith ("transport_timeout")or err =="timeout":
        return not RETRY_TIMEOUT_RECORDS 
    return True 

def cache_hit_ok (rec :Dict [str ,Any ],*,model :str ,prompt_hash :str ,fmt_kind :str ,fmt_fp :str )->bool :
    """
    True, wenn Cache-Record genau zu aktueller Konfiguration passt.
    Alte Records ohne format_* gelten als 'none' und matchen nur, wenn wir aktuell auch 'none' haben.
    """
    if rec .get ("model")!=model :
        return False 
    if rec .get ("prompt_hash")!=prompt_hash :
        return False 

    if rec .get ("format_kind","none")!=fmt_kind :
        return False 
    if rec .get ("format_fp","")!=fmt_fp :
        return False 

    err =rec .get ("error_type")
    if err and (str (err ).startswith ("transport_timeout")or err in ("timeout","transport_timeout")):
        return not RETRY_TIMEOUT_RECORDS 

    return True 

def parse_dual_for_mode (raw :str ,*,mode :str )->Dict [str ,Any ]:
    """
    mode in {"M2_JSON_PROMPT","M3_SCHEMA_PROMPT","M3_SCHEMA_ENFORCED","M4_REPAIR"}.
    Returns strict+relaxed fields + valid flags.
    Primary (paper) is STRICT; relaxed is recoverability.
    """

    pr_s ,er_s ,obj_s =parse_json_rating_strict (raw )

    schema_ok_s =None 
    if er_s is None :
        if mode =="M2_JSON_PROMPT":
            schema_ok_s =True 
        else :
            sch_err =validate_schema (obj_s )
            if sch_err is None :
                schema_ok_s =True 
            else :
                pr_s =None 
                er_s =sch_err 
                schema_ok_s =False 
    else :
        schema_ok_s =None 

    valid_s =(er_s is None )

    pr_r ,er_r ,obj_r =parse_json_rating_relaxed (raw )

    schema_ok_r =None 
    if er_r is None :
        if mode =="M2_JSON_PROMPT":
            schema_ok_r =True 
        else :
            sch_err2 =validate_schema (obj_r )
            if sch_err2 is None :
                schema_ok_r =True 
            else :
                pr_r =None 
                er_r =sch_err2 
                schema_ok_r =False 
    else :
        schema_ok_r =None 

    valid_r =(er_r is None )

    return {
    "parsed_rating_strict":pr_s if valid_s else None ,
    "error_type_strict":None if valid_s else er_s ,
    "schema_ok_strict":bool (schema_ok_s )if schema_ok_s is not None else None ,
    "valid_strict":bool (valid_s ),

    "parsed_rating_relaxed":pr_r if valid_r else None ,
    "error_type_relaxed":None if valid_r else er_r ,
    "schema_ok_relaxed":bool (schema_ok_r )if schema_ok_r is not None else None ,
    "valid_relaxed":bool (valid_r ),
    }

def run_one (row_id :int ,text :str ,mode :str )->Dict [str ,Any ]:
    t0 =time .perf_counter ()

    base_prompt =make_prompt (mode if mode !="M4_REPAIR"else "M4_REPAIR",text )
    hash_in =base_prompt 
    if mode =="M4_REPAIR":
        hash_in =base_prompt +"\n__repair_prompt_version__:"+M4_REPAIR_PROMPT_VERSION 
    p_hash =prompt_hash (hash_in )
    fmt =mode_to_format (mode )
    fmt_kind ,fmt_fp =format_fingerprint (fmt )

    key =(int (row_id ),str (mode ))
    if key in cache_index and cache_hit_ok (cache_index [key ],model =OLLAMA_MODEL ,prompt_hash =p_hash ,fmt_kind =fmt_kind ,fmt_fp =fmt_fp ):
        rec =cache_index [key ].copy ()
        rec ["from_cache"]=True 

        rec ["runtime_ms_effective"]=0 
        rec ["runtime_ms_cached"]=rec .get ("runtime_ms")

        if mode in ("M2_JSON_PROMPT","M3_SCHEMA_PROMPT","M3_SCHEMA_ENFORCED","M4_REPAIR")and ("parsed_rating_strict"not in rec or "valid_strict"not in rec ):
            dual =parse_dual_for_mode (rec .get ("raw_output",""),mode =mode )
            rec .update (dual )

            rec ["parsed_rating"]=rec .get ("parsed_rating_strict")
            rec ["error_type"]=rec .get ("error_type_strict")
            rec ["schema_ok"]=rec .get ("schema_ok_strict")

        return rec 

    t0 =time .perf_counter ()

    base_prompt =make_prompt (mode if mode !="M4_REPAIR"else "M4_REPAIR",text )
    hash_in =base_prompt 
    if mode =="M4_REPAIR":
        hash_in =base_prompt +"\n__repair_prompt_version__:"+M4_REPAIR_PROMPT_VERSION 
    p_hash =prompt_hash (hash_in )
    raw1 ,transport_err =ollama_generate_with_transport_retry (
    base_prompt ,
    model =OLLAMA_MODEL ,
    base_url =OLLAMA_BASE_URL ,
    temperature =0.0 ,
    fmt =fmt ,
    )

    raw1s =(raw1 or "").strip ()

    parsed =None 
    error_type =None 
    schema_ok =None 
    repaired =False 
    raw_final =raw1s 
    raw_repair =None 
    raw_first =raw1s 

    dual_first_pref ={}
    dual_final =None 
    if transport_err is not None and raw1s =="":
        error_type =transport_err 
        runtime_ms =int ((time .perf_counter ()-t0 )*1000 )
        rec ={
        "timestamp_utc":time .strftime ("%Y-%m-%dT%H:%M:%SZ",time .gmtime ()),
        "row_id":int (row_id ),
        "mode":mode ,
        "model":OLLAMA_MODEL ,
        "temperature":0.0 ,
        "prompt_hash":p_hash ,
        "raw_output":"",
        "raw_output_first":None ,
        "raw_output_repair":None ,
        "parsed_rating":None ,
        "error_type":error_type ,
        "schema_ok":None ,
        "runtime_ms":runtime_ms ,
        "repaired":False ,
        "from_cache":False ,
        "format_kind":fmt_kind ,
        "format_fp":fmt_fp ,
        "runtime_ms_effective":runtime_ms ,
        "runtime_ms_cached":None ,
        }
        append_cache (CACHE_PATH ,rec )
        cache_index [(int (row_id ),mode )]=rec 
        return rec 

    if mode =="M1_FREE":
        parsed ,error_type =parse_m1_free (raw1s )
        schema_ok =None 

    elif mode =="M2_JSON_PROMPT":
        dual =parse_dual_for_mode (raw1s ,mode =mode )
        parsed =dual ["parsed_rating_strict"]
        error_type =dual ["error_type_strict"]
        schema_ok =dual ["schema_ok_strict"]

    elif mode in ("M3_SCHEMA_PROMPT","M3_SCHEMA_ENFORCED"):
        dual =parse_dual_for_mode (raw1s ,mode =mode )

        parsed =dual ["parsed_rating_strict"]
        error_type =dual ["error_type_strict"]
        schema_ok =dual ["schema_ok_strict"]

    elif mode =="M4_REPAIR":
        dual_first =parse_dual_for_mode (raw1s ,mode =mode )

        dual_first_pref ={f"{k}_first":v for k ,v in dual_first .items ()}
        dual_final =dual_first 
        parsed =dual_first ["parsed_rating_strict"]
        error_type =dual_first ["error_type_strict"]
        schema_ok =dual_first ["schema_ok_strict"]

        if not dual_first ["valid_strict"]:
            repaired =True 

            review_text ,_ =truncate_text (text )

            repair_prompt =(
            "You produced INVALID output for a rating prediction task.\n"
            f"Error type: {error_type}\n\n"
            "Task: Predict the overall airline review rating as an INTEGER 1..10.\n"
            "Return ONLY JSON that matches the schema. No commentary, no code fences.\n"
            f"Schema: {json.dumps(SCHEMA_OBJ, ensure_ascii=False)}\n\n"
            "REVIEW:\n"
            f"{review_text}\n\n"
            "PREVIOUS_OUTPUT:\n"
            f"{raw1s}\n"
            )

            fmt_repair =mode_to_format ("M4_REPAIR")

            raw2 ,transport_err2 =ollama_generate_with_transport_retry (
            repair_prompt ,
            model =OLLAMA_MODEL ,
            base_url =OLLAMA_BASE_URL ,
            temperature =0.0 ,
            fmt =fmt_repair ,
            )
            raw2s =(raw2 or "").strip ()
            raw_repair =raw2s 
            raw_final =raw2s 
            raw_first =raw1s 

            if transport_err2 is not None and raw2s =="":
                parsed =None 
                error_type =transport_err2 
                schema_ok =False 
                dual_final =None 
            else :
                dual_final =parse_dual_for_mode (raw2s ,mode =mode )
                parsed =dual_final ["parsed_rating_strict"]
                error_type =dual_final ["error_type_strict"]
                schema_ok =dual_final ["schema_ok_strict"]

    else :
        raise ValueError (mode )

    runtime_ms =int ((time .perf_counter ()-t0 )*1000 )

    rec ={
    "timestamp_utc":time .strftime ("%Y-%m-%dT%H:%M:%SZ",time .gmtime ()),
    "row_id":int (row_id ),
    "mode":mode ,
    "model":OLLAMA_MODEL ,
    "temperature":0.0 ,
    "prompt_hash":p_hash ,

    "raw_output":raw_final ,
    "raw_output_first":raw_first if repaired else None ,
    "raw_output_repair":raw_repair if repaired else None ,

    "parsed_rating":parsed ,
    "error_type":error_type ,
    "schema_ok":schema_ok ,

    "transport_error_first":transport_err ,
    "transport_error_repair":(locals ().get ("transport_err2")if repaired else None ),
    **(dual if mode in ("M2_JSON_PROMPT","M3_SCHEMA_PROMPT","M3_SCHEMA_ENFORCED")else {}),
    **(dual_first_pref if mode =="M4_REPAIR"else {}),
    **(dual_final if (mode =="M4_REPAIR"and 'dual_final'in locals ()and dual_final is not None )else {}),

    "runtime_ms":runtime_ms ,
    "runtime_ms_effective":runtime_ms ,
    "repaired":bool (repaired ),
    "from_cache":False ,

    "format_kind":fmt_kind ,
    "format_fp":fmt_fp ,
    "runtime_ms_cached":None ,

    }

    append_cache (CACHE_PATH ,rec )
    cache_index [(int (row_id ),mode )]=rec 
    return rec 

def run_mode_chunked (mode :str )->pd .DataFrame :
    rows =[]
    t0 =time .perf_counter ()
    for start in range (0 ,len (df_S ),CHUNK_SIZE ):
        end =min (len (df_S ),start +CHUNK_SIZE )
        chunk =df_S .iloc [start :end ]
        for r in chunk .itertuples (index =False ):
            rows .append (run_one (int (r .row_id ),getattr (r ,"text"),mode ))
        done =end 
        valid_now =pd .Series ([x .get ("parsed_rating")for x in rows ]).notna ().mean ()
        print (f"{mode} progress: {done}/{len(df_S)} | valid_rate_so_far={valid_now:.3f}")
    dt =time .perf_counter ()-t0 
    rec_df =pd .DataFrame (rows )
    valid =rec_df ["parsed_rating"].notna ().mean ()
    print (f"{mode}: done | seconds={dt:.1f} | valid_rate={valid:.3f} | repaired_rate={rec_df['repaired'].mean():.3f} | from_cache={rec_df['from_cache'].mean():.3f}")
    return rec_df 

mode_frames ={}
for mode in MODES :
    mode_frames [mode ]=run_mode_chunked (mode )

for mode in ["M1_FREE","M2_JSON_PROMPT","M3_SCHEMA_PROMPT","M3_SCHEMA_ENFORCED","M4_REPAIR"]:
    fmt =mode_to_format (mode )
    resp ,err =ollama_generate_with_transport_retry (
    'Return ONLY {"rating": 7}',model =OLLAMA_MODEL ,base_url =OLLAMA_BASE_URL ,temperature =0.0 ,fmt =fmt 
    )
    print (mode ,"| err=",err ,"| resp=",resp [:80 ])

try :
    import shutil 
    shutil .copy2 (CACHE_PATH ,CACHE_PATH_LATEST )
except Exception :
    pass 

def compute_llm_metrics (df_rec :pd .DataFrame ,y_true_by_id :Dict [int ,float ],variant :str ="strict")->Dict [str ,Any ]:
    """
    Computes metrics for LLM predictions.
    variant="strict" -> paper-primary fields
    variant="relaxed" -> recoverability diagnostics (if present)
    """

    parsed_col =f"parsed_rating_{variant}"if f"parsed_rating_{variant}"in df_rec .columns else "parsed_rating"
    err_col =f"error_type_{variant}"if f"error_type_{variant}"in df_rec .columns else "error_type"
    schema_col =f"schema_ok_{variant}"if f"schema_ok_{variant}"in df_rec .columns else "schema_ok"
    valid_col =f"valid_{variant}"if f"valid_{variant}"in df_rec .columns else None 

    y_true =df_rec ["row_id"].astype (int ).map (y_true_by_id ).astype (float ).to_numpy ()
    y_pred =pd .to_numeric (df_rec .get (parsed_col ),errors ="coerce").to_numpy ()

    if valid_col is not None :
        valid_mask =df_rec [valid_col ].fillna (False ).astype (bool ).to_numpy ()
    else :
        valid_mask =pd .isna (df_rec .get (err_col )).to_numpy ()&~pd .isna (y_pred )

    n =int (len (df_rec ))
    coverage =float (valid_mask .mean ())if n else 0.0 

    if valid_mask .sum ()>0 :
        m =evaluate_regression (y_true [valid_mask ],y_pred [valid_mask ])
        mae =float (m ["mae"])
        rmse =float (m ["rmse"])
        spearman =float (m ["spearman"])
        risk_mae =mae 
        risk_rmse =rmse 
    else :
        mae =rmse =spearman =risk_mae =risk_rmse =float ("nan")

    parse_success =float (pd .isna (df_rec .get (err_col )).mean ())if err_col in df_rec .columns else float ("nan")
    empty_refusal =float (df_rec .get (err_col ).isin (["empty","refusal"]).mean ())if err_col in df_rec .columns else 0.0 
    out_of_range =float (df_rec .get (err_col ).eq ("out_of_range").mean ())if err_col in df_rec .columns else 0.0 

    if schema_col in df_rec .columns and df_rec [schema_col ].notna ().any ():
        schema_adherence =float (df_rec [schema_col ].fillna (False ).mean ())
    else :
        schema_adherence =parse_success 

    runtime_col ="runtime_ms_effective"if "runtime_ms_effective"in df_rec .columns else "runtime_ms"
    total_runtime_s =float (df_rec [runtime_col ].fillna (0 ).sum ()/1000.0 )
    sec_per_100 =float (runtime_s_per_100 (n ,total_runtime_s ))

    invalid_final_n =int ((~valid_mask ).sum ())if n else 0 

    valid_first_col =f"valid_{variant}_first"if f"valid_{variant}_first"in df_rec .columns else None 
    if valid_first_col is not None :
        valid_first =df_rec [valid_first_col ].fillna (False ).astype (bool ).to_numpy ()
        invalid_first_n =int ((~valid_first ).sum ())if n else 0 
    else :
        valid_first =valid_mask 
        invalid_first_n =invalid_final_n 

    repaired_mask =df_rec .get ("repaired",False )
    if hasattr (repaired_mask ,"fillna"):
        repaired_mask =repaired_mask .fillna (False ).astype (bool ).to_numpy ()
    else :
        repaired_mask =np .zeros (n ,dtype =bool )

    repair_trigger_n =int (repaired_mask .sum ())if n else 0 
    repair_success_n =int (((repaired_mask )&(~valid_first )&(valid_mask )).sum ())if n else 0 
    repair_trigger_rate =(repair_trigger_n /n )if n else 0.0 
    repair_success_rate =(repair_success_n /n )if n else 0.0 

    return {
    "coverage":coverage ,
    "mae":mae ,
    "rmse":rmse ,
    "spearman":spearman ,
    "risk_mae":risk_mae ,
    "risk_rmse":risk_rmse ,
    "parse_success_rate":parse_success ,
    "schema_adherence_rate":schema_adherence ,
    "out_of_range_rate":out_of_range ,
    "empty_refusal_rate":empty_refusal ,
    "runtime_s_total":total_runtime_s ,
    "sec_per_100":sec_per_100 ,
    "n_samples":n ,
    "repaired_rate":float (df_rec ["repaired"].mean ())if "repaired"in df_rec .columns else 0.0 ,
    "cache_hit_rate":float (df_rec ["from_cache"].mean ())if "from_cache"in df_rec .columns else 0.0 ,
    "invalid_first_n":invalid_first_n ,
    "invalid_final_n":invalid_final_n ,
    "repair_trigger_n":repair_trigger_n ,
    "repair_trigger_rate":float (repair_trigger_rate ),
    "repair_success_n":repair_success_n ,
    "repair_success_rate":float (repair_success_rate ),
    }

y_true_S ={int (r .row_id ):float (r .target )for r in df_S .itertuples (index =False )}
y_true_test ={int (r .row_id ):float (r .target )for r in df_test .itertuples (index =False )}

def predictions_table (df_rec :pd .DataFrame ,split_name :str ,y_true_map :Dict [int ,float ],method_name :str )->pd .DataFrame :
    rid =df_rec ["row_id"].astype (int )

    y_pred_strict =pd .to_numeric (df_rec .get ("parsed_rating_strict",df_rec .get ("parsed_rating")),errors ="coerce")
    y_pred_relaxed =pd .to_numeric (df_rec .get ("parsed_rating_relaxed",df_rec .get ("parsed_rating")),errors ="coerce")

    valid_strict =df_rec .get ("valid_strict",pd .isna (df_rec .get ("error_type_strict",df_rec .get ("error_type")))).fillna (False ).astype (bool )
    valid_relaxed =df_rec .get ("valid_relaxed",pd .isna (df_rec .get ("error_type_relaxed",df_rec .get ("error_type")))).fillna (False ).astype (bool )

    error_type_strict =df_rec .get ("error_type_strict",df_rec .get ("error_type"))
    error_type_relaxed =df_rec .get ("error_type_relaxed",df_rec .get ("error_type"))

    out =pd .DataFrame ({
    "row_id":rid ,
    "split":split_name ,
    "method":method_name ,
    "y_true":rid .map (y_true_map ).astype (float ),

    "y_pred":y_pred_strict ,
    "valid":valid_strict ,
    "error_type":error_type_strict ,

    "y_pred_strict":y_pred_strict ,
    "y_pred_relaxed":y_pred_relaxed ,
    "valid_strict":valid_strict ,
    "valid_relaxed":valid_relaxed ,
    "error_type_strict":error_type_strict ,
    "error_type_relaxed":error_type_relaxed ,

    "runtime_ms":pd .to_numeric (df_rec .get ("runtime_ms"),errors ="coerce"),
    "runtime_ms_effective":pd .to_numeric (df_rec .get ("runtime_ms_effective",df_rec .get ("runtime_ms")),errors ="coerce"),

    "repaired":df_rec .get ("repaired",False ).fillna (False ).astype (bool ),
    "from_cache":df_rec .get ("from_cache",False ).fillna (False ).astype (bool ),
    })
    return out 

results_rows =[]
pred_files =[]
fig_paths =[]

S_ids =set (df_S ["row_id"].astype (int ).tolist ())
test_ids_set =set (df_test ["row_id"].astype (int ).tolist ())
LLM_HAS_FULL_TEST =(S_ids ==test_ids_set )

for mode in MODES :
    df_rec =mode_frames [mode ].copy ()

    method =f"phi3mini_{mode.lower()}"

    metrics_S_strict =compute_llm_metrics (df_rec ,y_true_S ,variant ="strict")
    metrics_S_relaxed =compute_llm_metrics (df_rec ,y_true_S ,variant ="relaxed")
    pred_S =predictions_table (df_rec ,"S",y_true_S ,method )

    pred_parts =[pred_S ]
    results_splits =[("S",metrics_S_strict ,metrics_S_relaxed )]

    if LLM_HAS_FULL_TEST :
        df_rec_by_id =df_rec .set_index ("row_id")
        ordered =df_test ["row_id"].astype (int ).tolist ()
        df_rec_test =df_rec_by_id .reindex (ordered ).reset_index ()

        if "mode"in df_rec_test .columns and df_rec_test ["mode"].isna ().any ():
            df_rec_test ["mode"]=mode 
            df_rec_test ["model"]=OLLAMA_MODEL 
            df_rec_test ["temperature"]=0.0 
            df_rec_test ["raw_output"]=""
            df_rec_test ["parsed_rating"]=np .nan 
            df_rec_test ["error_type"]="missing_prediction"
            df_rec_test ["schema_ok"]=False 
            df_rec_test ["runtime_ms"]=0 
            df_rec_test ["repaired"]=False 
            df_rec_test ["from_cache"]=True 
            df_rec_test ["parsed_rating_strict"]=np .nan 
            df_rec_test ["error_type_strict"]="missing_prediction"
            df_rec_test ["schema_ok_strict"]=False 
            df_rec_test ["valid_strict"]=False 
            df_rec_test ["valid_strict_first"]=False 
            df_rec_test ["error_type_strict_first"]="missing_prediction"
            df_rec_test ["schema_ok_strict_first"]=False 
            df_rec_test ["parsed_rating_relaxed"]=np .nan 
            df_rec_test ["error_type_relaxed"]="missing_prediction"
            df_rec_test ["schema_ok_relaxed"]=False 
            df_rec_test ["valid_relaxed"]=False 
            df_rec_test ["valid_relaxed_first"]=False 
            df_rec_test ["error_type_relaxed_first"]="missing_prediction"
            df_rec_test ["schema_ok_relaxed_first"]=False 
            df_rec_test ["runtime_ms_effective"]=0 

        metrics_test_strict =compute_llm_metrics (df_rec_test ,y_true_test ,variant ="strict")
        metrics_test_relaxed =compute_llm_metrics (df_rec_test ,y_true_test ,variant ="relaxed")
        pred_test =predictions_table (df_rec_test ,"test",y_true_test ,method )

        pred_parts .append (pred_test )
        results_splits .append (("test",metrics_test_strict ,metrics_test_relaxed ))

    pred_path =PATHS ["reports"]/f"predictions_{method}.parquet"
    pd .concat (pred_parts ,axis =0 ).reset_index (drop =True ).to_parquet (pred_path ,index =False )
    pred_files .append (pred_path )

    timestamp_utc =time .strftime ("%Y-%m-%dT%H:%M:%SZ",time .gmtime ())
    for split_name ,met_strict ,met_relaxed in results_splits :
        results_rows .append ({
        "timestamp_utc":timestamp_utc ,
        "method":method ,
        "split":split_name ,
        "llm_mode":mode ,
        "model_name":OLLAMA_MODEL ,
        "temperature":0.0 ,

        **{k :met_strict [k ]for k in ["mae","rmse","spearman","parse_success_rate","schema_adherence_rate",
        "out_of_range_rate","empty_refusal_rate","coverage","risk_mae","risk_rmse",
        "sec_per_100","n_samples",
        "repaired_rate","cache_hit_rate",
        "invalid_first_n","invalid_final_n","repair_trigger_n","repair_trigger_rate","repair_success_n","repair_success_rate"]},

        "coverage_relaxed":met_relaxed ["coverage"],
        "parse_success_rate_relaxed":met_relaxed ["parse_success_rate"],
        "schema_adherence_rate_relaxed":met_relaxed ["schema_adherence_rate"],
        "mae_relaxed":met_relaxed ["mae"],
        "rmse_relaxed":met_relaxed ["rmse"],
        "spearman_relaxed":met_relaxed ["spearman"],

        "invalid_first_n_relaxed":met_relaxed .get ("invalid_first_n"),
        "invalid_final_n_relaxed":met_relaxed .get ("invalid_final_n"),
        "repair_trigger_rate_relaxed":met_relaxed .get ("repair_trigger_rate"),
        "repair_success_rate_relaxed":met_relaxed .get ("repair_success_rate"),

        "inference_s":met_strict ["runtime_s_total"],
        "notes":("evaluated_on=S_only"if not LLM_HAS_FULL_TEST else f"cache={CACHE_PATH.name}"),
        })

for _r in results_rows :
    _r .setdefault ("run_id",RUN_ID )
    _r .setdefault ("run_tag",RUN_TAG )

results_path =PATHS ["reports"]/"results.csv"
results_path_run =PATHS ["reports"]/f"results_{RUN_ID}.csv"

def upsert_results (existing :Optional [pd .DataFrame ],new_rows :pd .DataFrame ,key_cols :List [str ])->pd .DataFrame :
    if existing is None or len (existing )==0 :
        out =new_rows .copy ()
    else :
        out =pd .concat ([existing ,new_rows ],axis =0 ,ignore_index =True )
        out ["_dupkey"]=out [key_cols ].astype (str ).agg ("||".join ,axis =1 )
        out =out .drop_duplicates (subset ="_dupkey",keep ="last").drop (columns =["_dupkey"])
    return out .sort_values (key_cols ).reset_index (drop =True )

df_new =pd .DataFrame (results_rows )
df_exist =pd .read_csv (results_path )if results_path .exists ()else pd .DataFrame ()
df_results =upsert_results (df_exist ,df_new ,key_cols =["method","split","run_id"])
df_new .to_csv (results_path_run ,index =False )
df_results .to_csv (results_path ,index =False )

summary_cols =["method","split","mae","rmse","spearman","coverage","parse_success_rate","schema_adherence_rate","out_of_range_rate","empty_refusal_rate","sec_per_100","n_samples"]

try :
    import matplotlib .pyplot as plt 

    if (df_new ["split"]=="test").any ():
        test_rows =df_new [df_new ["split"]=="test"].copy ()
        test_rows ["mode"]=test_rows ["llm_mode"]
        test_rows =test_rows .sort_values ("mode")

        fig1 =PATHS ["figs"]/"phi3mini_mae_by_mode_test.png"
        plt .figure ()
        plt .bar (test_rows ["mode"].astype (str ),test_rows ["mae"].astype (float ))
        plt .ylabel ("MAE (lower is better)")
        plt .title ("phi3:mini — MAE by Mode (Test)")
        plt .tight_layout ()
        savefig_with_runid (fig1 ,dpi =150 )
        plt .close ()

        fig2 =PATHS ["figs"]/"phi3mini_coverage_by_mode_test.png"
        plt .figure ()
        plt .bar (test_rows ["mode"].astype (str ),test_rows ["coverage"].astype (float ))
        plt .ylabel ("Coverage / Valid Rate")
        plt .title ("phi3:mini — Coverage by Mode (Test)")
        plt .ylim (0 ,1.05 )
        plt .tight_layout ()
        savefig_with_runid (fig2 ,dpi =150 )
        plt .close ()

        fig_paths =[fig1 ,fig2 ]
        print ("Saved figs:",[str (p )for p in fig_paths ])
    else :
        print ("No full LLM test predictions; skipping test plots.")
except Exception as e :
    print ("Plotting skipped:",repr (e ))


In [None]:
results_path =PATHS ["reports"]/"results.csv"

df_res =pd .read_csv (results_path )

df_res ["timestamp_utc"]=df_res ["timestamp_utc"].astype (str )
df_res ["_ts"]=pd .to_datetime (df_res ["timestamp_utc"],errors ="coerce",utc =True )

df_latest =(
df_res .sort_values (["method","split","_ts"])
.groupby (["method","split"],as_index =False )
.tail (1 )
.drop (columns =["_ts"])
.reset_index (drop =True )
)

num_cols =["mae","rmse","spearman","coverage","parse_success_rate","schema_adherence_rate",
"out_of_range_rate","empty_refusal_rate","sec_per_100","n_samples","risk_mae","risk_rmse"]
for c in num_cols :
    if c in df_latest .columns :
        df_latest [c ]=pd .to_numeric (df_latest [c ],errors ="coerce")

keep_cols =[
"method","split","mae","rmse","spearman","coverage","risk_mae","risk_rmse",
"parse_success_rate","schema_adherence_rate","out_of_range_rate","empty_refusal_rate",
"sec_per_100","n_samples","timestamp_utc"
]
keep_cols =[c for c in keep_cols if c in df_latest .columns ]

lb_test =(df_latest [df_latest ["split"]=="test"][keep_cols ]
.sort_values (["mae","rmse","sec_per_100"],ascending =[True ,True ,True ])
.reset_index (drop =True ))

lb_S =(df_latest [df_latest ["split"]=="S"][keep_cols ]
.sort_values (["mae","rmse","sec_per_100"],ascending =[True ,True ,True ])
.reset_index (drop =True ))

lb_test_path =PATHS ["reports"]/"leaderboard_test.csv"
lb_S_path =PATHS ["reports"]/"leaderboard_S.csv"
df_latest_path =PATHS ["reports"]/"results_latest.csv"

lb_test .to_csv (lb_test_path ,index =False )
lb_S .to_csv (lb_S_path ,index =False )
df_latest .to_csv (df_latest_path ,index =False )

pred_files =sorted (glob .glob (str (PATHS ["reports"]/"predictions_*.parquet")))

df_all =pd .read_parquet (PATHS ["data_processed"]/"reviews_processed.parquet")[["row_id","target"]].copy ()
df_all ["rating_bin"]=pd .cut (df_all ["target"],bins =[-0.1 ,3 ,7 ,10.1 ],labels =["1-3","4-7","8-10"],include_lowest =True )
bin_map =df_all .set_index ("row_id")["rating_bin"].astype (str ).to_dict ()

def per_bin_metrics (dfp :pd .DataFrame )->pd .DataFrame :
    """
    dfp schema:
      row_id, split, method, y_true, y_pred, ...
    For LLM: may include valid flag; if absent, valid = y_pred notna
    """
    df =dfp .copy ()
    df ["row_id"]=pd .to_numeric (df ["row_id"],errors ="coerce").astype ("Int64")
    df =df [df ["row_id"].notna ()].copy ()
    df ["row_id"]=df ["row_id"].astype (int )

    df =df [df ["split"]=="test"].copy ()
    if len (df )==0 :
        return pd .DataFrame ()

    if "valid"in df .columns :
        df ["valid"]=df ["valid"].astype (bool )
    else :
        df ["valid"]=pd .to_numeric (df ["y_pred"],errors ="coerce").notna ()

    df ["y_true"]=pd .to_numeric (df ["y_true"],errors ="coerce")
    df ["y_pred"]=pd .to_numeric (df ["y_pred"],errors ="coerce")
    df ["rating_bin"]=df ["row_id"].map (bin_map ).fillna ("unknown")

    out =[]
    for (method ,b ),g in df .groupby (["method","rating_bin"]):
        valid =g [g ["valid"]&g ["y_true"].notna ()&g ["y_pred"].notna ()]
        cov =float (len (valid )/max (1 ,len (g )))
        if len (valid )>0 :
            m =evaluate_regression (valid ["y_true"].to_numpy (),valid ["y_pred"].to_numpy ())
        else :
            m ={"mae":np .nan ,"rmse":np .nan ,"spearman":np .nan }
        out .append ({
        "method":method ,
        "rating_bin":b ,
        "n_total":int (len (g )),
        "n_valid":int (len (valid )),
        "coverage":cov ,
        "mae":float (m ["mae"]),
        "rmse":float (m ["rmse"]),
        "spearman":float (m ["spearman"]),
        })
    return pd .DataFrame (out )

all_bin_rows =[]
for fp in pred_files :
    dfp =pd .read_parquet (fp )
    if not {"row_id","split","method","y_true","y_pred"}.issubset (dfp .columns ):
        continue 
    all_bin_rows .append (per_bin_metrics (dfp ))

df_bins =pd .concat (all_bin_rows ,axis =0 ,ignore_index =True )
df_bins =df_bins [df_bins ["rating_bin"].isin (["1-3","4-7","8-10"])].copy ()

bins_path =PATHS ["reports"]/"fairness_by_bin_test.csv"
df_bins .to_csv (bins_path ,index =False )

try :
    import matplotlib .pyplot as plt 
    HAS_MPL =True 
except Exception :
    HAS_MPL =False 

dfp =lb_test .copy ()

dfp =dfp .sort_values (["mae","rmse","sec_per_100"],ascending =[True ,True ,True ]).reset_index (drop =True )

fig_mae =PATHS ["figs"]/"leaderboard_mae_test.png"
plt .figure ()
plt .bar (dfp ["method"].astype (str ),dfp ["mae"].astype (float ))
plt .ylabel ("MAE (test)")
plt .title ("All methods — MAE on Test (lower is better)")
plt .xticks (rotation =90 )
plt .tight_layout ()
savefig_with_runid (fig_mae ,dpi =150 )
plt .close ()

fig_cov =PATHS ["figs"]/"leaderboard_coverage_test.png"
plt .figure ()
plt .bar (dfp ["method"].astype (str ),dfp ["coverage"].astype (float ))
plt .ylabel ("Coverage / Valid rate (test)")
plt .title ("All methods — Coverage on Test (higher is better)")
plt .ylim (0 ,1.05 )
plt .xticks (rotation =90 )
plt .tight_layout ()
savefig_with_runid (fig_cov ,dpi =150 )
plt .close ()

fig_cov_mae =PATHS ["figs"]/"coverage_vs_mae_test.png"
plt .figure ()
x =dfp ["coverage"].astype (float ).to_numpy ()
y =dfp ["mae"].astype (float ).to_numpy ()
plt .scatter (x ,y ,alpha =0.8 )
for i ,m in enumerate (dfp ["method"].astype (str ).tolist ()):
    plt .annotate (m ,(x [i ],y [i ]),fontsize =8 )
plt .xlabel ("Coverage (higher better)")
plt .ylabel ("MAE (lower better)")
plt .title ("Test: Coverage vs MAE")
plt .tight_layout ()
savefig_with_runid (fig_cov_mae ,dpi =150 )
plt .close ()

fig_eff_mae =PATHS ["figs"]/"efficiency_vs_mae_test.png"
plt .figure ()
x =dfp ["sec_per_100"].astype (float ).to_numpy ()
y =dfp ["mae"].astype (float ).to_numpy ()
plt .scatter (x ,y ,alpha =0.8 )
for i ,m in enumerate (dfp ["method"].astype (str ).tolist ()):
    plt .annotate (m ,(x [i ],y [i ]),fontsize =8 )
plt .xlabel ("Seconds per 100 samples (lower better)")
plt .ylabel ("MAE (lower better)")
plt .title ("Test: Efficiency vs MAE")
plt .tight_layout ()
savefig_with_runid (fig_eff_mae ,dpi =150 )
plt .close ()

for p in [fig_mae ,fig_cov ,fig_cov_mae ,fig_eff_mae ]:
    print (" -",p )


lb =pd .read_csv (PATHS ["reports"]/"leaderboard_test.csv")

bins =pd .read_csv (PATHS ["reports"]/"fairness_by_bin_test.csv")

CACHE_PATH =PATHS ["reports"]/"llm_cache.jsonl"

cache_index =load_cache_index (CACHE_PATH )

df_S =pd .read_parquet (PATHS ["data_processed"]/"splits"/"subset_S.parquet").copy ()
df_S =df_S .sample (n =min (30 ,len (df_S )),random_state =SEED ).reset_index (drop =True )

def probe_cache_hits (df_probe :pd .DataFrame ,mode :str )->Dict [str ,Any ]:
    t0 =time .perf_counter ()
    hits =0 
    for r in df_probe .itertuples (index =False ):
        key =(int (r .row_id ),mode )
        if key in cache_index :
            hits +=1 
        else :
            _ =run_one (int (r .row_id ),getattr (r ,"text"),mode )
            hits +=0 
    dt =time .perf_counter ()-t0 
    return {
    "mode":mode ,
    "n_probe":int (len (df_probe )),
    "cache_hits":int (hits ),
    "cache_hit_rate":float (hits /max (1 ,len (df_probe ))),
    "probe_seconds":float (dt ),
    "probe_sec_per_100":runtime_s_per_100 (len (df_probe ),dt ),
    }

rows =[]
for mode in MODES :
    rows .append (probe_cache_hits (df_S ,mode ))

df_probe =pd .DataFrame (rows )


def list_files (root :Path ,patterns :List [str ])->pd .DataFrame :
    files =[]
    for pat in patterns :
        files .extend (root .rglob (pat ))
    rows =[]
    for p in sorted (set (files )):
        if p .is_file ():
            rows .append ({
            "path":str (p ),
            "size_kb":round (p .stat ().st_size /1024 ,2 ),
            "modified":time .strftime ("%Y-%m-%dT%H:%M:%SZ",time .gmtime (p .stat ().st_mtime )),
            })
    return pd .DataFrame (rows )

idx_models =list_files (PATHS ["models"],["*.joblib","*.bin","*.safetensors","config.json","tokenizer.json","vocab.txt"])
idx_reports =list_files (PATHS ["reports"],["*.csv","*.json","*.jsonl","*.parquet","*.md"])
idx_figs =list_files (PATHS ["figs"],["*.png"])
idx_data =list_files (PATHS ["data_processed"],["*.parquet","*.csv","*.json"])
idx_feat =list_files (PATHS ["data_features"],["*.npz","*.json"])

artifact_index =pd .concat ([
idx_models .assign (category ="models"),
idx_reports .assign (category ="reports"),
idx_figs .assign (category ="figs"),
idx_data .assign (category ="data_processed"),
idx_feat .assign (category ="data_features"),
],axis =0 ).reset_index (drop =True )

artifact_index_path =PATHS ["reports"]/"artifact_index.csv"
artifact_index .to_csv (artifact_index_path ,index =False )

splits_meta_path =PATHS ["reports"]/"splits_meta.json"
subset_meta_path =PATHS ["reports"]/"subset_S_meta.json"
schema_path =PATHS ["reports"]/"data_schema.json"

splits_meta =json .load (open (splits_meta_path ,"r",encoding ="utf-8"))if splits_meta_path .exists ()else {}
subset_meta =json .load (open (subset_meta_path ,"r",encoding ="utf-8"))if subset_meta_path .exists ()else {}
data_schema =json .load (open (schema_path ,"r",encoding ="utf-8"))if schema_path .exists ()else {}

must_have =[
PATHS ["data_processed"]/"reviews_processed.parquet",
PATHS ["data_processed"]/"splits"/"train.parquet",
PATHS ["data_processed"]/"splits"/"val.parquet",
PATHS ["data_processed"]/"splits"/"test.parquet",
PATHS ["reports"]/"results.csv",
]


BOOT_B =5000 
BOOT_SEED =SEED 

pred_paths =sorted (glob .glob (str (PATHS ["reports"]/"predictions_*.parquet")))

df_test =pd .read_parquet (PATHS ["data_processed"]/"splits"/"test.parquet")[["row_id","target","text"]].copy ()
df_test ["row_id"]=df_test ["row_id"].astype (int )
df_test =df_test .rename (columns ={"target":"y_true"})
df_test ["text_len_chars"]=df_test ["text"].astype (str ).str .len ()

df_test ["rating_bin"]=pd .cut (df_test ["y_true"],bins =[-0.1 ,3 ,7 ,10.1 ],labels =["1-3","4-7","8-10"],include_lowest =True ).astype (str )

def load_method_predictions (path :str )->pd .DataFrame :
    dfp =pd .read_parquet (path )
    dfp =dfp [dfp ["split"]=="test"].copy ()
    dfp ["row_id"]=pd .to_numeric (dfp ["row_id"],errors ="coerce").astype ("Int64")
    dfp =dfp [dfp ["row_id"].notna ()].copy ()
    dfp ["row_id"]=dfp ["row_id"].astype (int )

    method =str (dfp ["method"].iloc [0 ])if len (dfp )else Path (path ).stem .replace ("predictions_","")
    stem =Path (path ).stem .replace ("predictions_","")
    if "__"in stem :
        base ,tag =stem .rsplit ("__",1 )
        method =f"{base}__{tag}"
    dfp ["method"]=method 

    if "y_pred_strict"in dfp .columns :
        y_pred =pd .to_numeric (dfp ["y_pred_strict"],errors ="coerce")
        valid =dfp ["valid_strict"].astype (bool )if "valid_strict"in dfp .columns else y_pred .notna ()
        err_type =dfp ["error_type_strict"]if "error_type_strict"in dfp .columns else dfp .get ("error_type",None )
    elif "y_pred"in dfp .columns :
        y_pred =pd .to_numeric (dfp ["y_pred"],errors ="coerce")
        valid =dfp ["valid"].astype (bool )if "valid"in dfp .columns else y_pred .notna ()
        err_type =dfp .get ("error_type",None )
    else :
        raise ValueError (f"Keine y_pred Spalte in {path}")

    out =dfp [["row_id","method"]].copy ()
    out ["y_pred"]=y_pred 
    out ["valid"]=valid 
    if err_type is not None :
        out ["error_type"]=err_type 
    else :
        out ["error_type"]=None 

    for c in ["runtime_ms","repaired","from_cache"]:
        if c in dfp .columns :
            out [c ]=dfp [c ]
    return out 

method_tables =[]
for p in pred_paths :
    try :
        method_tables .append (load_method_predictions (p ))
    except Exception as e :
        print ("Skip (could not load):",p ,"|",repr (e ))

df_methods =pd .concat (method_tables ,axis =0 ,ignore_index =True )

methods =sorted (df_methods ["method"].unique ().tolist ())

joined ={}
for m in methods :
    d =df_methods [df_methods ["method"]==m ].copy ()
    d =df_test .merge (d ,on =["row_id"],how ="left")
    d ["valid"]=d ["valid"].fillna (False )
    joined [m ]=d 

for m ,d in list (joined .items ())[:5 ]:
    print (m ,"| n=",len (d ),"| coverage=",float (d ["valid"].mean ()))

def mae_rmse (y_true :np .ndarray ,y_pred :np .ndarray )->tuple [float ,float ]:
    err =y_pred -y_true 
    mae =float (np .mean (np .abs (err )))
    rmse =float (np .sqrt (np .mean (err **2 )))
    return mae ,rmse 

def bootstrap_ci_metric (y_true :np .ndarray ,y_pred :np .ndarray ,B :int ,seed :int )->dict :
    rng =np .random .default_rng (seed )
    n =len (y_true )
    idx =rng .integers (0 ,n ,size =(B ,n ))
    yt =y_true [idx ]
    yp =y_pred [idx ]
    err =yp -yt 
    mae_b =np .mean (np .abs (err ),axis =1 )
    rmse_b =np .sqrt (np .mean (err **2 ,axis =1 ))
    return {
    "mae_mean":float (np .mean (mae_b )),
    "mae_p025":float (np .quantile (mae_b ,0.025 )),
    "mae_p975":float (np .quantile (mae_b ,0.975 )),
    "rmse_mean":float (np .mean (rmse_b )),
    "rmse_p025":float (np .quantile (rmse_b ,0.025 )),
    "rmse_p975":float (np .quantile (rmse_b ,0.975 )),
    }

rows =[]
for m ,d in joined .items ():
    valid =d ["valid"].to_numpy (dtype =bool )
    y_true =d .loc [valid ,"y_true"].to_numpy (dtype =float )
    y_pred =d .loc [valid ,"y_pred"].to_numpy (dtype =float )

    coverage =float (valid .mean ())
    n_valid =int (valid .sum ())

    if n_valid ==0 :
        rows .append ({"method":m ,"coverage":coverage ,"n_valid":n_valid ,
        "mae_point":np .nan ,"rmse_point":np .nan ,
        "mae_mean":np .nan ,"mae_p025":np .nan ,"mae_p975":np .nan ,
        "rmse_mean":np .nan ,"rmse_p025":np .nan ,"rmse_p975":np .nan })
        continue 

    mae0 ,rmse0 =mae_rmse (y_true ,y_pred )
    ci =bootstrap_ci_metric (y_true ,y_pred ,B =BOOT_B ,seed =BOOT_SEED )

    rows .append ({
    "method":m ,
    "coverage":coverage ,
    "n_valid":n_valid ,
    "mae_point":mae0 ,
    "rmse_point":rmse0 ,
    **ci 
    })

df_ci =pd .DataFrame (rows ).sort_values (["mae_point","rmse_point"],ascending =[True ,True ])
ci_path =PATHS ["reports"]/"bootstrap_ci_test_strict.csv"
df_ci .to_csv (ci_path ,index =False )

def paired_bootstrap_delta_mae (dA :pd .DataFrame ,dB :pd .DataFrame ,B :int ,seed :int )->dict :
    """
    Delta = MAE(A) - MAE(B) on matched rows where both valid.
    """
    m =dA [["row_id","y_true","y_pred","valid"]].merge (
    dB [["row_id","y_pred","valid"]].rename (columns ={"y_pred":"y_pred_B","valid":"valid_B"}),
    on ="row_id",
    how ="inner"
    )
    mask =m ["valid"].to_numpy (bool )&m ["valid_B"].to_numpy (bool )
    m =m .loc [mask ].copy ()

    if len (m )==0 :
        return {"n_overlap":0 ,"delta_mae_mean":np .nan ,"p025":np .nan ,"p975":np .nan }

    y_true =m ["y_true"].to_numpy (float )
    yA =m ["y_pred"].to_numpy (float )
    yB =m ["y_pred_B"].to_numpy (float )

    rng =np .random .default_rng (seed )
    n =len (y_true )
    idx =rng .integers (0 ,n ,size =(B ,n ))

    yt =y_true [idx ]
    yA_b =yA [idx ]
    yB_b =yB [idx ]

    maeA =np .mean (np .abs (yA_b -yt ),axis =1 )
    maeB =np .mean (np .abs (yB_b -yt ),axis =1 )
    delta =maeA -maeB 

    return {
    "n_overlap":int (n ),
    "delta_mae_mean":float (np .mean (delta )),
    "p025":float (np .quantile (delta ,0.025 )),
    "p975":float (np .quantile (delta ,0.975 )),
    }

refs =[]
if "distilbert_reg"in joined :
    refs .append ("distilbert_reg")
if "tfidf_ridge"in joined :
    refs .append ("tfidf_ridge")
if not refs :
    refs =[df_ci .dropna (subset =["mae_point"]).iloc [0 ]["method"]]

delta_rows =[]
for ref in refs :
    for m in methods :
        if m ==ref :
            continue 
        out =paired_bootstrap_delta_mae (joined [m ],joined [ref ],B =BOOT_B ,seed =BOOT_SEED )
        delta_rows .append ({
        "method_A":m ,
        "method_B_ref":ref ,
        **out 
        })

df_delta =pd .DataFrame (delta_rows ).sort_values (["method_B_ref","delta_mae_mean"])
delta_path =PATHS ["reports"]/"bootstrap_delta_mae_test_strict.csv"
df_delta .to_csv (delta_path ,index =False )

def safe_method_filename (s :str )->str :
    return re .sub (r"[^a-zA-Z0-9_\-]+","_",s )[:120 ]

summary_rows =[]

for m ,d in joined .items ():
    dd =d .copy ()
    dd ["error"]=dd ["y_pred"]-dd ["y_true"]
    dd ["abs_error"]=np .abs (dd ["error"])
    dd ["method"]=m 

    v =dd [dd ["valid"]&dd ["y_pred"].notna ()].copy ()

    if len (v )>0 :
        mae0 =float (np .mean (v ["abs_error"]))
        rmse0 =float (np .sqrt (np .mean ((v ["error"])**2 )))
    else :
        mae0 =np .nan 
        rmse0 =np .nan 

    summary_rows .append ({
    "method":m ,
    "coverage":float (dd ["valid"].mean ()),
    "n_valid":int (dd ["valid"].sum ()),
    "mae_valid":mae0 ,
    "rmse_valid":rmse0 ,
    })

    top =v .sort_values ("abs_error",ascending =False ).head (50 )

    cols =["row_id","y_true","y_pred","error","abs_error","rating_bin","text_len_chars"]
    for extra in ["error_type","repaired","runtime_ms","from_cache"]:
        if extra in dd .columns :
            cols .append (extra )

    out_path =PATHS ["reports"]/f"error_analysis_{safe_method_filename(m)}.csv"
    top [cols ].to_csv (out_path ,index =False )

df_errsum =pd .DataFrame (summary_rows ).sort_values (["mae_valid","coverage"],ascending =[True ,False ])
errsum_path =PATHS ["reports"]/"error_analysis_summary.csv"
df_errsum .to_csv (errsum_path ,index =False )

must =[
PATHS ["reports"]/"bootstrap_ci_test_strict.csv",
PATHS ["reports"]/"bootstrap_delta_mae_test_strict.csv",
PATHS ["reports"]/"error_analysis_summary.csv",
]

ea_files =list (PATHS ["reports"].glob ("error_analysis_*.csv"))

def add_efficiency_columns (df :pd .DataFrame )->pd .DataFrame :
    """
    Adds:
      - sec_per_100_total: existing sec_per_100 (if present) else derived from inference_s/n_samples
      - sec_per_100_online: inference-only for methods with precompute (e.g., MiniLM with feature_s_est)
    Expected columns (if available):
      - sec_per_100
      - inference_s, n_samples
      - feature_s_est (MiniLM)
    """
    out =df .copy ()

    if "sec_per_100"in out .columns :
        out ["sec_per_100_total"]=pd .to_numeric (out ["sec_per_100"],errors ="coerce")
    else :
        if {"inference_s","n_samples"}.issubset (out .columns ):
            out ["sec_per_100_total"]=out .apply (
            lambda r :runtime_s_per_100 (int (r ["n_samples"]),float (r ["inference_s"]))
            if pd .notna (r .get ("inference_s"))and pd .notna (r .get ("n_samples"))else np .nan ,
            axis =1 
            )
        else :
            out ["sec_per_100_total"]=np .nan 

    out ["sec_per_100_online"]=out ["sec_per_100_total"]

    if "feature_s_est"in out .columns and {"inference_s","n_samples"}.issubset (out .columns ):
        f =pd .to_numeric (out ["feature_s_est"],errors ="coerce")
        n =pd .to_numeric (out ["n_samples"],errors ="coerce")
        inf =pd .to_numeric (out ["inference_s"],errors ="coerce")

        mask =f .notna ()&n .notna ()&inf .notna ()
        if mask .any ():
            out .loc [mask ,"sec_per_100_online"]=out .loc [mask ].apply (
            lambda r :runtime_s_per_100 (int (r ["n_samples"]),float (r ["inference_s"])),
            axis =1 
            )
            out .loc [mask ,"sec_per_100_total"]=out .loc [mask ].apply (
            lambda r :runtime_s_per_100 (int (r ["n_samples"]),float (r ["inference_s"])+float (r ["feature_s_est"])),
            axis =1 
            )
    return out 

results_path =PATHS ["reports"]/"results.csv"
if results_path .exists ():
    df_res =pd .read_csv (results_path )
    df_res2 =add_efficiency_columns (df_res )
    df_res2 .to_csv (results_path ,index =False )
    print ("Updated results with efficiency columns:",results_path )
else :
    print ("results.csv not found at:",results_path )
    df_res2 =None 

if df_res2 is not None and len (df_res2 ):
    df_res2 ["timestamp_utc"]=df_res2 ["timestamp_utc"].astype (str )
    df_res2 ["_ts"]=pd .to_datetime (df_res2 ["timestamp_utc"],errors ="coerce",utc =True )
    df_latest =(
    df_res2 .sort_values (["method","split","_ts"])
    .groupby (["method","split"],as_index =False )
    .tail (1 )
    .drop (columns =["_ts"])
    .reset_index (drop =True )
    )
    latest_path =PATHS ["reports"]/"results_latest.csv"
    df_latest .to_csv (latest_path ,index =False )
    print ("Updated:",latest_path )

    snap_path =PATHS ["reports"]/f"results_latest__{RUN_ID}.csv"
    df_latest .to_csv (snap_path ,index =False )
    print ("Saved snapshot:",snap_path )

    import json 
    import platform 
    from datetime import datetime 
    meta ={
    "run_id":RUN_ID ,
    "run_tag":RUN_TAG ,
    "time_utc":datetime .utcnow ().isoformat ()+"Z",
    "seed":SEED ,
    "use_ollama_format":bool (globals ().get ("USE_OLLAMA_FORMAT",False )),
    "ollama_model":globals ().get ("OLLAMA_MODEL",None ),
    "ollama_base_url":globals ().get ("OLLAMA_BASE_URL",None ),
    "cache_path":str (globals ().get ("CACHE_PATH","")),
    "python_version":platform .python_version (),
    "platform":platform .platform (),
    }
    meta_path =PATHS ["reports"]/f"run_meta__{RUN_ID}.json"
    with open (meta_path ,"w",encoding ="utf-8")as f :
        json .dump (meta ,f ,indent =2 ,ensure_ascii =False )
    print ("Saved meta:",meta_path )
else :
    df_latest =None 

import matplotlib .pyplot as plt 

latest_path =PATHS ["reports"]/"results_latest.csv"
if latest_path .exists ():
    df_latest =pd .read_csv (latest_path )
    df_latest =add_efficiency_columns (df_latest )

    df_test_rows =df_latest [df_latest ["split"]=="test"].copy ()
    if len (df_test_rows ):
        df_test_rows ["mae"]=pd .to_numeric (df_test_rows ["mae"],errors ="coerce")
        df_test_rows ["coverage"]=pd .to_numeric (df_test_rows ["coverage"],errors ="coerce")
        df_test_rows ["sec_per_100_online"]=pd .to_numeric (df_test_rows ["sec_per_100_online"],errors ="coerce")
        df_test_rows ["sec_per_100_total"]=pd .to_numeric (df_test_rows ["sec_per_100_total"],errors ="coerce")

        fig1 =PATHS ["figs"]/"efficiency_online_vs_mae_test.png"
        plt .figure ()
        plt .scatter (df_test_rows ["sec_per_100_online"],df_test_rows ["mae"],alpha =0.8 )
        for _ ,r in df_test_rows .iterrows ():
            if pd .notna (r ["sec_per_100_online"])and pd .notna (r ["mae"]):
                plt .annotate (str (r ["method"]),(r ["sec_per_100_online"],r ["mae"]),fontsize =8 )
        plt .xlabel ("sec_per_100_online (lower better)")
        plt .ylabel ("MAE (lower better)")
        plt .title ("Test: Online Efficiency vs MAE")
        plt .tight_layout ()
        savefig_with_runid (fig1 ,dpi =150 )
        plt .close ()

        fig2 =PATHS ["figs"]/"efficiency_total_vs_mae_test.png"
        plt .figure ()
        plt .scatter (df_test_rows ["sec_per_100_total"],df_test_rows ["mae"],alpha =0.8 )
        for _ ,r in df_test_rows .iterrows ():
            if pd .notna (r ["sec_per_100_total"])and pd .notna (r ["mae"]):
                plt .annotate (str (r ["method"]),(r ["sec_per_100_total"],r ["mae"]),fontsize =8 )
        plt .xlabel ("sec_per_100_total (lower better)")
        plt .ylabel ("MAE (lower better)")
        plt .title ("Test: Total Efficiency vs MAE")
        plt .tight_layout ()
        savefig_with_runid (fig2 ,dpi =150 )
        plt .close ()

        print ("Saved:",fig1 )
        print ("Saved:",fig2 )
    else :
        print ("No test rows in results_latest.csv; skipping efficiency plots.")
else :
    print ("results_latest.csv not found at:",latest_path )

latest_path =PATHS ["reports"]/"results_latest.csv"
if latest_path .exists ():
    df_latest =pd .read_csv (latest_path )
    df_latest =add_efficiency_columns (df_latest )

    llm =df_latest [df_latest ["method"].astype (str ).str .startswith ("phi3mini_")].copy ()
    if len (llm ):
        diff =(pd .to_numeric (llm ["sec_per_100_online"],errors ="coerce")-pd .to_numeric (llm ["sec_per_100_total"],errors ="coerce")).abs ().max ()
        print ("LLM max |online-total|:",diff )

    print ("VALIDIERUNG OK")
else :
    print ("results_latest.csv not found; cannot validate PATCH 9.")


out_md =PATHS ["reports"]/"error_examples.md"

df_test =pd .read_parquet (PATHS ["data_processed"]/"splits"/"test.parquet")[["row_id","target","text"]].copy ()
df_test ["row_id"]=df_test ["row_id"].astype (int )
df_test =df_test .rename (columns ={"target":"y_true"})
df_test ["text_len_chars"]=df_test ["text"].astype (str ).str .len ()

pred_paths =sorted (glob .glob (str (PATHS ["reports"]/"predictions_*.parquet")))

def load_method_predictions (path :str )->pd .DataFrame :
    dfp =pd .read_parquet (path )
    dfp =dfp [dfp ["split"]=="test"].copy ()
    if len (dfp )==0 :
        return pd .DataFrame (columns =["row_id","method","y_pred","valid","error_type"])
    dfp ["row_id"]=pd .to_numeric (dfp ["row_id"],errors ="coerce").astype ("Int64")
    dfp =dfp [dfp ["row_id"].notna ()].copy ()
    dfp ["row_id"]=dfp ["row_id"].astype (int )

    method =str (dfp ["method"].iloc [0 ])if "method"in dfp .columns else Path (path ).stem .replace ("predictions_","")
    stem =Path (path ).stem .replace ("predictions_","")
    if "__"in stem :
        base ,tag =stem .rsplit ("__",1 )
        method =f"{base}__{tag}"
    dfp ["method"]=method 

    if "y_pred_strict"in dfp .columns :
        y_pred =pd .to_numeric (dfp ["y_pred_strict"],errors ="coerce")
        valid =dfp ["valid_strict"].astype (bool )if "valid_strict"in dfp .columns else y_pred .notna ()
        err_type =dfp ["error_type_strict"]if "error_type_strict"in dfp .columns else dfp .get ("error_type",None )
    elif "y_pred"in dfp .columns :
        y_pred =pd .to_numeric (dfp ["y_pred"],errors ="coerce")
        valid =dfp ["valid"].astype (bool )if "valid"in dfp .columns else y_pred .notna ()
        err_type =dfp .get ("error_type",None )
    else :
        raise ValueError (f"Keine y_pred Spalte in {path}")

    out =dfp [["row_id","method"]].copy ()
    out ["y_pred"]=y_pred 
    out ["valid"]=valid .fillna (False )
    out ["error_type"]=err_type if err_type is not None else None 
    return out 

method_tables =[]
for p in pred_paths :
    try :
        method_tables .append (load_method_predictions (p ))
    except Exception as e :
        print ("Skip (could not load):",p ,"|",repr (e ))

df_methods =pd .concat (method_tables ,axis =0 ,ignore_index =True )
methods =sorted (df_methods ["method"].unique ().tolist ())

joined ={}
for m in methods :
    d =df_methods [df_methods ["method"]==m ].copy ()
    d =df_test .merge (d ,on ="row_id",how ="left")
    d ["valid"]=d ["valid"].fillna (False )
    joined [m ]=d 

best_ref =None 
ci_path =PATHS ["reports"]/"bootstrap_ci_test_strict.csv"
if ci_path .exists ():
    df_ci =pd .read_csv (ci_path )
    if "mae_point"in df_ci .columns :
        best_ref =str (df_ci .sort_values ("mae_point").iloc [0 ]["method"])
if best_ref is None and len (methods ):
    best_ref =methods [0 ]

llm_methods =[m for m in methods if str (m ).startswith ("phi3mini_")]
show_methods =[]
if best_ref :
    show_methods .append (best_ref )
for pref in ["phi3mini_m3_schema_decoding","phi3mini_m4_repair","phi3mini_m3_schema_prompt","phi3mini_m2_json","phi3mini_m1_free"]:
    if pref in llm_methods and pref not in show_methods :
        show_methods .append (pref )
show_methods =show_methods [:3 ]

def short_text (s :str ,n :int =220 )->str :
    t =(s or "").replace ("\n"," ").strip ()
    t =re .sub (r"\s+"," ",t )
    return (t [:n ]+"…")if len (t )>n else t 

lines =[]
lines .append ("# Error Examples (Step 10)\n\n")
lines .append (f"Generated: {pd.Timestamp.utcnow().isoformat()}Z\n\n")
lines .append ("This file summarizes common failure modes and shows concrete examples (strict evaluation).\n\n")

for m in show_methods :
    d =joined [m ].copy ()
    d ["error"]=d ["y_pred"]-d ["y_true"]
    d ["abs_error"]=np .abs (d ["error"])
    coverage =float (d ["valid"].mean ())
    n_valid =int (d ["valid"].sum ())

    lines .append (f"## Method: `{m}`\n\n")
    lines .append (f"- Coverage (strict): **{coverage:.3f}** ({n_valid}/{len(d)})\n\n")

    inv =d [~d ["valid"]].copy ()
    lines .append ("### Failure modes (invalid outputs)\n\n")
    if len (inv )>0 :
        vc =inv ["error_type"].value_counts (dropna =False ).head (10 )
        for k ,v in vc .items ():
            lines .append (f"- `{k}`: {int(v)}\n")
        lines .append ("\n### Invalid examples\n\n")
        samp =inv .head (2 )
        for _ ,r in samp .iterrows ():
            lines .append (f"- row_id={int(r['row_id'])} | error_type={r.get('error_type')} | text_len={int(r['text_len_chars'])}\n")
            lines .append (f"  - text: \"{short_text(r['text'])}\"\n")
    else :
        lines .append ("- (none)\n")

    v =d [d ["valid"]&d ["y_pred"].notna ()].copy ()
    lines .append ("\n### Largest errors (valid predictions)\n\n")
    if len (v )==0 :
        lines .append ("- (no valid predictions)\n")
    else :
        top =v .sort_values ("abs_error",ascending =False ).head (5 )
        for _ ,r in top .iterrows ():
            lines .append (
            f"- row_id={int(r['row_id'])} | y_true={float(r['y_true']):.1f} | y_pred={float(r['y_pred']):.1f} | abs_err={float(r['abs_error']):.1f} | text_len={int(r['text_len_chars'])}\n"
            )
            lines .append (f"  - text: \"{short_text(r['text'])}\"\n")
    lines .append ("\n")

out_md .write_text ("".join (lines ),encoding ="utf-8")


In [None]:
RUN_SWEEP =True 

RUN_SWEEP_CONFIGS =[
dict (run_tag ="prompt_1k8",use_format =True ,num_predict =32 ,prompt_max_chars =1800 ),
dict (run_tag ="format_off",use_format =False ,num_predict =32 ,prompt_max_chars =1800 ),
dict (run_tag ="fast_predict8",use_format =True ,num_predict =8 ,prompt_max_chars =1800 ),
dict (run_tag ="prompt_4k",use_format =True ,num_predict =32 ,prompt_max_chars =4000 ),
]

def _set_prompt_truncation (max_chars :int )->None :
    """
    Setzt das zentrale Prompt-Profil (Sweep-sicher).
    make_prompt() und Repair-Prompts nutzen automatisch truncate_text() -> PROMPT_PROFILE.
    """
    global PROMPT_MAX_CHARS 
    PROMPT_MAX_CHARS =int (max_chars )
    set_prompt_profile (max_chars =PROMPT_MAX_CHARS ,tail_chars =350 ,marker ="\n...\n")
def _reset_llm_cache (run_id :str ,run_tag :str )->Path :
    """Isolierter Cache pro Run (wichtig für saubere Vergleiche)."""
    global CACHE_PATH ,cache_index 
    cache_path =PATHS ["reports"]/f"llm_cache__{run_id}__{run_tag}.jsonl"
    cache_path .parent .mkdir (parents =True ,exist_ok =True )
    CACHE_PATH =cache_path 
    cache_index =load_cache_index (CACHE_PATH )if CACHE_PATH .exists ()else {}
    print ("CACHE_PATH:",CACHE_PATH ,"| cached rows:",len (cache_index ))
    return CACHE_PATH 

def _ensure_run_dir (run_id :str ,run_tag :str )->Path :
    run_dir =PATHS ["reports"]/"runs"/f"{run_id}__{run_tag}"
    run_dir .mkdir (parents =True ,exist_ok =True )
    return run_dir 

def _evaluate_llm_modes (mode_frames :Dict [str ,pd .DataFrame ],*,run_id :str ,run_tag :str )->pd .DataFrame :
    """
    Rechnet Metrics (strict+relaxed) und speichert Predictions pro Mode in einem Run-Ordner.
    Nutzt Patch-6 Logik: test nur wenn S==test.
    """
    S_ids =set (df_S ["row_id"].astype (int ).tolist ())
    test_ids =set (df_test ["row_id"].astype (int ).tolist ())
    LLM_HAS_FULL_TEST =(S_ids ==test_ids )
    print ("LLM_HAS_FULL_TEST:",LLM_HAS_FULL_TEST ,"| nS:",len (S_ids ),"| nTest:",len (test_ids ))

    rows =[]
    run_dir =_ensure_run_dir (run_id ,run_tag )

    timestamp_utc =time .strftime ("%Y-%m-%dT%H:%M:%SZ",time .gmtime ())

    for mode in MODES :
        df_rec =mode_frames [mode ].copy ()
        method =f"{OLLAMA_MODEL}_{mode.lower()}"

        metS_strict =compute_llm_metrics (df_rec ,y_true_S ,variant ="strict")
        metS_relaxed =compute_llm_metrics (df_rec ,y_true_S ,variant ="relaxed")
        pred_S =predictions_table (df_rec ,"S",y_true_S ,method )

        pred_parts =[pred_S ]
        splits =[("S",metS_strict ,metS_relaxed ,len (df_rec ))]

        if LLM_HAS_FULL_TEST :
            df_rec_by_id =df_rec .set_index ("row_id")
            ordered =df_test ["row_id"].astype (int ).tolist ()
            df_rec_test =df_rec_by_id .reindex (ordered ).reset_index ()

            metT_strict =compute_llm_metrics (df_rec_test ,y_true_test ,variant ="strict")
            metT_relaxed =compute_llm_metrics (df_rec_test ,y_true_test ,variant ="relaxed")
            pred_test =predictions_table (df_rec_test ,"test",y_true_test ,method )

            pred_parts .append (pred_test )
            splits .append (("test",metT_strict ,metT_relaxed ,len (df_rec_test )))

        pred_df =pd .concat (pred_parts ,axis =0 ).reset_index (drop =True )
        pred_path =run_dir /f"predictions_{method}__{run_tag}.parquet"
        pred_df .to_parquet (pred_path ,index =False )

        pred_path_root =PATHS ["reports"]/f"predictions_{method}__{run_tag}.parquet"
        try :
            shutil .copy2 (pred_path ,pred_path_root )
        except Exception as e :
            print ("WARN: could not copy predictions to reports root:",e )

        for split_name ,met_strict ,met_relaxed ,n_samples in splits :
            rows .append ({
            "timestamp_utc":timestamp_utc ,
            "run_id":run_id ,
            "run_tag":run_tag ,

            "method":method ,
            "split":split_name ,
            "llm_mode":mode ,
            "model_name":OLLAMA_MODEL ,
            "temperature":0.0 ,

            **{k :met_strict [k ]for k in ["mae","rmse","spearman","parse_success_rate","schema_adherence_rate",
            "out_of_range_rate","empty_refusal_rate","coverage","risk_mae","risk_rmse",
            "sec_per_100","n_samples"]},

            "coverage_relaxed":met_relaxed ["coverage"],
            "parse_success_rate_relaxed":met_relaxed ["parse_success_rate"],
            "schema_adherence_rate_relaxed":met_relaxed ["schema_adherence_rate"],
            "mae_relaxed":met_relaxed ["mae"],
            "rmse_relaxed":met_relaxed ["rmse"],
            "spearman_relaxed":met_relaxed ["spearman"],

            "inference_s":met_strict ["runtime_s_total"],

            "notes":f"cache={CACHE_PATH.name}",
            })

    df_run =pd .DataFrame (rows )
    df_run_path =run_dir /f"results_llm__{run_tag}.csv"
    df_run .to_csv (df_run_path ,index =False )
    print ("Saved:",df_run_path )

    return df_run 

def _run_one_llm_config (cfg :Dict [str ,Any ])->pd .DataFrame :
    """
    Ein kompletter LLM-Run (M1..M4) mit separatem Cache + separaten Artefakten.
    """
    global RUN_ID ,RUN_TAG ,USE_OLLAMA_FORMAT ,OLLAMA_NUM_PREDICT 

    base =RUN_ID 
    run_tag =str (cfg ["run_tag"])
    run_id =f"{base}__{run_tag}__{uuid.uuid4().hex[:6]}"
    RUN_ID =run_id 
    RUN_TAG =run_tag 

    USE_OLLAMA_FORMAT =bool (cfg ["use_format"])
    OLLAMA_NUM_PREDICT =int (cfg ["num_predict"])
    _set_prompt_truncation (int (cfg ["prompt_max_chars"]))

    print ("\n==============================")
    print ("RUN:",RUN_ID ,"| tag:",RUN_TAG )
    print ("USE_OLLAMA_FORMAT:",USE_OLLAMA_FORMAT ,"| OLLAMA_NUM_PREDICT:",OLLAMA_NUM_PREDICT ,"| PROMPT_MAX_CHARS:",PROMPT_MAX_CHARS )

    _reset_llm_cache (RUN_ID ,RUN_TAG )

    warmup_prompt ='Return ONLY JSON exactly like: {"rating": 7}. No markdown, no code fences, no extra text.'

    warmup_fmt =mode_to_format ("M3_SCHEMA_ENFORCED")if USE_OLLAMA_FORMAT else None 

    resp ,err =ollama_generate_with_transport_retry (
    warmup_prompt ,
    model =OLLAMA_MODEL ,
    base_url =OLLAMA_BASE_URL ,
    temperature =0.0 ,
    fmt =warmup_fmt ,
    )

    pr ,perr ,pobj =parse_json_rating (resp ,relaxed =(not USE_OLLAMA_FORMAT ))
    if perr is not None :
        raise RuntimeError (f"Warmup parse failed: {perr} | resp={repr((resp or '')[:200])}")

    if USE_OLLAMA_FORMAT and warmup_fmt not in (None ,"json"):
        sch_err =validate_schema (pobj )
        if sch_err is not None :
            raise RuntimeError (f"Warmup schema failed: {sch_err} | obj={pobj}")

    if pr !=7 :
        raise RuntimeError (f"Warmup rating != 7: {pr} | obj={pobj}")

    print ("Warmup OK")

    mode_frames_local ={}
    for mode in MODES :
        mode_frames_local [mode ]=run_mode_chunked (mode )

    df_run =_evaluate_llm_modes (mode_frames_local ,run_id =RUN_ID ,run_tag =RUN_TAG )

    return df_run 

if RUN_SWEEP :
    _orig_run_id =RUN_ID 
    _orig_run_tag =RUN_TAG 
    _orig_cache_path =CACHE_PATH 
    _orig_use_format =USE_OLLAMA_FORMAT 
    _orig_num_predict =OLLAMA_NUM_PREDICT 
    _orig_prompt_max =PROMPT_MAX_CHARS 

    all_runs =[]
    try :
        for cfg in RUN_SWEEP_CONFIGS :
            df_run =_run_one_llm_config (cfg )
            all_runs .append (df_run )
    finally :
        RUN_ID =_orig_run_id 
        RUN_TAG =_orig_run_tag 
        CACHE_PATH =_orig_cache_path 
        USE_OLLAMA_FORMAT =_orig_use_format 
        OLLAMA_NUM_PREDICT =_orig_num_predict 
        _set_prompt_truncation (_orig_prompt_max )

    df_all =pd .concat (all_runs ,axis =0 ,ignore_index =True )
    out_all =PATHS ["reports"]/"run_sweep_llm_results.csv"
    df_all .to_csv (out_all ,index =False )
    print ("\nSaved sweep summary:",out_all )

    def _pairwise (df :pd .DataFrame ,a :str ,b :str ,split_pref :str ="test")->pd .DataFrame :
        split_use =split_pref if (df ["split"]==split_pref ).any ()else "S"
        da =df [(df ["run_tag"]==a )&(df ["split"]==split_use )].copy ()
        db =df [(df ["run_tag"]==b )&(df ["split"]==split_use )].copy ()
        m =da .merge (db ,on =["method","llm_mode","split"],suffixes =(f"__{a}",f"__{b}"))
        for k in ["mae","coverage","parse_success_rate","schema_adherence_rate","sec_per_100"]:
            m [f"delta_{k}"]=pd .to_numeric (m [f"{k}__{a}"],errors ="coerce")-pd .to_numeric (m [f"{k}__{b}"],errors ="coerce")
        m ["comparison"]=f"{a} vs {b} (split={split_use})"
        keep =["comparison","method","llm_mode","split",
        f"mae__{a}",f"mae__{b}","delta_mae",
        f"coverage__{a}",f"coverage__{b}","delta_coverage",
        f"sec_per_100__{a}",f"sec_per_100__{b}","delta_sec_per_100",
        f"parse_success_rate__{a}",f"parse_success_rate__{b}","delta_parse_success_rate",
        f"schema_adherence_rate__{a}",f"schema_adherence_rate__{b}","delta_schema_adherence_rate",
        ]
        return m [keep ].sort_values (["llm_mode","method"])
    comps =[]
    comps .append (_pairwise (df_all ,"prompt_1k8","format_off"))
    comps .append (_pairwise (df_all ,"prompt_1k8","fast_predict8"))
    comps .append (_pairwise (df_all ,"prompt_1k8","prompt_4k"))

    df_comp =pd .concat (comps ,axis =0 ,ignore_index =True )
    comp_path =PATHS ["reports"]/"run_sweep_llm_pairwise.csv"
    df_comp .to_csv (comp_path ,index =False )
    print ("Saved pairwise comparisons:",comp_path )

else :
    print ("RUN_SWEEP=False — kein Run-Sweep ausgeführt.")

import matplotlib .pyplot as plt 

RUN_ID_LOCAL =globals ().get ("RUN_ID",None )
if RUN_ID_LOCAL is None :
    candidates =sorted (PATHS ["reports"].glob ("results_*.csv"),key =lambda p :p .stat ().st_mtime ,reverse =True )
    RUN_ID_LOCAL =candidates [0 ].stem .replace ("results_","")if candidates else "run_unknown"

PATCHSET_TAG ="PATCHSET1"
export_dir =PATHS ["reports"]/f"paper_exports_{RUN_ID_LOCAL}_{PATCHSET_TAG}"
export_dir .mkdir (parents =True ,exist_ok =True )

def _load_first_existing (paths ):
    for p in paths :
        if p .exists ():
            return p ,pd .read_csv (p )
    return None ,None 

results_latest_paths =[
PATHS ["reports"]/f"results_latest__{RUN_ID_LOCAL}.csv",
PATHS ["reports"]/f"results_latest_{RUN_ID_LOCAL}.csv",
PATHS ["reports"]/"results_latest.csv",
]
results_paths =[
PATHS ["reports"]/f"results_{RUN_ID_LOCAL}.csv",
PATHS ["reports"]/"results.csv",
]

p_latest ,df_latest =_load_first_existing (results_latest_paths )
p_all ,df_all =_load_first_existing (results_paths )

if df_all is not None :
    print ("Loaded results from:",p_all )

for c in ["mae","rmse","spearman","coverage","sec_per_100_online","sec_per_100_total",
"parse_success_rate","schema_adherence_rate","coverage_relaxed","mae_relaxed","rmse_relaxed","spearman_relaxed"]:
    if c in df_latest .columns :
        df_latest [c ]=pd .to_numeric (df_latest [c ],errors ="coerce")

def build_paper_table (df :pd .DataFrame ,split_name :str )->pd .DataFrame :
    d =df [df ["split"].astype (str )==split_name ].copy ()
    if len (d )==0 :
        return d 

    keep =[
    "method",
    "mae","rmse","spearman","coverage",
    "parse_success_rate","schema_adherence_rate",
    "sec_per_100_online","sec_per_100_total",
    ]
    for c in ["coverage_relaxed","mae_relaxed","rmse_relaxed","spearman_relaxed",
    "parse_success_rate_relaxed","schema_adherence_rate_relaxed"]:
        if c in d .columns :
            keep .append (c )

    if "notes"in d .columns :
        keep .append ("notes")

    d =d [[c for c in keep if c in d .columns ]].copy ()

    is_llm =d ["method"].astype (str ).str .startswith ("phi3mini_")
    mask_cols =[
    "parse_success_rate","schema_adherence_rate",
    "parse_success_rate_relaxed","schema_adherence_rate_relaxed",
    ]
    for mc in mask_cols :
        if mc in d .columns :
            d .loc [~is_llm ,mc ]=np .nan 

    d =d .sort_values (["mae","rmse"],ascending =[True ,True ]).reset_index (drop =True )
    return d 

def save_table (df :pd .DataFrame ,base_name :str )->None :
    base_name =f"{base_name}_{PATCHSET_TAG}"
    csv_path =export_dir /f"{base_name}.csv"
    tex_path =export_dir /f"{base_name}.tex"
    df .to_csv (csv_path ,index =False )
    try :
        df .to_latex (tex_path ,index =False ,float_format ="%.3f",na_rep ="--")
    except Exception :
        tex_path .write_text (df .to_csv (index =False ),encoding ="utf-8")
    print ("Saved:",csv_path )
    print ("Saved:",tex_path )

splits_present =sorted (df_latest ["split"].astype (str ).unique ().tolist ())

preferred_splits =[]
for s in ["test","S","test_full","test_subsetS"]:
    if s in splits_present :
        preferred_splits .append (s )
if not preferred_splits :
    preferred_splits =splits_present 

tables ={}
for s in preferred_splits :
    t =build_paper_table (df_latest ,s )
    if len (t ):
        tables [s ]=t 
        save_table (t ,f"table_main_{s}")

ci_path =PATHS ["reports"]/"bootstrap_ci_test_strict.csv"
if ci_path .exists ()and "test"in tables :
    df_ci =pd .read_csv (ci_path )
    if {"method","mae_p025","mae_p975","rmse_p025","rmse_p975"}.issubset (df_ci .columns ):
        t =tables ["test"].merge (df_ci [["method","mae_p025","mae_p975","rmse_p025","rmse_p975"]],on ="method",how ="left")
        t ["mae_ci95"]=t .apply (lambda r :f"{r['mae']:.3f} [{r['mae_p025']:.3f},{r['mae_p975']:.3f}]"if pd .notna (r ["mae_p025"])else f"{r['mae']:.3f}",axis =1 )
        t ["rmse_ci95"]=t .apply (lambda r :f"{r['rmse']:.3f} [{r['rmse_p025']:.3f},{r['rmse_p975']:.3f}]"if pd .notna (r ["rmse_p025"])else f"{r['rmse']:.3f}",axis =1 )
        t2 =t .drop (columns =["mae_p025","mae_p975","rmse_p025","rmse_p975"])
        save_table (t2 ,"table_main_test_with_ci")
        tables ["test_ci"]=t2 

if "test"in splits_present and "coverage"in df_latest .columns and "mae"in df_latest .columns :
    d =df_latest [df_latest ["split"].astype (str )=="test"].copy ()
    d =d .dropna (subset =["coverage","mae"])
    fig_path =export_dir /"fig_coverage_vs_mae_test.png"
    plt .figure ()
    plt .scatter (d ["coverage"],d ["mae"],alpha =0.8 )
    for _ ,r in d .iterrows ():
        plt .annotate (str (r ["method"]),(float (r ["coverage"]),float (r ["mae"])),fontsize =8 )
    plt .xlabel ("Coverage (strict)")
    plt .ylabel ("MAE (strict)")
    plt .title ("Test: Coverage vs MAE")
    plt .tight_layout ()
    plt .savefig (fig_path ,dpi =150 )
    plt .close ()
    print ("Saved:",fig_path )

for fname in ["efficiency_online_vs_mae_test.png","efficiency_total_vs_mae_test.png"]:
    src =PATHS ["figs"]/fname 
    if src .exists ():
        dst =export_dir /fname 
        shutil .copy2 (src ,dst )
        print ("Copied:",dst )

summary_lines =[]
summary_lines .append (f"# Paper export summary ({RUN_ID_LOCAL})")
summary_lines .append ("")
summary_lines .append (f"- Generated: {datetime.utcnow().isoformat()}Z")
summary_lines .append (f"- results_latest source: {p_latest}")
summary_lines .append (f"- Export dir: {export_dir}")
summary_lines .append ("")

def _best_row (df :pd .DataFrame ):
    if df is None or len (df )==0 or "mae"not in df .columns :
        return None 
    d =df .dropna (subset =["mae"]).sort_values ("mae",ascending =True ).head (1 )
    return d .iloc [0 ].to_dict ()if len (d )else None 

for s ,t in tables .items ():
    if s .endswith ("_ci"):
        continue 
    best =_best_row (t )
    if best :
        summary_lines .append (f"## Best method on split={s}")
        summary_lines .append (f"- method: `{best.get('method')}`")
        summary_lines .append (f"- MAE: {best.get('mae'):.3f}"if pd .notna (best .get ("mae"))else "- MAE: n/a")
        if "coverage"in best and pd .notna (best ["coverage"]):
            summary_lines .append (f"- coverage: {best['coverage']:.3f}")
        if "sec_per_100_total"in best and pd .notna (best ["sec_per_100_total"]):
            summary_lines .append (f"- sec_per_100_total: {best['sec_per_100_total']:.2f}")
        if "sec_per_100_online"in best and pd .notna (best ["sec_per_100_online"]):
            summary_lines .append (f"- sec_per_100_online: {best['sec_per_100_online']:.2f}")
        summary_lines .append ("")

summary_lines .append ("## Files")
for p in sorted (export_dir .glob ("*")):
    summary_lines .append (f"- {p.name}")

md_path =export_dir /"paper_summary.md"
md_path .write_text ("\n".join (summary_lines )+"\n",encoding ="utf-8")
