<a href="https://colab.research.google.com/github/Jeongmin0658/kentech_EF_CCP/blob/main/codes/particle_based/Image_data/Image_Load.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# import torch
# if torch.backends.mps.is_available():
#     mps_device = torch.device("mps")
#     x = torch.ones(1, device=mps_device)
#     print (x)
# else:
#     print ("MPS device not found.")

# x = torch.rand(5, 3)
# print(x)

# # Check if CUDA is available, and set PyTorch to use GPU or CPU accordingly
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# print(device)
# print(torch.cuda.is_available())

In [None]:
import numpy as np
import umap.umap_ as umap
import hdbscan
import matplotlib.pyplot as plt
import seaborn as sns

class ImageLoad:

    def __init__(self):
        self.all_images = []  # List to store all the image grids

    @staticmethod
    def read_image_and_info(image, file, delimiter='\t'):
        # Load the data
        loaded_data = np.loadtxt(file, delimiter=delimiter)

        # Extract columns
        r = loaded_data[:, 0]  # radius
        m = loaded_data[:, 1]  # mass
        dm = loaded_data[:, 2]  # differential mass
        ndm = loaded_data[:, 3]  # normalized differential mass

        # Load grid
        grid = np.loadtxt(image, dtype=int)

        return grid, r, m, dm, ndm

    @staticmethod
    def extract_feature_vector(r, m, f1, f2):
#         # If the grid is 2D, flatten it
#         if len(grid.shape) == 2:
#             print ("Matrix fed?")
#             grid = grid.flatten()

        # Concatenate the vectors
        return np.concatenate([m, f1, f2])

    def cluster_images(self, p, nimage, snap_every):
        # feature vectors
        all_feature_vectors = []

        # EDEN
        p = 0
        for n in range(nimage):
            for j, i in enumerate(snap_every):
                grid, r, m, f1, f2 = self.read_image_and_info(
                    image='images/smooth/image_p%d_#%d_%d.txt' % (p, n, i),
                    file='images/smooth/info_p%d_#%d_%d.txt' % (p, n, i),
                    delimiter='\t')
                self.all_images.append(grid)  # Append the image grid to the all_images list
                feature_vector = self.extract_feature_vector(r, m, f1, f2)
                all_feature_vectors.append(feature_vector)

        # DLA
        p = 1
        for n in range(nimage):
            for j, i in enumerate(snap_every):
                grid, r, m, f1, f2 = self.read_image_and_info(
                    image='images/dendritic/image_p%d_#%d_%d.txt' % (p, n, i),
                    file='images/dendritic/info_p%d_#%d_%d.txt' % (p, n, i),
                    delimiter='\t')
                self.all_images.append(grid)  # Append the image grid to the all_images list
                feature_vector = self.extract_feature_vector(r, m, f1, f2)
                all_feature_vectors.append(feature_vector)

        # feature vector
        self.all_feature_vectors = np.vstack(all_feature_vectors)

    # Function to get all images
    def get_all_images(self):
        return self.all_images

    # Function to get all images
    def get_all_features(self):
        return self.all_feature_vectors

In [None]:
####
# Run simulation
# Outputs saved in the folder of "images/dendritic"
# XXX: sticking probability p
# YYY: image index
# ZZZ: number of deposits
# Images: image_pXXX_#YYY_ZZZ.txt
# Their characterization info: info_pXXX_#YYY_ZZZ.txt
####
p=0 # sticking prob.
nimage=512 # number of images
#
size_image=200 # square image size
ndeposit=1000 # number of deposits
snap_every=[100, 200, 400, 600, 800,1000]

# Define my method
image_loader = ImageLoad()
image_loader.cluster_images(p, nimage, snap_every)
all_images = image_loader.get_all_images()
all_feature_vectors = image_loader.get_all_features()

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Define Generator
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(z_dim + feature_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Linear(512, image_dim),
            nn.Tanh()
        )

    def forward(self, z, features):
        x = torch.cat([z, features], dim=1)
        return self.model(x)

# Define Discriminator
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(image_dim * image_dim + feature_dim, 512),  # Adjusted input size here
            nn.LeakyReLU(0.2),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2),
            nn.Linear(256, 1),
            nn.Sigmoid()
        )

    def forward(self, img, features):
        img = img.view(img.size(0), -1)  # Flatten the image tensor
        x = torch.cat([img, features], dim=1)
        return self.model(x)


In [None]:
# Check if CUDA is available, and set PyTorch to use GPU or CPU accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
print(torch.cuda.is_available())


# FEED YOUR INPUTS
features_tensor = torch.FloatTensor(all_feature_vectors)  # Your features should be fed here
all_images_array = np.array(all_images)
images_tensor = torch.FloatTensor(all_images_array)

# Hyperparameters
z_dim = 100  # Size of the noise vector
feature_dim = all_feature_vectors.shape[1]
print ("Dimension of feature vectors = %d"%feature_dim)
image_dim = all_images[0].size  # Assuming all images have the same shape
print ("Size of images = %d"%image_dim)
learning_rate = 0.0002
batch_size = 32
epochs = 1000

# Create DataLoader
dataset = TensorDataset(features_tensor, images_tensor)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Generator/Discriminator
generator = Generator().to(device)
discriminator = Discriminator().to(device)

# Loss and optimizers
criterion = nn.BCELoss()
optimizer_g = optim.Adam(generator.parameters(), lr=learning_rate)
optimizer_d = optim.Adam(discriminator.parameters(), lr=learning_rate)

# Training loop
print ("Now enter into training")
for epoch in range(epochs):
    for batch_features, batch_images in dataloader:
        batch_features, batch_images = batch_features.to(device), batch_images.to(device)

        # Train Discriminator
        optimizer_d.zero_grad()

        z = torch.randn(batch_size, z_dim).to(device)
        fake_images = generator(z, batch_features)
        labels_real = torch.ones(batch_size, 1).to(device)
        labels_fake = torch.zeros(batch_size, 1).to(device)

        logits_real = discriminator(batch_images, batch_features)
        logits_fake = discriminator(fake_images.detach(), batch_features)

        loss_real = criterion(logits_real, labels_real)
        loss_fake = criterion(logits_fake, labels_fake)

        loss_d = loss_real + loss_fake
        loss_d.backward()
        optimizer_d.step()

        # Train Generator
        optimizer_g.zero_grad()

        logits_fake = discriminator(fake_images, batch_features)
        loss_g = criterion(logits_fake, labels_real)
        loss_g.backward()
        optimizer_g.step()

    # Print losses
    if epoch % 10 == 0:
        print(f"Epoch [{epoch}/{epochs}] | Loss D: {loss_d.item():.4f} | Loss G: {loss_g.item():.4f}")



cpu
False


NameError: name 'all_feature_vectors' is not defined