In [15]:
import os
import numpy as np
import cv2
import boto3
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
import tempfile
import time

In future versions of this I would not embed login credentials in the code, but for testing purposes it's ok

In [None]:
aws_access_key_id = "ACCESS"
aws_secret_access_key = "SECRET"
s3_bucket_name = "ca1-simpsons"
dataset = "simpsons_dataset"

percentage_to_use = 0.1 # used for testing code, 1 meaning 100%

img_size = 64 # 64 pixels
batch_size = 128
epochs = 30
input_shape = (img_size, img_size, 3)

In [None]:
map_characters = {0: 'abraham_grampa_simpson', 1: 'apu_nahasapeemapetilon', 2: 'bart_simpson',
                  3: 'charles_montgomery_burns', 4: 'chief_wiggum', 5: 'comic_book_guy', 6: 'edna_krabappel',
                  7: 'homer_simpson', 8: 'kent_brockman', 9: 'krusty_the_clown', 10: 'lisa_simpson',
                  11: 'marge_simpson', 12: 'milhouse_van_houten', 13: 'moe_szyslak',
                  14: 'ned_flanders', 15: 'nelson_muntz', 16: 'principal_skinner', 17: 'sideshow_bob'}

num_classes = len(map_characters)

Connect to S3:

In [None]:
s3 = boto3.client("s3", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

Creating Spark Instance with setting the memory of worker and drivers to 28GB (t2.2xlarge EC2 instances)

In [None]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.hadoop:hadoop-aws:3.3.1 pyspark-shell'

conf = SparkConf().setAppName("TrainCNN").set("spark.executor.memory", "28g").set("spark.driver.memory", "28g")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext

Below function is quite complex, essentially it iterates through the S3 bucket and dataset for each character

It then reads the image, resizes it to 64x64 pixels with 3 RGB channels

And to normalize the values it divides by 255

In [None]:
def load_data(character_id, character_name, percentage=0.1):
    data = []
    character_folder = f"{s3_bucket_name}/simpsons_dataset/{character_name}"
    print(f"Loading data from: {character_folder}")

    s3 = boto3.client("s3", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
    paginator = s3.get_paginator("list_objects_v2")

    all_objects = []
    for result in paginator.paginate(Bucket=s3_bucket_name, Prefix=f"simpsons_dataset/{character_name}"):
        all_objects += result.get("Contents", [])

    all_objects.sort(key=lambda x: x['LastModified'])

    # Select based on the defined percentage
    subset_objects = all_objects[:int(len(all_objects) * percentage)]

    for obj in subset_objects:
        img_path = obj['Key']
        print(f"Loading image: {img_path}")

        with tempfile.TemporaryFile() as fp:
            s3.download_fileobj(s3_bucket_name, img_path, fp)
            fp.seek(0)
            img = cv2.imdecode(np.frombuffer(fp.read(), np.uint8), 1)

        img = cv2.resize(img, (img_size, img_size)).astype('float32') / 255
        data.append((img, character_id))

    return data

Distributed the list of characters to each worker in an RDD

Then each item in the RDD from the load_data function is then flattened into a new RDD

In [None]:
print(f'Map characters: {map_characters.items()}')

data_rdd = sc.parallelize(map_characters.items())

data_rdd = data_rdd.flatMap(lambda x: load_data(x[0], x[1], percentage=percentage_to_use))


In [None]:
print("First few items in data_rdd:")
print(data_rdd.take(5))

Split the data into training and test sets

In [None]:
train_data_rdd, test_data_rdd = data_rdd.randomSplit([0.8, 0.2], seed=42)

In [None]:
# Debugging: Print the number of items in train and test RDDs - Takes 20 minutes!!
#print(f"Number of items in train_data_rdd: {train_data_rdd.count()}")
#print(f"Number of items in test_data_rdd: {test_data_rdd.count()}")

apply a lambda function to each tuple in the train/test rdd to extract the labels and data

In [None]:
train_data = train_data_rdd.map(lambda x: x[0])
train_labels = train_data_rdd.map(lambda x: x[1])
test_data = test_data_rdd.map(lambda x: x[0])
test_labels = test_data_rdd.map(lambda x: x[1])

training and testing character IDs into one-hot encoded format

In [None]:
num_classes = len(map_characters)
train_labels = to_categorical(train_labels.collect(), num_classes)
test_labels = to_categorical(test_labels.collect(), num_classes)

Learning Schedule:

In [None]:
total_samples = train_data.count()
iterations_per_epoch = int(round(total_samples / batch_size,0))
decay_steps = int(iterations_per_epoch * epochs)
initial_learning_rate = 1e-3

lr_schedule = ExponentialDecay(
    initial_learning_rate=initial_learning_rate,
    decay_steps=decay_steps,
    decay_rate=0.9)

Image Generator:

In [None]:
datagen = ImageDataGenerator(
    rotation_range=30,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest')

CNN Model used in local mode

In [None]:
model = Sequential()
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=input_shape))
model.add(MaxPooling2D((2, 2)))
model.add(BatchNormalization())
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D((2, 2)))
model.add(BatchNormalization())
model.add(Conv2D(128, (3, 3), activation='relu'))
model.add(MaxPooling2D((2, 2)))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(len(map_characters), activation='softmax'))

