In [1]:
import warnings
warnings.filterwarnings('ignore') # We can suppress the warnings

In [2]:
spark

In [3]:
import pandas as pd
import numpy as np
from PIL import Image
from io import BytesIO
import re
# import tensorflow as tf
# import cv2

I will use the functionality of Image Datasource of spark to collect and process images in bytes, and then start processing the images for the neural network

In [4]:
images_folders = ['/CA1/Images/BakedPotato/','/CA1/Images/Pizza/','/CA1/Images/Taco/']

In [5]:
images_rdd = spark.sparkContext.binaryFiles(','.join(images_folders))
# images_rdd = spark.sparkContext.binaryFiles('hdfs://172.24.144.178:9000/CA1/Images/Ireland/Ireland_001.jpeg')

In [6]:
images_rdd

org.apache.spark.api.java.JavaPairRDD@14b4341c

In [7]:
def extract_data(data):
    file_path, image_data = data
#     image = Image.open(BytesIO(image_data))
#     image_array = np.array(image)
    
    file_name = file_path.split('/')[-1]
    file_name_without_ext = file_name.split(".")[0]
    label,name = file_name_without_ext.split('_')[0], file_name_without_ext.split('_')[1]
    
    return name, label, image_data

In [8]:
imageDf = images_rdd.map(lambda x: extract_data(x)).toDF(["Name","Label","Data"])

                                                                                

In [9]:
pandasImagesDF = imageDf.toPandas()

                                                                                

In [10]:
IMG_SHAPE=225

In [11]:
def processImage(data, target_size=(IMG_SHAPE,IMG_SHAPE)):
    imgbytes = BytesIO(data)
    image = Image.open(imgbytes)
    resized_img = image.resize(target_size, Image.ANTIALIAS)
    with BytesIO() as output:
        resized_img.save(output, format="PNG")
        new_image = Image.open(output)

        array = np.asarray(new_image).reshape([target_size[0],target_size[1],3])
        return array

In [12]:
pandasImagesDF["Data"] = pandasImagesDF["Data"].apply(lambda x: processImage(x))

In [13]:
# pandasImagesDF

In [14]:
pandasImagesDF = pandasImagesDF.sort_values(by=["Name"])

In [15]:
pandasImagesDF.info()

<class 'pandas.core.frame.DataFrame'>
Index: 875 entries, 0 to 874
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   Name    875 non-null    object
 1   Label   875 non-null    object
 2   Data    875 non-null    object
dtypes: object(3)
memory usage: 27.3+ KB


In [16]:
NCATEGORIES = len(pandasImagesDF["Label"].unique())

In [17]:
NCATEGORIES

3

Importing from Keras functionality necessary to implement CNN

In [18]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Conv2D, MaxPool2D, Flatten, Input
from tensorflow.keras import utils
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder

2024-03-23 10:16:57.780680: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


Now we are to split the data using train_test_split function to use Kfold and improve the perfomance of the model

In [19]:
X_train, X_test, y_train, y_test = train_test_split(pandasImagesDF["Data"],pandasImagesDF["Label"],test_size=0.3, random_state=42)

In [20]:
encoder = OneHotEncoder(sparse_output=False)
y_train = np.array(y_train).reshape(-1, 1)
y_train = encoder.fit_transform(y_train)

y_test = np.array(y_test).reshape(-1,1)
y_test = encoder.fit_transform(y_test)

In [21]:
X_train = np.stack(X_train)
X_test = np.stack(X_test)

In [22]:
X_train.shape

(612, 225, 225, 3)

In [23]:
X_train = np.reshape(X_train,(X_train.shape[0],IMG_SHAPE,IMG_SHAPE,3)).astype(np.float32)
X_test = np.reshape(X_test,(X_test.shape[0],IMG_SHAPE,IMG_SHAPE,3)).astype(np.float32)

Because the pixels are from 0 to 255, We have to normalize the pixels

In [24]:
X_train/=255
X_test/=255

In [25]:
X_train.shape,X_test.shape

