#Defects Project

In [1]:
import numpy as np
import pandas as pd
import h5py
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from torchsummary import summary
import torchvision.models
from tqdm import tqdm
from skimage.measure import label as separate_labels
fp_data = "CdSe\Dataset\CdSe_defect_dataset.h5"
fp_metadata ="CdSe\Metadata\CdSe_metadata.pkl"

In [2]:
def make_multi_labels(orig_mask, df, idx):
    N_particles = df.iloc[idx]["N_particles"]
    multi_mask = separate_labels(orig_mask)

    if np.max(multi_mask) != N_particles:
        print("Image %i does not have enough particles" % i)
        return False, []
    else:
        return True, [multi_mask == i+1 for i in range(N_particles)]

def print_positions(df, idx):

    N_particles = df.iloc[idx]["N_particles"]

    for i in range(N_particles):
        print(df.iloc[idx]["particle_%02i" % i]["position"])

def get_positions(df, idx):

    N_particles = df.iloc[idx]["N_particles"]

    return [df.iloc[idx]["particle_%02i" % i]["position"] for i in range(N_particles)]

def get_centroid(particle_label, cal_px_size, xx, yy):
    """Takes in a particle label, pixel size, and premade meshgrid to calculate centroid of a region."""

    centroid_x = np.sum(xx*particle_label)/np.sum(particle_label)
    centroid_y = np.sum(yy*particle_label)/np.sum(particle_label)

    return [centroid_x*cal_px_size, centroid_y*cal_px_size]

def get_centroids(multi_labels, cal_px_size, xx, yy):
    """Wrapper function to get all the centroids from a set of labels"""
    return np.array([get_centroid(multi_labels[i], cal_px_size, xx, yy) for i in range(len(multi_labels))])

def pair_labels_to_particles(centroids, pos):
    """Pairs segmentation labels to particle indices, per image.

    Centroids are the centroids of the individual particle segmentation masks, scaled to atomic coordinates.
    Pos are the centers of the particles in x and y in atomic coordinates.
    
    Returns a list of length N_particles with tuples (i,j), matching the i-th particle to the j-th segmentation mask.
    """

    # shift centroid array and position array to have common origin
    centroids = centroids - np.min(centroids, axis=0)
    pos = pos - np.min(pos, axis=0)

    # pair particle to mask by taking index of closest mask centroid
    pairs =[]
    for i, p in enumerate(pos):
        dists = np.linalg.norm(centroids-p, axis=1)
        pairs.append((i, np.argmin(dists)))
    return pairs

In [3]:
# Load data and metadata
cdse_data = h5py.File(fp_data,"r")

X_all = np.array(cdse_data["data"])
Y_all = np.array(cdse_data["masks"])

cdse_data.close()

# reduce metadata data frame to just the columns we need
metadata = pd.read_pickle(fp_metadata)
metadata = metadata[["N_particles", "particle_00", "particle_01", "particle_02", "particle_03", "particle_04"]]

NotImplementedError: cannot instantiate 'PosixPath' on your system

In [None]:
N_particles_total = metadata["N_particles"].sum()

# initialize data for particle-mask pairing
px_size = 0.2 # angstroms
yy, xx = np.meshgrid(np.arange(512),np.arange(512))

# preallocate new arrays
X_new = np.zeros(shape=(N_particles_total,512,512), dtype='float32')
mask_new = np.zeros(shape=(N_particles_total, 512, 512), dtype='float32')
Y_new = np.zeros(shape=(N_particles_total,2), dtype='float32')

# for each image
count = 0 # running index
for i in tqdm(range(X_all.shape[0])):
    # get the multilabels
    success, multi_labels = make_multi_labels(Y_all[i], metadata, i)

    if not success:
        continue

    # pair labels to positions
    pos = np.array(get_positions(metadata, i))
    centroids = get_centroids(multi_labels, px_size, xx,yy)
    pairs = pair_labels_to_particles(centroids, pos)

    # for each particle
    for j in range(len(multi_labels)):
        # add data into array
        X_new[count,:,:] = X_all[i,:,:]
        mask_new[count,:,:] = multi_labels[pairs[j][1]]

        # get defect label from metadata
        if metadata.iloc[i]["particle_%02i" % j]["N_defects"] > 0:
            Y_new[count, 1] = 1
        else:
            Y_new[count, 0] = 1

        count += 1

In [None]:
def normalize(data):
    # Reshape the data into a 2D array
    nsamples, nx, ny = data.shape
    data = data.reshape((nsamples, nx*ny))
    
    # Normalize the data
    scaler = StandardScaler()
    scaler.fit(data)
    data = scaler.transform(data)
    
    return data, scaler

