In [None]:
# Importing necessary libraries
import time
import numpy as np
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
from torch.autograd import Variable
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import math
from scipy.stats import norm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from torch.optim.lr_scheduler import ExponentialLR

if torch.cuda.is_available():
    torch.backends.cudnn.deterministic = True

In [None]:
# Device configuration
device = torch.device("cuda:9" if torch.cuda.is_available() else "cpu")
print('Device:', device)

# Hyperparameters
random_seed = 0
learning_rate = 0.001
num_epochs = 500
batch_size = 512

# Model architecture parameters
num_classes = 1
num_features = 90 * 90
num_latent = 4

Load datasets

In [None]:
def load_data_and_create_condition(mag_file_path, labels_linear_path, labels_nonlinear_path):
    """
    Load magnetic data and create conditions based on linear and nonlinear labels.

    Args:
    - mag_file_path (str): Path to the magnetic data file.
    - labels_linear_path (str): Path to the linear labels file.
    - labels_nonlinear_path (str): Path to the nonlinear labels file.

    Returns:
    - filtered_mag_data (np.ndarray): Filtered magnetic data.
    - filtered_condition (np.ndarray): Condition labels for the filtered data.
    - nlinear (int): Number of linear samples.
    - nnonlinear (int): Number of nonlinear samples.
    - nentire (int): Total number of filtered samples.
    """
    # Load magnetic data
    mag_data = np.loadtxt(mag_file_path)
    
    # Load labels of linear and nonlinear images
    labels_linear = np.loadtxt(labels_linear_path)
    labels_nonlinear = np.loadtxt(labels_nonlinear_path)
    
    # Initialize condition labels as 2 (indicating unclassified)
    condition = np.full(len(mag_data), 2)
    
    # Set linear labels to 1
    tag_linear = [int(x - 1) for x in labels_linear]
    condition[tag_linear] = 1
    
    # Set nonlinear labels to 0
    tag_nonlinear = [int(x - 1) for x in labels_nonlinear]
    condition[tag_nonlinear] = 0
    
    # Filter out unclassified data (condition == 2)
    mask = condition != 2
    filtered_mag_data = mag_data[mask]
    filtered_condition = condition[mask]
    
    # Reshape and transpose the filtered magnetic data
    nsample = filtered_mag_data.shape[0]
    filtered_mag_data = filtered_mag_data.reshape((nsample, 90, 90))
    filtered_mag_data = filtered_mag_data.transpose(0, 2, 1)
    filtered_mag_data = filtered_mag_data[:, np.newaxis, :, :]
    
    # Number of linear and nonlinear samples
    nlinear = len(labels_linear)
    nnonlinear = len(labels_nonlinear)
    nentire = nlinear + nnonlinear
    
    return filtered_mag_data, filtered_condition, nlinear, nnonlinear, nentire

In [None]:
# Load magnetic data, condition labels, and orientation data
mag_data, cond, nlinear, nnonlinear, nentire = load_data_and_create_condition(
    'TXTData/TXTDiff.txt', 'Labels/DIFF_linear.txt', 'Labels/DIFF_nonlinear.txt'
)

# Load angle (orientation) data
angle = np.loadtxt('Labels/Orientations.txt')

# Set angle to 0 for nonlinear conditions
angle[cond == 0] = 0


Data Standardization

In [None]:
# Calculate and standardize the magnetic data
mean_value = np.mean(mag_data, axis=0)
std_dev = np.std(mag_data, axis=0)
mag_std = ((mag_data - mean_value) / std_dev)

In [None]:
class MagSet(Dataset):
    def __init__(self, data, labels, angles):
        """
        Args:
        - data (np.ndarray): The magnetic data.
        - labels (np.ndarray): The condition labels.
        - angles (np.ndarray): The angle data.
        """
        self.data = data
        self.labels = labels
        self.angles = angles

    def __len__(self):
        """
        Returns:
        - int: The number of samples in the dataset.
        """
        return len(self.data)

    def __getitem__(self, index):
        """
        Args:
        - index (int): Index of the sample to retrieve.

        Returns:
        - dict: A dictionary containing the data, label, and angle of the sample.
        """
        return {
            'data': self.data[index],
            'label': self.labels[index],
            'angle': self.angles[index]
        }


In [None]:
# Split the dataset into training and testing sets
test_size = 0.1  # Proportion of the dataset to include in the test split
random_seed = 42  # Seed for random number generator for reproducibility

# Perform the train-test split
train_data, test_data, train_labels, test_labels, train_angles, test_angles = train_test_split(
    mag_std, cond, angle, test_size=test_size, random_state=random_seed
)

# Create dataset instances for training and testing sets
train_dataset = MagSet(train_data, train_labels, train_angles)
test_dataset = MagSet(test_data, test_labels, test_angles)

# Create data loaders for training set
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)

# Convert test data, labels, and angles from NumPy arrays to PyTorch tensors
test_data_t = torch.from_numpy(test_data).float()
test_labels_t = torch.from_numpy(test_labels).long()
test_angles_t = torch.from_numpy(test_angles).float()

MMD calculation

