#Parameters

In [122]:
IMGSIZE = 512
MNIST_SIZE = 28

MNIST_PATH = ''
MNIST_DATASET_PATH = 'dataset/mnist/'
TRAIN_DATASET_PATH = 'dataset/circle/' 

#Heat Diffusion Algorithm

In [151]:
import cv2 as cv
import numpy as np
from scipy.interpolate import RegularGridInterpolator
from scipy import ndimage
import imageio
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.animation as animationt
mpl.rc('image', cmap='gray')
import math
import time

def imshow(img):
    plt.imshow(img)
    plt.colorbar()
    # plt.axis('off')
    plt.show()

def heat_kernel(size,delta):
    data = np.zeros((size, size))
    center = size // 2
    for i in range(size):
        for j in range(size):
            x = i - center
            y = j - center
            data[i, j] = np.exp(-(x**2 + y**2)/4)/(4*np.pi*delta**2)
    data /= np.sum(data)
    return data
    
def heat_kernel_convolution(image, kernel):
    """Compute heat kernel convolution on input image"""
    # Apply kernel convolution to image
    heat_convoluted = ndimage.convolve(image, kernel, mode='reflect')
    heat_convoluted = heat_convoluted.astype(float)

    # Set values above 0.5 to 1 and below to 0
    heat_convoluted[heat_convoluted > 0.5] = 1
    heat_convoluted[heat_convoluted <= 0.5] = 0

    return heat_convoluted

def heat_diffusion(image, kernel, lapse, filepath):
  """Apply heat diffusion on image over a period"""
  images = [image]
  for i in range(lapse):
    conv_img =  heat_kernel_convolution(images[i], kernel.data)
    #append the new convoluted image 
    images.append(conv_img)
  index = 0
  for img in images:
    img *= 255
  now = str(int(time.time()))
  filename = filepath+'.gif'
  imageio.mimsave(filename, images, duration = 50)
  return images


#Generate Heat Diffusion GIF Images
Method 1: based on random generated circle

In [148]:
# generate heat kernel
KERNEL_SIZE = 5
time_step = 51200
DELTA = IMGSIZE / time_step
KERNEL = heat_kernel(KERNEL_SIZE, DELTA)
print(KERNEL)

[[0.01247764 0.02641517 0.03391775 0.02641517 0.01247764]
 [0.02641517 0.05592091 0.07180387 0.05592091 0.02641517]
 [0.03391775 0.07180387 0.09219799 0.07180387 0.03391775]
 [0.02641517 0.05592091 0.07180387 0.05592091 0.02641517]
 [0.01247764 0.02641517 0.03391775 0.02641517 0.01247764]]


In [24]:
import os
import random

# generate random circles
def generate_circle_img():
    img = np.zeros((512,512))
    num_circles = random.randint(1,4)
    for n in range(num_circles):
      cc = (random.randint(100, 400),random.randint(100, 400))
      radius = random.randint(1, 100)
      cv.circle(img,cc,radius,(1,0,255),-1)
    # imshow(img)
    return img
    
def circle_gen():
    count = 30
    path = TRAIN_DATASET_PATH    
    if not os.path.exists(path):
        os.makedirs(path)
    for i in range(count):
      img = generate_circle_img()
      heat_diffusion(img, KERNEL,100,path+str(KERNEL_SIZE)+'_'+str(DELTA))

In [None]:
circle_gen()

#Generate Heat Diffusion GIF Images
Method 2: based on MNIST dataset

In [149]:
import os
import torch
import struct
import numpy as np
from torch.utils.data import DataLoader,Dataset