((612, 225, 225, 3), (263, 225, 225, 3))

In [55]:
np.argmax(y_train, axis=1)

array([1, 0, 2, 0, 1, 2, 2, 1, 1, 2, 0, 0, 2, 1, 0, 0, 0, 1, 2, 0, 1, 1,
       1, 1, 0, 2, 2, 0, 0, 2, 0, 0, 2, 1, 1, 0, 2, 0, 0, 2, 0, 2, 0, 0,
       1, 0, 1, 2, 2, 2, 1, 2, 2, 1, 2, 2, 0, 0, 1, 1, 2, 0, 1, 1, 2, 1,
       1, 1, 1, 1, 2, 0, 1, 2, 2, 1, 2, 0, 2, 0, 0, 2, 2, 1, 1, 0, 2, 0,
       2, 1, 1, 1, 1, 0, 1, 2, 2, 2, 0, 2, 2, 2, 0, 1, 0, 2, 0, 1, 0, 2,
       2, 2, 2, 1, 0, 2, 2, 1, 0, 1, 0, 0, 1, 0, 0, 2, 2, 2, 2, 0, 0, 0,
       0, 0, 1, 1, 0, 0, 2, 2, 1, 1, 2, 1, 1, 0, 0, 1, 0, 1, 0, 2, 1, 1,
       0, 0, 1, 1, 0, 0, 2, 2, 0, 0, 1, 2, 1, 2, 0, 0, 0, 0, 2, 2, 2, 1,
       2, 0, 0, 1, 0, 1, 2, 1, 1, 0, 0, 2, 0, 0, 1, 1, 1, 2, 0, 1, 1, 0,
       0, 0, 1, 1, 2, 2, 2, 2, 2, 2, 2, 0, 1, 0, 0, 0, 1, 1, 2, 2, 1, 1,
       1, 1, 2, 2, 0, 1, 0, 1, 0, 2, 2, 0, 2, 0, 0, 0, 2, 0, 0, 2, 1, 1,
       2, 2, 0, 1, 0, 0, 2, 1, 2, 0, 2, 0, 0, 2, 0, 0, 1, 1, 2, 2, 2, 0,
       1, 1, 2, 2, 2, 2, 2, 1, 2, 0, 0, 1, 0, 0, 0, 1, 0, 0, 2, 0, 1, 1,
       2, 1, 0, 2, 1, 2, 1, 1, 1, 1, 1, 1, 0, 2, 2,

In [26]:
y_train.shape, y_test.shape

((612, 3), (263, 3))

In [27]:
model = Sequential()
model.add(Input(shape=(IMG_SHAPE, IMG_SHAPE, 3)))  # 225x225 RGB images
model.add(Conv2D(32,kernel_size=(3,3),strides=(1,1),padding="valid", activation="relu"))
model.add(Conv2D(32,kernel_size=(3,3),activation="relu"))
model.add(MaxPool2D(3))
model.add(Conv2D(32, (3,3), activation="relu"))
model.add(Conv2D(32, (3,3), activation="relu"))
model.add(MaxPool2D(2))
model.add(Conv2D(32, (3,3), activation="relu"))
model.add(Conv2D(32, (3,3), activation="relu"))
model.add(MaxPool2D(2))
model.add(Conv2D(32, (3,3), activation="relu"))
model.add(Conv2D(32, (3,3), activation="relu"))
model.add(MaxPool2D(2))

model.add(Flatten())


model.add(Dense(128, activation="relu"))
model.add(Dense(NCATEGORIES,activation="softmax"))

model.compile(loss="categorical_crossentropy", metrics=["accuracy"], optimizer="adam")

2024-03-23 10:17:00.480563: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:984] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2024-03-23 10:17:00.564548: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:984] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2024-03-23 10:17:00.564600: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:984] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2024-03-23 10:17:00.567769: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:984] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2024-03-23 10:17:00.567822: I external/local_xla/xla/stream_executor

In [28]:
model.summary()

