In [8]:
# Importe
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from utils import load_dat_file


# Die Klasse macht aus Numpy-Daten (X_train_seq, y_train) ein PyTorch kompatibles Dataset
class GasSensorDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
            return len(self.X)

    def __getitem__(self, idx):
            return self.X[idx], self.y[idx]

In [9]:
# LSTM-Modell Klasse
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, lr=0.001):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(self.parameters(), lr=lr)

    def forward(self, x):
        # Erwartet: (Batch, SeqLen, Features)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        out, _ = self.lstm(x, (h0, c0))
        out = out[:, -1, :]  # letzter Zeitschritt
        out = self.fc(out)
        return out

    def train_step(self, batch):
        self.optimizer.zero_grad()
        X_batch, y_batch = batch

        device = next(self.parameters()).device
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)

        outputs = self(X_batch)
        loss = self.criterion(outputs, y_batch)
        loss.backward()
        self.optimizer.step()
        return loss.item()

    def train_model(self, train_loader, epochs):
        for epoch in range(epochs):
            total_loss = 0
            for batch in train_loader:
                loss = self.train_step(batch)
                total_loss += loss
            print(f"Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(train_loader):.4f}")

    def evaluate(self, test_loader):
        self.eval()
        correct, total = 0, 0
        device = next(self.parameters()).device
        with torch.no_grad():
            for X_batch, y_batch in test_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                outputs = self(X_batch)
                _, predicted = torch.max(outputs.data, 1)
                total += y_batch.size(0)
                correct += (predicted == y_batch).sum().item()
        accuracy = correct / total
        print(f"Test Accuracy: {accuracy:.4f}")
        self.train()
        return accuracy

    def save_model(self,path):
        torch.save(self.state_dict(), path)

    def load_model(self, path, map_location=None):
        self.load_state_dict(torch.load(path, map_location=map_location))
        

In [10]:
# Datenvorbereitung

file = r"gas+sensor+array+drift+dataset/Dataset/batch1.dat"
df = load_dat_file(file)

# Features & Labels
X = df.iloc[:, 0:128].values
y = df.iloc[:, 128].values

# Normalisieren
scaler = MinMaxScaler(feature_range=(-1, 1))
X = scaler.fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# Sequenzlänge 
SEQ_LEN = 50

# Funktion zum Erstellen von Sequenzen aus den Daten
def create_sequences(X, y, seq_len):
    xs, ys = [], []
    for i in range(len(X) - seq_len + 1):
        xs.append(X[i:i+seq_len])
        ys.append(y[i+seq_len-1])
    return np.array(xs), np.array(ys)

# Sequenzen erstellen
X_train_seq, y_train_seq = create_sequences(X_train, y_train - 1, SEQ_LEN) # Labels 0-basiert
X_test_seq, y_test_seq = create_sequences(X_test, y_test - 1, SEQ_LEN)

# PyTorch Dataset + Loader
train_dataset = GasSensorDataset(X_train_seq, y_train_seq)
test_dataset = GasSensorDataset(X_test_seq, y_test_seq)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


In [12]:
# Training starten

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

model = LSTM(input_size=128, hidden_size=64, num_layers=1, num_classes=6, lr=0.0005)
model.to(device)

# Trainieren
model.train_model(train_loader, epochs=30)

# Evaluieren
model.evaluate(test_loader)

# speichern
model.save_model("lstm_batch1.pth")

Using device: cpu
Epoch [1/30], Loss: 1.7812
Epoch [2/30], Loss: 1.7275
Epoch [3/30], Loss: 1.6871
Epoch [4/30], Loss: 1.6461
Epoch [5/30], Loss: 1.6107
Epoch [6/30], Loss: 1.5692
Epoch [7/30], Loss: 1.5344
Epoch [8/30], Loss: 1.4910
Epoch [9/30], Loss: 1.4501
Epoch [10/30], Loss: 1.4050
Epoch [11/30], Loss: 1.3599
Epoch [12/30], Loss: 1.3194
Epoch [13/30], Loss: 1.2809
Epoch [14/30], Loss: 1.2360
Epoch [15/30], Loss: 1.1988
Epoch [16/30], Loss: 1.1549
Epoch [17/30], Loss: 1.1217
Epoch [18/30], Loss: 1.0866
Epoch [19/30], Loss: 1.0460
Epoch [20/30], Loss: 1.0202
Epoch [21/30], Loss: 0.9827
Epoch [22/30], Loss: 0.9533
Epoch [23/30], Loss: 0.9196
Epoch [24/30], Loss: 0.8924
Epoch [25/30], Loss: 0.8698
Epoch [26/30], Loss: 0.8394
Epoch [27/30], Loss: 0.8219
Epoch [28/30], Loss: 0.7920
Epoch [29/30], Loss: 0.7682
Epoch [30/30], Loss: 0.7502
Test Accuracy: 0.8500


In [56]:
# Für Batch 1 scheint SeqLen von 1 bis 10 optimal zu sein, da Messungen relativ stabil und unabhängig genug sind