In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import os

# 저장 폴더
os.makedirs("result", exist_ok=True)


# ========================
# 0. 상대좌표 및 velocity 변환 함수 정의
# ========================
def to_relative_coords(df, ref_joint='Hip'):
    ref_x = df[f'{ref_joint}_x']
    ref_y = df[f'{ref_joint}_y']
    for col in df.columns:
        if '_x' in col:
            df[col] = df[col] - ref_x
        elif '_y' in col:
            df[col] = df[col] - ref_y
    return df

# ========================
# 1. 시퀀스 데이터 생성
# ========================
def make_sliding_window_sequences(df, window_size=90, stride=30):
    sequences = []
    labels = []
    for sid in df['session_id'].unique():
        session = df[df['session_id'] == sid].reset_index(drop=True)
        features = session.drop(columns=['crossfit_label', 'session_id']).values
        targets = session['crossfit_label'].values

        for i in range(0, len(session) - window_size + 1, stride):
            seq = features[i:i+window_size]
            vel = np.diff(seq, axis=0)
            vel = np.vstack([np.zeros((1, seq.shape[1])), vel])
            full_seq = np.concatenate([seq, vel], axis=1)
            label = targets[i + window_size - 1]
            sequences.append(full_seq)
            labels.append(label)
    return np.array(sequences), np.array(labels)

# ========================
# 2. 모델 구조 정의
# ========================
class HybridClassifier(nn.Module):
    def __init__(self, input_dim, seq_len, num_classes, d_model=128, nhead=4, num_layers=2):
        super().__init__()
        self.conv1 = nn.Conv1d(input_dim, d_model, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
        self.norm = nn.LayerNorm(d_model)
        self.cls_token = nn.Parameter(torch.randn(1, 1, d_model))
        self.pos_embedding = nn.Parameter(torch.randn(1, seq_len + 1, d_model))
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, batch_first=True)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.classifier = nn.Linear(d_model, num_classes)

    def forward(self, x):
        x = x.transpose(1, 2)
        x = self.conv1(x)
        x = self.relu(x).transpose(1, 2)
        x = self.norm(x)
        cls_tokens = self.cls_token.expand(x.size(0), 1, -1)
        x = torch.cat([cls_tokens, x], dim=1)
        x = x + self.pos_embedding[:, :x.size(1), :]
        x = self.transformer(x)
        return self.classifier(x[:, 0])

# ========================
# 3. 프레스 테스트 데이터 불러오기
# ========================
csv_paths = [
    "./crossfit-7_frame.csv",  # 팔 이상
    "./crossfit-8_frame.csv",  # 정상
    "./crossfit-9_frame.csv"   # 어깨 이상
]

session_label_map = {
    7: 2,  # 팔 이상
    8: 0,  # 정상
    9: 3   # 어깨 이상
}

dfs = []
for path in csv_paths:
    df = pd.read_csv(path)
    sid = int(path.split("-")[1].split("_")[0])
    df["session_id"] = sid
    df["crossfit_label"] = session_label_map[sid]
    dfs.append(df)

df_all = pd.concat(dfs, ignore_index=True)
df_all = to_relative_coords(df_all)

# ========================
# 4. 데이터 전처리
# ========================
X, y = make_sliding_window_sequences(df_all, window_size=90, stride=30)
X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.long)

test_loader = DataLoader(TensorDataset(X_tensor, y_tensor), batch_size=16)

# ========================
# 5. 모델 로드 및 평가
# ========================
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = HybridClassifier(input_dim=X.shape[2], seq_len=X.shape[1], num_classes=4).to(device)
model.load_state_dict(torch.load("./press_transformer_best.pth", map_location=device))
model.eval()

y_true, y_pred = [], []
with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(device)
        outputs = model(xb)  # shape: (B, 4)
        preds = torch.argmax(outputs, dim=1).cpu()
        y_true.extend(yb.numpy())
        y_pred.extend(preds.numpy())

# ========================
# 6. 평가 지표 출력
# ========================
label_names = ["Normal", "Overhead Abnormal", "Arm Abnormal", "Shoulder Abnormal"]

print("\n[Classification Report]")
print(classification_report(y_true, y_pred, target_names=label_names))

# 혼동 행렬도 함께 시각화
import matplotlib.pyplot as plt
import seaborn as sns

conf_matrix = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(6, 5))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',
            xticklabels=label_names, yticklabels=label_names)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Hybrid Transformer Confusion Matrix")
plt.tight_layout()
plt.savefig("result/confusion_matrix_test.png")
plt.show()


binary_y_true = [0 if label == 0 else 1 for label in y_true]
binary_y_pred = [0 if label == 0 else 1 for label in y_pred]

print("\n[Binary Classification Report] (Normal vs Abnormal)")
print(classification_report(binary_y_true, binary_y_pred, target_names=["Normal", "Abnormal"]))

# 이진 혼동 행렬 시각화
binary_conf_matrix = confusion_matrix(binary_y_true, binary_y_pred)
plt.figure(figsize=(4, 4))
sns.heatmap(binary_conf_matrix, annot=True, fmt='d', cmap='Blues',
            xticklabels=["Normal", "Abnormal"], yticklabels=["Normal", "Abnormal"])
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Binary Confusion Matrix (Normal vs Abnormal)")
plt.tight_layout()
plt.savefig("result/confusion_matrix_binary_test.png")
plt.show()



FileNotFoundError: [Errno 2] No such file or directory: '/content/gdrive/MyDrive/빅데프/프레스/press_transformer_best.pth'