#import MNIST dataset
class mnist_dataset(Dataset):
    def __init__(self,path,filekind='train'):
        self.data_path = path
        
        if filekind=='train' or filekind=='t10k':
            imgs_path = os.path.join(path,'%s-images-idx3-ubyte'%filekind)
            labels_path = os.path.join(path, '%s-labels-idx1-ubyte' % filekind)
        else:
            print("Error:filekind is only a string with 'train' or 't10k' ")
		
        with open(labels_path, 'rb') as lbpath:
            magic, n = struct.unpack('>II',lbpath.read(8))
            labels = np.fromfile(lbpath,dtype=np.uint8)

        with open(imgs_path, 'rb') as imgpath:
            magic, num, rows, cols = struct.unpack('>IIII',imgpath.read(16))
            images = np.fromfile(imgpath,dtype=np.uint8).reshape(len(labels), 784)

        self.labels = labels
        self.images = images

    def __getitem__(self, index):
        temp_label = self.labels[index]
        temp_image = self.images[index]
        temp_image = torch.tensor(temp_image.reshape(28,28),dtype=torch.float32)[None,...]

        return temp_label,temp_image

    def __len__(self):
        return len(self.labels)
        
def get_MNIST_data():
    my_mnist = mnist_dataset(MNIST_PATH,'train')
    return DataLoader(dataset=my_mnist, batch_size=20, shuffle=True)

def mnist_gen():
  m = get_MNIST_data()
  image_path = MNIST_DATASET_PATH+'image/'
  gif_path = MNIST_DATASET_PATH+'gif/'
  if not os.path.exists(image_path):
    os.makedirs(image_path)
  if not os.path.exists(gif_path):
    os.makedirs(gif_path)
  for label, image in m:
    image = np.array(image[0][0],dtype=np.float32)
    now = str(int(time.time()))
    label = str(label[0].item())+'_'+now
    cv.imwrite(image_path + label +'.png', image)
    image = image/255
    heat_diffusion(image, KERNEL, 10, gif_path+label)

In [152]:
mnist_gen()

#Loading Data

In [160]:
import os
import torch
from torch.utils.data import DataLoader,Dataset
import torchvision.transforms as transforms
import cv2 as cv
from PIL import Image
# Define your dataset class
class gifdataset(Dataset):
    def __init__(self, folder, transform=None):
        self.folder = folder
        gif_folder = self.folder +'gif/'
        image_folder = self.folder +'image/'
        self.train_gif_file_paths = [os.path.join(gif_folder, gif_file) for gif_file in os.listdir(gif_folder)]
        self.train_image_file_paths = [os.path.join(image_folder, image_file) for image_file in os.listdir(image_folder)]
        self.transform = transform
    
    def __len__(self):
        return len(self.train_gif_file_paths)
    
    def __getitem__(self, idx):
        gif_root = self.train_gif_file_paths[idx]
        gif_name = gif_root.split(os.path.sep)[-1]
        
        image_root = self.train_image_file_paths[idx]
       
        image = Image.open(image_root)
        image = transforms.ToTensor()(image)
           
        gif = cv.VideoCapture(gif_root)
        frames = []
        gif_data = []
        while True:
            ret, frame = gif.read()
            if not ret:
                break
            
            frame = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
            ret, frame = cv.threshold(frame, 0, 255,cv.THRESH_BINARY) 
            
            frames.append(frame)
        gif.release()
       
        if self.transform is not None:
            for i in range(frames.__len__()):       
                gif_data.append(self.transform(frames[i])[0])
        if gif_data is not None:
            gif_data =  torch.stack(gif_data)
        
        return gif_data,image[0]

# Set up transformations for your input images 
transform = transforms.Compose([
    transforms.ToTensor()
])

def get_train_data_loader():
    dataset = gifdataset(TRAIN_DATASET_PATH, transform=transform)
    return DataLoader(dataset, batch_size=1, shuffle=True)

def get_MNIST_data_loader():
    dataset = gifdataset(MNIST_DATASET_PATH, transform=transform)
    return DataLoader(dataset, batch_size=1,shuffle=True)



In [154]:
data = get_MNIST_data_loader()

# a = 0
# for d,t in data:
#     for i in range(d.shape[1]):
#         cv.imwrite(str(i) +'.png', d[0][i])
        

# print(a)

#CNN Model for MNIST

