In [3]:
from zipfile import ZipFile
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
from torchvision import transforms
import torch.optim as optim
import numpy as np
from torch.utils.data import Subset
import matplotlib.pyplot as plt
import os
import cv2
import shutil

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.layers import Dense, Flatten, Dropout, GlobalAveragePooling1D
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical


# ViT specific imports
from tensorflow.keras.layers import LayerNormalization, MultiHeadAttention, Add


In [None]:
#Testing with basic transformer (From lecture)

In [2]:
#Encoder
class TransformerEncoder(nn.Module):
  def __init__(self, input_size, hidden_size):
    super(TransformerEncoder, self).__init__()
    self.linear_q = nn.Linear(input_size, hidden_size)
    self.linear_k = nn.Linear(input_size, hidden_size)
    self.linear_v = nn.Linear(input_size, hidden_size)
    self.linear_x = nn.Linear(input_size, hidden_size)
    self.attention = nn.MultiheadAttention(hidden_size, num_heads=4, batch_first=True)
    self.fc = nn.Sequential(
    nn.Linear(hidden_size, hidden_size),
    nn.ReLU(),
    nn.Linear(hidden_size, hidden_size))
    self.norm = nn.LayerNorm(hidden_size)
  def forward(self, x):
    q, k, v = self.linear_q(x), self.linear_k(x), self.linear_v(x)
    x = self.norm(self.linear_x(x) + self.attention(q, k, v))
    x = self.norm(x + self.fc(x))
    return x

In [3]:
#Classifier
class TweetTransformer(nn.Module):
  def __init__(self, input_size, hidden_size, num_class):
    super(TweetTransformer, self).__init__()
    self.emb = nn.Embedding.from_pretrained(glove.vectors)
    self.encoder = TransformerEncoder(input_size, hidden_size)
    self.fc = nn.Linear(hidden_size, num_class)
  def forward(self, x, pos):
    # Add GloVe vectors to positional encoding
    x = self.emb(x) + pos
    x = self.encoder(x)
    # Add embeddings from transformer encoding to get tweet embedding
    x = torch.sum(x, -1)
    # Classify
    return self.fc(x)


In [None]:
#Real Transformer

In [4]:
class PatchEmbedding(layers.Layer):
    def __init__(self, image_size, patch_size, projection_dim):
        super(PatchEmbedding, self).__init__()
        self.image_size = image_size
        self.patch_size = patch_size
        self.projection_dim = projection_dim
        self.num_patches = (image_size // patch_size) ** 2
        self.projection = tf.keras.layers.Dense(projection_dim)
        self.position = self.add_weight(
            name="position_embeddings",
            shape=(1, self.num_patches, projection_dim),
            initializer='random_normal'
        )

    def call(self, patches):
        batch_size = tf.shape(patches)[0]
        # Extract patches
        patches = tf.image.extract_patches(
            images=patches,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding='VALID'
        )
        # Flatten the patches
        patch_dim = patches.shape[-1]
        patches = tf.reshape(patches, (batch_size, self.num_patches, patch_dim))
        embeddings = self.projection(patches)
        embeddings = tf.cast(embeddings, dtype=tf.float32)
        position = tf.cast(self.position, dtype=tf.float32)
        return embeddings + position


In [None]:
#Patch embedding layer
'''class PatchEmbedding(layers.Layer):
  def __init__(self, num_patches, projection_dim):
    super(PatchEmbedding, self).__init__()
    self.num_patches = num_patches
    self.projection = tf.keras.layers.Dense(projection_dim)
    self.position = self.add_weight(name="position_embeddings",
            shape=(1, num_patches, projection_dim),
            initializer='random_normal')
    #self.position = self.add_weight("position_embeddings", )

  def call(self, patches):
    embeddings = self.projection(patches)
      # Ensure that position embeddings are the correct dtype
    embeddings = tf.cast(embeddings, dtype=tf.float32)
    position = tf.cast(self.position, dtype=tf.float32)
    return embeddings + position
'''

  ''' def __init__(self, num_patches, projection_dim):
    super(PatchEmbedding, self).__init__()
    self.num_patches = num_patches
    self.projection = layers.Dense(units=projection_dim)
    self.position = tf.Variable(initial_value = tf.random.normal([1, num_patches, projection_dim]), trainable=True)

  def call(self, patches):
    embeddings = self.projection(patches)
    # Reshape embeddings to match position embeddings
    #embeddings = tf.reshape(embeddings, [-1, self.num_patches, self.projection.units])
    return embeddings + self.position # Add position embeddings after reshaping
'''


In [5]:
#Encoder Layer
class TransformerEncoderLayer(layers.Layer):
  def __init__(self, num_heads, embedding_dim, mlp_dim, dropout_rate=0.1):
    super(TransformerEncoderLayer, self).__init__()
    self.layer_norm1 = LayerNormalization()
    self.multi_head_attention = MultiHeadAttention(num_heads, embedding_dim)
    self.add1 = Add()
    self.layer_norm2 = layers.LayerNormalization()
    self.mlp = models.Sequential([
      layers.Dense(mlp_dim, activation='relu'),
      layers.Dense(embedding_dim)])
    self.add2 = Add()
    self.dropout = Dropout(dropout_rate)

  def call(self, x):
    #multi head attention
    x1 = self.layer_norm1(x)
    attention_output = self.multi_head_attention(x1,x1)
    x2 = self.add1([x, attention_output])

    #FF network
    x3 = self.layer_norm2(x2)
    x3 = self.mlp(x3)
    x4 = self.add2([x2, x3])
    return self.dropout(x4)

In [6]:
#Vision Transformer Model
def vit_model(image_size, patch_size, num_layers, num_heads, projection_dim, mlp_dim, dropout_rate, num_classes):
  num_patches = (image_size // patch_size) ** 2
  inputs = layers.Input(shape=(image_size, image_size, 3))
  #projection_dim = (patch_size * patch_size * 3)

  #patches = PatchEmbedding(num_patches, projection_dim)(inputs)

  #patches = PatchEmbedding((num_patches, (patch_size * patch_size * 3)))(inputs)

  #Patch embedding
  x = PatchEmbedding(image_size, patch_size, projection_dim)(inputs)#(patches)

  #Transformer encoding
  for _ in range(num_layers):
    x = TransformerEncoderLayer(num_heads, projection_dim, mlp_dim, dropout_rate)(x)

  #Classification head
  x = LayerNormalization()(x)
  #x = Flatten()(x)
  x = GlobalAveragePooling1D()(x)
  x = Dense(mlp_dim, activation='relu')(x)
  x = Dropout(dropout_rate)(x)
  outputs = Dense(num_classes, activation='softmax')(x)

  #Model
  model = models.Model(inputs=inputs, outputs=outputs)
  return model


In [7]:
#hyperparameters
image_size = 224 #Make sure
patch_size = 32
num_layers = 8
num_heads = 16
projection_dim = 256
mlp_dim = 256
dropout_rate = 0.1
num_classes = 7

model = vit_model(image_size, patch_size, num_layers, num_heads, projection_dim, mlp_dim, dropout_rate, num_classes)
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [8]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [9]:
dataset_path = '/content/gdrive/MyDrive/APS360 Project/Data/test'

batch_size = 8

# Load datasets with optimized pipeline
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    dataset_path,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(224,224),
    batch_size=batch_size,
    label_mode = 'categorical'
).cache().prefetch(tf.data.experimental.AUTOTUNE)

val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    dataset_path,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(224,224),
    batch_size=batch_size,
    label_mode = 'categorical'
).cache().prefetch(tf.data.experimental.AUTOTUNE)