def split_and_transform(data, labels, scaler, split):
    # Normalize the data using the scaler
    #data = scaler.transform(data)
    
    # Split the data into training, validation, and testing sets
    N_data = data.shape[0]
    N_train = int(N_data * split[0])
    N_val = int(N_data * split[1])
    N_test = int((N_data *split[2]))
    
    train_data = data[:N_train, :]
    train_labels = labels[:N_train, :]
    val_data = data[N_train:N_train+N_val, :]
    val_labels = labels[N_train:N_train+N_val, :]
    test_data = data[N_train+N_val:N_train+N_val+N_test, :]
    test_labels = labels[N_train+N_val:N_train+N_val+N_test, :]
    
    # Transform the data into PyTorch tensors and reshape
    train_data = torch.from_numpy(train_data).to(torch.float32).reshape(-1, 512, 512)[:, None, :, :]
    val_data = torch.from_numpy(val_data).to(torch.float32).reshape(-1, 512, 512)[:, None, :, :]
    test_data =  torch.from_numpy(test_data).to(torch.float32).reshape(-1, 512, 512)[:, None, :, :]
    
    return train_data, train_labels, val_data, val_labels, test_data, test_labels

# Normalize the defect data
defect_data = X_new*mask_new
defect_data, scaler = normalize(defect_data)
#X_new , scaler = normalize(X_new)
# Splitting the data into training, validation, and testing sets
train_split, val_split, test_split = 0.7, 0.2, 0.1

# Split and transform the defect data

train_data, train_labels, val_data, val_labels, test_data, test_labels = split_and_transform(defect_data, Y_new, scaler, (train_split, val_split, test_split))

In [None]:
class DefectCountDataset(Dataset):
    def __init__(self, images, labels, transform=None, target_transform=None):
        self.img = images
        self.labels = labels
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = self.img[idx,:,:]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

train_dataset = DefectCountDataset(train_data, train_labels)
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)


val_dataset = DefectCountDataset(val_data, val_labels)
val_dataloader = DataLoader(val_dataset, batch_size=16, shuffle=True)


test_dataset = DefectCountDataset(test_data, test_labels)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=True)


In [None]:
class Flatten(torch.nn.Module):
    def forward(self, input):
        if len(input.shape) < 4:
            input = input[None,:,:,:]
           
        else:
            input = input
        #input = input.reshape(input.size(0), -1)
        return input.reshape(-1, input.size(1)*input.size(2)*input.size(3))

In [None]:
import torch
import torch.nn as nn