In [173]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class RNN(nn.Module):
    def __init__(self, input_size, output_size, hidden_dim, n_layers):
        super(RNN, self).__init__()
        # define dimensions
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.output_size = output_size
        # RNN Layer
        self.rnn = nn.RNN(input_size, hidden_dim, n_layers)
        
        self.fc = nn.Linear(self.hidden_dim, self.output_size)

    def forward(self, x):
        
        # out = x.reshape(1,-1)
        
        batch_size = x.size(1)
        # Initializing hidden state for first input using method defined below
        hidden = self.init_hidden(batch_size)
        
        # Passing in the input and hidden state into the model and obtaining outputs
        out, hidden  = self.rnn(x, hidden)
        
        # out = np.heaviside(out,1)
        # # Reshaping the outputs such that it can be fit into the fully connected layer
        # out = out.contiguous().view(-1, self.hidden_dim)
        out = out.detach().numpy()
        out = np.heaviside(out,1)
        out = torch.tensor(out)
        return out
        
    def init_hidden(self, batch_size):
        # This method generates the first hidden state of zeros which we'll use in the forward pass
        # We'll send the tensor holding the hidden state to the device we specified earlier as well
        hidden = torch.zeros(self.n_layers, batch_size, self.hidden_dim)
        return hidden

In [177]:
import torch
import torch.nn as nn
from torch.autograd import Variable

# Initialize the model, loss function, and optimizer    

# Initialize the networks
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
criterion = nn.MSELoss()
model = RNN(input_size=28*28, output_size=6, 
            hidden_dim=28*28, n_layers=1).to(device)  
# networks = [model for _ in range(11)]   
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# Training loop
num_epochs = 10
outputs = []
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    train_loader = get_MNIST_data_loader()
    
    for gif, time in train_loader:
        gif = gif[0]
        gif = gif.reshape(gif.shape[0],gif.shape[1]*gif.shape[2])
        
        # gif shape: 11*28*28
        # RNN input:(SeqLen * batchsize * inputsize)
        # RNN output:(SeqLen * batchsize * hiddensize)
        # seqLen = ,batch size =1, input size = 28*28=784
        inputs = gif.unsqueeze(1)
           
        output = model(inputs)
        # print(output.shape)
        outputs.append(output)
        loss = criterion(output,inputs)
        
        loss.requires_grad = True
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}")

# Save the trained model
torch.save(model.state_dict(), "kernel_prediction_model.pth")


Epoch 1/10, Loss: 0.4932776475325227
Epoch 2/10, Loss: 0.4932776475325227
Epoch 3/10, Loss: 0.4932776475325227
Epoch 4/10, Loss: 0.4932776475325227
Epoch 5/10, Loss: 0.4932776475325227
Epoch 6/10, Loss: 0.4932776475325227
Epoch 7/10, Loss: 0.4932776475325227
Epoch 8/10, Loss: 0.4932776475325227
Epoch 9/10, Loss: 0.4932776475325227
Epoch 10/10, Loss: 0.4932776475325227


#CNN Model for Circle

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.layer1 = nn.Conv2d(1, 10, kernel_size=3, padding=1)
        self.pool = nn.MaxPool3d(2,2)
        self.fc1 = nn.Linear(32 * 128 * 128, 64) 
        self.fc2 = nn.Linear(64,KERNEL_SIZE)

    def forward(self, x):
        x = self.layer1(x)
        x = torch.where(x > 0.5, torch.ones_like(x), torch.zeros_like(x))
        x = self.pool(x)
        x = x.view(-1, 32 * 128 * 128)  # Flatten the output for fully connected layers
        x = self.fc1(x)
        x = self.fc2(x)
        return x

In [None]:
import torch
import torch.nn as nn
from torch.autograd import Variable

# Initialize the model, loss function, and optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNN().to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    train_loader = get_train_data_loader()
    for inputs, kernel in train_loader:
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, kernel)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}")

# Save the trained model
torch.save(model.state_dict(), "kernel_prediction_model.pth")
