In [None]:
# import basic packages
import os
import sys
import wget
import tarfile
import argparse
import numpy as np
import pandas as pd

# import plotting and image/video processing packages
from imutils import paths
import matplotlib.pyplot as plt

# import ML packages
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import cv2
# adding global TF installation to PATH 
tensorflow_path = "/usr/lib/python3.10/site-packages"
sys.path.append(tensorflow_path)
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import AveragePooling2D
from tensorflow.keras.applications import Xception
from tensorflow.keras.layers import Dropout,Convolution2D,MaxPooling2D
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
from tensorflow.keras import Sequential
from tensorflow.keras.optimizers import Adam,SGD
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.utils import to_categorical

# Checking GPU acceleration

In [None]:
# Enable device placement logging
tf.debugging.set_log_device_placement(True)

# Print the list of available GPUs
gpus = tf.config.list_physical_devices("GPU")
print("Available GPUs:")
for gpu in gpus:
    print(gpu)

# Enable memory growth for each GPU
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

# Test GPU availability by running a simple computation on each GPU
for i in range(len(gpus)):
    with tf.device(f"/GPU:{i}"):
        tf.random.uniform((1000, 1000))

# Check if GPU acceleration is available for TensorFlow
gpu_available = tf.test.is_built_with_cuda() and len(gpus) > 0
print("GPU acceleration available:", gpu_available)

# Disable further device placement logging
tf.debugging.set_log_device_placement(False)
tf.get_logger().setLevel('ERROR')

## Downloading Dataset

In [None]:
download_needed = False

if (download_needed):
    URL = 'http://image.ntua.gr/iva/datasets/flickr_logos/flickr_logos_27_dataset.tar.gz'
    get.download(_URL)
        
    fname = '../data/logo_classification/flickr_logos_27_dataset.tar.gz'
    if fname.endswith("tar.gz"):
        tar = tarfile.open(fname, "r:gz")
        tar.extractall()
        tar.close()
        
    fname = '../data/logo_classification/flickr_logos_27_dataset/flickr_logos_27_dataset_images.tar.gz'
    if fname.endswith("tar.gz"):
        tar = tarfile.open(fname, "r:gz")
        tar.extractall()
        tar.close()

else:
    print("Data already downloaded. Skipping...")

## Preprocessing

In [None]:
raw_data_dir = '../data/logo_classification/flickr_logos_27_dataset_images/'

training_dir = '../data/logo_classification/train'
validation_dir = '../data/logo_classification/validation'
test_dir = '../data/logo_classification/test' 

In [None]:
df = pd.read_csv("../data/logo_classification/flickr_logos_27_dataset/flickr_logos_27_dataset_training_set_annotation.txt", sep='\s+',header=None)
df.head(5)

In [None]:
# Splitting the original into training and test sets
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df.iloc[:, 1])

# Splitting the test DataFrame into training and validation sets
test_df, valid_df = train_test_split(test_df, test_size=0.5, random_state=42, stratify=test_df.iloc[:, 1])

In [None]:
train_X = train_df.iloc[:,0]
train_Y = train_df.iloc[:,1]

valid_X = valid_df.iloc[:,0]
valid_Y = valid_df.iloc[:,1]

test_X = test_df.iloc[:,0]
test_Y = test_df.iloc[:,1]

In [None]:
size_train = train_df.iloc[:,3:]
size_valid = valid_df.iloc[:,3:]
size_test = test_df.iloc[:,3:]

size_train.head()
size_valid.head()
size_test.head()

size_train = size_train.values.tolist() # used to crop image to ROI
size_valid = size_valid.values.tolist() # used to crop image to ROI
size_test = size_test.values.tolist() # used to crop image to ROI

print(f'Training set size: {len(size_train)} ({100 * len(size_train)/(len(size_train) + len(size_valid) + len(size_test)):.1f}%)')
print(f'Validation set size: {len(size_valid)} ({100 * len(size_valid)/(len(size_train) + len(size_valid) + len(size_test)):.1f}%)')
print(f'Test set size: {len(size_test)} ({100 * len(size_test)/(len(size_train) + len(size_valid) + len(size_test)):.1f}%)')

In [None]:
if not os.path.exists(training_dir):
    os.makedirs(training_dir)
    
if not os.path.exists(validation_dir):
    os.makedirs(validation_dir)
    
if not os.path.exists(test_dir):
    os.makedirs(test_dir)


In [None]:
y_train = list(set(list(train_Y)))
y_train.sort()

y_valid = list(set(list(valid_Y)))
y_valid.sort()

y_test = list(set(list(test_Y)))
y_test.sort()

for i in y_train:
    try:
        os.makedirs(os.path.join(training_dir,i))
        os.makedirs(os.path.join(training_dir,i) + '/cropped')
    except:
        print("Directory already exists. Skipping...")
        