class ClassificationNN(nn.Module):
    def __init__(self, input_channel, num_classes, num_conv_layers, conv_output_channels, kernel_size, stride, padding, 
                 activations,dropout_probs, use_batchnorm, use_maxpool,Maxpool_size, Height, Width):
        super(ClassificationNN, self).__init__()
        
        # Initialize list of layers
        layers = []
        
        # Create convolutional layers
        for i in range(num_conv_layers):
            # Create convolutional layer
            conv = nn.Conv2d(in_channels=input_channel,out_channels=conv_output_channels[i],kernel_size=kernel_size,stride =stride,padding=padding)
            # Initialize weights using Kaiming initialization
            nn.init.kaiming_normal_(conv.weight, nonlinearity='relu')
            input_channel = conv_output_channels[i]
            
            # Add convolutional layer to model
            layers.append(conv)
            
            # Add activation function
            layers.append(activations[i])

            # Calulate what the final dimensions will be (w/o Maxpool, see below)
            New_Height = ((Height - kernel_size + 2*padding)//stride + 1)

            New_Width = ((Width - kernel_size + 2*padding)//stride + 1)

            # Add dropout layer if specified
            if dropout_probs[i] > 0:
                dropout_layer = nn.Dropout2d(p=dropout_probs[i])
                if not self.training:
                    dropout_layer.p = 0
                layers.append(dropout_layer)

            # Add batch normalization layer if specified
            if use_batchnorm[i]:
                layers.append(nn.BatchNorm2d(conv_output_channels[i]))
            
            # Add max pooling layer if specified
            if use_maxpool[i]:
                layers.append(nn.AvgPool2d(Maxpool_size))
                # Calulate what the final dimensions will be (if there's a maxpool)
                New_Height = New_Height  // Maxpool_size
                New_Width = New_Width // Maxpool_size

            # Update the final dimensions values
            Height = New_Height
            Width = New_Width
            #print(Width)
                    
        # Flatten layer
        layers.append(Flatten())


        # Add fully connected layer
        Dims = conv_output_channels[i] * (New_Height*New_Width)
        layers.append(nn.Linear(Dims, num_classes ))
        
        # Add softmax activation function
        layers.append(nn.Softmax(dim=1))
        
        # Create Sequential model
        self.Conv_layers = nn.Sequential(*layers)
        
    def forward(self, x):
        return self.Conv_layers(x)

# Define model
model = ClassificationNN(input_channel=1, num_classes=2, num_conv_layers=3, conv_output_channels = [8, 16,16], kernel_size = 7, stride = 2,
                         padding = 2,activations=[nn.ReLU(),nn.ReLU(),nn.ReLU()],dropout_probs = [0.05, 0.15, 0.1], use_batchnorm=[False, True, False], 
                         use_maxpool=[True, True, False], Maxpool_size = 2, Height = 512, Width = 512)
print(model)
#nn.Tanh(),nn.Tanh(),nn.Tanh(),nn.Tanh(),nn.Tanh(),nn.Tanh()

In [None]:
import torch.optim as optim
criterion = torch.nn.MSELoss()
optimizer = optim.RMSprop(model.parameters(), lr=1, weight_decay =0.01)

In [None]:
from torch.optim.lr_scheduler import StepLR
# Initialize empty lists to store training and validation losses
train_loss = []
val_loss = []
learning_rates = []
# Set the number of epochs to train the model
n_epochs = 30

# Define the learning rate scheduler
scheduler = StepLR(optimizer, step_size= 5, gamma=0.5)

# Loop over the number of epochs
for epoch in range(1, n_epochs+1): 
    # Initialize running training and validation losses to 0
    running_loss_train = 0.0
    running_loss_val = 0.0
    
    # Set the model to train mode
    model.train()
    
    # Loop over the training data
    for train_data in train_dataloader:
        # Clear the gradients of all optimized variables
        optimizer.zero_grad()
        # Retrieve images and image labels from the dataloader
        train_images, train_labels = train_data
        # Forward pass: compute predicted outputs by passing inputs to the model
        train_outputs = model(train_images)
        # Calculate the loss
        loss = criterion(train_outputs, train_labels)
        # Backward pass: compute gradient of the loss with respect to model parameters
        loss.backward()
        # Perform a single optimization step (parameter update)
        optimizer.step()
        # Update the running training loss
        running_loss_train += loss.item() 
        
    # Set the model to evaluation mode
    model.eval()  
    
    # Loop over the validation data with torch.no_grad() to disable gradient calculations
    with torch.no_grad():
        for val_data in val_dataloader:
            # Retrieve images and image labels from the dataloader
            val_images, val_labels = val_data
            # Forward pass: compute predicted outputs by passing inputs to the model
            val_outputs = model(val_images)
            # Calculate the loss
            loss = criterion(val_outputs, val_labels)
            # Update the running validation loss
            running_loss_val += loss.item()
        
    # Calculate and store the average training loss for the epoch
    running_loss_train /= len(train_dataloader)
    train_loss.append(running_loss_train)
    
    # Calculate and store the average validation loss for the epoch
    running_loss_val /= len(val_dataloader)
    val_loss.append(running_loss_val)

    # retrieve the learning rate at the current epoch
    lr = optimizer.param_groups[0]['lr']
    # store the learning rate in a list
    learning_rates.append(lr)

    # Print the average training and validation losses for the epoch
    print('Epoch: {}/{} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f} \tlr: {:.10f}'.format(
        epoch, 
        n_epochs,
        running_loss_train,
        running_loss_val,
        lr
        ))
    

    # Step the learning rate scheduler
    scheduler.step()
    
# Plot the training and validation losses
plt.plot(range(1,n_epochs+1), train_loss, 'g', label='Training loss')
plt.plot(range(1,n_epochs+1), val_loss, 'r', label='Validation loss')
plt.title('Training and Validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
 # set the model to evaluation mode
model.eval()

# initialize variables to store the total number of correct predictions and total number of predictions
correct = 0
total = 0
running_loss_test = 0
N_images = len(test_data)*2
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    for data in test_dataloader:
        images, labels = data
        # calculate outputs by running images through the network
        outputs = model(images)       
        # the class with the highest energy is what we choose as prediction
        _, predicted = torch.max(outputs.data,1)
        _, label = torch.max(labels.data,1)
        total += labels.size(0)
        correct += (predicted == label).sum().item()

print(f'Accuracy of the network on the {N_images} test images: {100 * correct // total} %')