In [2]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional, BatchNormalization, Attention
from tensorflow.keras.utils import to_categorical
from imblearn.over_sampling import SMOTE
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import AdamW
from tensorflow.keras.optimizers.schedules import ExponentialDecay


# Step 1: Load Data
data = pd.read_csv(r'.\keypoints_output_with_filename.csv')

# Step 2: Preprocess Data
# Remove 'fall' from 'image_fall_num' and convert to integer
data['image_fall_num'] = data['image_fall_num'].str.replace('fall', '').astype(int)

# Sort and group by 'image_fall_num'
data = data.sort_values(by=['image_fall_num', 'image_num'])  # Assume 'frame_index' indicates temporal order
groups = data.groupby('image_fall_num')

# Extract sequences and labels
sequences = []
labels = []
for _, group in groups:
    # Select only numeric columns and handle non-numeric values
    numeric_data = group.iloc[:, 2:-1].apply(pd.to_numeric, errors='coerce')  # Convert to numeric, NaN if not possible
    numeric_data = numeric_data.fillna(0)  # Replace NaN with 0 or another strategy
    sequences.append(numeric_data.values)
    
    # Store all labels in the group instead of just one
    labels.extend(group['label'].values)  # Append all labels from the group

# Padding sequences to the same length
max_seq_length = max(len(seq) for seq in sequences)
num_features = sequences[0].shape[1]

padded_sequences = np.zeros((len(labels), max_seq_length, num_features))
label_array = np.array(labels, dtype=int)

idx = 0
for i, seq in enumerate(sequences):
    for _ in range(len(seq)):
        padded_sequences[idx, :len(seq), :] = seq
        idx += 1

# Ensure at least two classes before applying SMOTE
if len(np.unique(label_array)) < 2:
    minority_class = 1 if 1 in label_array else 2
    label_array = np.concatenate([label_array, np.full(5, minority_class)])  # Add missing class

# Check class distribution before SMOTE
print("Original class distribution:", np.unique(label_array, return_counts=True))

# Normalize the sequences
scaler = MinMaxScaler()
num_samples, seq_length, num_features = padded_sequences.shape
reshaped_sequences = padded_sequences.reshape(-1, num_features)
norm_sequences = scaler.fit_transform(reshaped_sequences)
norm_sequences = norm_sequences.reshape(num_samples, seq_length, num_features)

# Apply SMOTE BEFORE train-test split
unique_classes = np.unique(label_array)
if len(unique_classes) > 1:
    smote = SMOTE(sampling_strategy={1: 4000}, random_state=42)
    norm_sequences_reshaped = norm_sequences.reshape(num_samples, -1)  # 2D 변환
    norm_sequences_resampled, labels_resampled = smote.fit_resample(norm_sequences_reshaped, label_array)
    # Reshape back to 3D
    num_samples_resampled = norm_sequences_resampled.shape[0]
    norm_sequences_resampled = norm_sequences_resampled.reshape(num_samples_resampled, max_seq_length, num_features)
    print("SMOTE applied.")
else:
    print("SMOTE not applied as only one class is present in dataset")
    norm_sequences_resampled, labels_resampled = norm_sequences, label_array

# Train-test split AFTER SMOTE
X_train, X_test, y_train, y_test = train_test_split(
    norm_sequences_resampled, labels_resampled, test_size=0.2, random_state=42, stratify=labels_resampled
)

# Check class distribution in training data
print("Training class distribution:", np.unique(y_train, return_counts=True))

# Data Augmentation: Add Time Noise

def add_time_noise(data, noise_level=0.05):
    noise = np.random.normal(loc=0, scale=noise_level, size=data.shape)
    return data + noise

X_train = np.array([add_time_noise(seq) for seq in X_train])
print("Time noise added for data augmentation.")

# Convert labels back to one-hot encoding
y_train = to_categorical(y_train, num_classes=3)
y_test = to_categorical(y_test, num_classes=3)