In [None]:
model.fit(X_train, y_train, batch_size=50, epochs=50, verbose=1, validation_data=(X_test, y_test))

2024-03-23 11:38:58.202014: W external/local_tsl/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 371790000 exceeds 10% of free system memory.


Epoch 1/50
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 711ms/step - accuracy: 1.0000 - loss: 0.0014 - val_accuracy: 0.5856 - val_loss: 2.6408
Epoch 2/50
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 591ms/step - accuracy: 1.0000 - loss: 7.3541e-04 - val_accuracy: 0.6122 - val_loss: 2.7943
Epoch 3/50
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 585ms/step - accuracy: 1.0000 - loss: 3.8616e-04 - val_accuracy: 0.6122 - val_loss: 2.8893
Epoch 4/50
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 598ms/step - accuracy: 1.0000 - loss: 3.2635e-04 - val_accuracy: 0.6198 - val_loss: 2.8917
Epoch 5/50
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 596ms/step - accuracy: 1.0000 - loss: 2.5013e-04 - val_accuracy: 0.6122 - val_loss: 2.9108
Epoch 6/50
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 585ms/step - accuracy: 1.0000 - loss: 2.1165e-04 - val_accuracy: 0.6160 - val_loss: 2.9425
Epoch 7

In [30]:
pred = model.predict(X_test[:25])

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 3s/step


In [52]:
def comparePredictions(predictionsValues, expectedValues):
    correct = 0
    for x in range(len(predictionsValues)):
        if predictionsValues[x] == expectedValues[x]:
            correct+=1
    
    print(f'Correct predictions {correct}/{len(predictionsValues)}. Percent {((correct/len(predictionsValues)*100)):.2f}%')

In [53]:
comparePredictions(np.argmax(pred, axis=1),np.argmax(y_test[:25], axis=1))

Correct predictions 7/25. Percent 28.00%


### Trying Data Augmentation 

In [33]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [34]:
X_train, X_test, y_train, y_test = train_test_split(pandasImagesDF["Data"],pandasImagesDF["Label"],test_size=0.3, random_state=42)

In [35]:
encoder = OneHotEncoder(sparse_output=False)
y_train = np.array(y_train).reshape(-1, 1)
y_train = encoder.fit_transform(y_train)

y_test = np.array(y_test).reshape(-1,1)
y_test = encoder.fit_transform(y_test)

X_train = np.stack(X_train)
X_train = np.reshape(X_train,(X_train.shape[0],IMG_SHAPE,IMG_SHAPE,3)).astype(np.float32)
X_train/=255

X_test = np.stack(X_test)
X_test = np.reshape(X_test,(X_test.shape[0],IMG_SHAPE,IMG_SHAPE,3)).astype(np.float32)
X_test/=255

In [36]:
datagen = ImageDataGenerator(
    rotation_range=20,  # Random rotation between 0 and 20 degrees
    width_shift_range=0.1,  # Randomly shift the width by up to 10%
    height_shift_range=0.1,  # Randomly shift the height by up to 10%
    shear_range=0.2,  # Shear intensity (shear angle in counter-clockwise direction in radians)
    zoom_range=0.2,  # Randomly zoom in by up to 20%
    horizontal_flip=True,  # Randomly flip images horizontally
    fill_mode='nearest'  # Fill mode for points outside the input boundaries
)

In [37]:
print("Shape of X_train:", X_train.shape)
print("Type of X_train:", type(X_train))
print("Shape of y_train:", y_train.shape)
print("Type of y_train:", type(y_train))

Shape of X_train: (612, 225, 225, 3)
Type of X_train: <class 'numpy.ndarray'>
Shape of y_train: (612, 3)
Type of y_train: <class 'numpy.ndarray'>


In [38]:
#augmented_generator = datagen.flow(np.array(X_train), np.array(y_train), batch_size=32)

In [39]:
#augmented_generator

In [40]:
# # Get a batch of augmented data
# images, labels = next(augmented_generator)

