Restarted llmfeml (Python 3.12.11)

In [None]:
from __future__ import annotations
import ast
import json
import os
import re
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import kagglehub
import numpy as np
import ollama 
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score, make_scorer, roc_auc_score
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# Load German Credit dataset from Kaggle
# Download latest version
# path = kagglehub.dataset_download("uciml/german-credit")
# print(os.listdir(path))

path = "/Users/rantao/.cache/kagglehub/datasets/uciml/german-credit/versions/1"
df_kaggle = pd.read_csv(f"{path}/german_credit_data.csv")
df_kaggle.head()

# data cleaning 
# add class from the UCI repo as it is missing in the Kaggle version
from ucimlrepo import fetch_ucirepo 
statlog_german_credit_data = fetch_ucirepo(id=144) 
y = statlog_german_credit_data.data.targets 

# map to 0 (good) and 1 (bad)
mapping = {
    1: 0,
    2: 1
}

y['class'] = y['class'].replace(mapping)

# add target to df and drop id column
df = pd.concat([df_kaggle.iloc[:, 1:] , y], axis=1)
display(df.head())

# data overview
# numeric features
display(df.describe())

# categorical features
categorical_df = df.select_dtypes(include=['object', 'category'])

for col in categorical_df.columns:
    unique_vals = df[col].unique()
    print(f"\nFeature: '{col}'")
    print(f"    Unique Values: {unique_vals}")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  y['class'] = y['class'].replace(mapping)


Unnamed: 0,Age,Sex,Job,Housing,Saving accounts,Checking account,Credit amount,Duration,Purpose,class
0,67,male,2,own,,little,1169,6,radio/TV,0
1,22,female,2,own,little,moderate,5951,48,radio/TV,1
2,49,male,1,own,little,,2096,12,education,0
3,45,male,2,free,little,little,7882,42,furniture/equipment,0
4,53,male,2,free,little,little,4870,24,car,1


Unnamed: 0,Age,Job,Credit amount,Duration,class
count,1000.0,1000.0,1000.0,1000.0,1000.0
mean,35.546,1.904,3271.258,20.903,0.3
std,11.375469,0.653614,2822.736876,12.058814,0.458487
min,19.0,0.0,250.0,4.0,0.0
25%,27.0,2.0,1365.5,12.0,0.0
50%,33.0,2.0,2319.5,18.0,0.0
75%,42.0,2.0,3972.25,24.0,1.0
max,75.0,3.0,18424.0,72.0,1.0



Feature: 'Sex'
    Unique Values: ['male' 'female']

Feature: 'Housing'
    Unique Values: ['own' 'free' 'rent']

Feature: 'Saving accounts'
    Unique Values: [nan 'little' 'quite rich' 'rich' 'moderate']

Feature: 'Checking account'
    Unique Values: ['little' 'moderate' nan 'rich']

Feature: 'Purpose'
    Unique Values: ['radio/TV' 'education' 'furniture/equipment' 'car' 'business'
 'domestic appliances' 'repairs' 'vacation/others']


In [None]:
# Automated LLM feature engineering
@dataclass
class Config:
    seed: int = 42
    cv_splits: int = 5
    max_iterations: int = 2
    n_per_round: int = 2
    llm_model: str = "deepseek-r1:14b"

CONFIG = Config()


def ask_llm(prompt: str, model: str = CONFIG.llm_model, options: dict = None) -> str:
    resp = ollama.chat(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        options=options or {"temperature": 0}
    )
    content = resp.get("message", {}).get("content", "")
    return re.sub(r"<think>.*?</think>", "", content, flags=re.DOTALL).strip()


def make_pipeline(X: pd.DataFrame, model: Optional[Any] = None) -> Pipeline:
    if model is None:
        model = RandomForestClassifier(
            n_estimators=300, min_samples_leaf=2, n_jobs=-1, random_state=CONFIG.seed
        )
    num = X.select_dtypes(include=[np.number]).columns.tolist()
    cat = [c for c in X.columns if c not in num]
    pre = ColumnTransformer(
        transformers=[
            ("num", Pipeline([("imp", SimpleImputer(strategy="median")), ("sc", StandardScaler())]), num),
            ("cat", Pipeline([("imp", SimpleImputer(strategy="most_frequent")),
                              ("oh", OneHotEncoder(handle_unknown="ignore", sparse_output=False))]), cat),
        ]
    )
    return Pipeline([("pre", pre), ("model", model)])

