<a href="https://colab.research.google.com/github/JoohoSon24/tire_competition/blob/main/%EC%86%90%EC%A3%BC%ED%98%B8_main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from google.colab import drive
drive.mount('/content/drive')

!cp drive/MyDrive/25F/Tire/5-ai-and-datascience-competition.zip .
!mkdir -d data
!unzip 5-ai-and-datascience-competition.zip -d data/

ValueError: mount failed

In [None]:
!pip install catboost

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from scipy.stats import norm

from sklearn.preprocessing import StandardScaler, LabelBinarizer, OneHotEncoder
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import cross_val_score, train_test_split

from imblearn.over_sampling import SMOTE

tab10_kwargs = {'cmap': 'tab10', 'vmin': 0, 'vmax': 9}

In [None]:
def split_data(df, train=True):
    cols_sim = [f'{ch}{i}' for i in range(256) for ch in ['x', 'y', 'p']]
    X_sum = df.drop(columns=cols_sim)
    X_fem = df[cols_sim]
    if train:
      X_sum = X_sum.drop(columns='Class')
      y = df['Class']
      return X_sum, X_fem, y
    else:
      X_sum = X_sum.drop(columns='ID')
      ids = df['ID']
      return X_sum, X_fem, ids

def numerize(X_sum, oe=None):
  cat_cols = X_sum.select_dtypes(include=['object', 'category', 'bool']).columns.tolist()
  num_cols = X_sum.select_dtypes(exclude=['object', 'category', 'bool']).columns.tolist()

  train = oe is None

  Xn = X_sum[num_cols].values
  if train:
    oe = OneHotEncoder(sparse_output=False)
    Xc = oe.fit_transform(X_sum[cat_cols])
  else:
    Xc = oe.transform(X_sum[cat_cols])

  ohe_cols = oe.get_feature_names_out(cat_cols).tolist()
  X = np.concatenate([Xc, Xn], axis=1)
  X = pd.DataFrame(data=X, columns=ohe_cols+num_cols, index=X_sum.index)
  if train:
    return X, oe
  else:
    return X

def apply_smote(X, y, random_state=42):
    smote = SMOTE(random_state=random_state)
    X_res, y_res = smote.fit_resample(X, y)
    return X_res, y_res

In [None]:
def curve_length(x, y, p):
    P = np.stack([x, y], axis=-1)
    return np.sqrt((np.diff(P, axis=-2) ** 2).sum(axis=-1)).sum(axis=-1)

def stress_length(x, y, p, p_thr=2.5):
    P = np.stack([x, y], axis=-1)  # (N, l, 2)
    mask = (p > p_thr)  # (N, l)

    ls = []
    for i in range(p.shape[0]):
        P_stress = P[i, mask[i], :]  # (l', 2)
        l = np.sqrt((np.diff(P_stress, axis=0) ** 2).sum(axis=-1)).sum(axis=-1)
        ls.append(l)
    ls = np.array(ls)

    return ls

def bend_extent(x, y, p):
    return (y.max(axis=-1) - y.min(axis=-1)) / (x.max(axis=-1) - x.min(axis=-1))

def max_curvature(x, y, p):
    dx  = np.gradient(x, axis=1)
    dy  = np.gradient(y, axis=1)
    ddx = np.gradient(dx, axis=1)
    ddy = np.gradient(dy, axis=1)

    num = np.abs(dx * ddy - dy * ddx)
    den = (dx**2 + dy**2)**1.5 + 1e-9

    curvature = num / den
    max_kappa = np.max(curvature, axis=1)
    return max_kappa

