In [2]:
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

A = 1
omega = 2 * np.pi * 50
T = 1 / 50
t = np.linspace(0, 1, 1000)
phases = [0, np.pi/6, np.pi/4, np.pi/3, np.pi/2, 2*np.pi/3, 3*np.pi/4, 5*np.pi/6, np.pi,
          7*np.pi/6, 5*np.pi/4, 4*np.pi/3, 3*np.pi/2, 2*np.pi]
np.random.seed(0)  # For reproducibility

def u(t):
    return np.heaviside(t, 1)

def pure_sine(t, phi, noise=False):
    signal = A * np.sin(omega * t + phi)
    if noise:
        signal += np.random.normal(0, 0.1, len(t))
    return signal

def sag(t, alpha, t1, t2, phi, noise=False):
    signal = (1 - alpha * (u(t - t1) - u(t - t2))) * np.sin(omega * t + phi)
    if noise:
        signal += np.random.normal(0, 0.1, len(t))
    return signal

def swell(t, alpha, t1, t2, phi, noise=False):
    signal = (1 + alpha * (u(t - t1) - u(t - t2))) * np.sin(omega * t + phi)
    if noise:
        signal += np.random.normal(0, 0.1, len(t))
    return signal

def interruption(t, alpha, t1, t2, phi, noise=False):
    signal = alpha * (u(t - t1) - u(t - t2)) * np.sin(omega * t + phi)
    if noise:
        signal += np.random.normal(0, 0.1, len(t))
    return signal

def harmonics(t, alpha1, alpha3, alpha5, alpha7, phi, noise=False):
    signal = alpha1 * np.sin(omega * t + phi) + alpha3 * np.sin(3 * omega * t + phi) + alpha5 * np.sin(5 * omega * t + phi) + alpha7 * np.sin(7 * omega * t + phi)
    if noise:
        signal += np.random.normal(0, 0.1, len(t))
    return signal

def oscillatory_transient(t, alpha, t1, tau, omega_n, phi, noise=False):
    signal = np.sin(omega * t + phi) + alpha * np.exp(-(t - t1) / tau) * np.sin(omega_n * (t - t1) + phi) * (u(t - t1) - u(t - t1 - 3 * tau))
    if noise:
        signal += np.random.normal(0, 0.1, len(t))
    return signal

def flicker(t, alpha_f, beta, phi, noise=False):
    signal = (1 + alpha_f * np.sin(beta * t)) * np.sin(omega * t + phi)
    if noise:
        signal += np.random.normal(0, 0.1, len(t))
    return signal

def impulse_transient(t, K, t1, t2, phi, noise=False):
    signal = np.sin(omega * t + phi) + np.sign(np.sin(omega * t + phi)) * K * (u(t - t1) - u(t - t2))
    if noise:
        signal += np.random.normal(0, 0.1, len(t))
    return signal

def notch(t, K, t1, t2, phi, noise=False):
    signal = np.sin(omega * t + phi) - np.sign(np.sin(omega * t + phi)) * K * (u(t - t1) - u(t - t2))
    if noise:
        signal += np.random.normal(0, 0.1, len(t))
    return signal

def sag_with_harmonics(t, alpha, t1, t2, alpha1, alpha3, alpha5, alpha7, phi, noise=False):
    signal = sag(t, alpha, t1, t2, phi) + harmonics(t, alpha1, alpha3, alpha5, alpha7, phi)
    if noise:
        signal += np.random.normal(0, 0.1, len(t))
    return signal

def swell_with_harmonics(t, alpha, t1, t2, alpha1, alpha3, alpha5, alpha7, phi, noise=False):
    signal = swell(t, alpha, t1, t2, phi) + harmonics(t, alpha1, alpha3, alpha5, alpha7, phi)
    if noise:
        signal += np.random.normal(0, 0.1, len(t))
    return signal

def sag_with_oscillation(t, alpha, phi, beta, t3, omega_n, noise=False):
    signal = A * np.sin(omega * t - phi) * (1 - alpha * (u(t - t3) - u(t - t3 - 3 * T))) + beta * np.exp(-(t - t3) / T) * np.sin(omega_n * (t - t3) + phi) * (u(t - t3) - u(t - t3 - 3 * T))
    if noise:
        signal += np.random.normal(0, 0.1, len(t))
    return signal

def swell_with_oscillation(t, alpha, phi, beta, t3, omega_n, noise=False):
    signal = A * np.sin(omega * t - phi) * (1 + alpha * (u(t - t3) - u(t - t3 - 3 * T))) + beta * np.exp(-(t - t3) / T) * np.sin(omega_n * (t - t3) + phi) * (u(t - t3) - u(t - t3 - 3 * T))
    if noise:
        signal += np.random.normal(0, 0.1, len(t))
    return signal

def swell_with_harmonics_oscillations(t, alpha, t1, t2, alpha1, alpha3, alpha5, alpha7, beta, phi, t3, omega_n, noise=False):
    signal = swell_with_harmonics(t, alpha, t1, t2, alpha1, alpha3, alpha5, alpha7, phi) + beta * np.exp(-(t - t3) / T) * np.sin(omega_n * (t - t3) + phi) * (u(t - t3) - u(t - t3 - 3 * T))
    if noise:
        signal += np.random.normal(0, 0.1, len(t))
    return signal