# Mixed precision
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

#train_ds = train_ds.map(lambda x, y: (x, tf.one_hot(y, num_classes, axis=-1))) # Add axis=-1
#val_ds = val_ds.map(lambda x, y: (x, tf.one_hot(y, num_classes, axis=-1))) # Add axis=-1

# Train the model
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=10,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3),
        tf.keras.callbacks.ModelCheckpoint('vit_model.keras', save_best_only=True),
        tf.keras.callbacks.TensorBoard(log_dir='./logs', histogram_freq=1)
    ]
)

# Evaluate the model
test_loss, test_accuracy = model.evaluate(val_ds)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

# Save the model
model.save('vit_model.keras')

Found 7178 files belonging to 7 classes.
Using 5743 files for training.
Found 7178 files belonging to 7 classes.
Using 1435 files for validation.
Epoch 1/10
[1m718/718[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m242s[0m 283ms/step - accuracy: 0.2142 - loss: 1.8940 - val_accuracy: 0.1861 - val_loss: 1.7982
Epoch 2/10
[1m718/718[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m169s[0m 236ms/step - accuracy: 0.2366 - loss: 1.8233 - val_accuracy: 0.2314 - val_loss: 1.7794
Epoch 3/10
[1m718/718[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m152s[0m 212ms/step - accuracy: 0.2476 - loss: 1.8154 - val_accuracy: 0.2530 - val_loss: 1.7656
Epoch 4/10
[1m718/718[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 67ms/step - accuracy: 0.2547 - loss: 1.8097 - val_accuracy: 0.2523 - val_loss: 1.7709
Epoch 5/10
[1m718/718[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 67ms/step - accuracy: 0.2518 - loss: 1.8096 - val_accuracy: 0.2564 - val_loss: 1.7762
Epoch 6/10
[1m718/718[0m 

In [11]:
import gc
def reset_ram():
    gc.collect()  # Garbage collection to free up RAM
    torch.cuda.empty_cache()  # Clear GPU cache

# Call this function in between training sessions
reset_ram()

from tensorflow.keras.backend import clear_session
clear_session()

In [None]:
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=10,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3),
        tf.keras.callbacks.ModelCheckpoint('vit_model.keras', save_best_only=True),
        tf.keras.callbacks.TensorBoard(log_dir='./logs', histogram_freq=1)
    ]
)

Epoch 1/10


AttributeError: 'NoneType' object has no attribute 'items'

In [None]:
%pip install split-folders
import splitfolders

Collecting split-folders
  Downloading split_folders-0.5.1-py3-none-any.whl.metadata (6.2 kB)
Downloading split_folders-0.5.1-py3-none-any.whl (8.4 kB)
Installing collected packages: split-folders
Successfully installed split-folders-0.5.1


In [None]:
splitfolders.ratio('/content/gdrive/MyDrive/APS360 Project/Data/test', output="split_data",
    seed=999, ratio=(0.8, 0.1, 0.1), group_prefix=None, move=False)


Copying files: 7178 files [03:27, 34.56 files/s] 


In [None]:
transform = transforms.Compose(
        [transforms.ToTensor(),transforms.Resize((48,48))])

#transform = transforms.Compose([transforms.ToTensor(), transforms.Resize((224,224))])

#Create train, validation, and testing datasets
#Apply the transformations
train_data = ImageFolder("/content/split_data/train", transform)
val_data = ImageFolder("/content/split_data/val", transform)
test_data = ImageFolder("/content/split_data/test", transform)

#Load all of the datasets into their respective loaders
train_loader = torch.utils.data.DataLoader(train_data, batch_size=10,
                                           shuffle=True)
val_loader = torch.utils.data.DataLoader(val_data, batch_size=10,
                                         shuffle=True)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=10,
                                          shuffle=True)

In [None]:
#model = TransformerModel(input_dim=..., hidden_dim=..., output_dim=..., num_heads=..., num_layers=...)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for batch in train_loader:
        src, tgt = batch  # Assuming your DataLoader returns a tuple (src, tgt)
        optimizer.zero_grad()
        output = model(src, tgt)
        loss = criterion(output.view(-1, output.shape[-1]), tgt.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}')

AttributeError: 'Functional' object has no attribute 'parameters'

In [None]:
'''
#Load data for transformer (different format then for CNN???)
train_dir = '/content/gdrive/MyDrive/APS360 Project/Data/train'
test_dir = '/content/gdrive/MyDrive/APS360 Project/Data/test'

train_dataset = tf.keras.preprocessing.image_dataset_from_directory(
    train_dir,
    image_size=(48, 48),  # Match with your model input size
    batch_size=32,
    label_mode='int',  # Adjust based on your labels
    shuffle=True
)

# Load test data
test_dataset = tf.keras.preprocessing.image_dataset_from_directory(
    test_dir,
    image_size=(48, 48),
    batch_size=32,
    label_mode='int'
)'''

Found 28274 files belonging to 6 classes.
Found 7178 files belonging to 7 classes.


In [None]:
'''normalization_layer = tf.keras.layers.Rescaling(1./255)

subset_size = 10  # Adjust this value to the desired subset size
batch_size = 32  # Use smaller batches to manage memory

# Create subsets and preprocess the data
train_data = train_dataset.take(subset_size).map(lambda x, y: (normalization_layer(x), to_categorical(y, num_classes=num_classes)))
test_data = test_dataset.take(subset_size).map(lambda x, y: (normalization_layer(x), to_categorical(y, num_classes=num_classes)))

# Optimize the dataset pipeline
train_data = train_data.cache().prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
test_data = test_data.cache().prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

# Clear memory
import gc
gc.collect()
'''
'''
subset_size = 100  # Adjust this value to the desired subset size
train_data = train_dataset.take(subset_size).map(lambda x, y: (normalization_layer(x), to_categorical(y, num_classes=num_classes)))
test_data = test_dataset.take(subset_size).map(lambda x, y: (normalization_layer(x), to_categorical(y, num_classes=num_classes)))

train_data = train_data.cache().prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
test_data = test_dataset.cache().prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
#train_data = train_data.take(100)
#test_data = test_data.take(100)'''

'\nsubset_size = 100  # Adjust this value to the desired subset size\ntrain_data = train_dataset.take(subset_size).map(lambda x, y: (normalization_layer(x), to_categorical(y, num_classes=num_classes)))\ntest_data = test_dataset.take(subset_size).map(lambda x, y: (normalization_layer(x), to_categorical(y, num_classes=num_classes)))\n\ntrain_data = train_data.cache().prefetch(buffer_size=tf.data.experimental.AUTOTUNE)\ntest_data = test_dataset.cache().prefetch(buffer_size=tf.data.experimental.AUTOTUNE)\n#train_data = train_data.take(100)\n#test_data = test_data.take(100)'

In [None]:
# Enable eager execution to create variables on subsequent calls
tf.config.run_functions_eagerly(True)

model.fit(
    train_loader,
    val_loader,
    epochs=20
)

ValueError: When providing `x` as a torch DataLoader, `y` should not be passed. Instead, the targets should be included as part of the torch DataLoader.