In [None]:
import os
import glob
import json
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

In [None]:
# Ordnerpfad anpassen
folder_path = "/home/haggenmueller/asl_detection/machine_learning/datasets/own_dataset/keypoints" 

# Alle JSON-Dateien im Ordner finden
json_files = glob.glob(os.path.join(folder_path, "*.json"))
print("Gefundene JSON-Dateien:", json_files)
print("Anzahl geladener JSON-Dateien:", len(json_files))

# Laden aller JSON-Dateien in eine Liste
all_data = []
for file in json_files:
    with open(file, "r") as f:
        data = json.load(f)
        all_data.append(data)

In [None]:
# Definiere die relevanten Teile
parts = ['pose', 'face', 'left_hand', 'right_hand']

# Funktion, um die maximale Länge für einen Part zu ermitteln
def get_max_length(keypoints, part):
    lengths = []
    for kp in keypoints:
        if kp.get(part) is not None and len(kp[part]) > 0:
            lengths.append(np.array(kp[part]).flatten().shape[0])
    return max(lengths) if lengths else 0

# Bestimme die globalen maximalen Längen für jeden Part über alle Dateien
global_expected = {part: 0 for part in parts}
for data in all_data:
    if data.get("keypoints"):
        for part in parts:
            max_len = get_max_length(data["keypoints"], part)
            if max_len > global_expected[part]:
                global_expected[part] = max_len
print("Global expected lengths:", global_expected)

# Funktion zur Feature-Extraktion aus den Keypoints
def extract_features(keypoints, expected_lengths):
    features = []
    for kp in keypoints:
        frame_features = []
        for part in parts:
            if kp.get(part) is not None and len(kp[part]) > 0:
                vals = np.array(kp[part]).flatten().tolist()
                frame_features.extend(vals)
            else:
                frame_features.extend([0] * expected_lengths[part])
        features.append(frame_features)
    return np.array(features)

# Erstelle eine Liste von Tensoren (eine Sequenz pro JSON) und speichere die zugehörigen Labels
feature_list = []
filtered_labels = []
for data in all_data:
    if not data.get("keypoints"):
        continue
    features = extract_features(data["keypoints"], global_expected)
    if features.size == 0:
        continue
    tensor_feat = torch.tensor(features, dtype=torch.float32)
    feature_list.append(tensor_feat)
    filtered_labels.append(data["gloss"])

if not feature_list:
    raise ValueError("Keine gültigen Feature-Tensoren gefunden!")

# Padding: Alle Sequenzen auf gleiche Timesteps-Länge bringen
X_tensor = pad_sequence(feature_list, batch_first=True)
print("Shape der gepaddeten Features:", X_tensor.shape)

# Normalisierung: Über alle Elemente
mean = X_tensor.mean()
std = X_tensor.std() + 1e-5  # Vermeidung von Division durch Null
X_tensor = (X_tensor - mean) / std
print("Features normalisiert.")

In [None]:
# Label-Encoding
le = LabelEncoder()
labels_encoded = le.fit_transform(filtered_labels)
y_tensor = torch.tensor(labels_encoded, dtype=torch.long)
print("Enkodierte Labels:", labels_encoded)
print("Anzahl Klassen:", len(le.classes_))

# Stratifizierte Aufteilung: Testgröße so wählen, dass jede Klasse mindestens vertreten ist
# Bei 15 Samples und 5 Klassen verwenden wir beispielsweise 33% (ca. 5 Samples) für Validierung
X_train, X_val, y_train, y_val = train_test_split(
    X_tensor, y_tensor, test_size=0.33, random_state=42, stratify=y_tensor
)
print("Train Samples:", X_train.shape[0], "Validation Samples:", X_val.shape[0])

In [None]:
class SimpleLSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(SimpleLSTMClassifier, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=1, batch_first=True)
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(hidden_size, num_classes)
        
    def forward(self, x):
        # x: (batch, timesteps, features)
        out, _ = self.lstm(x)  # out: (batch, timesteps, hidden_size)
        out = out[:, -1, :]    # Letzter Zeitschritt
        out = self.dropout(out)
        out = self.fc(out)
        return out

input_size = X_tensor.shape[2]
hidden_size = 32  # Reduzierte Hidden Size
num_classes = len(le.classes_)  # Sollte 5 ergeben
model = SimpleLSTMClassifier(input_size, hidden_size, num_classes)
print(model)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
num_epochs = 200
patience = 10  # Frühstopp, falls der Validierungsloss 10 Epochen lang nicht sinkt
best_val_loss = float('inf')
patience_counter = 0

for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_train)
    loss = criterion(outputs, y_train)
    loss.backward()
    optimizer.step()
    
    # Evaluation im Trainings- und Validierungsmodus
    model.eval()
    with torch.no_grad():
        train_pred = model(X_train)
        train_loss = criterion(train_pred, y_train)
        train_acc = (torch.argmax(train_pred, dim=1) == y_train).float().mean().item()
        
        val_pred = model(X_val)
        val_loss = criterion(val_pred, y_val)
        val_acc = (torch.argmax(val_pred, dim=1) == y_val).float().mean().item()
    
    print(f"Epoch {epoch+1:03d}: Train Loss {train_loss.item():.4f}, Train Acc {train_acc*100:.2f}%, "
          f"Val Loss {val_loss.item():.4f}, Val Acc {val_acc*100:.2f}%")
    
    # Early Stopping
    if val_loss.item() < best_val_loss:
        best_val_loss = val_loss.item()
        patience_counter = 0
        best_model_state = model.state_dict()
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("Early stopping triggered.")
            break

# Laden des besten Modells
model.load_state_dict(best_model_state)

In [None]:
model.eval()
with torch.no_grad():
    pred = model(X_val)
    predicted_classes = torch.argmax(pred, dim=1).numpy()
    predicted_labels = le.inverse_transform(predicted_classes)
    print("Vorhersage auf dem Validierungssatz:", predicted_labels)

# Modell speichern
torch.save(model.state_dict(), "lstm_model.pth")
print("Modell gespeichert.")