In [None]:
local_model = Sequential()
local_model.add(Conv2D(32, (3, 3), activation='relu', input_shape=input_shape))
local_model.add(MaxPooling2D((2, 2)))
local_model.add(BatchNormalization())
local_model.add(Conv2D(64, (3, 3), activation='relu'))
local_model.add(MaxPooling2D((2, 2)))
local_model.add(BatchNormalization())
local_model.add(Conv2D(128, (3, 3), activation='relu'))
local_model.add(MaxPooling2D((2, 2)))
local_model.add(Flatten())
local_model.add(Dense(128, activation='relu'))
local_model.add(Dropout(0.5))
local_model.add(Dense(len(map_characters), activation='softmax'))

This function is to allow each worker to train a model on a partition of the data:

In [None]:
def train_cnn(partition):
    train_data_partition = []
    train_labels_partition = []

    for data_label_tuple in partition:
        train_data_partition.append(data_label_tuple[0])
        train_labels_partition.append(to_categorical(data_label_tuple[1], num_classes))

    train_data_partition = np.array(train_data_partition)
    train_labels_partition = np.array(train_labels_partition)

    partition_model = Sequential()
    partition_model.add(Conv2D(32, (3, 3), activation='relu', input_shape=input_shape))
    partition_model.add(MaxPooling2D((2, 2)))
    partition_model.add(BatchNormalization())
    partition_model.add(Conv2D(64, (3, 3), activation='relu'))
    partition_model.add(MaxPooling2D((2, 2)))
    partition_model.add(BatchNormalization())
    partition_model.add(Conv2D(128, (3, 3), activation='relu'))
    partition_model.add(MaxPooling2D((2, 2)))
    partition_model.add(Flatten())
    partition_model.add(Dense(128, activation='relu'))
    partition_model.add(Dropout(0.5))
    partition_model.add(Dense(len(map_characters), activation='softmax'))

    # Compile the model with the Adam optimizer
    partition_optimizer = Adam(learning_rate=lr_schedule)
    partition_model.compile(optimizer=partition_optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

    # Train the model
    partition_model.fit(datagen.flow(train_data_partition, train_labels_partition, batch_size=batch_size),
                        steps_per_epoch=len(train_data_partition) / batch_size, epochs=epochs)

    weights = partition_model.get_weights()
    return [weights]

Training a model on partitions of data and Testing model with average combined weights

In [None]:
start_distributed_time = time.time()

weights_rdd = data_rdd.mapPartitions(train_cnn)

all_weights = weights_rdd.collect()

average_weights = [np.zeros_like(w) for w in all_weights[0]]

for weights in all_weights:
    for i, w in enumerate(weights):
        average_weights[i] += w

num_workers = len(all_weights)

for i in range(len(average_weights)):
    average_weights[i] /= num_workers
    
model.set_weights(average_weights)

optimizer = Adam(learning_rate=lr_schedule)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

end_distributed_time = time.time()
print(f"Time taken for distributed training: {end_distributed_time - start_distributed_time:.2f} seconds")

test_data_array = np.array(test_data.collect())

test_loss, test_accuracy = model.evaluate(test_data_array, test_labels)

print(f'Test accuracy for distributed model: {test_accuracy * 100:.2f}%')

In [None]:
start_local_time = time.time()

train_data_array = np.array(train_data.collect())
local_optimizer = Adam(learning_rate=lr_schedule)
local_model.compile(optimizer=local_optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

local_model.fit(datagen.flow(train_data_array, train_labels, batch_size=batch_size),
                steps_per_epoch=len(train_data_array) / batch_size,
                epochs=epochs)

end_local_time = time.time()
print(f"Time taken for local training: {end_local_time - start_local_time:.2f} seconds")


local_test_loss, local_test_accuracy = local_model.evaluate(np.array(test_data.collect()), test_labels)

print(f'Test accuracy for local model: {local_test_accuracy * 100:.2f}%')

Save model to file:

In [None]:
model.save("model-distributed-training.h5")
local_model.save("local-model.h5")

Stop Spark:

In [None]:
sc.stop()

spark.stop()