# Step 3: Define the Model
model = Sequential([
    Bidirectional(LSTM(64, return_sequences=True, input_shape=(max_seq_length, num_features))),
    Dropout(0.3),
    BatchNormalization(),
    Bidirectional(LSTM(32, return_sequences=True)),
    Dropout(0.3),
    BatchNormalization(),
    LSTM(32, return_sequences=False),
    Dropout(0.2),
    Dense(16, activation='relu'),
    Dense(3, activation='softmax')  # Multi-class classification
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Step 4: Train the Model


early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

history = model.fit(
    X_train, y_train,
    epochs=100,
    batch_size=32,
    validation_split=0.2,
    verbose=1,
    callbacks=[early_stopping]
)

# Step 5: Evaluate the Model
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Test Loss: {loss}, Test Accuracy: {accuracy}")

# Save the model
#model.save('fall_detection_model.keras')


Original class distribution: (array([0, 1, 2]), array([8073,  837, 4894], dtype=int64))
SMOTE applied.
Training class distribution: (array([0, 1, 2]), array([6458, 3200, 3915], dtype=int64))
Time noise added for data augmentation.


  super().__init__(**kwargs)


Epoch 1/100
[1m340/340[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 67ms/step - accuracy: 0.4621 - loss: 1.0596 - val_accuracy: 0.4843 - val_loss: 1.0224
Epoch 2/100
[1m340/340[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 67ms/step - accuracy: 0.4822 - loss: 1.0223 - val_accuracy: 0.4530 - val_loss: 1.1250
Epoch 3/100
[1m340/340[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m25s[0m 72ms/step - accuracy: 0.4606 - loss: 1.0358 - val_accuracy: 0.4869 - val_loss: 1.0060
Epoch 4/100
[1m340/340[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m24s[0m 69ms/step - accuracy: 0.5008 - loss: 1.0060 - val_accuracy: 0.5274 - val_loss: 0.9790
Epoch 5/100
[1m340/340[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m25s[0m 73ms/step - accuracy: 0.5160 - loss: 0.9811 - val_accuracy: 0.4961 - val_loss: 1.0286
Epoch 6/100
[1m340/340[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m25s[0m 74ms/step - accuracy: 0.5365 - loss: 0.9686 - val_accuracy: 0.5422 - val_loss: 0.9737
Epoch 7/10

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from imblearn.over_sampling import SMOTE
from tensorflow.keras.utils import to_categorical

# Step 1: Load Data
data = pd.read_csv(r'.\keypoints_output_with_filename.csv')

# Step 2: Preprocess Data
data['image_fall_num'] = data['image_fall_num'].str.replace('fall', '').astype(int)
data = data.sort_values(by=['image_fall_num', 'image_num'])  
groups = data.groupby('image_fall_num')

# Extract sequences and labels
sequences = []
labels = []
for _, group in groups:
    numeric_data = group.iloc[:, 2:-1].apply(pd.to_numeric, errors='coerce')
    numeric_data = numeric_data.fillna(0)
    sequences.append(numeric_data.values)
    labels.extend(group['label'].values)  

# Padding sequences to the same length
max_seq_length = max(len(seq) for seq in sequences)
num_features = sequences[0].shape[1]

padded_sequences = np.zeros((len(labels), max_seq_length, num_features))
label_array = np.array(labels, dtype=int)

idx = 0
for i, seq in enumerate(sequences):
    for _ in range(len(seq)):
        padded_sequences[idx, :len(seq), :] = seq
        idx += 1

# Normalize the sequences
scaler = MinMaxScaler()
num_samples, seq_length, num_features = padded_sequences.shape
reshaped_sequences = padded_sequences.reshape(-1, num_features)
norm_sequences = scaler.fit_transform(reshaped_sequences)
norm_sequences = norm_sequences.reshape(num_samples, seq_length, num_features)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
    norm_sequences, label_array, test_size=0.2, random_state=42, stratify=label_array
)

# Convert labels to one-hot encoding
y_train = to_categorical(y_train, num_classes=3)
y_test = to_categorical(y_test, num_classes=3)

# PyTorch Model Definition
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.pe = nn.Parameter(torch.randn(1, max_len, d_model))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

class TransformerEncoderModel(nn.Module):
    def __init__(self, input_dim, embed_dim, num_heads, ff_dim, num_layers, output_dim):
        super(TransformerEncoderModel, self).__init__()
        self.embedding = nn.Linear(input_dim, embed_dim)
        self.pos_encoder = PositionalEncoding(embed_dim)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim, 
            nhead=num_heads, 
            dim_feedforward=ff_dim, 
            dropout=0.1,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(embed_dim, output_dim)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.embedding(x)
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = self.fc(x[:, -1, :])  
        return self.softmax(x)

# Define Model Parameters
input_dim = num_features
embed_dim = 64
num_heads = 8
ff_dim = 1024
num_layers = 6
output_dim = 3  # Multi-class classification

# Instantiate Model
model = TransformerEncoderModel(input_dim, embed_dim, num_heads, ff_dim, num_layers, output_dim)

# Loss Function & Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=10)

# Convert Data to PyTorch Tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(np.argmax(y_train, axis=1), dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(np.argmax(y_test, axis=1), dtype=torch.long)

# PyTorch DataLoader
from torch.utils.data import TensorDataset, DataLoader

batch_size = 64
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Training Loop
num_epochs = 100
best_val_loss = float("inf")
early_stopping_counter = 0
patience = 10

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for batch_x, batch_y in train_loader:
        optimizer.zero_grad()
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == batch_y).sum().item()
        total += batch_y.size(0)


    train_accuracy = correct / total
    avg_loss = total_loss / len(train_loader)

    # Validation
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        
        for batch_x, batch_y in test_loader:
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            val_correct += (predicted == batch_y).sum().item()
            val_total += batch_y.size(0)
    val_accuracy = val_correct / val_total
    avg_val_loss = val_loss / len(test_loader)

    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_loss:.4f}, Train Acc: {train_accuracy:.4f}, "
          f"Val Loss: {avg_val_loss:.4f}, Val Acc: {val_accuracy:.4f}")

    scheduler.step(avg_val_loss)

    # Early Stopping
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        early_stopping_counter = 0
        torch.save(model.state_dict(), "best_transformer_model.pth")
    else:
        early_stopping_counter += 1

    if early_stopping_counter >= patience:
        print("Early stopping triggered.")
        break

print("Training Complete. Best model saved.")

# Load Best Model
model.load_state_dict(torch.load("best_transformer_model.pth"))

# Test Model Performance
model.eval()
test_correct = 0
test_total = 0

with torch.no_grad():
    for batch_x, batch_y in test_loader:
        outputs = model(batch_x)
        _, predicted = torch.max(outputs, 1)
        test_correct += (predicted == batch_y).sum().item()
        test_total += batch_y.size(0)

test_accuracy = test_correct / test_total
print(f"Test Accuracy: {test_accuracy:.4f}")