def auc_scorer_binary():
    def _auc(y_true, y_proba, **kwargs):
        proba_pos = y_proba if y_proba.ndim == 1 else y_proba[:, 1]
        return roc_auc_score(y_true, proba_pos)
    return make_scorer(_auc, needs_proba=True)

def cv_auc(X: pd.DataFrame, y: pd.Series, model: Optional[Any] = None) -> float:
    if len(pd.unique(y)) != 2:
        raise ValueError("Binary target required.")
    pipe = make_pipeline(X, model)
    cv = StratifiedKFold(n_splits=CONFIG.cv_splits, shuffle=True, random_state=CONFIG.seed)
    return float(np.mean(cross_val_score(pipe, X, y, scoring=auc_scorer_binary(), cv=cv)))


def summarize_dataframe(X: pd.DataFrame):
    num = X.select_dtypes(include=["number"]).columns.tolist()
    cat = X.select_dtypes(exclude=["number"]).columns.tolist()
    num_sum = {
        c: {
            "min": float(X[c].min()) if pd.notnull(X[c].min()) else None,
            "max": float(X[c].max()) if pd.notnull(X[c].max()) else None,
            "mean": float(X[c].mean()) if pd.notnull(X[c].mean()) else None,
        }
        for c in num
    }
    cat_sum = {c: X[c].dropna().astype(str).unique()[:5].tolist() for c in cat}
    return num, cat, num_sum, cat_sum


def build_feature_code_prompt(X: pd.DataFrame, target: str) -> str:
    num, cat, num_sum, cat_sum = summarize_dataframe(X)
    sample = X.head(4).to_dict(orient="records")
    return f"""
You are an expert feature engineer. Write Python that adds new numeric columns to a pandas DataFrame for a BINARY CLASSIFICATION task and returns the DataFrame.

CONTEXT
- Target (not in df below): "{target}"
- All columns: {json.dumps(list(X.columns))}
- Numeric: {json.dumps(num)}
- Categorical: {json.dumps(cat)}
- Numeric summary: {json.dumps(num_sum)}
- Categorical sample values: {json.dumps(cat_sum)}
- Sample rows: {json.dumps(sample)}

RULES
1) Output ONLY code (no prose/markdown).
2) Define exactly:
       def create_features(df):
           ...
           return df
3) Create up to {CONFIG.n_per_round} NEW numeric columns (aim for {CONFIG.n_per_round} if sensible).
4) Use only +, -, *, /, parentheses on numeric columns; you may use 1e-9 to avoid division by zero.
5) Refer to columns as df["col_name"]. No imports, no function calls, no loops, no conditionals, no attribute access.
6) Do not drop/overwrite existing columns. Use short, unique, snake_case names.
""".strip()


class CodeSafetyError(Exception):
    pass