for i in y_valid:
    try:
        os.makedirs(os.path.join(validation_dir,i))
        os.makedirs(os.path.join(validation_dir,i) + '/cropped')
    except:
        print("Directory already exists. Skipping...")
        
for i in y_test:
    try:
        os.makedirs(os.path.join(test_dir,i))
        os.makedirs(os.path.join(test_dir,i) + '/cropped')
    except:
        print("Directory already exists. Skipping...")

### Removing extra logos that I don't need

In [None]:
removing_extra_logos_needed = False

if (removing_extra_logos_needed):
    directory_path = "../data/logo_classification/flickr_logos_27_dataset_images"

    for filename in os.listdir(directory_path):

        print(filename)
        if (filename not in X.values):
            file_path = os.path.join(directory_path, filename)
            os.remove(file_path)
    print("Done removing extra logos...")
else:
    print("Skipping removing extra logos...")

## Storing cropped and removing corrupt images 

In [None]:
HEIGHT = 224
WIDTH =  224

In [None]:
for i in range(len(train_X)):
    try:
        train_data = os.path.join(training_dir, train_Y.values[i], 'cropped/')
        img_name = train_X.values[i]
        img = os.path.join(raw_data_dir, img_name)

        # Check if the file already exists in the directory (there are duplicate names if 2 or more logos in same image)
        savepath = os.path.join(train_data, img_name)
        counter = 1
        while os.path.exists(savepath):
            img_name_modified = f"{os.path.splitext(img_name)[0]}_{counter}{os.path.splitext(img_name)[1]}"
            savepath = os.path.join(train_data, img_name_modified)
            counter += 1

        image = cv2.imread(img)
        image = image[size_train[i][1]:size_train[i][3], size_train[i][0]:size_train[i][2]]  # crop to ROI
        image = cv2.resize(image, (HEIGHT, WIDTH))
        cv2.imwrite(savepath, image)
        
    except Exception as e:
        print('Error:', type(e), e)
        print(img, train_Y.values[i])
        print("No worries if you don't see too many of these. Moving on…")
        
for i in range(len(valid_X)):
    try:
        valid_data = os.path.join(validation_dir, valid_Y.values[i], 'cropped/')
        img_name = valid_X.values[i]
        img = os.path.join(raw_data_dir, img_name)

        # Check if the file already exists in the directory (there are duplicate names if 2 or more logos in same image)
        savepath = os.path.join(valid_data, img_name)
        counter = 1
        while os.path.exists(savepath):
            img_name_modified = f"{os.path.splitext(img_name)[0]}_{counter}{os.path.splitext(img_name)[1]}"
            savepath = os.path.join(valid_data, img_name_modified)
            counter += 1

        image = cv2.imread(img)
        image = image[size_valid[i][1]:size_valid[i][3], size_valid[i][0]:size_valid[i][2]]  # crop to ROI
        image = cv2.resize(image, (HEIGHT, WIDTH))
        cv2.imwrite(savepath, image)
        
    except Exception as e:
        print('Error:', type(e), e)
        print(img, valid_Y.values[i])
        print("No worries if you don't see too many of these. Moving on…")

for i in range(len(test_X)):
    try:
        test_data = os.path.join(test_dir, test_Y.values[i], 'cropped/')
        img_name = test_X.values[i]
        img = os.path.join(raw_data_dir, img_name)

        # Check if the file already exists in the directory (there are duplicate names if 2 or more logos in same image)
        savepath = os.path.join(test_data, img_name)
        counter = 1
        while os.path.exists(savepath):
            img_name_modified = f"{os.path.splitext(img_name)[0]}_{counter}{os.path.splitext(img_name)[1]}"
            savepath = os.path.join(test_data, img_name_modified)
            counter += 1

        image = cv2.imread(img)
        image = image[size_test[i][1]:size_test[i][3], size_test[i][0]:size_test[i][2]]  # crop to ROI
        image = cv2.resize(image, (HEIGHT, WIDTH))
        cv2.imwrite(savepath, image)
        
    except Exception as e:
        print('Error:', type(e), e)
        print(img, valid_Y.values[i])
        print("No worries if you don't see too many of these. Moving on…")
    


## Image Augmentation

In [None]:
train = ImageDataGenerator(
rescale = 1/255,
horizontal_flip=True,
vertical_flip=True,
shear_range=0.2,
zoom_range=0.2,
featurewise_center=True, # Set input mean to 0 over the dataset, feature-wise
featurewise_std_normalization=True, # Divide inputs by std of the dataset, feature-wise
rotation_range=40, # Degree range for random rotations
width_shift_range=0.2,
height_shift_range=0.2,
fill_mode='nearest')