def sag_with_harmonics_oscillations(t, alpha, t1, t2, alpha1, alpha3, alpha5, alpha7, beta, phi, t3, omega_n, noise=False):
    signal = sag_with_harmonics(t, alpha, t1, t2, alpha1, alpha3, alpha5, alpha7, phi) + beta * np.exp(-(t - t3) / T) * np.sin(omega_n * (t - t3) + phi) * (u(t - t3) - u(t - t3 - 3 * T))
    if noise:
        signal += np.random.normal(0, 0.1, len(t))
    return signal

def plot_pq_event(event_func, t, save_path, *args):
    plt.figure(figsize=(10, 4))
    plt.plot(t, event_func(t, *args))
    plt.axis('off')
    plt.savefig(save_path, bbox_inches='tight', pad_inches=0)
    plt.close()

def generate_dataset():
    dataset = []
    labels = []
    
    for phi in phases:
        dataset.append((pure_sine(t, phi), 'PureSine', phi, False))
        dataset.append((pure_sine(t, phi, noise=True), 'PureSine', phi, True))

    for alpha in np.linspace(0.1, 0.8, 5):
        for phi in phases:
            dataset.append((sag(t, alpha, 0.2, 0.5, phi), 'Sag', phi, False))
            dataset.append((sag(t, alpha, 0.2, 0.5, phi, noise=True), 'Sag', phi, True))
        
    for alpha in np.linspace(0.1, 0.8, 5):
        for phi in phases:
            dataset.append((swell(t, alpha, 0.2, 0.5, phi), 'Swell', phi, False))
            dataset.append((swell(t, alpha, 0.2, 0.5, phi, noise=True), 'Swell', phi, True))

    for alpha in np.linspace(0.9, 1.0, 2):
        for phi in phases:
            dataset.append((interruption(t, alpha, 0.2, 0.5, phi), 'Interruption', phi, False))
            dataset.append((interruption(t, alpha, 0.2, 0.5, phi, noise=True), 'Interruption', phi, True))
        
    for alpha1 in np.linspace(0.9, 1.0, 2):
        for phi in phases:
            alpha3 = 0.3
            alpha5 = 0.2
            alpha7 = 0.1
            dataset.append((harmonics(t, alpha1, alpha3, alpha5, alpha7, phi), 'Harmonics', phi, False))
            dataset.append((harmonics(t, alpha1, alpha3, alpha5, alpha7, phi, noise=True), 'Harmonics', phi, True))

    for alpha in np.linspace(0.1, 0.3, 3):
        for phi in phases:
            dataset.append((oscillatory_transient(t, alpha, 0.2, 0.01, 300, phi), 'OscillatoryTransient', phi, False))
            dataset.append((oscillatory_transient(t, alpha, 0.2, 0.01, 300, phi, noise=True), 'OscillatoryTransient', phi, True))

    for alpha_f in np.linspace(0.1, 0.2, 2):
        for phi in phases:
            dataset.append((flicker(t, alpha_f, 5 * omega, phi), 'Flicker', phi, False))
            dataset.append((flicker(t, alpha_f, 5 * omega, phi, noise=True), 'Flicker', phi, True))
        
    for K in np.linspace(0.1, 0.4, 4):
        for phi in phases:
            dataset.append((impulse_transient(t, K, 0.2, 0.5, phi), 'ImpulseTransient', phi, False))
            dataset.append((impulse_transient(t, K, 0.2, 0.5, phi, noise=True), 'ImpulseTransient', phi, True))
        
    for K in np.linspace(0.1, 0.4, 4):
        for phi in phases:
            dataset.append((notch(t, K, 0.2, 0.5, phi), 'Notch', phi, False))
            dataset.append((notch(t, K, 0.2, 0.5, phi, noise=True), 'Notch', phi, True))
        
    for alpha in np.linspace(0.1, 0.9, 3):
        for phi in phases:
            alpha1, alpha3, alpha5, alpha7 = 0.9, 0.1, 0.1, 0.1
            dataset.append((sag_with_harmonics(t, alpha, 0.2, 0.5, alpha1, alpha3, alpha5, alpha7, phi), 'SagWithHarmonics', phi, False))
            dataset.append((sag_with_harmonics(t, alpha, 0.2, 0.5, alpha1, alpha3, alpha5, alpha7, phi, noise=True), 'SagWithHarmonics', phi, True))
        
    for alpha in np.linspace(1.1, 1.8, 3):
        for phi in phases:
            alpha1, alpha3, alpha5, alpha7 = 0.9, 0.1, 0.1, 0.1
            dataset.append((swell_with_harmonics(t, alpha, 0.2, 0.5, alpha1, alpha3, alpha5, alpha7, phi), 'SwellWithHarmonics', phi, False))
            dataset.append((swell_with_harmonics(t, alpha, 0.2, 0.5, alpha1, alpha3, alpha5, alpha7, phi, noise=True), 'SwellWithHarmonics', phi, True))

    for alpha in np.linspace(0.9, 1.0, 2):
        for phi in phases:
            dataset.append((sag_with_oscillation(t, alpha, phi, 0.1, 0.2, 300), 'SagWithOscillation', phi, False))
            dataset.append((sag_with_oscillation(t, alpha, phi, 0.1, 0.2, 300, noise=True), 'SagWithOscillation', phi, True))
        
    for alpha in np.linspace(1.1, 1.8, 3):
        for phi in phases:
            dataset.append((swell_with_oscillation(t, alpha, phi, 0.1, 0.2, 300), 'SwellWithOscillation', phi, False))
            dataset.append((swell_with_oscillation(t, alpha, phi, 0.1, 0.2, 300, noise=True), 'SwellWithOscillation', phi, True))
        
    for alpha in np.linspace(0.9, 1.8, 3):
        for phi in phases:
            alpha1, alpha3, alpha5, alpha7 = 0.9, 0.1, 0.1, 0.1
            dataset.append((swell_with_harmonics_oscillations(t, alpha, 0.2, 0.5, alpha1, alpha3, alpha5, alpha7, 0.1, phi, 0.2, 300), 'SwellWithHarmonicsOscillations', phi, False))
            dataset.append((swell_with_harmonics_oscillations(t, alpha, 0.2, 0.5, alpha1, alpha3, alpha5, alpha7, 0.1, phi, 0.2, 300, noise=True), 'SwellWithHarmonicsOscillations', phi, True))
        
    for alpha in np.linspace(0.9, 1.8, 3):
        for phi in phases:
            alpha1, alpha3, alpha5, alpha7 = 0.9, 0.1, 0.1, 0.1
            dataset.append((sag_with_harmonics_oscillations(t, alpha, 0.2, 0.5, alpha1, alpha3, alpha5, alpha7, 0.1, phi, 0.2, 300), 'SagWithHarmonicsOscillations', phi, False))
            dataset.append((sag_with_harmonics_oscillations(t, alpha, 0.2, 0.5, alpha1, alpha3, alpha5, alpha7, 0.1, phi, 0.2, 300, noise=True), 'SagWithHarmonicsOscillations', phi, True))

    return dataset