class FeatureCodeValidator(ast.NodeVisitor):
    ALLOWED_BINOPS = (ast.Add, ast.Sub, ast.Mult, ast.Div)

    def visit_Module(self, node: ast.Module):
        if len(node.body) != 1 or not isinstance(node.body[0], ast.FunctionDef):
            raise CodeSafetyError("Define exactly one function.")
        self.visit(node.body[0])

    def visit_FunctionDef(self, node: ast.FunctionDef):
        if node.name != "create_features":
            raise CodeSafetyError("Function must be create_features.")
        if len(node.args.args) != 1 or node.args.args[0].arg != "df":
            raise CodeSafetyError("Function must take one arg: df.")
        if node.decorator_list:
            raise CodeSafetyError("No decorators.")
        saw_return = False
        for stmt in node.body:
            if isinstance(stmt, ast.Assign):
                self.visit_Assign(stmt)
            elif isinstance(stmt, ast.Return):
                self.visit_Return(stmt)
                saw_return = True
            else:
                raise CodeSafetyError("Only assignments and final return allowed.")
        if not saw_return:
            raise CodeSafetyError("Must end with 'return df'.")

    def visit_Assign(self, node: ast.Assign):
        if len(node.targets) != 1:
            raise CodeSafetyError("One assignment target per line.")
        tgt = node.targets[0]
        if not (isinstance(tgt, ast.Subscript) and isinstance(tgt.value, ast.Name) and tgt.value.id == "df"):
            raise CodeSafetyError("Targets must be df[\"new_col\"].")
        if isinstance(tgt.slice, ast.Constant):
            if not isinstance(tgt.slice.value, str):
                raise CodeSafetyError("Subscript key must be a string literal.")
        elif isinstance(tgt.slice, ast.Index) and isinstance(tgt.slice.value, ast.Constant):
            if not isinstance(tgt.slice.value.value, str):
                raise CodeSafetyError("Subscript key must be a string literal.")
        self._validate_expr(node.value)

    def _validate_expr(self, node: ast.AST):
        if isinstance(node, ast.BinOp) and isinstance(node.op, self.ALLOWED_BINOPS):
            self._validate_expr(node.left); self._validate_expr(node.right)
        elif isinstance(node, ast.UnaryOp) and isinstance(node.op, ast.USub):
            self._validate_expr(node.operand)
        elif isinstance(node, ast.Subscript):
            if not (isinstance(node.value, ast.Name) and node.value.id == "df"):
                raise CodeSafetyError("Only df[...] may be subscripted.")
            if isinstance(node.slice, ast.Constant):
                if not isinstance(node.slice.value, str):
                    raise CodeSafetyError("Subscript key must be a string literal.")
            elif isinstance(node.slice, ast.Index) and isinstance(node.slice.value, ast.Constant):
                if not isinstance(node.slice.value.value, str):
                    raise CodeSafetyError("Subscript key must be a string literal.")
        elif isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
            pass
        else:
            raise CodeSafetyError("Only arithmetic on df[\"...\"] and numeric constants is allowed.")

    def visit_Return(self, node: ast.Return):
        if not (isinstance(node.value, ast.Name) and node.value.id == "df"):
            raise CodeSafetyError("Return must be 'return df'.")


def validate_feature_code(code: str) -> None:
    FeatureCodeValidator().visit(ast.parse(code))


def run_feature_code(code: str, X: pd.DataFrame) -> Tuple[pd.DataFrame, List[str]]:
    validate_feature_code(code)
    local_env: Dict[str, Any] = {}
    global_env: Dict[str, Any] = {"__builtins__": {}}
    exec(code, global_env, local_env)
    fn = local_env.get("create_features")
    if not callable(fn):
        raise CodeSafetyError("create_features not found/callable.")
    before = set(X.columns)
    out = fn(X.copy())
    if not isinstance(out, pd.DataFrame):
        raise CodeSafetyError("create_features must return a DataFrame.")
    new_cols = [c for c in out.columns if c not in before and pd.api.types.is_numeric_dtype(out[c])]
    return out, new_cols


def keep_up_to_n_improving(
    X_base: pd.DataFrame,
    y: pd.Series,
    candidate_cols: List[str],
    df_with_candidates: pd.DataFrame,
    base_auc: float,
    n_to_keep: int,
    model: Optional[Any] = None,
) -> Tuple[List[str], float]:
    kept: List[str] = []
    best_auc = base_auc
    Xw = X_base.copy()
    remaining = set(candidate_cols)
    while remaining and len(kept) < n_to_keep:
        best_col, best_gain, best_col_auc = None, 0.0, best_auc
        for col in list(remaining):
            X_try = Xw.copy()
            X_try[col] = df_with_candidates[col]
            new_auc = cv_auc(X_try, y, model=model)
            gain = new_auc - best_auc
            if gain > best_gain + 1e-12:
                best_gain, best_col, best_col_auc = gain, col, new_auc
        if best_col is None or best_gain <= 0.0:
            break
        kept.append(best_col)
        Xw[best_col] = df_with_candidates[best_col]
        best_auc = best_col_auc
        remaining.remove(best_col)
    return kept, best_auc


@dataclass
class AutoFEResult:
    base_auc: float
    final_auc: float
    added_features: List[str]
    final_model: Pipeline
    X_final: pd.DataFrame
    y: pd.Series