In [None]:
def gaussian_kernel(source,target,kernel_mul = 2, kernel_num = 5, fix_sigma = None):
    n_samples = int(source.size()[0])+int(target.size()[0])
    total = torch.cat([source,target],dim = 0)
    total0 = total.unsqueeze(0).expand(int(total.size(0)),\
                                      int(total.size(0)),\
                                      int(total.size(1)))
    total1 = total.unsqueeze(1).expand(int(total.size(0)),\
                                      int(total.size(0)),\
                                      int(total.size(1)))
    L2_distance = ((total0-total1) **2).sum(2)
    if fix_sigma:
        bandwidth = fix_sigma
    else:
        bandwidth = torch.sum(L2_distance.data)/(n_samples**2 - n_samples)
    bandwidth /= kernel_mul ** (kernel_num//2)
    bandwidth_list = [bandwidth * (kernel_mul **i) for i in range(kernel_num)]
    kernel_val = [torch.exp(-L2_distance/bandwidth_temp) for\
                 bandwidth_temp in bandwidth_list]
    return sum(kernel_val)

def mmd(source,target,kernel_mul = 2, kernel_num = 5, fix_sigma = None):
    batch_size = int(source.size()[0])
    kernels = gaussian_kernel(source,target,
                             kernel_mul = kernel_mul,
                             kernel_num = kernel_num,
                             fix_sigma = fix_sigma)
    XX = kernels[:batch_size, :batch_size]
    YY = kernels[batch_size:, batch_size:]
    XY = kernels[:batch_size, batch_size:]
    YX = kernels[batch_size:, :batch_size]
    loss = torch.mean(XX+YY-XY-YX)
    return loss


CVAE 

In [None]:
class ConditionalVariationalAutoencoder(torch.nn.Module):
    def __init__(self, num_features, num_latent):
        super(ConditionalVariationalAutoencoder, self).__init__()

        # Encoder layers
        self.enc_conv_1 = torch.nn.Conv2d(in_channels=3, out_channels=16, kernel_size=(6, 6), stride=(2, 2), padding=0)
        self.enc_conv_2 = torch.nn.Conv2d(in_channels=16, out_channels=32, kernel_size=(4, 4), stride=(2, 2), padding=0)
        self.enc_conv_3 = torch.nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(2, 2), stride=(2, 2), padding=0)
        self.enc_conv_4 = torch.nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(2, 2), stride=(2, 2), padding=0)
        self.enc_conv_5 = torch.nn.Conv2d(in_channels=128, out_channels=256, kernel_size=(2, 2), stride=(2, 2), padding=0)
        self.z = torch.nn.Linear(256 * 2 * 2, num_latent)

        # Decoder layers
        self.dec_linear_1 = torch.nn.Linear(num_latent + 2, 256 * 2 * 2)
        self.dec_conv_1 = torch.nn.ConvTranspose2d(in_channels=256, out_channels=128, kernel_size=(2, 2), stride=(2, 2), padding=0)
        self.dec_conv_2 = torch.nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=(3, 3), stride=(2, 2), padding=0)
        self.dec_conv_3 = torch.nn.ConvTranspose2d(in_channels=64, out_channels=32, kernel_size=(4, 4), stride=(2, 2), padding=2)
        self.dec_conv_4 = torch.nn.ConvTranspose2d(in_channels=32, out_channels=16, kernel_size=(4, 4), stride=(2, 2), padding=2)
        self.dec_conv_5 = torch.nn.ConvTranspose2d(in_channels=16, out_channels=1, kernel_size=(5, 5), stride=(3, 3), padding=1)

    def encoder(self, features, targets, angles):
        """
        Encode the input features with conditions.

        Args:
        - features (torch.Tensor): Input feature tensor.
        - targets (torch.Tensor): Target tensor.
        - angles (torch.Tensor): Angle tensor.

        Returns:
        - torch.Tensor: Encoded latent representation.
        """
        ones = torch.ones(features.size(0), 1, features.size(2), features.size(3), dtype=features.dtype).to(features.device)
        target_matrix = ones * targets.view(-1, 1, 1, 1)
        angle_matrix = ones * angles.view(-1, 1, 1, 1)
        
        # Concatenate features with target and angle matrices
        x = torch.cat((features, target_matrix, angle_matrix), dim=1).float()
        
        # Pass through convolutional layers with LeakyReLU activations
        x = F.leaky_relu(self.enc_conv_1(x))
        x = F.leaky_relu(self.enc_conv_2(x))
        x = F.leaky_relu(self.enc_conv_3(x))
        x = F.leaky_relu(self.enc_conv_4(x))
        x = F.leaky_relu(self.enc_conv_5(x))
        
        # Flatten and pass through the linear layer to get the latent representation
        encoded = self.z(x.view(-1, 256 * 2 * 2))
        return encoded

    def decoder(self, encoded, targets, angles):
        """
        Decode the latent representation back to the input space.

        Args:
        - encoded (torch.Tensor): Encoded latent representation.
        - targets (torch.Tensor): Target tensor.
        - angles (torch.Tensor): Angle tensor.

        Returns:
        - torch.Tensor: Decoded tensor.
        """
        # Concatenate encoded representation with targets and angles
        encoded = torch.cat((encoded, targets.view(-1, 1), angles.view(-1, 1)), dim=1).float()
        
        # Pass through linear layer and reshape
        x = F.leaky_relu(self.dec_linear_1(encoded))
        x = x.view(-1, 256, 2, 2)
        
        # Pass through transposed convolutional layers with LeakyReLU activations
        x = F.leaky_relu(self.dec_conv_1(x))
        x = F.leaky_relu(self.dec_conv_2(x))
        x = F.leaky_relu(self.dec_conv_3(x))
        x = F.leaky_relu(self.dec_conv_4(x))
        decoded = self.dec_conv_5(x)

        return decoded

    def forward(self, features, targets, angles):
        """
        Forward pass through the encoder and decoder.

        Args:
        - features (torch.Tensor): Input feature tensor.
        - targets (torch.Tensor): Target tensor.
        - angles (torch.Tensor): Angle tensor.

        Returns:
        - (torch.Tensor, torch.Tensor): Encoded latent representation and decoded tensor.
        """
        encoded = self.encoder(features, targets, angles)
        decoded = self.decoder(encoded, targets, angles)
        return encoded, decoded


