In [1]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.metrics import f1_score, ConfusionMatrixDisplay
import pandas as pd
import pickle
from keras import layers, models, Sequential
from keras.callbacks import ModelCheckpoint
import os

In [2]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
    print(gpu)

PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')


### Training Data

In [3]:
all_data = []
all_labels = []

In [4]:
def file_open(file_path):
    try:
        with open(file_path, 'rb') as f:
            data_dic = pickle.load(f, encoding="latin1")
            all_data.append(data_dic["data"])
            all_labels.append(data_dic["labels"])
    except FileNotFoundError:
        print(f"File {file_path} not found")
    except Exception as e:
        print(f"Error loading file {file_path}: {e}")

In [5]:
base_path = r"Imagenet32_train"
for i in range (10):
    file_path = os.path.join(base_path, f"train_data_batch_{i+1}")
    file_open(file_path)

In [6]:
flattened_labels = [item for sublist in all_labels for item in sublist]

In [7]:
all_data = np.vstack(all_data)  
all_labels = np.array(flattened_labels) 

print(f"Data shape: {all_data.shape}")
print(f"Labels shape: {all_labels.shape}") 

Data shape: (1281167, 3072)
Labels shape: (1281167,)


In [8]:
data = pd.DataFrame({
    "image_data": list(all_data),
    "label": all_labels
})

In [9]:
data['image_data'] = data['image_data'].apply(
    lambda x: np.array(x).reshape((3, 32, 32)).transpose(1, 2, 0)
)

In [10]:
data['label'].value_counts()

label
993    1300
450    1300
855    1300
381    1300
737    1300
       ... 
99      772
46      755
207     754
31      738
42      732
Name: count, Length: 1000, dtype: int64

In [17]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Perform stratified split
train, valid = train_test_split(data, test_size=0.2, stratify=data['label'], random_state=42)

In [18]:
images_valid = np.array(valid['image_data'].tolist()) 
labels_valid = np.array(valid['label'])  

In [19]:
# Test Data
file_path = r'Imagenet32_val\val_data'

with open(file_path, 'rb') as f:
    test_dict = pickle.load(f, encoding='latin1')


print(test_dict.keys()) 

dict_keys(['labels', 'data'])


In [20]:
test = pd.DataFrame({
    'image_data': list(test_dict['data']),
    'label': test_dict['labels'] 
})

In [21]:
test['image_data'] = test['image_data'].apply(
    lambda x: np.array(x).reshape((3, 32, 32)).transpose(1, 2, 0)
)

In [22]:
test['image_data'] = test['image_data'] / 255.0

In [23]:
test['image_data'] = test['image_data'].map(lambda x: (np.round(x, 2)))

In [24]:
test_images = np.array(test['image_data'].tolist())  
test_labels = np.array(test['label'])  

In [25]:
images = np.array(train['image_data'].tolist()) 
labels = np.array(train['label'])  

In [26]:
def resnet_block(x, filters, kernel_size=3, stride=1, conv_shortcut=True):
    """
    Residual block for ResNet.
    """
    shortcut = x
    if conv_shortcut:
        shortcut = layers.Conv2D(filters, 1, strides=stride, padding='same')(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)

    x = layers.Conv2D(filters, kernel_size, strides=stride, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    x = layers.Conv2D(filters, kernel_size, padding='same')(x)
    x = layers.BatchNormalization()(x)

    x = layers.add([x, shortcut])
    x = layers.Activation('relu')(x)
    return x

def modifyNet(input_shape=(32, 32, 3), num_classes=1000, num_blocks_list=[3, 4, 6, 3]):
    """
    Modified ResNet model for ImageNet.
    """
    inputs = tf.keras.Input(shape=input_shape)

    # Initial Conv Layer with 7x7 kernel and stride 2
    x = layers.Conv2D(64, 7, strides=2, padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D(pool_size=3, strides=2, padding='same')(x)

    # Residual blocks
    for i, num_blocks in enumerate(num_blocks_list):
        for j in range(num_blocks):
            strides = 2 if i > 0 and j == 0 else 1
            x = resnet_block(x, 64 * 2**i, stride=strides)

    # Global Average Pooling and Output Layer
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(1024, activation='relu')(x)  # Optional dense layer for intermediate features
    x = layers.Dropout(0.5)(x)  # Optional Dropout for regularization
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = models.Model(inputs, outputs)
    return model

# Example usage
model = modifyNet()
model.summary()


Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 32, 32, 3)]  0           []                               
                                                                                                  
 conv2d (Conv2D)                (None, 16, 16, 64)   9472        ['input_1[0][0]']                
                                                                                                  
 batch_normalization (BatchNorm  (None, 16, 16, 64)  256         ['conv2d[0][0]']                 
 alization)                                                                                       
                                                                                                  
 activation (Activation)        (None, 16, 16, 64)   0           ['batch_normalization[0][0]']