train_dir = 'output_new/train'
test_dir = 'output_new/test'

def ensure_min_samples_per_class(dataset, min_samples=2):
    unique_labels = set([item[1] for item in dataset])  # Label is always the second element
    for label in unique_labels:
        count = sum(1 for item in dataset if item[1] == label)
        if count < min_samples:
            idx = next(i for i, item in enumerate(dataset) if item[1] == label)
            for _ in range(min_samples - count):
                dataset.append(dataset[idx])
    return dataset

dataset = generate_dataset()

dataset = ensure_min_samples_per_class(dataset)
X = [item[0] for item in dataset]
y = [item[1] for item in dataset]
phis = [item[2] for item in dataset]
noise = [item[3] if len(item) > 3 else False for item in dataset]

X_train, X_test, y_train, y_test, phi_train, phi_test, noise_train, noise_test = train_test_split(
    X, y, phis, noise, test_size=0.3, random_state=42, stratify=y
)

# Create directories
train_dir = 'output_new/train'
test_dir = 'output_new/test'
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

# Function to save both noisy and non-noisy versions
def save_pq_event(data, label, phi, is_noisy, save_dir, index):
    base_path = os.path.join(save_dir, label)
    os.makedirs(base_path, exist_ok=True)
    
    # Save non-noisy version
    save_path = os.path.join(base_path, f"{label}_WithoutNoise_{index}.png")
    print(save_path)
    plot_pq_event(lambda t, p: data, t, save_path, phi)
    
    # Save noisy version
    if is_noisy:
        noisy_data = data + np.random.normal(0, 0.1, len(data))
        save_path = os.path.join(base_path, f"{label}_WithNoise_{index}.png")
        plot_pq_event(lambda t, p: noisy_data, t, save_path, phi)

for i, (data, label, phi, is_noisy) in enumerate(zip(X_train, y_train, phi_train, noise_train)):
    save_pq_event(data, label, phi, is_noisy, train_dir, i+1)

for i, (data, label, phi, is_noisy) in enumerate(zip(X_test, y_test, phi_test, noise_test)):
    save_pq_event(data, label, phi, is_noisy, test_dir, i+1)

print("Dataset generation and saving completed.")


output_new/train\SwellWithHarmonics\SwellWithHarmonics_WithoutNoise_1.png
output_new/train\Sag\Sag_WithoutNoise_2.png
output_new/train\Sag\Sag_WithoutNoise_3.png
output_new/train\ImpulseTransient\ImpulseTransient_WithoutNoise_4.png
output_new/train\SagWithHarmonics\SagWithHarmonics_WithoutNoise_5.png
output_new/train\PureSine\PureSine_WithoutNoise_6.png
output_new/train\Interruption\Interruption_WithoutNoise_7.png
output_new/train\OscillatoryTransient\OscillatoryTransient_WithoutNoise_8.png
output_new/train\SagWithHarmonicsOscillations\SagWithHarmonicsOscillations_WithoutNoise_9.png
output_new/train\Swell\Swell_WithoutNoise_10.png
output_new/train\Swell\Swell_WithoutNoise_11.png
output_new/train\Swell\Swell_WithoutNoise_12.png
output_new/train\ImpulseTransient\ImpulseTransient_WithoutNoise_13.png
output_new/train\Notch\Notch_WithoutNoise_14.png
output_new/train\SwellWithHarmonics\SwellWithHarmonics_WithoutNoise_15.png
output_new/train\ImpulseTransient\ImpulseTransient_WithoutNoise_16.p

In [29]:
import torch
import torchvision.transforms as transforms
from PIL import Image
import numpy as np
import os
from efficientnet_pytorch import EfficientNet

model = EfficientNet.from_pretrained('efficientnet-b7')
model._fc = torch.nn.Identity()
model.eval()

