# CNN using `Keras` library

This tutorial will demonstrate image classification using the `Keras` library. This library allows for GPU acceleration. By utilizing GPUs, Keras significantly speeds up the training process, enabling the parallel processing of matrix operations involved in CNNs, resulting in faster and improved performance when training on large datasets like MNIST.





### Import libraries and initialize random generator

In [1]:
import numpy as np
import keras
np.random.seed(0)

caused by: ['/opt/conda/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io_plugins.so: undefined symbol: _ZN3tsl6StatusC1EN10tensorflow5error4CodeESt17basic_string_viewIcSt11char_traitsIcEENS_14SourceLocationE']
caused by: ['/opt/conda/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io.so: undefined symbol: _ZTVN10tensorflow13GcsFileSystemE']


A GPU (Graphics Processing Unit) is crucial for training CNN on large dataset due to its ability to perform parallel computations efficiently. CNNs involve lots of matrix operations, such as convolutions and pooling, which can be computationally intensive. GPUs excel at executing these operations simultaneously across multiple cores, dramatically accelerating the training process compared to a CPU. The parallel nature of GPUs allows for faster gradient calculations during backpropagation, leading to quicker weight updates and convergence to an optimal solution. Thus, the GPU's parallel processing capabilities significantly reduce the training time, making it an indispensable tool for effectively training CNNs on large datasets like MNIST.


In [2]:
# Check GPU availability
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

Num GPUs Available:  1


### Read data using Keras

In [3]:
# Load the MNIST dataset
from keras.datasets import mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [5]:
# Preprocess the data
X_train = X_train.reshape(X_train.shape[0], 28, 28, 1).astype('float32') / 255
X_test = X_test.reshape(X_test.shape[0], 28, 28, 1).astype('float32') / 255

from keras.utils import np_utils
y_train = np_utils.to_categorical(y_train, 10)
y_test = np_utils.to_categorical(y_test, 10)

### Create the CNN model


In [11]:
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D

model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(Flatten())
model.add(Dense(128, activation='sigmoid'))
model.add(Dense(10, activation='softmax'))


In [12]:
# Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Train the model using GPU acceleration
with tf.device('/device:GPU:0'):
    model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=10, batch_size=200, verbose=2)

Epoch 1/10


ValueError: in user code:

    File "/opt/conda/lib/python3.10/site-packages/keras/engine/training.py", line 1284, in train_function  *
        return step_function(self, iterator)
    File "/opt/conda/lib/python3.10/site-packages/keras/engine/training.py", line 1268, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/opt/conda/lib/python3.10/site-packages/keras/engine/training.py", line 1249, in run_step  **
        outputs = model.train_step(data)
    File "/opt/conda/lib/python3.10/site-packages/keras/engine/training.py", line 1051, in train_step
        loss = self.compute_loss(x, y, y_pred, sample_weight)
    File "/opt/conda/lib/python3.10/site-packages/keras/engine/training.py", line 1109, in compute_loss
        return self.compiled_loss(
    File "/opt/conda/lib/python3.10/site-packages/keras/engine/compile_utils.py", line 265, in __call__
        loss_value = loss_obj(y_t, y_p, sample_weight=sw)
    File "/opt/conda/lib/python3.10/site-packages/keras/losses.py", line 142, in __call__
        losses = call_fn(y_true, y_pred)
    File "/opt/conda/lib/python3.10/site-packages/keras/losses.py", line 268, in call  **
        return ag_fn(y_true, y_pred, **self._fn_kwargs)
    File "/opt/conda/lib/python3.10/site-packages/keras/losses.py", line 1984, in categorical_crossentropy
        return backend.categorical_crossentropy(
    File "/opt/conda/lib/python3.10/site-packages/keras/backend.py", line 5559, in categorical_crossentropy
        target.shape.assert_is_compatible_with(output.shape)

    ValueError: Shapes (200, 10, 10) and (200, 10) are incompatible


In [None]:
# Evaluate the model using GPU acceleration
with tf.device('/device:GPU:0'):
    score = model.evaluate(X_test, y_test, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

# Experimentation with models with different layers

In [None]:
# Create the CNN model. No dropout
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dense(10, activation='softmax'))

# Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Train the model using GPU acceleration
with tf.device('/device:GPU:0'):
    model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=10, batch_size=200, verbose=2)

# Evaluate the model using GPU acceleration
with tf.device('/device:GPU:0'):
    score = model.evaluate(X_test, y_test, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

In [None]:
# Create the CNN model. No dropout. no pooling
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dense(10, activation='softmax'))

# Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Train the model using GPU acceleration
with tf.device('/device:GPU:0'):
    model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=10, batch_size=200, verbose=2)

