# Data generation

## image data

In [None]:
import numpy as np
from PIL import Image
import scipy.ndimage as ndi

def create_number_one_image(size, position, figure_width=6):
    """
    Creates an image with a figure of the number '1' at a specified position.
    The intensity is highest at the center and decays towards the boundary.
    """
    img = np.zeros(size, dtype=np.float32)
    cx, cy = position

    # Create a vertical bar in the middle of the image
    for x in range(cx - figure_width // 2, cx + figure_width // 2):
        for y in range(cy - figure_width // 2, cy + figure_width // 2):
            if 0 <= x < size[0] and 0 <= y < size[1]:
                img[x, y] = 1.0
    
    # Apply Gaussian decay for smooth intensity
    gaussian =1.3* ndi.gaussian_filter(img, sigma=figure_width / 3.0, mode='constant', cval=0.0)
    
    # Normalize to range [0, 1]
    img = np.clip(gaussian, 0, 1)
    
    return Image.fromarray((img * 255).astype(np.uint8))

def generate_dataset(num_samples, size=(64, 64), figure_width=6):
    """
    Generate a dataset of images with the figure of number '1' at random locations.
    """
    images = []
    for _ in range(num_samples):
        x = size[0] // 2+ int(np.clip(np.random.randn()*size[0] // 4,figure_width // 2-size[0]//2  , size[0]//2 - figure_width // 2))
        y = size[0] // 2+ int(np.clip(np.random.randn()*size[0] // 4,figure_width // 2-size[0]//2  , size[0]//2 - figure_width // 2))

        img = create_number_one_image(size, (x, y), figure_width)
        images.append(np.array(img))
    return np.array(images)

def generate_video_data(num_frames, size=(64, 64), figure_width=10):
    """
    Generate a video dataset with the figure of number '1' moving around.
    """
    frames = []
    for _ in range(num_frames):
        x = np.random.randint(figure_width // 2, size[0] - figure_width // 2)
        y = np.random.randint(0, size[1])
        img = create_number_one_image(size, (x, y), figure_width)
        frames.append(img)
    return np.array(frames)


In [None]:
# Parameters
train_samples = 20000
test_samples = 20000
image_size = (64, 64)

# Generate and save dataset
train_data = generate_dataset(train_samples, size=image_size,figure_width=10)
test_data = generate_dataset(test_samples, size=image_size,figure_width=10)

# Save datasets as .npy files
train_data=train_data.astype('float32').reshape(-1,1,64,64)/255
test_data=test_data.astype('float32').reshape(-1,1,64,64)/255
np.save('./image_data/vae_train.npy', train_data)
np.save('./image_data/vae_test.npy', test_data)

print("Datasets created and saved.")

## Video data

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.autograd import Variable
from torchvision.utils import save_image
from torch.utils.data import Dataset, DataLoader
from numpy import square

import numpy as np
import torch
import matplotlib.pyplot as plt
import imageio

import numpy as np
from PIL import Image
import scipy.ndimage as ndi

from tqdm import tqdm
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C
from VAE import VAE
num_frames=240
num_frame=240


       

def generate_brownian_motion_trajectory(num_frames, size):
    """
    Generate a Brownian motion trajectory for the number '1'.
    """
    # Define Brownian motion parameters
    dt = 1  # Time step
    sigma = 3  # Volatility (standard deviation of the increments)
    
    # Generate Brownian motion
    increments = np.random.normal(loc=0, scale=sigma * np.sqrt(dt), size=(num_frames, 2))
    trajectory = np.cumsum(increments, axis=0)
    
    # Normalize the trajectory to fit within the image bounds
    x_trajectory = np.clip(trajectory[:, 0], -size[0]//2, size[0]//2).astype(int)
    y_trajectory = np.clip(trajectory[:, 1],  -size[1]//2, size[1]//2).astype(int)
    
    return x_trajectory, y_trajectory

def generate_brownian_motion_trajectory_batch(num_frames, size,batch_size=1):
    """
    Generate a Brownian motion trajectory for the number '1'.
    """
    # Define Brownian motion parameters
    dt = 1  # Time step
    sigma = 3  # Volatility (standard deviation of the increments)
    
    # Generate Brownian motion
    increments = np.random.normal(loc=0, scale=sigma * np.sqrt(dt), size=(batch_size, num_frames, 2))
    trajectory = np.cumsum(increments, axis=1)
    
    # Normalize the trajectory to fit within the image bounds
    x_trajectory = np.clip(trajectory[:,:, 0], -size[0]//2, size[0]//2-1).astype(int)+size[0]//2
    y_trajectory = np.clip(trajectory[:,:, 1],  -size[1]//2, size[1]//2-1).astype(int)+size[1]//2
    
    return x_trajectory, y_trajectory

def generate_bouncing_motion_trajectory(num_frames, size,figure_width=10, speed=2):
    """
    Generate a trajectory where the figure starts at the center and moves towards a random direction,
    bouncing back upon reaching the boundary.
    """
    # Start at the center
    x, y = 0,0
    
    # Generate a random direction
    angle = np.random.uniform(0, 2 * np.pi)
    dx = speed * np.cos(angle)
    dy = speed * np.sin(angle)
    
    x_trajectory = []
    y_trajectory = []
    
    for _ in range(num_frames):
        x += dx
        y += dy
        
        # Check for boundary collisions and bounce back if necessary
        if x < figure_width//2-size[1]//2 or x >= size[0]//2-figure_width//2:
            dx = -dx
            x = np.clip(x, figure_width//2-size[0]//2, size[0]//2-figure_width//2 - 1)
        
        if y < figure_width//2 -size[1]//2 or y >= size[1]//2-figure_width//2:
            dy = -dy
            y = np.clip(y, figure_width//2-size[1]//2, size[1]//2-figure_width//2 - 1)
        
        x_trajectory.append(int(x))
        y_trajectory.append(int(y))
    
    return np.array(x_trajectory), np.array(y_trajectory)

def generate_figure_8_trajectory(num_frames, size,figure_width, speed=4):
    """
    Generate a figure 8 trajectory with a random starting direction.
    """
    t = np.linspace(0, 6 * np.pi, num_frames)
    x = np.sin(t)
    y = np.sin(2 * t)

    # Scale the path to fit within the image bounds
    #x = (x - np.min(x)) / (np.max(x) - np.min(x)) * (size[0]/1.2 - 1)
    #y = (y - np.min(y)) / (np.max(y) - np.min(y)) * (size[1]/1.2 - 1)

    # Rotate the path by a random angle
    angle = np.random.uniform(0, 2 * np.pi)
    cos_angle = np.cos(angle)
    sin_angle = np.sin(angle)

    x_rotated = cos_angle * x - sin_angle * y
    y_rotated = sin_angle * x + cos_angle * y


    # Ensure the trajectory is within bounds
    x_trajectory = np.clip(x_rotated*size[0]//np.max(np.abs(x)+np.abs(y))/2,figure_width//2-size[0]//2, size[0]//2-figure_width//2 - 1).astype(int)
    y_trajectory = np.clip(y_rotated*size[1]//np.max(np.abs(x)+np.abs(y))/2 ,figure_width//2-size[1]//2, size[1]//2-figure_width//2 - 1).astype(int)

    return x_trajectory, y_trajectory




def generate_video_data_batch(num_frames, size=(64, 64), figure_width=3,batch_size=1):
    """
    Generate a video dataset with the figure of number '1' following a Gaussian process trajectory.
    """
    x_trajectory, y_trajectory = generate_brownian_motion_trajectory_batch(num_frames, size,batch_size)

    frames=np.zeros((batch_size,num_frames,size[0],size[1]))
    for j in tqdm(range(batch_size)):
        for i in range(num_frames):
            x = x_trajectory[j,i] 
            y = y_trajectory[j,i]
            frames[j,i] = create_number_one_image(size, (x, y), figure_width)
    
    return frames


def generate_video_data(num_frames, size=(64, 64), figure_width=10,motion_type="brownian"):
    """
    Generate a video dataset with the figure of number '1' following a Gaussian process trajectory.
    """
    frames = []
    if motion_type=="brownian":
        x_trajectory, y_trajectory = generate_brownian_motion_trajectory(num_frames, size)
    elif motion_type=="bounce":
        x_trajectory, y_trajectory = generate_bouncing_motion_trajectory(num_frames, size, figure_width, speed=4)
        # plt.plot(x_trajectory, y_trajectory)
        # plt.xlim(0,size[0])
        # plt.ylim(0,size[1])
    elif motion_type=="8":
        x_trajectory, y_trajectory = generate_figure_8_trajectory(num_frames, size,figure_width, speed=4)
    for i in range(num_frames):
        x = x_trajectory[i] +size[0]//2
        y = y_trajectory[i]+size[1]//2
        img = create_number_one_image(size, (x, y), figure_width)
        frames.append(img)
    
    return np.array(frames)

def create_number_one_image(size, position, figure_width=6):
    """
    Creates an image with a figure of the number '1' at a specified position.
    The intensity is highest at the center and decays towards the boundary.
    """
    img = np.zeros(size, dtype=np.float32)
    cx, cy = position

    # Create a vertical bar in the middle of the image
    for x in range(cx - figure_width // 2, cx + figure_width // 2):
        for y in range(cy - figure_width // 2, cy + figure_width // 2):
            if 0 <= x < size[0] and 0 <= y < size[1]:
                img[x, y] = 1.0
    
    # Apply Gaussian decay for smooth intensity
    gaussian =1.3* ndi.gaussian_filter(img, sigma=figure_width / 3.0, mode='constant', cval=0.0)
    
    # Normalize to range [0, 1]
    img = np.clip(gaussian, 0, 1)
    
    return Image.fromarray((img * 255).astype(np.uint8))

# Function to encode video frames and create a GIF
def create_latent_gif(video, vae, gif_filename):
    video_tensor = torch.tensor(video, dtype=torch.float32) # Normalize and add channel dimension
    
    latent_representations = []
    with torch.no_grad():
        latent,_ = vae.encoder(video_tensor.reshape(num_frames,-1).cuda()) 


    latent_representations = np.array(latent.cpu())

    frames = []
    for i in range(num_frames):
        frame=video[i]
        latent=latent_representations[i]
        fig, axs = plt.subplots(1, 3, figsize=(14, 5))

        # Plot true image
        axs[0].imshow(frame.reshape(64,64))
        axs[0].set_title(f'Frame {i}')
        axs[0].axis('off')

        # Plot latent representation
        axs[1].plot(latent_representations[:i, 0], latent_representations[:i, 1], marker='o', linestyle='-', color='b', alpha=0.3)
        #axs[1].scatter(latent[0], latent[1], color='r')
        axs[1].set_title('Latent Space')
        axs[1].set_xlabel('Latent Dimension 1')
        axs[1].set_ylabel('Latent Dimension 2')
        axs[1].set_xlim(-3,3)
        axs[1].set_ylim(-3,3)
        axs[1].grid(True)

        axs[2].imshow(vae.decoder(torch.tensor(latent.reshape(1,-1)).cuda()).reshape(64,64).detach().cpu())
        axs[2].set_title(f'Reconstructed Frame {i}')
        axs[2].axis('off')

        

        # Save the plot as a frame
        fig.canvas.draw()
        image = np.frombuffer(fig.canvas.tostring_rgb(), dtype='uint8')
        image = image.reshape(fig.canvas.get_width_height()[::-1] + (3,))
        frames.append(image)

        plt.close(fig)

    # Create a GIF
    imageio.mimsave(gif_filename, frames, format='GIF', duration=0.1)

    print(f"GIF saved as {gif_filename}")



In [14]:
# parameters
batch_size=10000
num_frames=240
def video_to_latents(video_batch,vae_filename="vae_big_ball.pt"):
    vae = VAE(x_dim=64*64, h_dim1= 256, h_dim2=64, z_dim=2)
    if torch.cuda.is_available():
        vae.cuda()
    vae=torch.load("./vae_big_ball.pt")
    latents=[]
    split=20
    for i in tqdm(range(split)):
        video_small_batch=np.array(video_batch)[i*batch_size//split:(i+1)*batch_size//split]
        
        video_tensor = torch.tensor(video_small_batch.astype("float32"))/255 # Normalize and add channel dimension
        with torch.no_grad():
            latent,_ = vae.encoder(video_tensor.reshape(batch_size//split,num_frames,-1).cuda()) 
        latents.append(latent.detach().cpu())
    return torch.tensor(np.array(latents).reshape(batch_size,num_frames,2))
    

### Brownian motion

In [16]:
video_batch=generate_video_data_batch(num_frames, size=(64, 64), figure_width=10,batch_size=1000)
video_batch.shape
latents=video_to_latents(video_batch,vae_filename="vae_big_ball.pt")
torch.save(latents,"./video_data/latent_data_ball_10_240_brownian.pt")

  0%|          | 0/1000 [00:00<?, ?it/s]

100%|██████████| 1000/1000 [00:20<00:00, 47.75it/s]


(1000, 240, 64, 64)

Let's check out the covariance matrix of the latents

In [None]:
latent_x=latents[:,:,0]
latent_y=latents[:,:,1]
plt.title("Covariance of latent distribution")
plt.imshow( torch.cov(latent_x.T).detach().cpu())
plt.xlabel("latent axis 1")
plt.xlabel("latent axis 2")


### Boundary bouncing motion

In [18]:
video_batch=[]
for i in tqdm(range(batch_size)):
    video_batch.append(generate_video_data(num_frames, size=(64, 64), figure_width=10,motion_type="bounce"))
video_batch=np.array(video_batch)
latents=video_to_latents(video_batch,vae_filename="vae_big_ball.pt")
torch.save(latents,"./video_data/latent_data_ball_10_240_bounce.pt")

 81%|████████▏ | 8147/10000 [02:42<00:36, 50.26it/s]


KeyboardInterrupt: 

In [None]:
latent_x=latents[:,:,0]
latent_y=latents[:,:,1]
plt.title("Covariance of latent distribution")
plt.imshow( torch.cov(latent_x.T).detach().cpu())
plt.xlabel("latent axis 1")
plt.xlabel("latent axis 2")