In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay, precision_recall_fscore_support

# Paths
dataset_path = "your_path"
results_path = "your_path"

# Load the dataset
def load_data(dataset_path):
    motion_data, labels = [], []
    for file_name in sorted(os.listdir(dataset_path)):
        if file_name.startswith("._"):  # Skip macOS metadata files
            continue
        file_path = os.path.join(dataset_path, file_name)
        if file_name.startswith("moc_s") and "t-pose" not in file_name:
            try:
                parts = file_name.split("_")
                action_id = int(parts[2][1:])  # Extract action ID (e.g., "a01" -> 1)
                data = np.loadtxt(file_path, encoding="latin1")
                motion_data.append(data)
                labels.append(action_id)
            except Exception as e:
                print(f"Error reading motion file {file_name}: {e}")
    return np.array(motion_data, dtype=object), np.array(labels)

# Preprocess the data with pairwise distances and angular velocities
def preprocess_data(motion_data, labels, sequence_length=100):
    processed_data, processed_labels = [], []
    for sequence, label in zip(motion_data, labels):
        if sequence.shape[0] > sequence_length:
            sequence = sequence[:sequence_length, :]
        elif sequence.shape[0] < sequence_length:
            padding = np.zeros((sequence_length - sequence.shape[0], sequence.shape[1]))
            sequence = np.vstack((sequence, padding))
        
        # Pairwise joint distances
        num_joints = sequence.shape[1] // 3  # Assuming (x, y, z) for each joint
        distances = []
        for i in range(num_joints):
            for j in range(i + 1, num_joints):
                dist = np.linalg.norm(sequence[:, 3 * i:3 * (i + 1)] - sequence[:, 3 * j:3 * (j + 1)], axis=1)
                distances.append(dist)
        distances = np.array(distances).T

        # Angular velocities
        velocities = np.diff(sequence, axis=0, prepend=sequence[0:1])
        angular_velocity = np.linalg.norm(np.cross(sequence[:, :3], velocities[:, :3]), axis=1, keepdims=True)
        
        # Concatenate features
        sequence = np.hstack((sequence, distances, angular_velocity))
        sequence = (sequence - np.mean(sequence, axis=0)) / (np.std(sequence, axis=0) + 1e-6)  # Normalize
        
        processed_data.append(sequence)
        processed_labels.append(label - 1)  # Convert 1-based labels to 0-based
    return np.array(processed_data), np.array(processed_labels)

# Data Augmentation
def augment_data(data):
    augmented_data = []
    for sequence in data:
        # Apply random scaling
        scaled_sequence = sequence * (1 + np.random.uniform(-0.1, 0.1, sequence.shape))
        # Add Gaussian noise
        noisy_sequence = scaled_sequence + np.random.normal(0, 0.01, size=sequence.shape)
        # Temporal shifting
        shift = np.roll(noisy_sequence, np.random.randint(1, 5), axis=0)
        augmented_data.append(shift)
    return np.array(augmented_data)

# Advanced ST-GCN Model
def build_advanced_stgcn(input_shape, num_classes):
    inputs = layers.Input(shape=input_shape)

    # Spatial-Temporal Graph Convolutional Layers
    x = layers.Conv1D(64, kernel_size=3, activation="relu", padding="same")(inputs)
    x = layers.MaxPooling1D(pool_size=2)(x)
    x = layers.Conv1D(128, kernel_size=3, activation="relu", padding="same")(x)
    x = layers.Conv1D(256, kernel_size=3, activation="relu", padding="same")(x)
    
    # Attention Mechanism
    attention = layers.MultiHeadAttention(num_heads=4, key_dim=256)(x, x)
    x = layers.Add()([x, attention])
    x = layers.GlobalAveragePooling1D()(x)

    # Dense Layers
    x = layers.Dense(256, activation="relu")(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(num_classes, activation="softmax")(x)

    return models.Model(inputs, outputs)

# Load and preprocess dataset
motion_data, labels = load_data(dataset_path)
X, y = preprocess_data(motion_data, labels)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Augment training data
X_train_augmented = augment_data(X_train)
X_train_combined = np.concatenate([X_train, X_train_augmented])
y_train_combined = np.concatenate([y_train, y_train])

# Compute class weights
class_weights = compute_class_weight("balanced", classes=np.unique(y_train), y=y_train)
class_weights = dict(enumerate(class_weights))

# Build and compile the model
input_shape = X_train_combined.shape[1:]  # (sequence_length, num_features)
num_classes = len(np.unique(y))
model = build_advanced_stgcn(input_shape, num_classes)

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])
model.summary()

# Train the model
history = model.fit(
    X_train_combined, y_train_combined,
    validation_data=(X_test, y_test),
    epochs=100,
    batch_size=16,
    class_weight=class_weights,
    callbacks=[
        callbacks.EarlyStopping(patience=10, restore_best_weights=True),
        callbacks.ReduceLROnPlateau(factor=0.5, patience=5)
    ]
)

# Evaluate the model
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"Test Accuracy: {test_accuracy:.2f}")

# Save results and figures
os.makedirs(results_path, exist_ok=True)
results = {"Test Accuracy": test_accuracy, "Test Loss": test_loss}
with open(os.path.join(results_path, "results.txt"), "w") as f:
    for key, value in results.items():
        f.write(f"{key}: {value}\n")

# Save the model
model.save(os.path.join(results_path, "action_recognition_advanced_stgcn_model.h5"))

# Plot training history
plt.figure()
plt.plot(history.history["accuracy"], label="Train Accuracy")
plt.plot(history.history["val_accuracy"], label="Validation Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
plt.savefig(os.path.join(results_path, "training_history.png"))
plt.show()

plt.figure()
plt.plot(history.history["loss"], label="Train Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.savefig(os.path.join(results_path, "loss_history.png"))
plt.show()

# Additional evaluation metrics
y_pred = np.argmax(model.predict(X_test), axis=1)
report = classification_report(y_test, y_pred, target_names=[f"Class {i}" for i in range(num_classes)])
print(report)

# Save classification report
with open(os.path.join(results_path, "classification_report.txt"), "w") as f:
    f.write(report)

# Plot confusion matrix
conf_matrix = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=[f"Class {i}" for i in range(num_classes)])
plt.figure()
disp.plot(cmap="viridis", xticks_rotation=45)
plt.tight_layout()
plt.savefig(os.path.join(results_path, "confusion_matrix.png"))
plt.show()

# Precision, Recall, F1-Score bar chart
precisions, recalls, f1_scores, _ = precision_recall_fscore_support(y_test, y_pred, average=None)
plt.figure()
x = np.arange(num_classes)
plt.bar(x - 0.2, precisions, width=0.2, label="Precision")
plt.bar(x, recalls, width=0.2, label="Recall")
plt.bar(x + 0.2, f1_scores, width=0.2, label="F1-Score")
plt.xticks(x, [f"Class {i}" for i in range(num_classes)], rotation=45)
plt.xlabel("Classes")
plt.ylabel("Scores")
plt.legend()
plt.tight_layout()
plt.savefig(os.path.join(results_path, "precision_recall_f1.png"))
plt.show()

# Predicted vs True label distribution
plt.figure()
plt.hist([y_test, y_pred], bins=num_classes, label=["True Labels", "Predicted Labels"], color=["blue", "orange"], alpha=0.7)
plt.xlabel("Classes")
plt.ylabel("Frequency")
plt.legend()
plt.tight_layout()
plt.savefig(os.path.join(results_path, "label_distribution.png"))
plt.show()