def FEM_feat(rows, fn=lambda x, y, p: p.max(axis=-1)):
    if isinstance(rows, (pd.Series, pd.DataFrame)):
        rows = rows.values
    x, y, p = np.transpose(rows.reshape(-1, rows.shape[1] // 3, 3), axes=(2, 0, 1))
    return fn(x, y, p)

def extract_fem(X_fem, fns=lambda x, y, p: p.max(axis=-1), as_df=False):
  if isinstance(X_fem, (pd.Series, pd.DataFrame)):
    rows = X_fem.values
  else:
    rows = X_fem

  if not isinstance(fns, list):
    fns = [fns]

  X_fem_new = []
  for fn in fns:
    feat = FEM_feat(X_fem, fn=fn)
    X_fem_new.append(feat)
  X_fem_new = np.stack(X_fem_new, axis=1)

  if as_df:
    X_fem_new = pd.DataFrame(data = X_fem_new, columns=[f'FEM_feat_{i}' for i in range(len(fns))])

  return X_fem_new

In [None]:
def conf_table(y_pred, y_true):
    if y_true.ndim == 2:
      y_true = y_true.squeeze(1)
    fp = ((y_pred == 1) & (y_true == 0)).sum()
    fn = ((y_pred == 0) & (y_true == 1)).sum()
    tp = ((y_pred == 1) & (y_true == 1)).sum()
    tn = ((y_pred == 0) & (y_true == 0)).sum()
    return tp, tn, fp, fn

def plot_curves(p_hat, y_true, ax, curve_type='auroc', n=100):
    if y_true.ndim == 2:
      y_true = y_true.squeeze(1)

    x, y = [], []
    for thr in np.linspace(0, 1, n + 1):
      y_pred = (p_hat > thr).astype(int)

      tp, tn, fp, fn = conf_table(y_pred, y_true)
      if curve_type == 'auroc':
        xx = fp / np.clip((tn + fp), 1, np.inf) # FPR
        yy = tp / np.clip((tp + fn), 1, np.inf) # Recall, TPR
      elif curve_type == 'auprc':
        xx = tp / np.clip((tp + fn), 1, np.inf) # Recall, TPR
        yy = tp / (tp + fp) if tp + fp > 0 else 1 # Precision
      elif curve_type == 'NPV':
        xx = thr
        yy = tn / (tn + fn) if tn + fn > 0 else 1
      elif curve_type == 'profit':
        xx = thr
        yy = tn * 100 - fn * 2000 - 99999 * int(fn + tn > 200 * len(y_true) / 466)
      x.append(xx)
      y.append(yy)

    x = np.array(x)
    y = np.array(y)

    if curve_type in ['auroc', 'auprc']:
      score = -((y[1:] + y[:-1]) / 2 * np.diff(x)).sum()
      ax.plot([0, 1], [0, 1] if curve_type == 'auroc' else [1, 0], 'k--', alpha=0.3)
      xlabel = 'FPR' if curve_type == 'auroc' else 'Recall'
      ylabel = 'TPR' if curve_type == 'auroc' else 'Precision'
      print(f"{curve_type.upper()} score: {score:.4f}")
    elif curve_type == 'NPV':
      score = y[len(y)//2]
      print(f"NPV at thr=0.5: {score:.4f}")
      xlabel = 'Threshold'
      ylabel = 'NPV'
    elif curve_type == 'profit':
      thr_opt = x[np.argmax(y)]
      score = np.max(y)
      print(f"Actual Optimal Profit: {score:.1f} at thr={thr_opt:.4f}")
      xlabel = 'Threshold'
      ylabel = 'Profit'

    ax.plot(x, y, c='red', lw=1)
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    ax.set_title(f'{curve_type} curve)')

    return score

def plot_profit(p_hat, y_true, ax, quantile=[0.25, 0.75], n=100, xlim=None, ylim=None):
    x, y_expected_mean, y_expected_std = [], [], []
    if y_true is None:
      y_true = np.zeros_like(p_hat)
    for thr in np.linspace(0, 1, n + 1):
      y_pred = (p_hat > thr).astype(int)
      p_selected = p_hat[p_hat < thr]

      y_bar = (100 * (1 - p_selected) - 2000 * p_selected).sum() - 99999 * int(len(p_selected) > 200 * len(p_hat) / 466)
      y_std = 2100 * np.sqrt((p_selected * (1 - p_selected)).sum())

      x.append(thr)
      y_expected_mean.append(y_bar)
      y_expected_std.append(y_std)

    x = np.array(x)
    y_expected_mean = np.array(y_expected_mean)
    y_expected_std = np.array(y_expected_std)

    #thr_opt_expected = x[np.argmax(y_expected_mean)]
    thr_opt_expected = 0.03
    p_max_expected = np.max(y_expected_mean)
    p_std_expected = y_expected_std[np.argmax(y_expected_mean)]

    y_pred = (p_hat > thr_opt_expected).astype(int)
    tp, tn, fp, fn = conf_table(y_pred, y_true)
    p_decision = tn * 100 - fn * 2000 - 99999 * int(fn + tn > 200 * len(y_true) / 466)

    print(f"Expected Optimal Profit: {p_max_expected:.1f}(±{p_std_expected:.1f}) at thr={thr_opt_expected:.4f}")
    print(f"Decision Profit: {p_decision:.1f} at thr={thr_opt_expected:.4f}")

    ax.plot(x, y_expected_mean, c='black', lw=1)
    y_q0 = y_q = y_expected_mean + norm.ppf(quantile[0]) * y_expected_std
    for q in quantile:
      y_q = y_expected_mean + norm.ppf(q) * y_expected_std
      ax.plot(x, y_q, c='grey', label=f"{q*100}%", lw=1)
    ax.fill_between(x, y_q0, y_q, fc='gray', alpha=0.3)

    if xlim is not None:
      ax.set_xlim(xlim)
    if ylim is not None:
      ax.set_ylim(ylim)

    ax.set_xlabel('Decision Threshold')
    ax.set_ylabel('Profit')
    ax.set_title(f'Profit curve')
    plt.legend()

    return thr_opt_expected, p_max_expected, p_std_expected, p_decision

def print_result(model, X, y_true):
    y_pred = model.predict(X)
    tp, tn, fp, fn = conf_table(y_pred, y_true)

    accuracy = (tp + tn) / (tp + fp + tn + fn)
    recall = (tp) / (tp + fn)
    precision = (tp) / (tp + fp)
    npv = (tn) / (tn + fn)
    f1 = 2 * (recall * precision) / (recall + precision)

    print(f"Ground truth positive rate: {y_true.mean():.4f}")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"NPV: {npv:.4f}")
    print(f"F1 score: {f1:.4f}")
    print()

def final_score(auroc, profit_mean, profit_std):
  return np.sqrt(max(auroc - 0.5, 0) / 0.5 * max(profit_mean, 0) / 20000).item()

In [None]:
def plot_FEM(row, ax):
    if isinstance(row, pd.Series):
        row = row.values
    x, y, p = row.reshape(-1, 3).T
    im = ax.scatter(x, y, c=p, cmap='jet', s=0.5, vmin=1.2, vmax=3)
    plt.colorbar(im)

In [None]:
import os
import shutil
from google.colab import files

# 1. 'data' 폴더 생성 (이미 있으면 무시)
# 주의: '/data'는 시스템 루트 경로이므로 권한 문제가 생길 수 있습니다. 현재 위치 기준인 'data'를 추천합니다.
if not os.path.exists('data'):
    os.makedirs('data')

print("아래 [파일 선택] 버튼을 눌러 train.csv와 test.csv를 모두 선택해 주세요.")

# 2. 로컬 파일 업로드 (train.csv, test.csv 선택)
uploaded = files.upload()

# 3. 업로드된 파일을 'data' 폴더로 이동
for filename in uploaded.keys():
    # 파일이 이미 data 폴더에 있다면 덮어쓰기 위해 이동 전 확인
    source = filename
    destination = os.path.join('data', filename)

    # 이동 (shutil.move는 파일 이동 명령어)
    shutil.move(source, destination)
    print(f"✅ {filename} 파일이 {destination} 위치로 이동되었습니다.")

In [None]:
df_pub = pd.read_csv('data/train.csv')
df_exam = pd.read_csv('data/test.csv')

In [None]:
X_sum, X_fem, cl_ = split_data(df_pub)
X_sum_exam, X_fem_exam, ids = split_data(df_exam, train=False)

le = LabelBinarizer()
cl = le.fit_transform(cl_)

In [None]:
def cross_validate(model, X, y, cv=5, seed=None):
  if isinstance(y, pd.Series):
    y = y.values
  if y.ndim == 2:
    y = y.squeeze(1)

  N = X.shape[0]
  index = np.random.permutation(np.arange(N))
  X, y = X.iloc[index], y[index]

  block_size = N // cv
  ps, ys = [], []
  for i in range(cv):
    s, e = block_size * i, min(block_size * (i + 1), N)
    mask = ((np.arange(N) >= s) & (np.arange(N) < e))
    X_train, X_val = X.iloc[~mask], X.iloc[mask]
    y_train, y_val = y[~mask], y[mask]

    model.fit(X_train, y_train)
    p = model.predict_proba(X_val)[:, 1]
    ps.append(p)
    ys.append(y_val)
  ps = np.stack(ps)
  ys = np.stack(ys)

  return ps, ys

# 4. FEM Info. Extraction

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam

from tqdm.notebook import tqdm

def collate_fn(batch):
  keys = batch[0][-1].keys()
  xs = torch.stack([x for x, y, p, l in batch])
  ys = torch.stack([y for x, y, p, l in batch])
  ps = torch.stack([p for x, y, p, l in batch])
  labels = {k: torch.tensor([l[k] for x, y, p, l in batch]).to(torch.float32) for k in keys}
  return xs, ys, ps, labels

class FEMDataset(Dataset):
  def __init__(self, df, labels):
    self.df = df
    labels = (labels - labels.mean()) / labels.std()
    self.labels = {col: labels[col].values for col in labels.columns}

  def __len__(self):
    return len(self.df)

  def __getitem__(self, i):
    row = self.df.iloc[i].values
    x, y, p = row.reshape(-1, 3).T
    x = torch.from_numpy(x).to(torch.float32)
    y = torch.from_numpy(y).to(torch.float32)
    p = torch.from_numpy(p).to(torch.float32)

    x = (x - x[0]) / 100
    y = (y - y[0]) / 20

    l = {k: v[i] for k, v in self.labels.items()}
    return x, y, p, l

  def get_labels(self):
    return self.labels

class FEMEncoder(nn.Module):
  def __init__(self, in_dim=256*3, hid_dim=256, rep_dim=16, p=0.1, targets: list = ['G1', 'G2', 'G3']):
    super().__init__()
    self.body = nn.Sequential(
        nn.Linear(in_dim, hid_dim),
        nn.Dropout(p=p),
        nn.ReLU(),
        nn.Linear(hid_dim, rep_dim),
        nn.Dropout(p=p),
        nn.ReLU()
    )

    heads = {}
    for k in targets:
      heads[k] = nn.Linear(rep_dim, 1)
    self.heads = nn.ModuleDict(heads)
    self.targets = list(self.heads.keys())

  def get_repv(self, x, y, p):
    h = torch.cat([x, y, p], dim=-1)
    return self.body(h)

  def forward(self, x, y, p):
    h = self.get_repv(x, y, p)
    ys = {}
    for k in self.targets:
      ys[k] = self.heads[k](h).squeeze(-1)
    return ys

class CNNEncoder(FEMEncoder):
    def __init__(self, in_dim=3, hid_dim=64, rep_dim=16, n_blocks=4, p=.1, targets: list = ['G1', 'G2', 'G3']):
        """
        channels: 입력 채널 수 (x, y, p → 3)
        hid_dim: Conv hidden size
        layers: dilated residual block 개수
        """
        super().__init__(in_dim, hid_dim, rep_dim, p, targets)
        del self.body

        self.convs = nn.ModuleList()

        in_channels = in_dim
        out_channels = hid_dim
        dilation = 1
        for _ in range(n_blocks):
            padding = dilation
            block = nn.Sequential(
                nn.Conv1d(in_channels, out_channels, kernel_size=3,
                          dilation=dilation, padding=padding),
                nn.Dropout(p=p),
                nn.ReLU()
            )
            self.convs.append(block)
            in_channels = out_channels
            out_channels = hid_dim
            dilation *= 2

        self.mlp = nn.Sequential(
            nn.Linear(hid_dim, hid_dim),
            nn.ReLU(),
            nn.Linear(hid_dim, rep_dim)
        )

    def get_repv(self, x, y, p):
        """
        x, y, p: 각각 shape (batch, T)
        return: z (batch, hidden_dim)
        """
        # (B, T) → (B, 1, T)
        x = x.unsqueeze(1)
        y = y.unsqueeze(1)
        p = p.unsqueeze(1)

        # concat → (B, 3, T)
        h = torch.cat([x, y, p], dim=1)

        # CNN forward
        for conv in self.convs:
            h = conv(h)

        z = h.mean(dim=-1)

        # MLP → final representation
        z = self.mlp(z)
        return z

In [None]:
def loss_medusa(preds, trues, weight=None):
  assert preds.keys() == trues.keys()

  loss = torch.zeros(1, device=list(preds.values())[0].device)
  for k in trues.keys():
    pred, true = preds[k], trues[k]
    assert pred.device == true.device, f"pred: {pred.device} / true: {true.device}"
    loss += nn.functional.mse_loss(pred, true)
  return loss

In [None]:
X_fem_all = pd.concat([X_fem, X_fem_exam], axis=0).reset_index(drop=True)
X_sum_all = pd.concat([X_sum, X_sum_exam], axis=0).reset_index(drop=True)

N = len(X_fem_all)
r = .8
max_epoch = 100

device = 'cuda' if torch.cuda.is_available() else 'cpu'

target_cols = ['Y1', 'G1', 'X1', 'Y5', 'X5', 'Y2', 'G2', 'G4', 'G3', 'Y3']#'Proc_Param8'] # Y1, G1, X1, Y5, X5
loss_fn = loss_medusa

indices = torch.randperm(N)

train_set = FEMDataset(X_fem_all.iloc[indices[:int(N*r)]], labels=X_sum_all[target_cols].iloc[indices[:int(N*r)]])
val_set = FEMDataset(X_fem_all.iloc[indices[int(N*r):]], labels=X_sum_all[target_cols].iloc[indices[int(N*r):]])

train_loader = DataLoader(train_set, batch_size=16, collate_fn=collate_fn, shuffle=True)
val_loader = DataLoader(val_set, batch_size=16, collate_fn=collate_fn, shuffle=False)

In [None]:
model_fem = CNNEncoder(hid_dim=256, targets=target_cols)
for batch in val_loader:
  x, y, p, ls = batch
  ls_pred = model_fem(x, y, p)
  print({k: v.shape for k, v in ls_pred.items()}); break

In [None]:
model_fem = CNNEncoder(hid_dim=128, rep_dim=12, p=0.1, targets=target_cols)
optimizer = Adam(params=model_fem.parameters(), lr=1e-3, weight_decay=2e-4)

def run_epoch(train, epoch, device, verbose=True):
  loader = train_loader if train else val_loader
  epoch_loss = []
  bar = tqdm(loader) if verbose else loader
  for batch in bar:
    x, y, p, labels = batch
    x = x.to(device)
    y = y.to(device)
    p = p.to(device)
    for k in labels.keys():
      labels[k] = labels[k].to(device)

    labels_pred = model_fem(x, y, p)
    loss = loss_fn(labels_pred, labels)
    if train:
      loss.backward()
      optimizer.step()
      optimizer.zero_grad()
    epoch_loss.append(loss.item())
  epoch_loss = sum(epoch_loss) / len(epoch_loss)
  return epoch_loss

model_size = sum([param.numel() for param in model_fem.parameters()])
print(f"Model size: {model_size}")

loss_dict = {'train': [], 'val': []}
model_fem.to(device)
for i in range(max_epoch):
  verbose = (i % 10 == 0)
  model_fem.train()
  train_loss = run_epoch(train=True, epoch=i, device=device, verbose=verbose)
  loss_dict['train'].append(train_loss)
  print(f"Epoch {i} training loss: {train_loss:.6f}") if verbose else None

  model_fem.eval()
  with torch.no_grad():
    val_loss = run_epoch(train=False, epoch=i, device=device, verbose=verbose)
  loss_dict['val'].append(val_loss)
  print(f"Epoch {i} validation loss: {val_loss:.6f}") if verbose else None

In [None]:
labels = val_set.get_labels()
labels_pred = {k: [] for k in target_cols}
model_fem.eval()
for batch in val_loader:
  x, y, p, _ = batch
  x = x.to(device)
  y = y.to(device)
  p = p.to(device)
  with torch.no_grad():
    out = model_fem(x, y, p)
    for k in target_cols:
      labels_pred[k].append(out[k])

for k in target_cols:
  labels_pred[k] = torch.cat(labels_pred[k]).detach().cpu().numpy()

r, c = 6, 2
fig, axes = plt.subplots(r, c, figsize=(6 * c, 4 * r))

axes[0, 0].plot(loss_dict['train'], label='train')
axes[0, 0].plot(loss_dict['val'], label='val')
axes[0, 0].set_title("Loss plot")
axes[0, 0].legend()

for n, k in enumerate(target_cols):
  i, j = (n + 1) // c, (n + 1) % c
  ax = axes[i, j]
  label, label_pred = labels[k], labels_pred[k]
  m, M = np.sort(np.concatenate([label, label_pred]))[[0, -1]]

  ax.scatter(label, label_pred, s=5)
  ax.plot([m, M], [m, M], lw=0.5, c='black')
  ax.set_xlim(m - (M - m) * 0.05, M + (M - m) * 0.05)
  ax.set_ylim(m - (M - m) * 0.05, M + (M - m) * 0.05)

  SST = ((label - label.mean()) ** 2).sum()
  SSE = ((label_pred - label) ** 2).sum()
  R_sq =  1 - SSE / SST
  ax.set_title(f"Feature {k} - R sq.: {R_sq:4f}")

In [None]:
pub_set = FEMDataset(X_fem, labels=X_sum[target_cols])
pub_loader = DataLoader(pub_set, batch_size=16, collate_fn=collate_fn, shuffle=False)

rep_vs = []
model_fem.eval()
for batch in pub_loader:
  x, y, p, _ = batch
  x = x.to(device)
  y = y.to(device)
  p = p.to(device)
  with torch.no_grad():
    rep_vs.append(model_fem.get_repv(x, y, p))
rep_vs = torch.cat(rep_vs, dim=0).detach().cpu().numpy()

In [None]:
df_fem_feat = pd.DataFrame(
    data = rep_vs,
    columns = [f'FEM_feat_{i}' for i in range(rep_vs.shape[1])]
)
df_fem_feat.to_csv('data/fem_ext.csv')
df_fem_feat

# 1. Model

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.calibration import CalibratedClassifierCV

from catboost import CatBoostClassifier
from xgboost import XGBClassifier, XGBRFClassifier

import shap

model_type = 'rf' # 'catboost' 'xgb' 'rf'
include_fem = True
augment = False

if model_type == 'catboost':
  model = CatBoostClassifier(
      cat_features = ['Plant', 'Proc_Param6'],
      verbose = 0,
      )
  X = X_sum

elif model_type == 'xgb':
  model = XGBRFClassifier(
      n_estimators = 1000,
      scale_pos_weight=1,
      )
  X, oe = numerize(X_sum)

elif model_type == 'rf':
  model = RandomForestClassifier(
      n_estimators = 1000,
      ##########################################class_weight={0: 1, 1: 20,
      max_features = "sqrt",
      #max_depth = 25,
      #min_samples_split = 50,
      #min_samples_leaf = 16,
      bootstrap = True,
      criterion = "log_loss"
      )
  X, oe = numerize(X_sum)

In [None]:

def engineer_specs(X):
    """
    1. 타이어 제원(Spec) 기반 물리량 계산
    - 단순 Width, Aspect 등이 아닌 실제 물리적 높이, 지름, 부피 등을 유도
    """
    df = X.copy()

    # Mass_Pilot: Boolean -> Int (1/0)
    if 'Mass_Pilot' in df.columns:
        df['Mass_Pilot'] = df['Mass_Pilot'].astype(int)

    # Plant: One-Hot Encoding (공장이 여러 곳일 경우를 대비)
    if 'Plant' in df.columns:
        df = pd.get_dummies(df, columns=['Plant'], prefix='Plant')

    # 물리 공식 적용
    # 1. Sidewall Height (단면 높이 mm) = 단면폭 * 편평비 / 100
    df['Feat_Sidewall_H'] = df['Width'] * (df['Aspect'] / 100)

    # 2. Total Diameter (타이어 전체 지름 mm) = (단면 높이 * 2) + (휠 인치 * 25.4)
    df['Feat_Diameter'] = (df['Feat_Sidewall_H'] * 2) + (df['Inch'] * 25.4)

    # 3. Tire Volume Proxy (타이어 부피 근사치) = 지름 * 단면폭 (적재/물류 효율성 관련)
    df['Feat_Volume_Proxy'] = df['Feat_Diameter'] * df['Width']

    # 4. Aspect Ratio Interaction (인치 대비 폭 비율)
    df['Feat_Width_to_Inch'] = df['Width'] / df['Inch']

    return df

def engineer_geometry(X):
    """
    2. 형상 벡터(X1~X5, Y1~Y5) 기하학적 처리
    - 단순 좌표가 아닌 다각형의 면적, 둘레, 왜도 등을 계산
    """
    df = X.copy()

    # 좌표 컬럼 존재 여부 확인
    x_cols = [f'X{i}' for i in range(1, 6)]
    y_cols = [f'Y{i}' for i in range(1, 6)]

    if not all(col in df.columns for col in x_cols + y_cols):
        return df

    # 1. 원점(0,0) 혹은 중심으로부터의 거리 평균 (Shape Magnitude)
    # 각 점들이 얼마나 퍼져있는가?
    dists = []
    for xc, yc in zip(x_cols, y_cols):
        dists.append(np.sqrt(df[xc]**2 + df[yc]**2))

    df['Feat_Geo_Mean_Dist'] = np.mean(dists, axis=0)
    df['Feat_Geo_Max_Dist'] = np.max(dists, axis=0)
    df['Feat_Geo_Std_Dist'] = np.std(dists, axis=0) # 형상의 불규칙성

    # 2. 형상 비대칭성 (X축, Y축 범위 비율)
    x_range = df[x_cols].max(axis=1) - df[x_cols].min(axis=1)
    y_range = df[y_cols].max(axis=1) - df[y_cols].min(axis=1)
    df['Feat_Geo_Aspect_Ratio'] = x_range / (y_range + 1e-6)

    # 3. 다각형 면적 (Shoelace Formula: 신발끈 공식)
    # 5개의 점이 이루는 내부 면적 계산
    area = 0
    for i in range(5):
        j = (i + 1) % 5
        # x_i * y_j - x_j * y_i
        term = df[f'X{i+1}'] * df[f'Y{j+1}'] - df[f'X{j+1}'] * df[f'Y{i+1}']
        area += term
    df['Feat_Geo_Area'] = np.abs(area) / 2.0

    return df

def engineer_process_params(X):
    """
    3. 공정 파라미터(Proc_Param) 통계 처리
    - 파라미터 간의 편차(Instability) 확인
    """
    df = X.copy()
    proc_cols = [c for c in df.columns if 'Proc_Param' in c]

    if proc_cols:
        # 공정 파라미터들을 정규화했다고 가정하고, 행(Row)별 통계를 냄
        # 의미: "이 타이어 생산 시 공정 세팅이 얼마나 극단적이었나?"
        df['Feat_Proc_Mean'] = df[proc_cols].mean(axis=1)
        df['Feat_Proc_Std'] = df[proc_cols].std(axis=1)  # 세팅의 변동성
        df['Feat_Proc_Max'] = df[proc_cols].max(axis=1)  # 가장 강했던 공정 부하

        # PCA 등을 쓰지 않고 간단한 상호작용 (선택적)
        # 예: 파라미터1 * 파라미터2 (도메인 지식 필요하므로 여기선 제외)

    return df

def drop_raw_simulation(X):
    """
    4. DNN 압축 완료된 원본 고차원 시뮬레이션 데이터 제거
    """
    df = X.copy()
    # x0~x255, y0~y255, p0~p255 패턴 제거
    drop_cols = [c for c in df.columns if c.startswith(('x', 'y', 'p')) and c[1:].isdigit()]

    # 대소문자 주의 (X1~X5는 대문자이므로 유지됨. x0~x255 소문자 제거)
    # 혹시 모를 G1~G4(결과값)도 Feature로 쓰면 안되므로(Data Leakage) 제외해야 함 (Train에선 제외)
    # 여기서는 x, y, p만 명시적으로 제거
    if drop_cols:
        df = df.drop(columns=drop_cols)

    return df

def encode_target(X):
    """
    5. Class 라벨 인코딩 (NG=1, Good=0)
    """
    df = X.copy()
    if 'Class' in df.columns:
        # NG가 불량(Positive case)인 경우가 많음
        df['Class'] = df['Class'].apply(lambda x: 1 if 'NG' in str(x) else 0)
    return df

# -----------------------------------------------------------
# 실행 파이프라인
# -----------------------------------------------------------

# 예시용 더미 데이터 생성 (실제 사용 시엔 생략)
# X = pd.read_csv('your_data.csv')

print("--- 1. Spec Engineering 진행 중 (타이어 물리량 계산)...")
X = engineer_specs(X)

print("--- 2. Geometry Engineering 진행 중 (형상 벡터 면적/거리 계산)...")
X = engineer_geometry(X)

print("--- 3. Process Param Engineering 진행 중 (공정 변수 통계 추출)...")
X = engineer_process_params(X)

print("--- 4. Raw Simulation Data Cleaning 진행 중 (DNN 압축 대체된 원본 제거)...")
X = drop_raw_simulation(X)

print("--- 5. Target Encoding 진행 중 (NG/Good -> 1/0)...")
X = encode_target(X)

print("\n✅ 모든 Feature Engineering 완료!")
print(f"최종 데이터 크기: {X.shape}")
print("생성된 파생 변수 예시:")
new_feats = [c for c in X.columns if c.startswith('Feat')]
print(X[new_feats].head(2).T)

In [None]:
if include_fem:
  X_fem_ext = df_fem_feat
  X = pd.concat([X, X_fem_ext], axis=1)

X_train, X_test, cl_train, cl_test = train_test_split(X, cl, train_size=0.8)
if augment:
  X_train, cl_train = apply_smote(X_train, cl_train)

In [None]:
model.fit(X_train, cl_train)

In [None]:
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test)

In [None]:
shap_value_pos = shap_values[:, :, 0]
shap.summary_plot(shap_value_pos, X_test, plot_type="dot", show=False)

In [None]:
ps, ys = cross_validate(model, X, cl, cv=15)

In [None]:
p = ps.flatten()
y = ys.flatten()
mask = (y == 0)

fig, axes = plt.subplots(1, 2, figsize=(15, 7))

upper = 0.1
bins = np.linspace(0, upper, 11)
freq_n, _, im1 = axes[0].hist(p[mask], bins=bins, alpha=0.7)
freq_p, _, im2 = axes[0].hist(p[~mask], bins=bins, alpha=0.7)

acc_n = np.cumsum(freq_n)
acc_p = np.cumsum(freq_p)

p_pred = (bins[1:] + bins[:-1]) / 2

ratio = freq_p / (freq_n + freq_p)

bin_indices = np.digitize(p, bins)
bin_indices = np.clip(bin_indices, 1, len(bins) - 1) - 1
p_bin = ratio[bin_indices]

axes[1].plot(p_pred, ratio)
axes[1].scatter(p, p_bin, s=3, alpha=0.5, c=y, **tab10_kwargs)
axes[1].set_xlim(-upper * 0.1, upper * 1.1)
axes[1].set_ylim(-upper * 0.1, upper * 1.1)
axes[1].plot([0, upper], [0, upper], c='grey', lw=0.7, ls='--')

In [None]:
fig, axes = plt.subplots(2, 2, figsize=(12, 8))

auroc = plot_curves(p, y, axes[0, 0], curve_type='auroc', n=2000)
auprc = plot_curves(p, y, axes[0, 1], curve_type='auprc', n=2000)
npv = plot_curves(p, y, axes[1, 0], curve_type='NPV', n=2000)
p_actual = plot_curves(p, y, axes[1, 1], curve_type='profit', n=2000)
thr_opt, p_mean, p_std, p_dec = plot_profit(p, y, axes[1, 1], quantile=[0.05, 0.25, 0.75, 0.95], n=1000, xlim=(0, 0.1), ylim=(-10000, 21000))
print(f"final score (predicted): {final_score(auroc, p_mean, p_std):.4f}")
print(f"final score (actual upper bound): {final_score(auroc, p_actual, p_std):.4f}")
print(f"final score (decision): {final_score(auroc, p_dec, p_std):.4f}")
plt.tight_layout()

In [None]:
X_fem_exam
pub_set_exam = FEMDataset(X_fem_exam, labels=X_sum_exam[target_cols])
pub_loader_exam = DataLoader(pub_set_exam, batch_size=16, collate_fn=collate_fn, shuffle=False)

rep_vs_exam = []
model_fem.eval()
for batch in pub_loader_exam:
  x, y, p, _ = batch
  x = x.to(device)
  y = y.to(device)
  p = p.to(device)
  with torch.no_grad():
    rep_vs_exam.append(model_fem.get_repv(x, y, p))
rep_vs_exam = torch.cat(rep_vs_exam, dim=0).detach().cpu().numpy()

df_fem_feat_exam = pd.DataFrame(
    data = rep_vs_exam,
    columns = [f'FEM_feat_{i}' for i in range(rep_vs.shape[1])]
)
df_fem_feat_exam.to_csv('data/fem_ext.csv')
df_fem_feat_exam

In [None]:
model.fit(X, y)

if model_type in ['rf', 'xgb']:
  X_sum_exam = numerize(X_sum_exam, oe=oe)

print("--- 1. Spec Engineering 진행 중 (타이어 물리량 계산)...")
X_sum_exam = engineer_specs(X_sum_exam)

print("--- 2. Geometry Engineering 진행 중 (형상 벡터 면적/거리 계산)...")
X_sum_exam = engineer_geometry(X_sum_exam)

print("--- 3. Process Param Engineering 진행 중 (공정 변수 통계 추출)...")
X_sum_exam = engineer_process_params(X_sum_exam)

print("--- 4. Raw Simulation Data Cleaning 진행 중 (DNN 압축 대체된 원본 제거)...")
X_sum_exam = drop_raw_simulation(X_sum_exam)

print("--- 5. Target Encoding 진행 중 (NG/Good -> 1/0)...")
X_sum_exam = encode_target(X_sum_exam)

if include_fem:
  X_fem_ext_exam = df_fem_feat_exam
  X_exam = pd.concat([X_sum_exam, X_fem_ext_exam], axis=1)

# 예측 수행 (X 대신 X_sum_exam 사용)
p_exam = model.predict_proba(X_exam)[:, 1]

ax = plt.subplot()
thr, p_mean, p_std, _ = plot_profit(p_hat=p_exam, y_true=None, ax=ax, n=1000, xlim=(0, 0.1), ylim=(-10000, 10000))

In [None]:
import datetime

thr = 0.03
decision = (p_exam < thr)
print(f"# of selected products: {decision.sum()}")

submission = pd.read_csv('data/sample_submission.csv')

submission['probability'] = np.concatenate([p_exam, p_exam])
submission['decision'] = np.concatenate([decision, decision])

now = datetime.datetime.now(tz=datetime.timezone(datetime.timedelta(hours=9))).strftime("%m-%d-%H-%M")
submission.to_csv(f"submission_HAIYONG_{now}.csv")