transform = transforms.Compose([
    transforms.RandomResizedCrop(600, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    transforms.RandomErasing(p=0.2, scale=(0.02, 0.33), ratio=(0.3, 3.3), value='random')
])

test_transform = transforms.Compose([
    transforms.Resize((600, 600)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def extract_features(image_path, model, transform):
    image = Image.open(image_path).convert('RGB')
    image = transform(image).unsqueeze(0)
    with torch.no_grad():
        features = model(image)
    return features.squeeze().numpy()

train_dir = 'output_new/train'
test_dir = 'output_new/test'

all_train_features = []
all_train_features_labels = []

all_test_features = []
all_test_features_labels = []

for label in os.listdir(train_dir):
    label_dir = os.path.join(train_dir, label)
    if os.path.isdir(label_dir):
        for image_name in os.listdir(label_dir):
            image_path = os.path.join(label_dir, image_name)
            features = extract_features(image_path, model, transform)
            all_train_features.append(features)
            all_train_features_labels.append(os.listdir(test_dir).index(label))

for label in os.listdir(test_dir):
    label_dir = os.path.join(test_dir, label)
    if os.path.isdir(label_dir):
        for image_name in os.listdir(label_dir):
            image_path = os.path.join(label_dir, image_name)
            features = extract_features(image_path, model, test_transform)
            all_test_features.append(features)
            all_test_features_labels.append(os.listdir(test_dir).index(label))

all_train_features = np.array(all_train_features)
all_train_features_labels = np.array(all_train_features_labels)

all_test_features = np.array(all_test_features)
all_test_features_labels = np.array(all_test_features_labels)

print("Feature extraction completed.")
print("Train features shape:", all_train_features.shape)
print("Train labels shape:", all_train_features_labels.shape)
print("Test features shape:", all_test_features.shape)
print("Test labels shape:", all_test_features_labels.shape)

Loaded pretrained weights for efficientnet-b0
Feature extraction completed.
Train features shape: (1328, 1280)
Train labels shape: (1328,)
Test features shape: (562, 1280)
Test labels shape: (562,)


In [30]:
import numpy as np
np.save("all_train_features.npy", all_train_features)
np.save("all_train_features_labels.npy", all_train_features_labels)
np.save("all_test_features.npy", all_test_features)
np.save("all_test_features_labels.npy", all_test_features_labels)

In [31]:
import numpy as np

In [38]:
all_train_features = np.load("all_train_features.npy")
all_train_features_labels = np.load("all_train_features_labels.npy")

all_test_features = np.load("all_test_features.npy")
all_test_features_labels = np.load("all_test_features_labels.npy")

In [39]:
train_dir = 'output_new/train'
test_dir = 'output_new/test'

In [40]:
def evaluate(classifier, features, labels):
    classifier.eval()
    with torch.no_grad():
        outputs = classifier(features)
        _, predicted = torch.max(outputs.data, 1)
        accuracy = (predicted == labels).sum().item() / len(labels)
    return accuracy

In [41]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
all_train_features = scaler.fit_transform(all_train_features)
all_test_features = scaler.transform(all_test_features)

In [42]:
import torch.nn as nn
import torch
import os
import torch.optim as optim
from torch.optim.lr_scheduler import CyclicLR
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

class CrossAttention(nn.Module):
    def __init__(self, feature_dim, num_heads):
        super(CrossAttention, self).__init__()
        self.query = nn.Linear(feature_dim, feature_dim)
        self.key = nn.Linear(feature_dim, feature_dim)
        self.value = nn.Linear(feature_dim, feature_dim)
        self.num_heads = num_heads
        self.attention = nn.MultiheadAttention(embed_dim=feature_dim, num_heads=num_heads)

    def forward(self, features):
        query = self.query(features)
        key = self.key(features)
        value = self.value(features)
        
        query = query.unsqueeze(1).transpose(0, 1)
        key = key.unsqueeze(1).transpose(0, 1)
        value = value.unsqueeze(1).transpose(0, 1)
        
        attended_features, _ = self.attention(query, key, value)
        return attended_features.squeeze(0)

class ImageClassifier(nn.Module):
    def __init__(self, feature_dim, num_classes, num_heads, hidden_dim, num_layers):
        super(ImageClassifier, self).__init__()
        self.bi_lstm = nn.LSTM(input_size=feature_dim, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True, bidirectional=True)
        self.cross_attention = CrossAttention(hidden_dim * 2, num_heads) 
        self.fc = nn.Linear(hidden_dim * 2, num_classes)
        self.bn = nn.BatchNorm1d(hidden_dim * 2)
        self.dropout = nn.Dropout(0.55)

    def forward(self, features):
        lstm_out1, _1 = self.bi_lstm(features.unsqueeze(1))
        lstm_out1 = lstm_out1.squeeze(1)   
        attended_features = self.cross_attention(lstm_out1)
        attended_features = self.bn(attended_features)
        attended_features = self.dropout(attended_features)
        logits = self.fc(attended_features)
        return logits

feature_dim = all_train_features.shape[1]
num_classes = len(os.listdir(train_dir))
num_heads = 4
hidden_dim = 64
num_layers = 1  

classifier_bi = ImageClassifier(feature_dim, num_classes, num_heads, hidden_dim, num_layers)

features_tensor = torch.tensor(all_train_features, dtype=torch.float32)
labels_tensor = torch.tensor(all_train_features_labels, dtype=torch.long)

features_tensor_test = torch.tensor(all_test_features, dtype=torch.float32)
labels_tensor_test = torch.tensor(all_test_features_labels, dtype=torch.long)

class_weights = compute_class_weight('balanced', classes=np.unique(all_train_features_labels), y=all_train_features_labels)
class_weights = torch.FloatTensor(class_weights)

criterion_bi = nn.CrossEntropyLoss()
optimizer_bi = optim.AdamW(classifier_bi.parameters(), lr=0.0001, weight_decay=1e-5)

num_epochs = 600
best_accuracy = 0.0
best_model_path = "best_model_trainxa.pth"

base_lr = 1e-5
max_lr = 1e-2
step_size_up = 2000  
scheduler = CyclicLR(optimizer_bi, base_lr, max_lr, step_size_up, mode='triangular2')

for epoch in range(num_epochs):
    classifier_bi.train()
    optimizer_bi.zero_grad()
    train_outputs = classifier_bi(features_tensor)
    train_loss = criterion_bi(train_outputs, labels_tensor)
    train_loss.backward()
    optimizer_bi.step()
    scheduler.step()
    classifier_bi.eval()

    with torch.no_grad():
        test_outputs = classifier_bi(features_tensor_test)
        test_loss = criterion_bi(test_outputs, labels_tensor_test)
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss.item():.4f}, Test Loss: {test_loss.item():.4f}')

    train_accuracy = evaluate(classifier_bi, features_tensor, labels_tensor)
    print(f'Train Accuracy: {train_accuracy * 100:.2f}%')

    test_accuracy = evaluate(classifier_bi, features_tensor_test, labels_tensor_test)
    print(f'Test Accuracy: {test_accuracy * 100:.2f}%')

Epoch [1/600], Train Loss: 3.0195, Test Loss: 2.7033
Train Accuracy: 13.83%
Test Accuracy: 14.29%
Epoch [2/600], Train Loss: 3.0165, Test Loss: 2.7013
Train Accuracy: 14.04%
Test Accuracy: 15.27%
Epoch [3/600], Train Loss: 3.0061, Test Loss: 2.6984
Train Accuracy: 14.26%
Test Accuracy: 16.26%
Epoch [4/600], Train Loss: 2.9329, Test Loss: 2.6946
Train Accuracy: 15.11%
Test Accuracy: 16.75%
Epoch [5/600], Train Loss: 2.8076, Test Loss: 2.6897
Train Accuracy: 15.11%
Test Accuracy: 16.75%
Epoch [6/600], Train Loss: 2.8744, Test Loss: 2.6836
Train Accuracy: 16.38%
Test Accuracy: 18.23%
Epoch [7/600], Train Loss: 2.8470, Test Loss: 2.6761
Train Accuracy: 19.79%
Test Accuracy: 21.67%
Epoch [8/600], Train Loss: 2.7195, Test Loss: 2.6671
Train Accuracy: 22.13%
Test Accuracy: 23.15%
Epoch [9/600], Train Loss: 2.6431, Test Loss: 2.6565
Train Accuracy: 25.96%
Test Accuracy: 25.62%
Epoch [10/600], Train Loss: 2.5245, Test Loss: 2.6442
Train Accuracy: 28.94%
Test Accuracy: 30.05%
Epoch [11/600], Tra

In [10]:
classifier_bi.load_state_dict(torch.load("best_model_trainxa.pth"))

<All keys matched successfully>

In [11]:
accuracy = evaluate(classifier_bi, features_tensor, labels_tensor)
print(f'Accuracy: {accuracy * 100:.2f}%')

Accuracy: 92.77%


In [20]:
print("Shape of all_train_features:", all_train_features.shape)
print("Shape of all_test_features:", all_test_features.shape)


Shape of all_train_features: (1328, 2304)
Shape of all_test_features: (562, 2304)


In [16]:
import torch.nn as nn
import torch
import os
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

class CrossAttention(nn.Module):
    def __init__(self, feature_dim, num_heads):
        super(CrossAttention, self).__init__()
        self.query = nn.Linear(feature_dim, feature_dim)
        self.key = nn.Linear(feature_dim, feature_dim)
        self.value = nn.Linear(feature_dim, feature_dim)
        self.num_heads = num_heads
        self.attention = nn.MultiheadAttention(embed_dim=feature_dim, num_heads=num_heads)
        self.dropout = nn.Dropout(0.3)  # Add dropout to attention

    def forward(self, features):
        query = self.query(features)
        key = self.key(features)
        value = self.value(features)
        
        query = query.unsqueeze(1).transpose(0, 1)
        key = key.unsqueeze(1).transpose(0, 1)
        value = value.unsqueeze(1).transpose(0, 1)
        
        attended_features, _ = self.attention(query, key, value)
        return self.dropout(attended_features.squeeze(0))  # Apply dropout

class ImageClassifier(nn.Module):
    def __init__(self, feature_dim, num_classes, num_heads, hidden_dim, num_layers):
        super(ImageClassifier, self).__init__()
        self.bi_lstm = nn.LSTM(input_size=feature_dim, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True, bidirectional=True)
        self.cross_attention = CrossAttention(hidden_dim * 2, num_heads)
        self.fc1 = nn.Linear(hidden_dim * 2, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, num_classes)
        self.bn1 = nn.BatchNorm1d(hidden_dim * 2)
        self.bn2 = nn.BatchNorm1d(hidden_dim)
        self.dropout1 = nn.Dropout(0.5)
        self.dropout2 = nn.Dropout(0.3)
        self.relu = nn.ReLU()

    def forward(self, features):
        lstm_out1, _1 = self.bi_lstm(features.unsqueeze(1))
        lstm_out1 = lstm_out1.squeeze(1)   
        attended_features = self.cross_attention(lstm_out1)
        x = self.bn1(attended_features)
        x = self.dropout1(x)
        x = self.fc1(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.dropout2(x)
        logits = self.fc2(x)
        return logits

# Hyperparameters
feature_dim = all_train_features.shape[1]
num_classes = len(os.listdir(train_dir))
num_heads = 4  # Reduced from 8
hidden_dim = 64  # Reduced from 64
num_layers = 1

classifier_bi = ImageClassifier(feature_dim, num_classes, num_heads, hidden_dim, num_layers)

features_tensor = torch.tensor(all_train_features, dtype=torch.float32)
labels_tensor = torch.tensor(all_train_features_labels, dtype=torch.long)

features_tensor_test = torch.tensor(all_test_features, dtype=torch.float32)
labels_tensor_test = torch.tensor(all_test_features_labels, dtype=torch.long)

class_weights = compute_class_weight('balanced', classes=np.unique(all_train_features_labels), y=all_train_features_labels)
class_weights = torch.FloatTensor(class_weights)

criterion_bi = nn.CrossEntropyLoss(weight=class_weights)
optimizer_bi = optim.AdamW(classifier_bi.parameters(), lr=0.001, weight_decay=1e-5)
scheduler = ReduceLROnPlateau(optimizer_bi, mode='max', factor=0.1, patience=10, verbose=True)

num_epochs = 500
best_accuracy = 0.0
best_model_path = "best_model_trainxa.pth"
patience = 20
no_improve = 0

for epoch in range(num_epochs):
    classifier_bi.train()
    optimizer_bi.zero_grad()
    train_outputs = classifier_bi(features_tensor)
    train_loss = criterion_bi(train_outputs, labels_tensor)
    train_loss.backward()
    optimizer_bi.step()
    
    classifier_bi.eval()
    with torch.no_grad():
        test_outputs = classifier_bi(features_tensor_test)
        test_loss = criterion_bi(test_outputs, labels_tensor_test)
    
    train_accuracy = evaluate(classifier_bi, features_tensor, labels_tensor)
    test_accuracy = evaluate(classifier_bi, features_tensor_test, labels_tensor_test)
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss.item():.4f}, Test Loss: {test_loss.item():.4f}')
    print(f'Train Accuracy: {train_accuracy * 100:.2f}%, Test Accuracy: {test_accuracy * 100:.2f}%')
    
    scheduler.step(test_accuracy)
    
    if test_accuracy > best_accuracy:
        best_accuracy = test_accuracy
        torch.save(classifier_bi.state_dict(), best_model_path)
        no_improve = 0
    else:
        no_improve += 1
    
    # if no_improve >= patience:
    #     print("Early stopping")
    #     break

Epoch [1/500], Train Loss: 2.7992, Test Loss: 2.7105
Train Accuracy: 11.14%, Test Accuracy: 11.03%
Epoch [2/500], Train Loss: 2.7040, Test Loss: 2.7061
Train Accuracy: 11.14%, Test Accuracy: 11.03%
Epoch [3/500], Train Loss: 2.6793, Test Loss: 2.7011
Train Accuracy: 11.37%, Test Accuracy: 11.03%
Epoch [4/500], Train Loss: 2.6231, Test Loss: 2.6957
Train Accuracy: 11.60%, Test Accuracy: 11.21%
Epoch [5/500], Train Loss: 2.5693, Test Loss: 2.6895
Train Accuracy: 13.10%, Test Accuracy: 11.39%
Epoch [6/500], Train Loss: 2.5274, Test Loss: 2.6820
Train Accuracy: 15.44%, Test Accuracy: 14.59%
Epoch [7/500], Train Loss: 2.4942, Test Loss: 2.6734
Train Accuracy: 18.15%, Test Accuracy: 17.26%
Epoch [8/500], Train Loss: 2.4532, Test Loss: 2.6634
Train Accuracy: 20.41%, Test Accuracy: 19.22%
Epoch [9/500], Train Loss: 2.4324, Test Loss: 2.6521
Train Accuracy: 21.76%, Test Accuracy: 19.93%
Epoch [10/500], Train Loss: 2.3963, Test Loss: 2.6397
Train Accuracy: 23.95%, Test Accuracy: 21.17%
Epoch [11

In [18]:
import torch.nn as nn
import torch
import os
import torch.optim as optim
from torch.optim.lr_scheduler import OneCycleLR
from sklearn.utils.class_weight import compute_class_weight
import numpy as np
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts

class CrossAttention(nn.Module):
    def __init__(self, feature_dim, num_heads):
        super(CrossAttention, self).__init__()
        self.query = nn.Linear(feature_dim, feature_dim)
        self.key = nn.Linear(feature_dim, feature_dim)
        self.value = nn.Linear(feature_dim, feature_dim)
        self.num_heads = num_heads
        self.attention = nn.MultiheadAttention(embed_dim=feature_dim, num_heads=num_heads)
        self.dropout = nn.Dropout(0.3)

    def forward(self, features):
        query = self.query(features)
        key = self.key(features)
        value = self.value(features)
        
        query = query.unsqueeze(1).transpose(0, 1)
        key = key.unsqueeze(1).transpose(0, 1)
        value = value.unsqueeze(1).transpose(0, 1)
        
        attended_features, _ = self.attention(query, key, value)
        return self.dropout(attended_features.squeeze(0))

class ImageClassifier(nn.Module):
    def __init__(self, feature_dim, num_classes, num_heads, hidden_dim, num_layers):
        super(ImageClassifier, self).__init__()
        self.bi_lstm = nn.LSTM(input_size=feature_dim, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True, bidirectional=True)
        self.cross_attention = CrossAttention(hidden_dim * 2, num_heads)
        self.fc1 = nn.Linear(hidden_dim * 2, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, num_classes)
        self.bn1 = nn.BatchNorm1d(hidden_dim * 2)
        self.bn2 = nn.BatchNorm1d(hidden_dim)
        self.dropout1 = nn.Dropout(0.5)
        self.dropout2 = nn.Dropout(0.3)
        self.relu = nn.ReLU()

    def forward(self, features):
        lstm_out1, _1 = self.bi_lstm(features.unsqueeze(1))
        lstm_out1 = lstm_out1.squeeze(1)   
        attended_features = self.cross_attention(lstm_out1)
        x = self.bn1(attended_features)
        x = self.dropout1(x)
        x = self.fc1(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.dropout2(x)
        logits = self.fc2(x)
        return logits

# Hyperparameters
feature_dim = all_train_features.shape[1]
num_classes = len(os.listdir(train_dir))
num_heads = 4
hidden_dim = 64
num_layers = 1

classifier = ImageClassifier(feature_dim, num_classes, num_heads, hidden_dim, num_layers)

features_tensor = torch.tensor(all_train_features, dtype=torch.float32)
labels_tensor = torch.tensor(all_train_features_labels, dtype=torch.long)

features_tensor_test = torch.tensor(all_test_features, dtype=torch.float32)
labels_tensor_test = torch.tensor(all_test_features_labels, dtype=torch.long)

class_weights = compute_class_weight('balanced', classes=np.unique(all_train_features_labels), y=all_train_features_labels)
class_weights = torch.FloatTensor(class_weights)

criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.AdamW(classifier.parameters(), lr=0.001, weight_decay=1e-4)

def mixup_data(x, y, alpha=0.2):
    '''Returns mixed inputs, pairs of targets, and lambda'''
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size()[0]
    index = torch.randperm(batch_size)

    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

# Hyperparameters
feature_dim = all_train_features.shape[1]
num_classes = len(os.listdir(train_dir))
num_heads = 4
hidden_dim = 64
num_layers = 1

classifier = ImageClassifier(feature_dim, num_classes, num_heads, hidden_dim, num_layers)

features_tensor = torch.tensor(all_train_features, dtype=torch.float32)
labels_tensor = torch.tensor(all_train_features_labels, dtype=torch.long)

features_tensor_test = torch.tensor(all_test_features, dtype=torch.float32)
labels_tensor_test = torch.tensor(all_test_features_labels, dtype=torch.long)

class_weights = compute_class_weight('balanced', classes=np.unique(all_train_features_labels), y=all_train_features_labels)
class_weights = torch.FloatTensor(class_weights)

criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.AdamW(classifier.parameters(), lr=0.001, weight_decay=1e-3)

num_epochs = 300
scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2, eta_min=1e-6)

best_accuracy = 0.0
best_model_path = "best_model_trainxa.pth"
patience = 30
no_improve = 0

# L1 regularization
l1_lambda = 1e-4

for epoch in range(num_epochs):
    classifier.train()
    optimizer.zero_grad()
    
    # Apply mixup
    mixed_features, y_a, y_b, lam = mixup_data(features_tensor, labels_tensor)
    train_outputs = classifier(mixed_features)
    train_loss = mixup_criterion(criterion, train_outputs, y_a, y_b, lam)
    
    # Add L1 regularization
    l1_norm = sum(p.abs().sum() for p in classifier.parameters())
    train_loss += l1_lambda * l1_norm
    
    train_loss.backward()
    
    # Gradient clipping
    torch.nn.utils.clip_grad_norm_(classifier.parameters(), max_norm=0.5)
    
    optimizer.step()
    scheduler.step(epoch + 1)
    
    classifier.eval()
    with torch.no_grad():
        test_outputs = classifier(features_tensor_test)
        test_loss = criterion(test_outputs, labels_tensor_test)
    
    train_accuracy = evaluate(classifier, features_tensor, labels_tensor)
    test_accuracy = evaluate(classifier, features_tensor_test, labels_tensor_test)
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss.item():.4f}, Test Loss: {test_loss.item():.4f}')
    print(f'Train Accuracy: {train_accuracy * 100:.2f}%, Test Accuracy: {test_accuracy * 100:.2f}%')
    print(f'Learning Rate: {scheduler.get_last_lr()[0]:.6f}')
    
    if test_accuracy > best_accuracy:
        best_accuracy = test_accuracy
        torch.save(classifier.state_dict(), best_model_path)
        no_improve = 0
    else:
        no_improve += 1

Epoch [1/300], Train Loss: 11.0215, Test Loss: 2.6955
Train Accuracy: 6.55%, Test Accuracy: 7.83%
Learning Rate: 0.000976
Epoch [2/300], Train Loss: 10.8544, Test Loss: 2.6849
Train Accuracy: 9.79%, Test Accuracy: 11.39%
Learning Rate: 0.000905
Epoch [3/300], Train Loss: 10.6332, Test Loss: 2.6717
Train Accuracy: 17.55%, Test Accuracy: 18.15%
Learning Rate: 0.000794
Epoch [4/300], Train Loss: 10.6121, Test Loss: 2.6580
Train Accuracy: 22.21%, Test Accuracy: 22.95%
Learning Rate: 0.000655
Epoch [5/300], Train Loss: 10.3235, Test Loss: 2.6437
Train Accuracy: 27.26%, Test Accuracy: 27.76%
Learning Rate: 0.000501
Epoch [6/300], Train Loss: 10.2357, Test Loss: 2.6296
Train Accuracy: 29.52%, Test Accuracy: 30.78%
Learning Rate: 0.000346
Epoch [7/300], Train Loss: 10.1811, Test Loss: 2.6175
Train Accuracy: 30.27%, Test Accuracy: 31.32%
Learning Rate: 0.000207
Epoch [8/300], Train Loss: 10.1799, Test Loss: 2.6074
Train Accuracy: 30.80%, Test Accuracy: 32.03%
Learning Rate: 0.000096
Epoch [9/30

In [19]:
import torch.nn as nn
import torch
import os
import torch.optim as optim
from torch.optim.lr_scheduler import OneCycleLR
from sklearn.model_selection import KFold
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

class SimpleClassifier(nn.Module):
    def __init__(self, feature_dim, num_classes, hidden_dim):
        super(SimpleClassifier, self).__init__()
        self.fc1 = nn.Linear(feature_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim // 2)
        self.fc3 = nn.Linear(hidden_dim // 2, num_classes)
        self.bn1 = nn.BatchNorm1d(hidden_dim)
        self.bn2 = nn.BatchNorm1d(hidden_dim // 2)
        self.dropout = nn.Dropout(0.5)
        self.relu = nn.ReLU()

    def forward(self, features):
        x = self.fc1(features)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.dropout(x)
        return self.fc3(x)

# Hyperparameters
feature_dim = all_train_features.shape[1]
num_classes = len(os.listdir(train_dir))
hidden_dim = 128

# Combine train and test data for k-fold CV
all_features = np.concatenate((all_train_features, all_test_features), axis=0)
all_labels = np.concatenate((all_train_features_labels, all_test_features_labels), axis=0)

# K-fold Cross Validation
k_folds = 5
kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

for fold, (train_index, val_index) in enumerate(kf.split(all_features)):
    print(f"Fold {fold + 1}/{k_folds}")
    
    X_train, X_val = all_features[train_index], all_features[val_index]
    y_train, y_val = all_labels[train_index], all_labels[val_index]
    
    features_tensor = torch.tensor(X_train, dtype=torch.float32)
    labels_tensor = torch.tensor(y_train, dtype=torch.long)
    
    features_tensor_val = torch.tensor(X_val, dtype=torch.float32)
    labels_tensor_val = torch.tensor(y_val, dtype=torch.long)
    
    classifier = SimpleClassifier(feature_dim, num_classes, hidden_dim)
    
    class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
    class_weights = torch.FloatTensor(class_weights)
    
    criterion = nn.CrossEntropyLoss(weight=class_weights, label_smoothing=0.1)
    optimizer = optim.AdamW(classifier.parameters(), lr=0.001, weight_decay=1e-2)
    
    num_epochs = 100
    steps_per_epoch = 1
    scheduler = OneCycleLR(optimizer, max_lr=0.01, epochs=num_epochs, steps_per_epoch=steps_per_epoch)
    
    best_accuracy = 0.0
    best_model_path = f"best_model_fold{fold+1}.pth"
    patience = 10
    no_improve = 0
    
    for epoch in range(num_epochs):
        classifier.train()
        optimizer.zero_grad()
        train_outputs = classifier(features_tensor)
        train_loss = criterion(train_outputs, labels_tensor)
        
        train_loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(classifier.parameters(), max_norm=1.0)
        
        optimizer.step()
        scheduler.step()
        
        classifier.eval()
        with torch.no_grad():
            val_outputs = classifier(features_tensor_val)
            val_loss = criterion(val_outputs, labels_tensor_val)
        
        train_accuracy = evaluate(classifier, features_tensor, labels_tensor)
        val_accuracy = evaluate(classifier, features_tensor_val, labels_tensor_val)
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss.item():.4f}, Val Loss: {val_loss.item():.4f}')
        print(f'Train Accuracy: {train_accuracy * 100:.2f}%, Val Accuracy: {val_accuracy * 100:.2f}%')
        
        if val_accuracy > best_accuracy:
            best_accuracy = val_accuracy
            torch.save(classifier.state_dict(), best_model_path)
            no_improve = 0
        else:
            no_improve += 1
        
        if no_improve >= patience:
            print("Early stopping")
            break
    
    print(f"Best validation accuracy for fold {fold+1}: {best_accuracy * 100:.2f}%")

# After k-fold CV, you can train on all data and test on the original test set
# ... (code for final training and testing)


Fold 1/5
Epoch [1/100], Train Loss: 2.9056, Val Loss: 2.6944
Train Accuracy: 16.20%, Val Accuracy: 16.14%
Epoch [2/100], Train Loss: 2.6906, Val Loss: 2.6336
Train Accuracy: 25.73%, Val Accuracy: 24.34%
Epoch [3/100], Train Loss: 2.5749, Val Loss: 2.5646
Train Accuracy: 29.63%, Val Accuracy: 28.04%
Epoch [4/100], Train Loss: 2.5060, Val Loss: 2.4789
Train Accuracy: 35.85%, Val Accuracy: 34.39%
Epoch [5/100], Train Loss: 2.4327, Val Loss: 2.3787
Train Accuracy: 38.56%, Val Accuracy: 37.57%
Epoch [6/100], Train Loss: 2.3861, Val Loss: 2.2729
Train Accuracy: 41.07%, Val Accuracy: 39.68%
Epoch [7/100], Train Loss: 2.3145, Val Loss: 2.1678
Train Accuracy: 43.39%, Val Accuracy: 41.80%
Epoch [8/100], Train Loss: 2.2678, Val Loss: 2.0755
Train Accuracy: 43.65%, Val Accuracy: 40.48%
Epoch [9/100], Train Loss: 2.2161, Val Loss: 2.0008
Train Accuracy: 42.86%, Val Accuracy: 39.15%
Epoch [10/100], Train Loss: 2.1805, Val Loss: 1.9401
Train Accuracy: 42.72%, Val Accuracy: 38.10%
Epoch [11/100], Trai