In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

from imblearn.under_sampling import RandomUnderSampler
import math

In [None]:
df = pd.read_csv('attention-analysis/preprocessed_data_extra.csv')

features = [
    'face_movement', 'body_movement', 'eye_openness_rate',
    'eye_direction_x', 'eye_direction_y', 'mouth_openness_rate',
    'yaw_angle', 'pitch_angle', 'roll_angle'
]
INPUT_SIZE = len(features)
SEQ_LEN = 30

scaler = MinMaxScaler()
df[features] = scaler.fit_transform(df[features])

X_seq, y_seq = [], []
for user_id in df['id'].unique():
    user_df = df[df['id'] == user_id].copy()
    for i in range(len(user_df) - SEQ_LEN):
        seq = user_df[features].iloc[i:i + SEQ_LEN].values
        label = user_df['isAttentive'].iloc[i + SEQ_LEN - 1]
        X_seq.append(seq)
        y_seq.append(int(label))

X_seq, y_seq = np.array(X_seq), np.array(y_seq)
X_flat = X_seq.reshape((X_seq.shape[0], -1))
rus = RandomUnderSampler(random_state=42)
X_resampled, y_resampled = rus.fit_resample(X_flat, y_seq)
X_seq_balanced = X_resampled.reshape((-1, SEQ_LEN, INPUT_SIZE))
y_seq_balanced = y_resampled

X_train_val, X_test, y_train_val, y_test = train_test_split(
    X_seq_balanced, y_seq_balanced, test_size=0.2, stratify=y_seq_balanced, random_state=42
)
X_train, X_val, y_train, y_val = train_test_split(
    X_train_val, y_train_val, test_size=0.2, stratify=y_train_val, random_state=42
)

class SequenceDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

class CausalTransformerClassifier(nn.Module):
    def __init__(self, input_size, d_model, n_head, n_layers, dim_feedforward, dropout, num_classes=2):
        super().__init__()
        self.input_projection = nn.Linear(input_size, d_model)
        self.pos_encoder = nn.Parameter(torch.randn(1, SEQ_LEN, d_model))
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=n_head, dim_feedforward=dim_feedforward, 
            dropout=dropout, batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=n_layers)
        
        self.classifier = nn.Linear(d_model, num_classes)
        self.d_model = d_model

    def _generate_square_subsequent_mask(self, sz: int) -> torch.Tensor:
        """Gelecek adımların görülmesini engelleyen bir maske oluşturur."""
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def forward(self, src: torch.Tensor) -> torch.Tensor:
        src = self.input_projection(src) * math.sqrt(self.d_model)
        src = src + self.pos_encoder
        
        mask = self._generate_square_subsequent_mask(src.size(1)).to(src.device)
        output = self.transformer_encoder(src, mask=mask)
        
        output = output[:, -1, :]
        return self.classifier(output)

# Optuna'dan elde edilen en iyi parametreleri buraya doğrudan yazıyoruz.
best_params = {
    'learning_rate': 0.000165359106482521,
    'd_model': 48,
    'n_heads': 2,
    'n_layers': 3,
    'dropout': 0.1642622418497776,
    'batch_size': 64
}
print("\n--- En İyi Parametrelerle Final Model Eğitiliyor ---")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

X_train_final = np.concatenate((X_train, X_val), axis=0)
y_train_final = np.concatenate((y_train, y_val), axis=0)

final_train_dataset = SequenceDataset(X_train_final, y_train_final)
val_dataset = SequenceDataset(X_val, y_val) # Validation seti kayıp takibi için kullanılacak
test_dataset = SequenceDataset(X_test, y_test)

final_train_loader = DataLoader(final_train_dataset, batch_size=best_params['batch_size'], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=best_params['batch_size'])
test_loader = DataLoader(test_dataset, batch_size=best_params['batch_size'])

final_model = CausalTransformerClassifier(
    input_size=INPUT_SIZE,
    d_model=best_params['d_model'],
    n_head=best_params['n_heads'],
    n_layers=best_params['n_layers'],
    dim_feedforward=best_params['d_model'] * 4,
    dropout=best_params['dropout'],
    num_classes=2
).to(device)

optimizer = torch.optim.Adam(final_model.parameters(), lr=best_params['learning_rate'])
criterion = nn.CrossEntropyLoss()
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.2, patience=10, verbose=True)

train_losses = []
val_losses = []
FINAL_EPOCHS = 120

for epoch in range(FINAL_EPOCHS):
    # Eğitim Aşaması
    final_model.train()
    total_train_loss = 0
    for xb, yb in final_train_loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        pred = final_model(xb)
        loss = criterion(pred, yb)
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()
    avg_train_loss = total_train_loss / len(final_train_loader)
    train_losses.append(avg_train_loss)
    
    final_model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for xb, yb in val_loader:
            xb, yb = xb.to(device), yb.to(device)
            pred = final_model(xb)
            loss = criterion(pred, yb)
            total_val_loss += loss.item()
    avg_val_loss = total_val_loss / len(val_loader)
    val_losses.append(avg_val_loss)

    scheduler.step(avg_val_loss) # Scheduler'ı validation kaybına göre güncelle
    
    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch+1:03d}/{FINAL_EPOCHS} | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")

plt.figure(figsize=(12, 6))
plt.plot(train_losses, label='Eğitim Kaybı (Training Loss)')
plt.plot(val_losses, label='Doğrulama Kaybı (Validation Loss)')
plt.title('Eğitim ve Doğrulama Kaybı Grafiği (Causal Transformer)')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.show()

print("\n--- Final Test Raporu ---")
final_model.eval()
y_true, y_pred = [], []
with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(device)
        outputs = final_model(xb)
        preds = torch.argmax(outputs, dim=1).cpu().numpy()
        y_true.extend(yb.numpy())
        y_pred.extend(preds)

print(classification_report(y_true, y_pred, target_names=['Dikkatsiz (0)', 'Dikkatli (1)']))
print("\nConfusion Matrix:")
print(confusion_matrix(y_true, y_pred))

torch.save(final_model.state_dict(), "causal_transformer_model.pt")
joblib.dump(scaler, "minmax_scaler_causal.pkl")
print("Model 'causal_transformer_model.pt' olarak kaydedildi.")
print("Scaler 'minmax_scaler_causal.pkl' olarak kaydedildi.")

--- Final Test Raporu ---
               precision    recall  f1-score   support

Dikkatsiz (0)       0.81      0.80      0.80       144
 Dikkatli (1)       0.80      0.81      0.81       143

     accuracy                           0.80       287
    macro avg       0.80      0.80      0.80       287
 weighted avg       0.80      0.80      0.80       287


Confusion Matrix:
[[115  29]
 [ 27 116]]