In [4]:
import matplotlib.pyplot as plt
import seaborn as sns
import keras
from keras import layers
from keras.models import Sequential
from keras.layers import Dense, Conv2D , MaxPool2D , Flatten , Dropout , BatchNormalization
from keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report,confusion_matrix
from keras.callbacks import ReduceLROnPlateau
import cv2
import os
import tensorflow as tf
import pandas as pd
from PIL import Image
import numpy as np
import time 
import PIL
from IPython import display

def get_training_data(image_path):
    """Loads an image from the given path into a numpy array."""
    with Image.open(image_path) as img:
        return np.array(img)

In [5]:
df = pd.read_csv('./mTBI/eye_motion_trace/00029_U_4_19_2018_9_18_9_V001.csv')

# Actual project

In [6]:
def extract_frame(video_path, time_in_seconds):
    # Open the video file
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print("Error: Could not open video file.")
        return None

    # Get the frame rate of the video
    fps = cap.get(cv2.CAP_PROP_FPS)

    # Calculate the frame number to be captured
    frame_number = int(time_in_seconds * fps)

    # Set the frame position
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)

    # Read the frame
    ret, frame = cap.read()

    # Release the video capture object
    cap.release()

    if ret:
        return frame  # Return the frame if successfully captured
    else:
        print("Error: Could not retrieve frame at given time.")
        return None

### Extracting data

In [7]:
def convert_combine(arrays):
    grayscaled = [np.mean(array, axis=2, keepdims=True) for array in arrays]
    combined = np.stack(grayscaled, axis=0)
    
    # list of all the frames
    
    return combined
    # extract_frame('./mTBI/video_sequence/00029_U_4_19_2018_9_18_9_V001.avi', 1).shape

def avi_parser(avi_file="./mTBI/video_sequence/00029_U_4_19_2018_9_18_9_V001.avi"):
    temp = []
    
    # avi_file = "00029_U_4_19_2018_9_18_9_V001.avi"
    # Open the video file
    cap = cv2.VideoCapture(avi_file)
    
    # Check if the video file opened successfully
    if not cap.isOpened():
        print("Error: Could not open video file.")
    else:
        # Loop through each frame
        while True:
            ret, frame = cap.read()
            if not ret:
                break  # Break the loop if there are no frames left
    
            # Process the frame here
            # For example, you can display the frame using cv2.imshow("Frame", frame)
            # print(type(frame))
            temp.append(frame)
    
    # Release the video capture object
    cap.release()
    cv2.destroyAllWindows()
    return temp

In [8]:
avi_obj = avi_parser("./mTBI/video_sequence/00029_U_4_19_2018_9_18_9_V001.avi")
avi_obj[0].shape

(512, 512, 3)

In [9]:
avi_train = convert_combine(avi_obj)
type(avi_train)

numpy.ndarray

In [10]:
def shape_finder(csv_directory= './mTBI/eye_motion_trace'):
    # Directory containing CSV files
    csv_directory = './mTBI/eye_motion_trace'
    
    # Loop through each file in the directory
    for filename in os.listdir(csv_directory):
        if filename.endswith('.csv'):
            # Construct the full file path
            file_path = os.path.join(csv_directory, filename)
    
            # Read the CSV file into a DataFrame
            temp = pd.read_csv(file_path)
    
            # Print the filename and shape of the DataFrame
            print(f"File: {filename}, Shape: {temp.shape[0]}")

In [11]:
avi_train = avi_train.reshape(avi_train.shape[0], 512, 512, 1).astype('float32')
avi_train_images = (avi_train - 127.5) / 127.5  # Normalize the images to [-1, 1]
avi_train_images.shape

(300, 512, 512, 1)

### Getting noise from vectors

In [12]:
df_array = df.drop(columns=['time[s]']).to_numpy()

In [13]:
df_array

array([[-9.97400e-01,  2.69573e+01, -1.22000e-02, ...,  2.70000e+01,
        -1.22000e-02,  2.47200e-01],
       [-9.97600e-01,  2.43766e+01, -1.22000e-02, ..., -1.00000e+00,
        -1.22000e-02, -9.20000e-03],
       [-9.97900e-01,  2.18332e+01, -1.22000e-02, ..., -1.00000e+00,
        -1.22000e-02, -9.20000e-03],
       ...,
       [-1.84842e+01, -2.28521e+01, -2.25600e-01, ..., -8.00000e+00,
        -2.31900e-01, -7.32000e-02],
       [-1.85297e+01, -2.39695e+01, -2.26200e-01, ..., -8.00000e+00,
        -2.31900e-01, -7.32000e-02],
       [-1.85716e+01, -2.50233e+01, -2.26700e-01, ..., -3.50000e+01,
        -2.31900e-01, -3.20400e-01]])

In [14]:
# Number of rows in the original array
num_rows = df_array.shape[0]

# Generate noise (random numbers, for example)
# Adjust the parameters of np.random.rand() as needed
noise = np.random.rand(num_rows, 92)

# Concatenate the original array and the noise array
combined_array = np.concatenate([df_array, noise], axis=1)
combined_array