In [27]:
input_shape = images.shape[1:]
model = modifyNet(input_shape, 1000)

In [28]:
model.compile(loss=tf.keras.losses.sparse_categorical_crossentropy,
              optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
              metrics=[tf.keras.metrics.sparse_categorical_accuracy])

In [29]:
model_checkpoint = ModelCheckpoint('ResNet.h5', save_best_only=True, monitor="val_loss")

In [30]:
training = model.fit(images, labels, batch_size = 32,epochs= 10, validation_data=(images_valid, labels_valid), callbacks=[model_checkpoint])

Epoch 1/10
Epoch 2/10
Epoch 3/10

KeyboardInterrupt: 

In [None]:
model.compile(loss=tf.keras.losses.sparse_categorical_crossentropy,
              optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
              metrics=[tf.keras.metrics.sparse_categorical_accuracy])

In [None]:
training = model.fit(images, labels, batch_size = 32,epochs= 20, validation_data=(images_valid, labels_valid), callbacks=[model_checkpoint])

In [35]:
from keras.preprocessing.image import ImageDataGenerator

datagen = ImageDataGenerator(
    rotation_range=15,
    width_shift_range=0.1,
    height_shift_range=0.1,
    horizontal_flip=True,
)
datagen.fit(images)

In [None]:
model.compile(loss=tf.keras.losses.sparse_categorical_crossentropy,
              optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005),
              metrics=[tf.keras.metrics.sparse_categorical_accuracy])
training2 = model.fit(datagen.flow(images, labels, batch_size=32),epochs= 20, validation_data=(images_valid, labels_valid), callbacks=[model_checkpoint])

In [47]:
datagen = ImageDataGenerator(
    rotation_range=15,
    width_shift_range=0.1,
    height_shift_range=0.1,
    horizontal_flip=True,
)
datagen.fit(images)

In [None]:
model.compile(loss=tf.keras.losses.sparse_categorical_crossentropy,
              optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
              metrics=[tf.keras.metrics.sparse_categorical_accuracy])
training3 = model.fit(datagen.flow(images, labels, batch_size=32),epochs= 5, validation_data=(images_valid, labels_valid), callbacks=[model_checkpoint])

In [None]:
fig = plt.figure()
plt.plot(training2.history['sparse_categorical_accuracy'], color='blue', label='accuracy')
plt.plot(training2.history['val_sparse_categorical_accuracy'], color='orange', label='val_accuracy')
fig.suptitle('Accuracy', fontsize=20)
plt.legend(loc="upper left")
plt.show()

In [None]:
fig = plt.figure()
plt.plot(training2.history['loss'], color='blue', label='loss')
plt.plot(training2.history['val_loss'], color='orange', label='val_loss')
fig.suptitle('Loss', fontsize=20)
plt.legend(loc="upper left")
plt.show()

### Test Data

In [None]:
model.evaluate(test_images, test_labels)

In [59]:
test_pred = model.predict(test_images)

In [60]:
test_pred = test_pred.argmax(axis=-1)

In [None]:
f1_class = f1_score(test_labels, test_pred,average=None)
print(f1_class)

In [62]:
import os

In [63]:
model.save(os.path.join('models','ResNet.h5'))