# # Visualize the augmented images
# plt.figure(figsize=(10, 10))
# for i in range(9):
#     plt.subplot(3, 3, i + 1)
#     plt.imshow(images[i])
#     plt.title(f'Label: {labels[i]}')
#     plt.axis('off')
# plt.show()

In [41]:
from tensorflow.keras.layers import RandomContrast, RandomBrightness, RandomRotation, RandomFlip

In [42]:
dataAugmented = Sequential()
dataAugmented.add(RandomContrast(0.7))
dataAugmented.add(RandomBrightness(0.7))
dataAugmented.add(RandomRotation(0.4))
dataAugmented.add(RandomFlip())


In [43]:
modelAugmented = Sequential()
modelAugmented.add(Input(shape=(IMG_SHAPE, IMG_SHAPE, 3)))  # 225x225 RGB images
modelAugmented.add(dataAugmented)
modelAugmented.add(Conv2D(32,kernel_size=(3,3),strides=(1,1),padding="valid", activation="relu"))
modelAugmented.add(Conv2D(32,kernel_size=(3,3),activation="relu"))
modelAugmented.add(MaxPool2D(3))
modelAugmented.add(Conv2D(32, (3,3), activation="relu"))
modelAugmented.add(Conv2D(32, (3,3), activation="relu"))
modelAugmented.add(MaxPool2D(2))
modelAugmented.add(Conv2D(32, (3,3), activation="relu"))
modelAugmented.add(Conv2D(32, (3,3), activation="relu"))
modelAugmented.add(MaxPool2D(2))
modelAugmented.add(Conv2D(32, (3,3), activation="relu"))
modelAugmented.add(Conv2D(32, (3,3), activation="relu"))
modelAugmented.add(MaxPool2D(2))

modelAugmented.add(Flatten())


modelAugmented.add(Dense(128, activation="relu"))
modelAugmented.add(Dense(NCATEGORIES,activation="sigmoid"))

modelAugmented.compile(loss="binary_crossentropy", metrics=["accuracy"], optimizer="adam")

In [44]:
X_train.shape,X_test.shape, y_train.shape, y_test.shape

((612, 225, 225, 3), (263, 225, 225, 3), (612, 3), (263, 3))

In [45]:
modelAugmented.fit(X_train, y_train, batch_size=32, epochs=30, verbose=1, validation_data=(X_test, y_test))

Epoch 1/30


2024-03-23 10:18:20.165321: W external/local_tsl/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 371790000 exceeds 10% of free system memory.
2024-03-23 10:18:20.392816: W external/local_tsl/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 371790000 exceeds 10% of free system memory.


[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 207ms/step - accuracy: 0.3577 - loss: 0.7201 - val_accuracy: 0.3460 - val_loss: 0.6597
Epoch 2/30
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 107ms/step - accuracy: 0.3672 - loss: 0.6480 - val_accuracy: 0.3042 - val_loss: 0.6481
Epoch 3/30
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 106ms/step - accuracy: 0.3572 - loss: 0.6415 - val_accuracy: 0.3574 - val_loss: 0.6423
Epoch 4/30
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 104ms/step - accuracy: 0.3501 - loss: 0.6397 - val_accuracy: 0.3498 - val_loss: 0.6362
Epoch 5/30
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 106ms/step - accuracy: 0.3523 - loss: 0.6368 - val_accuracy: 0.3460 - val_loss: 0.6356
Epoch 6/30
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 110ms/step - accuracy: 0.3219 - loss: 0.6382 - val_accuracy: 0.3460 - val_loss: 0.6367
Epoch 7/30
[1m20/20[0m [32m━━━━━━━━

<keras.src.callbacks.history.History at 0x7fad0b9b9720>

In [46]:
modelAugmented.summary()

In [47]:
pred = modelAugmented.predict(X_test[:25])

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step


In [51]:
comparePredictions(np.argmax(pred, axis=1),np.argmax(y_test[:25], axis=1))

Correct predictions 7/25. Percent 28.000000%
