# Model 3

In [1]:
import os
import torch
import random
import numpy as np
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import defaultdict
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.utils.class_weight import compute_class_weight
from torch.utils.data import DataLoader, SubsetRandomSampler
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import DataLoader, Subset, random_split, Dataset

## initiation:

In [2]:
class NumpyFolderDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.samples = []
        for class_idx, class_name in enumerate(sorted(os.listdir(root_dir))):
            class_dir = os.path.join(root_dir, class_name)
            if not os.path.isdir(class_dir):
                continue
            for file_name in os.listdir(class_dir):
                if file_name.endswith('.npy'):
                    file_path = os.path.join(class_dir, file_name)
                    self.samples.append((file_path, class_idx))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]
        np_array = np.load(file_path).astype(np.float32)
        if self.transform:
            np_array = self.transform(np_array)
        return np_array, label


# Define transformations
transform = transforms.Compose([
    transforms.Lambda(lambda x: torch.tensor(x)),
    transforms.Lambda(lambda x: x.unsqueeze(0)),  # Add channel dimension
    transforms.Normalize(mean=(0.5,), std=(0.5,)),  # Normalize
])

# Dataset path
dataset_path = r'C:\Users\Admin\PycharmProjects\Ramin_Thesis\DataSet\12_class'
dataset = NumpyFolderDataset(root_dir=dataset_path, transform=transform)

# Train-test split
train_size = int(0.9 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, pin_memory=True)

# Verify DataLoader
for batch_data, batch_labels in train_loader:
    print(f"Batch data shape: {batch_data.shape}, Batch labels: {batch_labels.shape}")
    break


Batch data shape: torch.Size([32, 1, 320, 320]), Batch labels: torch.Size([32])


### 1_from 3   ozmnenei

In [3]:
import torch
import torch.nn as nn

class RecurrencePlotClassifierScratch(nn.Module):
    def __init__(self, num_classes, transformer_layers=2, transformer_heads=4, transformer_dim=128, dropout=0.1):
        super(RecurrencePlotClassifierScratch, self).__init__()

        # CNN Feature Extractor (from scratch)
        self.cnn_layers = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),  # Input channels: 3
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d(1) # Global Average Pooling
        )

        # Transformer Encoder
        self.transformer_input_dim = 256
        self.transformer_embedding = nn.Linear(self.transformer_input_dim, transformer_dim)
        self.positional_encoding = PositionalEncoding(transformer_dim)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=transformer_dim, nhead=transformer_heads, dropout=dropout),
            num_layers=transformer_layers
        )

        # Classification Head
        self.fc = nn.Linear(transformer_dim, num_classes)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        batch_size = x.size(0)

        # CNN Feature Extraction
        features = self.cnn_layers(x)
        features = features.view(batch_size, -1)

        # Prepare for Transformer
        features = features.unsqueeze(1)  # Add sequence dimension
        features = self.transformer_embedding(features)
        features = self.positional_encoding(features)

        # Transformer
        transformer_output = self.transformer_encoder(features)

        # Classification
        transformer_output = transformer_output.squeeze(1)
        output = self.dropout(transformer_output)
        output = self.fc(output)
        return output

# Positional Encoding (same as before)
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

# Example usage:
num_classes = 2
model = RecurrencePlotClassifierScratch(num_classes)
input_tensor = torch.randn(32, 1, 320, 320)
output = model(input_tensor)
print(output.shape)



torch.Size([32, 2])


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
import time
import matplotlib.pyplot as plt

# ... (Your model definition and data loading code from previous responses) ...

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-2)
scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5, verbose=True) # Learning rate scheduler
num_epochs = 100
patience = 10  # Patience for early stopping
best_test_accuracy = 0.0
best_model_path = "best_model.pth"
early_stopping_counter = 0

# Lists to store metrics for plotting
train_losses = []
train_accuracies = []
test_accuracies = []

model.to(device)

start_time = time.time()
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct_predictions += (predicted == labels).sum().item()
        total_samples += labels.size(0)

    epoch_loss = running_loss / len(train_loader)
    epoch_accuracy = correct_predictions / total_samples
    train_losses.append(epoch_loss)
    train_accuracies.append(epoch_accuracy)

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, "
          f"Accuracy: {epoch_accuracy:.2%}, "
          f"LR: {optimizer.param_groups[0]['lr']:.6f}")

    if epoch >= 3:  # Start testing after a few epochs
        model.eval()
        correct_predictions = 0
        total_samples = 0
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs, 1)
                correct_predictions += (predicted == labels).sum().item()
                total_samples += labels.size(0)

        test_accuracy = correct_predictions / total_samples
        test_accuracies.append(test_accuracy)
        print(f"Test Accuracy after Epoch {epoch+1}: {test_accuracy:.2%}")

        scheduler.step(test_accuracy)  # Update learning rate based on test accuracy

        if test_accuracy > best_test_accuracy:
            best_test_accuracy = test_accuracy
            torch.save(model.state_dict(), best_model_path)
            print(f"###################### Best model saved with accuracy ######################: {best_test_accuracy:.2%}")
            early_stopping_counter = 0  # Reset early stopping counter
        else:
            early_stopping_counter += 1
            if early_stopping_counter >= patience:
                print(f"Early stopping triggered after {patience} epochs without improvement.")
                break  # Exit training loop

end_time = time.time()
training_time = end_time - start_time
print(f"Total training time: {training_time:.2f} seconds")

# Plotting
plt.figure(figsize=(12, 4))

# Plot training loss
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

# Plot training and test accuracy
plt.subplot(1, 2, 2)
plt.plot(train_accuracies, label='Training Accuracy')
plt.plot(test_accuracies, label='Test Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

print(f"Training complete. Best test accuracy: {best_test_accuracy:.2%}")
model.load_state_dict(torch.load(best_model_path))