# Evaluate the model using GPU acceleration
with tf.device('/device:GPU:0'):
    score = model.evaluate(X_test, y_test, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

In [None]:
# Create the CNN model. No dropout. no pooling
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(Flatten())
model.add(Dense(128, activation='sigmoid'))
model.add(Dense(10, activation='softmax'))

# Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Train the model using GPU acceleration
with tf.device('/device:GPU:0'):
    model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=10, batch_size=200, verbose=2)

# Evaluate the model using GPU acceleration
with tf.device('/device:GPU:0'):
    score = model.evaluate(X_test, y_test, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

In [None]:
# Create the CNN model. No dropout. no pooling
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(Flatten())
model.add(Dense(128, activation='sigmoid'))
model.add(Dense(10, activation='sigmoid'))

# Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Train the model using GPU acceleration
with tf.device('/device:GPU:0'):
    model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=10, batch_size=200, verbose=2)

# Evaluate the model using GPU acceleration
with tf.device('/device:GPU:0'):
    score = model.evaluate(X_test, y_test, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

In [None]:
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), input_shape=(28, 28, 1)))
model.add(Activation("tanh"))
model.add(Flatten())
model.add(Dense(128))
model.add(Activation("tanh"))
model.add(Dense(10))
model.add(Activation("softmax"))
# Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Train the model using GPU acceleration
with tf.device('/device:GPU:0'):
    model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=10, batch_size=200, verbose=2)