validation = ImageDataGenerator(
rescale = 1/255,
horizontal_flip=True,
vertical_flip=True,
shear_range=0.2,
zoom_range=0.2,
featurewise_center=True, # Set input mean to 0 over the dataset, feature-wise
featurewise_std_normalization=True, # Divide inputs by std of the dataset, feature-wise
rotation_range=40, # Degree range for random rotations
width_shift_range=0.2,
height_shift_range=0.2,
fill_mode='nearest')

## Preparing data augmentation for training and validation dataset

In [None]:
HEIGHT = 224
WIDTH = 224
INIT_LR = 1e-4
EPOCHS =  100
BS = 256

In [None]:
trainset = train.flow_from_directory(training_dir,
target_size = (HEIGHT,WIDTH),
batch_size = BS,
shuffle=False,
seed=42,
color_mode='rgb',
class_mode='categorical')

In [None]:
validset = validation.flow_from_directory(validation_dir,
target_size = (HEIGHT,WIDTH),
batch_size = BS,
shuffle=False,
seed=42,
color_mode='rgb',
class_mode='categorical')

##### Preparing paths to all images

In [None]:
train_image_paths = list(paths.list_images(training_dir))
validation_image_paths = list(paths.list_images(validation_dir))
test_image_paths = list(paths.list_images(test_dir))

print(f'Training set size: {len(train_image_paths)} \
      ({100 * len(train_image_paths)/(len(train_image_paths) + len(validation_image_paths) + len(test_image_paths)):.1f}%)')

print(f'Validation set size: {len(validation_image_paths)} \
      ({100 * len(validation_image_paths)/(len(train_image_paths) + len(validation_image_paths) + len(test_image_paths)):.1f}%)')

print(f'Test set size: {len(test_image_paths)} \
      ({100 * len(test_image_paths)/(len(train_image_paths) + len(validation_image_paths) + len(test_image_paths)):.1f}%)')


## Model Architecture

In [None]:
baseModel = Xception(weights="imagenet", include_top=False,input_tensor=Input(shape=(WIDTH, HEIGHT, 3)))
headModel = baseModel.output

headModel = AveragePooling2D(pool_size=(5, 5))(headModel)
headModel = Flatten(name="flatten")(headModel)

headModel = Dense(256, activation="relu")(headModel)
headModel = Dropout(0.7)(headModel)

headModel = Dense(trainset.num_classes, activation="softmax")(headModel)

# the actual model we will train)
model = Model(inputs=baseModel.input, outputs=headModel)
for layer in baseModel.layers:
	layer.trainable = False

# compile model
print("[INFO] compiling model...")
model.compile(loss="categorical_crossentropy", optimizer='Adam',metrics=["accuracy"])

## Train Results

In [None]:
H = model.fit(trainset, validation_data = validset, epochs=EPOCHS)

## Results Plot

In [None]:
# plot the training loss and accuracy
N = EPOCHS
plt.style.use("ggplot")
plt.figure()
plt.plot(np.arange(0, N), H.history["loss"], label="train_loss")
plt.plot(np.arange(0, N), H.history["val_loss"], label="val_loss")
plt.title("Training Loss VS Validation Loss")
plt.xlabel("Epoch #")
plt.ylabel("Loss/Accuracy")
plt.legend(loc="lower left")
plt.show()
plt.savefig('../results/logo_classification/loss.png')

In [None]:
N = EPOCHS
plt.style.use("ggplot")
plt.figure()
plt.plot(np.arange(0, N), H.history["accuracy"], label="train_acc")
plt.plot(np.arange(0, N), H.history["val_accuracy"], label="val_acc")
plt.title(" Accuracy")
plt.xlabel("Epoch #")
plt.ylabel("Accuracy")
plt.legend(loc="lower left")
plt.show()
plt.savefig('../results/logo_classification/accuracy.png')

## F1-score Precision Recall

In [None]:
print("[INFO] evaluating after fine-tuning network...")
validset.reset()
predIdxs = model.predict(x=validset)
predIdxs = np.argmax(predIdxs, axis=1)
print(classification_report(validset.classes, predIdxs,
	target_names=validset.class_indices.keys()))
# serialize the model to disk
print("[INFO] serializing network...")
model.save('../results/logo_classification/logo_classification.model', save_format="h5")

## Some Predictions from Test Set

In [None]:
model.get_config

In [None]:
from PIL import Image
def predimage(path):
    image = Image.open(path)
    plt.imshow(image)
    test = load_img(path,target_size=(WIDTH,HEIGHT))
    test = img_to_array(test)
    test = np.expand_dims(test,axis=0)
    test /= 255 
    result = model.predict(test,batch_size = BS)
    y_class = result.argmax(axis=-1)
    result = (result*100)
    result = list(np.around(np.array(result),1))
    print(result)
    print(y_test[y_class[0]])

In [None]:
testimage = list(paths.list_images('../data/logo_classification/new_images/'))
predimage(testimage[7])