def automated_llm_fe_fit(
    df: pd.DataFrame,
    target: str,
    model: Optional[Any] = None,
    n_per_round: Optional[int] = None,
    max_iterations: Optional[int] = None
) -> AutoFEResult:
    if target not in df.columns:
        raise ValueError(f"Target '{target}' not found in DataFrame.")
    y = df[target]
    if len(pd.unique(y)) != 2:
        raise ValueError("Binary target required.")
    X = df.drop(columns=[target]).copy()

    n_per_round = CONFIG.n_per_round if n_per_round is None else int(n_per_round)
    max_iterations = CONFIG.max_iterations if max_iterations is None else int(max_iterations)

    base_auc = cv_auc(X, y, model=model)
    X_work = X.copy()
    current_auc = base_auc
    kept_all: List[str] = []

    for _ in range(max_iterations):
        print(f"\n=== LLM AutoFE Iteration {_ + 1} ===")
        print("\n--- LLM Feature Engineering Prompt ---")
        prompt = build_feature_code_prompt(X_work, target)
        print(prompt)

        code = ask_llm(prompt, model=CONFIG.llm_model, options={"temperature": 0.5})
        match = re.search(r'```python\s*(.*?)\s*```', code, re.DOTALL) 
        if match:
            clean_code = match.group(1).strip()
        else:
            clean_code = code.strip()

        print("\n--- LLM Feature Engineering Code ---")
        print(clean_code) 
        try:
            df_with_candidates, new_cols = run_feature_code(clean_code, X_work)
        except CodeSafetyError:
            continue
        if not new_cols:
            continue
        kept, improved_auc = keep_up_to_n_improving(
            X_work, y, new_cols, df_with_candidates, current_auc, n_per_round, model=model
        )
        if kept:
            for c in kept:
                X_work[c] = df_with_candidates[c]
            kept_all.extend(kept)
            print(f"\nKept features this round: {kept}")
            print(f"Old AUC: {current_auc:.4f}")

            current_auc = improved_auc
            print(f"New AUC: {current_auc:.4f}")
            
        else:
            print("\nNo features improved AUC this round.")

    final_model = make_pipeline(X_work, model)
    final_model.fit(X_work, y)
    print("\n=== LLM AutoFE Completed ===")
    print("Final Dataframe Head:")
    print(X_work.head())

    return AutoFEResult(
        base_auc=base_auc,
        final_auc=current_auc,
        added_features=kept_all,
        final_model=final_model,
        X_final=X_work,
        y=y,
    )


def print_report(res: AutoFEResult):
    delta = (res.final_auc - res.base_auc) / res.base_auc
    print("\n=== LLM AutoFE Result ===")
    print(f"Baseline CV AUC : {res.base_auc:.4f}")
    print(f"Final CV AUC    : {res.final_auc:.4f}  ({'+' if delta>=0 else ''}{delta*100:.2f}%)")
    print(f"Features created: {len(res.added_features)}")
    for f in res.added_features:
        print(f"  • {f}")

In [None]:
# Run automated LLM feature engineering
rng = np.random.RandomState(CONFIG.seed)
res = automated_llm_fe_fit(df, target="class")
print_report(res)
# preds = res.final_model.predict(res.X_final)
# acc = accuracy_score(res.y, preds)


=== LLM AutoFE Iteration 1 ===

--- LLM Feature Engineering Prompt ---
You are an expert feature engineer. Write Python that adds new numeric columns to a pandas DataFrame for a BINARY CLASSIFICATION task and returns the DataFrame.

CONTEXT
- Target (not in df below): "class"
- All columns: ["Age", "Sex", "Job", "Housing", "Saving accounts", "Checking account", "Credit amount", "Duration", "Purpose"]
- Numeric: ["Age", "Job", "Credit amount", "Duration"]
- Categorical: ["Sex", "Housing", "Saving accounts", "Checking account", "Purpose"]
- Numeric summary: {"Age": {"min": 19.0, "max": 75.0, "mean": 35.546}, "Job": {"min": 0.0, "max": 3.0, "mean": 1.904}, "Credit amount": {"min": 250.0, "max": 18424.0, "mean": 3271.258}, "Duration": {"min": 4.0, "max": 72.0, "mean": 20.903}}
- Categorical sample values: {"Sex": ["male", "female"], "Housing": ["own", "free", "rent"], "Saving accounts": ["little", "quite rich", "rich", "moderate"], "Checking account": ["little", "moderate", "rich"], "Purp