CVAE initalization

In [None]:
# Set random seed for reproducibility
torch.manual_seed(random_seed)

# Initialize the Conditional Variational Autoencoder model
model = ConditionalVariationalAutoencoder(num_features, num_latent)

# Move the model to the specified device (e.g., GPU)
model = model.to(device)

In [None]:
# Initialize the optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Set the learning rate decay factor
gamma = 0.995 

# Initialize the learning rate scheduler
scheduler = ExponentialLR(optimizer, gamma=gamma)

CVAE training

In [None]:
start_time = time.time()  # Record start time
losses = []
losses_mmd = []
losses_pred = []

losses_t = []
losses_mmd_t = []
losses_pred_t = []

# Move model to device once
model.to(device)

# Training loop
for epoch in range(num_epochs):
    
    epoch_loss = 0.0
    epoch_mmd = 0.0
    epoch_pred = 0.0
    total_batch = 0
    
    model.train()  # Set the model to training mode
    for batch_idx, batch in enumerate(train_loader):
        features = batch['data'].to(device)
        targets = batch['label'].to(device)
        angles = batch['angle'].to(device)

        # Forward pass
        encoded, decoded = model(features, targets, angles)
        decoded = decoded.to(torch.float64)
        
        sam_num = decoded.shape[0]
        true_samples = Variable(torch.randn(sam_num, num_latent), requires_grad=False).to(device)
        
        # Calculate losses
        loss_pred = F.mse_loss(decoded, features)
        loss_mmd = mmd(encoded, true_samples)
        loss = loss_mmd + loss_pred
        
        epoch_loss += loss.item()
        epoch_mmd += loss_mmd.item()
        epoch_pred += loss_pred.item()
        total_batch += 1
        
        # Backward pass and parameter update
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    # Calculate average losses
    average_loss = epoch_loss / total_batch
    average_mmd = epoch_mmd / total_batch
    average_pred = epoch_pred / total_batch
    
    losses.append(average_loss)
    losses_mmd.append(average_mmd)
    losses_pred.append(average_pred)
    
    # Update the learning rate scheduler
    scheduler.step()
    
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():  # Disable gradient calculation for evaluation
        encoded, decoded = model(test_data_t.to(device), test_labels_t.to(device), test_angles_t.to(device))
        sam_num = decoded.shape[0]
        true_samples = Variable(torch.randn(sam_num, num_latent), requires_grad=False).to(device)
        
        loss_pred_t = F.mse_loss(decoded, test_data_t.to(device))
        loss_mmd_t = mmd(encoded, true_samples)
        loss_t = loss_mmd_t + loss_pred_t
        
        losses_t.append(loss_t.item())
        losses_mmd_t.append(loss_mmd_t.item())
        losses_pred_t.append(loss_pred_t.item())
    
    # Print results for each epoch
    print(f'Epoch: {epoch+1}/{num_epochs} | Loss: {average_loss:.4f} | '
          f'Pred Loss: {average_pred:.4f} | MMD Loss: {average_mmd:.4f} | '
          f'Test Loss: {loss_t.item():.4f} | Test MMD: {loss_mmd_t.item():.4f} | '
          f'Test Pred Loss: {loss_pred_t.item():.4f}')

# Training time
print(f'Training time: {time.time() - start_time:.2f}s')

In [None]:
# Save the trained model
torch.save(model.state_dict(), 'Generator_CVAE/CVAE.pth')

Loss curves

In [None]:
# Plot training losses
plt.figure()
plt.plot(losses, label='Loss')
plt.plot(losses_mmd, label='Loss_mmd')
plt.plot(losses_pred, label='Loss_pred')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training Losses')
plt.show()

# Plot testing losses
plt.figure()
plt.plot(losses_t, label='Loss')
plt.plot(losses_mmd_t, label='Loss_mmd')
plt.plot(losses_pred_t, label='Loss_pred')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Testing Losses')
plt.show()