array([[-9.97400000e-01,  2.69573000e+01, -1.22000000e-02, ...,
         5.63321910e-01,  7.99001740e-01,  3.94848256e-02],
       [-9.97600000e-01,  2.43766000e+01, -1.22000000e-02, ...,
         1.86606109e-01,  2.58139274e-01,  2.51103863e-02],
       [-9.97900000e-01,  2.18332000e+01, -1.22000000e-02, ...,
         3.24500055e-01,  4.81204305e-01,  8.70232442e-01],
       ...,
       [-1.84842000e+01, -2.28521000e+01, -2.25600000e-01, ...,
         4.22212620e-02,  7.96284929e-01,  1.78540979e-01],
       [-1.85297000e+01, -2.39695000e+01, -2.26200000e-01, ...,
         8.84062961e-02,  9.85273430e-01,  5.26902718e-01],
       [-1.85716000e+01, -2.50233000e+01, -2.26700000e-01, ...,
         6.97287123e-01,  4.61467631e-01,  5.15684150e-01]])

# Side quest

In [15]:
BUFFER_SIZE = 60000
BATCH_SIZE = 256

In [16]:
# Batch and shuffle the data
data_set = tf.data.Dataset.from_tensor_slices(avi_train_images).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

In [17]:
def make_generator_model():
    model = tf.keras.Sequential()
    # Start with a dense layer that reshapes into a 8x8x1024 tensor
    model.add(layers.Dense(8*8*1024, use_bias=False, input_shape=(100,)))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Reshape((8, 8, 1024)))
    # Size becomes 8x8x1024 here

    # Upsample to 16x16
    model.add(layers.Conv2DTranspose(512, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    # Size becomes 16x16x512 here

    # Upsample to 32x32
    model.add(layers.Conv2DTranspose(256, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    # Size becomes 32x32x256 here

    # Upsample to 64x64
    model.add(layers.Conv2DTranspose(128, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    # Size becomes 64x64x128 here

    # Upsample to 128x128
    model.add(layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    # Size becomes 128x128x64 here

    # Upsample to 256x256
    model.add(layers.Conv2DTranspose(32, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    # Size becomes 256x256x32 here

    # Upsample to 512x512
    model.add(layers.Conv2DTranspose(1, (5, 5), strides=(2, 2), padding='same', use_bias=False, activation='tanh'))
    # Final size becomes 512x512x1

    return model

In [18]:
generator = make_generator_model()

In [19]:
def make_discriminator_model():
    model = tf.keras.Sequential()
    model.add(layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same',
                                     input_shape=[512, 512, 1]))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    model.add(layers.Flatten())
    model.add(layers.Dense(1))

    return model

In [20]:
discriminator = make_discriminator_model()

In [21]:
# This method returns a helper function to compute cross entropy loss
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [22]:
# descrim loss function
def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

In [23]:
# generator loss
def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

In [24]:
generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)

In [25]:
# checkpoint
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=generator,
                                 discriminator=discriminator)

In [26]:
EPOCHS = 10
noise_dim = 100
num_examples_to_generate = 1

# You will reuse this seed overtime (so it's easier)
# to visualize progress in the animated GIF)
seed = combined_array[0]


In [27]:
# Notice the use of `tf.function`
# This annotation causes the function to be "compiled".
@tf.function
def train_step(images):
    noise = tf.random.normal([BATCH_SIZE, noise_dim])
    
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
      generated_images = generator(noise, training=True)
      print(images.shape)
      real_output = discriminator(images, training=True)
      print('DEBUB NOISE:', noise)
      fake_output = discriminator(generated_images, training=True)

      gen_loss = generator_loss(fake_output)
      disc_loss = discriminator_loss(real_output, fake_output)

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

In [28]:
def train(dataset, epochs):
  for epoch in range(epochs):
    start = time.time()
    for image_batch in dataset:
      train_step(image_batch)
    # Produce images for the GIF as you go
    display.clear_output(wait=True)
    generate_and_save_images(generator,
                             epoch + 1,
                             seed)
    # Save the model every 15 epochs
    if (epoch + 1) % 15 == 0:
      checkpoint.save(file_prefix = checkpoint_prefix)

    print ('Time for epoch {} is {} sec'.format(epoch + 1, time.time()-start))

  # Generate after the final epoch
  display.clear_output(wait=True)
  generate_and_save_images(generator,
                           epochs,
                           seed)

In [29]:
def generate_and_save_images(model, epoch, test_input):
  # Notice `training` is set to False.
  # This is so all layers run in inference mode (batchnorm).
  predictions = model(test_input, training=False)

  fig = plt.figure(figsize=(4, 4))

  for i in range(predictions.shape[0]):
      plt.subplot(4, 4, i+1)
      plt.imshow(predictions[i, :, :, 0] * 127.5 + 127.5, cmap='gray')
      plt.axis('off')

  plt.savefig('image_at_epoch_{:04d}.png'.format(epoch))
  plt.show()

In [None]:
# Train the model
train(data_set, EPOCHS)

(256, 512, 512, 1)
DEBUB NOISE: Tensor("random_normal:0", shape=(256, 100), dtype=float32)
(256, 512, 512, 1)
DEBUB NOISE: Tensor("random_normal:0", shape=(256, 100), dtype=float32)


In [None]:
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

In [None]:
# Display a single image using the epoch number
def display_image(epoch_no):
  return PIL.Image.open('image_at_epoch_{:04d}.png'.format(epoch_no))

In [None]:
display_image(EPOCHS)