# Evaluate the model using GPU acceleration
with tf.device('/device:GPU:0'):
    score = model.evaluate(X_test, y_test, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

In [None]:
# Create the CNN model
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(Conv2D(64, kernel_size=(3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Conv2D(128, kernel_size=(3, 3), activation='relu'))
model.add(Conv2D(128, kernel_size=(3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Flatten())
model.add(Dense(256, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(10, activation='softmax'))

# Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Train the model using GPU acceleration
with tf.device('/device:GPU:0'):
    model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=10, batch_size=200, verbose=2)

# Evaluate the model using GPU acceleration
with tf.device('/device:GPU:0'):
    score = model.evaluate(X_test, y_test, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

# CNN CPU

In [None]:
import numpy as np
import keras
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.utils import np_utils
import tensorflow as tf

# Load the MNIST dataset
(X_train, y_train), (X_test, y_test) = mnist.load_data()

# Preprocess the data
X_train = X_train.reshape(X_train.shape[0], 28, 28, 1).astype('float32') / 255
X_test = X_test.reshape(X_test.shape[0], 28, 28, 1).astype('float32') / 255

y_train = np_utils.to_categorical(y_train, 10)
y_test = np_utils.to_categorical(y_test, 10)

# Build the CNN model
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(10, activation='softmax'))

# Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Train the model
batch_size = 128
epochs = 10
model.fit(X_train, y_train, batch_size=batch_size, epochs=epochs, validation_split=0.1)

# Evaluate the model
score = model.evaluate(X_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])


In [None]:
import numpy as np
from keras.datasets import mnist

# Load MNIST dataset using Keras
(X_train, y_train), (X_test, y_test) = mnist.load_data()

# Preprocess the data
X_train = X_train.reshape(-1, 28, 28, 1) / 255.0
X_test = X_test.reshape(-1, 28, 28, 1) / 255.0
y_train = y_train.astype(np.int32)
y_test = y_test.astype(np.int32)

# Define activation function and its derivative
def relu(x):
    return np.maximum(0, x)

def relu_derivative(x):
    return np.where(x > 0, 1, 0)

# Implement Convolutional Neural Network (CNN) from scratch
class CNN:
    def __init__(self):
        self.conv_filters = 8
        self.kernel_size = 3
        self.pool_size = 2
        self.fc_units = 128
        self.learning_rate = 0.001

        # Initialize weights and biases for the convolutional and fully connected layers
        self.conv_weights = np.random.randn(self.conv_filters, self.kernel_size, self.kernel_size)
        self.conv_biases = np.zeros(self.conv_filters)
        self.fc_weights = np.random.randn(12 * 12 * self.conv_filters, self.fc_units)
        self.fc_biases = np.zeros(self.fc_units)
        self.output_weights = np.random.randn(self.fc_units, 10)
        self.output_biases = np.zeros(10)

    def forward(self, x):
        # Convolution layer
        conv_out = np.zeros((x.shape[0], 28 - self.kernel_size + 1, 28 - self.kernel_size + 1, self.conv_filters))
        for i in range(self.conv_filters):
            for j in range(28 - self.kernel_size + 1):
                for k in range(28 - self.kernel_size + 1):
                    conv_out[:, j, k, i] = np.sum(x[:, j:j+self.kernel_size, k:k+self.kernel_size] * self.conv_weights[i, :, :],
                                                  axis=(1, 2)) + self.conv_biases[i]

        conv_out = relu(conv_out)

        # Max pooling layer
        pool_out = np.zeros((x.shape[0], 14, 14, self.conv_filters))
        for i in range(self.conv_filters):
            for j in range(14):
                for k in range(14):
                    pool_out[:, j, k, i] = np.max(conv_out[:, j*2:j*2+self.pool_size, k*2:k*2+self.pool_size, i], axis=(1, 2))

        pool_out = pool_out.reshape(pool_out.shape[0], -1)

        # Fully connected layer
        fc_out = relu(np.dot(pool_out, self.fc_weights) + self.fc_biases)

        # Output layer
        output = np.dot(fc_out, self.output_weights) + self.output_biases

        # Apply softmax activation for the final output
        exp_scores = np.exp(output - np.max(output, axis=1, keepdims=True))
        probabilities = exp_scores / np.sum(exp_scores, axis=1, keepdims=True)

        return probabilities

    def train(self, X, y, epochs=10, batch_size=32):
        num_samples = X.shape[0]

        for epoch in range(epochs):
            total_loss = 0
            for i in range(0, num_samples, batch_size):
                X_batch = X[i:i+batch_size]
                y_batch = y[i:i+batch_size]

                # Forward pass
                output = self.forward(X_batch)

                # Compute loss using cross-entropy
                loss = -np.log(output[np.arange(batch_size), y_batch]).mean()
                total_loss += loss

                # Backpropagation
                one_hot_y = np.zeros_like(output)
                one_hot_y[np.arange(batch_size), y_batch] = 1
                delta3 = output - one_hot_y

                delta2 = np.dot(delta3, self.output_weights.T) * relu_derivative(np.dot(X_batch, self.fc_weights) + self.fc_biases)
                delta2 = delta2.reshape(delta2.shape[0], 12, 12, self.conv_filters)

                dconv_weights = np.zeros_like(self.conv_weights)
                dconv_biases = np.zeros_like(self.conv_biases)
                for j in range(self.conv_filters):
                    for k in range(12):
                        for l in range(12):
                            dconv_weights[j, :, :] += np.sum(delta2[:, k, l, j][:, np.newaxis, np.newaxis] *
                                                             X_batch[:, k:k+self.kernel_size, l:l+self.kernel_size],
                                                             axis=0)
                dconv_biases = np.sum(delta2, axis=(0, 1, 2))

                dfc_weights = np.dot(pool_out.T, delta2.reshape(delta2.shape[0], -1)) / batch_size
                dfc_biases = np.sum(delta2, axis=0).sum(axis=1) / batch_size

                dout_weights = np.dot(fc_out.T, delta3) / batch_size
                dout_biases = np.sum(delta3, axis=0) / batch_size

                # Update weights and biases using gradients
                self.conv_weights -= self.learning_rate * dconv_weights
                self.conv_biases -= self.learning_rate * dconv_biases
                self.fc_weights -= self.learning_rate * dfc_weights
                self.fc_biases -= self.learning_rate * dfc_biases
                self.output_weights -= self.learning_rate * dout_weights
                self.output_biases -= self.learning_rate * dout_biases

            avg_loss = total_loss / (num_samples // batch_size)
            print(f"Epoch {epoch + 1}/{epochs}, Loss: {avg_loss:.4f}")

    def predict(self, X):
        return np.argmax(self.forward(X), axis=1)

# Create the CNN model and train it
model = CNN()
model.train(X_train, y_train, epochs=10, batch_size=32)

# Evaluate the model on the test set
predictions = model.predict(X_test)
accuracy = np.mean(predictions == y_test)
print("Test accuracy:", accuracy)
