# AI ENGINEERING IBM COURSE TEXTBOOK - KERAS

### Libraries

In [None]:
import keras
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Dense
from keras.utils import to_categorical
from keras.models import load_model
from tensorflow.keras.models import Model 
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Softmax
from keras.layers.convolutional import Conv2D # to add convolutional layers
from keras.layers.convolutional import MaxPooling2D # to add pooling layers
from keras.layers import Flatten # to flatten data for fully connected layers
from PIL import Image, ImageDraw
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from tensorflow.keras.utils import array_to_img
from tensorflow.keras.applications import VGG16
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, UpSampling2D
from tensorflow.keras.layers import LayerNormalization
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.datasets import mnist
import keras_tuner as kt
import gym
from collections import deque

### Initializing a network without framework

In [None]:
n = 2 # number of inputs
num_hidden_layers = 2 # number of hidden layers
m = [2, 2] # number of nodes in each hidden layer
num_nodes_output = 1 # number of nodes in the output layer

In [5]:
import numpy as np # import the Numpy library

def initialize_network(num_inputs, num_hidden_layers, num_nodes_hidden, num_nodes_output):
    
    num_nodes_previous = num_inputs # number of nodes in the previous layer

    network = {}
    
    # loop through each layer and randomly initialize the weights and biases associated with each layer
    for layer in range(num_hidden_layers + 1):
        
        if layer == num_hidden_layers:
            layer_name = 'output' # name last layer in the network output
            num_nodes = num_nodes_output
        else:
            layer_name = 'layer_{}'.format(layer + 1) # otherwise give the layer a number
            num_nodes = num_nodes_hidden[layer] 
        
        # initialize weights and bias for each node
        network[layer_name] = {}
        for node in range(num_nodes):
            node_name = 'node_{}'.format(node+1)
            network[layer_name][node_name] = {
                'weights': np.around(np.random.uniform(size=num_nodes_previous), decimals=2),
                'bias': np.around(np.random.uniform(size=1), decimals=2),
            }
    
        num_nodes_previous = num_nodes

    return network # return the network

def compute_weighted_sum(inputs, weights, bias):
    # Convert inputs and weights to numpy arrays
    inputs = np.array(inputs)
    weights = np.array(weights)
    return np.sum(inputs * weights) + bias

#node activation using sigmoid function
def node_activation(weighted_sum):
    return 1.0 / (1.0 + np.exp(-1 * weighted_sum))

#automate all process with this function
def forward_propagate(network, inputs):
    
    layer_inputs = list(inputs) # start with the input layer as the input to the first hidden layer
    
    for layer in network:
        
        layer_data = network[layer]
        
        layer_outputs = [] 
        for layer_node in layer_data:
        
            node_data = layer_data[layer_node]
        
            # compute the weighted sum and the output of each node at the same time 
            node_output = node_activation(compute_weighted_sum(layer_inputs, node_data['weights'], node_data['bias']))
            layer_outputs.append(np.around(node_output[0], decimals=4))
            
        if layer != 'output':
            print('The outputs of the nodes in hidden layer number {} is {}'.format(layer.split('_')[1], layer_outputs))
    
        layer_inputs = layer_outputs # set the output of this layer to be the input to next layer

    network_predictions = layer_outputs
    return network_predictions

In [1]:
#example of using compute_weighted_sum function with network dictionary
node_weights = small_network['layer_1']['node_1']['weights']
node_bias = small_network['layer_1']['node_1']['bias']

weighted_sum = compute_weighted_sum(inputs, node_weights, node_bias)
print('The weighted sum at the first node in the hidden layer is {}'.format(np.around(weighted_sum[0], decimals=4)))

#and/or

node_output  = node_activation(compute_weighted_sum(inputs, node_weights, node_bias))
print('The output of the first node in the hidden layer is {}'.format(np.around(node_output[0], decimals=4)))

#create random inputs
inputs_01 = np.around(np.random.uniform(size=10), decimals=2)

In [6]:
weighted_sum = compute_weighted_sum([0.5, -0.35], [0.55, 0.45], 0.15)
print(weighted_sum)
output = node_activation(weighted_sum)
print(output)

0.2675
0.5664790559676278


### Regression with Keras

In [None]:
# define regression model
def regression_model():
    # create model
    model = Sequential()
    model.add(Dense(50, activation='relu', input_shape=(n_cols,)))
    model.add(Dense(50, activation='relu'))
    model.add(Dense(1))
    
    # compile model
    model.compile(optimizer='adam', loss='mean_squared_error')
    return model

In [None]:
# build the model
model = regression_model()

# fit the model
model.fit(predictors_norm, target, validation_split=0.3, epochs=100, verbose=2)

### Classification with Keras

In [None]:
# define classification model
def classification_model():
    # create model
    model = Sequential()
    model.add(Dense(num_pixels, activation='relu', input_shape=(num_pixels,)))
    model.add(Dense(100, activation='relu'))
    model.add(Dense(num_classes, activation='softmax'))
    
    # compile model
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

In [None]:
# import the data
from keras.datasets import mnist

# read the data
(X_train, y_train), (X_test, y_test) = mnist.load_data()

X_train.shape
#(60000, 28, 28)

plt.imshow(X_train[0])

#With conventional neural networks, we cannot feed in the image as input as is. So we need to flatten the images 
#into one-dimensional vectors, each of size 1 x (28 x 28) = 1 x 784.
# flatten images into one-dimensional vector

num_pixels = X_train.shape[1] * X_train.shape[2] # find size of one-dimensional vector

X_train = X_train.reshape(X_train.shape[0], num_pixels).astype('float32') # flatten training images
X_test = X_test.reshape(X_test.shape[0], num_pixels).astype('float32') # flatten test images

#Since pixel values can range from 0 to 255, let's normalize the vectors to be between 0 and 1.
# normalize inputs from 0-255 to 0-1
X_train = X_train / 255
X_test = X_test / 255

#Finally, before we start building our model, remember that for classification we need to divide our target variable 
#into categories. We use the to_categorical function from the Keras Utilities package.
# one hot encode outputs
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)

num_classes = y_test.shape[1]
print(num_classes)

# build the model
model = classification_model()

# fit the model (!!!training model can take up to 20 minutes!!!)
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=10, verbose=2)

# evaluate the model
scores = model.evaluate(X_test, y_test, verbose=0)

#save model
model.save('classification_model.h5')

#load saved model
pretrained_model = load_model('classification_model.h5')

### Convolutional Neural Networks with Keras

In [None]:
def convolutional_model():
    
    # create model
    model = Sequential()
    model.add(Conv2D(16, (5, 5), strides=(1, 1), activation='relu', input_shape=(28, 28, 1)))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
    
    model.add(Flatten())
    model.add(Dense(100, activation='relu'))
    model.add(Dense(num_classes, activation='softmax'))
    
    # compile model
    model.compile(optimizer='adam', loss='categorical_crossentropy',  metrics=['accuracy'])
    return model

In [None]:
# build the model
model = convolutional_model()

# fit the model
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=10, batch_size=200, verbose=2)

# evaluate the model
scores = model.evaluate(X_test, y_test, verbose=0)
print("Accuracy: {} \n Error: {}".format(scores[1], 100-scores[1]*100))

### Functional API in Keras

In [None]:
#create input layer. Assume that inputs are of vector size 20
input_layer = Input(shape=(20,))
print(input_layer)

#add hidden layers
hidden_layer1 = Dense(64, activation='relu')(input_layer) #creates a dense (fully connected) layer with 64 units and ReLU activation function.
hidden_layer2 = Dense(64, activation='relu')(hidden_layer1)

#define output layer
output_layer = Dense(1, activation='sigmoid')(hidden_layer2) #creates a dense layer with 1 unit and a sigmoid activation function, suitable for binary classification.

#create model and give summary
model = Model(inputs=input_layer, outputs=output_layer)
model.summary()

#Before training the model, you need to compile it. You will specify the loss function, optimizer, and evaluation metrics.
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

#train model (example)
import numpy as np 
X_train = np.random.rand(1000, 20) 
y_train = np.random.randint(2, size=(1000, 1)) 
model.fit(X_train, y_train, epochs=10, batch_size=32)

# Example test data (in practice, use real dataset) 
loss, accuracy = model.evaluate(X_test, y_test)
print(f'Test loss: {loss}')
print(f'Test accuracy: {accuracy}')


### Dropout and Batch Normalization

Before we proceed with the practice exercise, let's briefly discuss two important techniques often used to improve the performance of neural networks: **Dropout Layers** and **Batch Normalization**.

#### Dropout Layers

Dropout is a regularization technique that helps prevent overfitting in neural networks. During training, Dropout randomly sets a fraction of input units to zero at each update cycle. This prevents the model from becoming overly reliant on any specific neurons, which encourages the network to learn more robust features that generalize better to unseen data.

**Key points:**
- Dropout is only applied during training, not during inference.
- The dropout rate is a hyperparameter that determines the fraction of neurons to drop.


#### Batch Normalization

Batch Normalization is a technique used to improve the training stability and speed of neural networks. It normalizes the output of a previous layer by re-centering and re-scaling the data, which helps in stabilizing the learning process. By reducing the internal covariate shift (the changes in the distribution of layer inputs), batch normalization allows the model to use higher learning rates, which often speeds up convergence.

**Key Points:**

- Batch normalization works by normalizing the inputs to each layer to have a mean of zero and a variance of one.
- It is applied during both training and inference, although its behavior varies slightly between the two phases.
- Batch normalization layers also introduce two learnable parameters that allow the model to scale and - shift the normalized output, which helps in restoring the model's representational power.

In [None]:
#example of adding dropout layer in Keras
from tensorflow.keras.layers import Dropout, Dense, Input
from tensorflow.keras.models import Model

# Define the input layer
input_layer = Input(shape=(20,))

# Add a hidden layer
hidden_layer = Dense(64, activation='relu')(input_layer)

# Add a Dropout layer
dropout_layer = Dropout(rate=0.5)(hidden_layer)

# Add another hidden layer after Dropout
hidden_layer2 = Dense(64, activation='relu')(dropout_layer)

# Define the output layer
output_layer = Dense(1, activation='sigmoid')(hidden_layer2)

# Create the model
model = Model(inputs=input_layer, outputs=output_layer)

# Summary of the model
model.summary()

In [None]:
#example of adding batch normalization in Keras
from tensorflow.keras.layers import BatchNormalization, Dense, Input
from tensorflow.keras.models import Model

# Define the input layer
input_layer = Input(shape=(20,))

# Add a hidden layer
hidden_layer = Dense(64, activation='relu')(input_layer)

# Add a BatchNormalization layer
batch_norm_layer = BatchNormalization()(hidden_layer)

# Add another hidden layer after BatchNormalization
hidden_layer2 = Dense(64, activation='relu')(batch_norm_layer)

# Define the output layer
output_layer = Dense(1, activation='sigmoid')(hidden_layer2)

# Create the model
model = Model(inputs=input_layer, outputs=output_layer)

# Summary of the model
model.summary()

##### Example of full model using Keras

In [None]:
#create input layer. Assume that inputs are of vector size 20
input_layer = Input(shape=(20,))

#add hidden layer
hidden_layer1 = Dense(64, activation='relu')(input_layer)

#add batch layer
batch_norm_layer1 = BatchNormalization()(hidden_layer1)

#add dropout layer
dropout_layer1 = Dropout(rate=0.5)(batch_norm_layer1)

#add hidden layer
hidden_layer2 = Dense(64, activation='relu')(dropout_layer1)

#add batch layer
batch_norm_layer2 = BatchNormalization()(hidden_layer2)

#add dropout layer
dropout_layer2 = Dropout(rate=0.5)(batch_norm_layer2)

#define output layer
output_layer = Dense(1, activation='sigmoid')(dropout_layer2)

#recreate model
model = Model(inputs=input_layer, outputs=output_layer)

#model summary
model.summary()

#recompile model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

#train model
model.fit(X_train, y_train, epochs=10, batch_size=32)

### Creating Custom Layers and Models

In [None]:
class CustomDenseLayer(Layer):
    def __init__(self, units=32):
        super(CustomDenseLayer, self).__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units),
                                 initializer='random_normal',
                                 trainable=True)
        self.b = self.add_weight(shape=(self.units,),
                                 initializer='zeros',
                                 trainable=True)
    def call(self, inputs):
        return tf.nn.relu(tf.matmul(inputs, self.w) + self.b)

In [None]:
from tensorflow.keras.layers import Softmax

# Define the model with Softmax in the output layer
model = Sequential([
    CustomDenseLayer(128),
    CustomDenseLayer(10),  # Hidden layer with ReLU activation
    Softmax()              # Output layer with Softmax activation for multi-class classification
])

#The Softmax activation function is used in the output layer for multi-class classification tasks, ensuring the model 
# outputs probabilities that sum up to 1 for each class, which aligns with categorical cross-entropy as the loss function. 
# This adjustment ensures the model is optimized correctly for multi-class classification.

model.compile(optimizer='adam', loss='categorical_crossentropy')
print("Model summary before building:")
model.summary()

# Build the model to show parameters
model.build((1000, 20))
print("\nModel summary after building:")
model.summary()

# Visualize the model architecture
plot_model(model, to_file="model_plot.png", show_shapes=True, show_layer_names=True)

### Advanced Data Augmentation with Keras

In [None]:
from PIL import Image, ImageDraw

# Create a blank white image
image = Image.new('RGB', (224, 224), color = (255, 255, 255))

# Draw a red square
draw = ImageDraw.Draw(image)
draw.rectangle([(50, 50), (174, 174)], fill=(255, 0, 0))

# Save the image
image.save('sample.jpg')

# Load a sample image 
img_path = 'sample.jpg' 
img = load_img(img_path) 
x = img_to_array(img) 
x = np.expand_dims(x, axis=0) 

import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array

# Load the sample image
img_path = 'sample.jpg'
img = load_img(img_path)
x = img_to_array(img)
x = np.expand_dims(x, axis=0)

# Create an instance of ImageDataGenerator with basic augmentations
datagen = ImageDataGenerator(
    rotation_range=40,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

# Generate batches of augmented images
i = 0
for batch in datagen.flow(x, batch_size=1):
    plt.figure(i)
    imgplot = plt.imshow(batch[0].astype('uint8'))
    i += 1
    if i % 4 == 0:
        break

plt.show()

In [None]:
# Create an instance of ImageDataGenerator with normalization options
datagen = ImageDataGenerator(
    featurewise_center=True,
    featurewise_std_normalization=True,
    samplewise_center=True,
    samplewise_std_normalization=True
)

# Load the sample image again and fit the generator (normally done on the training set)
datagen.fit(x)

# Generate batches of normalized images
i = 0
for batch in datagen.flow(x, batch_size=1):
    plt.figure(i)
    imgplot = plt.imshow(batch[0].astype('uint8'))
    i += 1
    if i % 4 == 0:
        break

plt.show()

In [None]:
# Define a custom data augmentation function
def add_random_noise(image):
    noise = np.random.normal(0, 0.5, image.shape)
    return image + noise

# Create an instance of ImageDataGenerator with the custom augmentation
datagen = ImageDataGenerator(preprocessing_function=add_random_noise)

# Generate batches of augmented images with noise
i = 0
for batch in datagen.flow(x, batch_size=1):
    plt.figure(i)
    imgplot = plt.imshow(batch[0].astype('uint8'))
    i += 1
    if i % 4 == 0:
        break

plt.show()

In [None]:
# Visualizing multiple augmented versions of the same image
plt.figure(figsize=(10, 10))
for i, batch in enumerate(datagen.flow(x, batch_size=1)):
    if i >= 4:  # Show only 4 versions
        break
    plt.subplot(2, 2, i+1)
    plt.imshow(batch[0].astype('uint8'))
plt.show()

In [None]:
from tensorflow.keras.utils import array_to_img

# Create an instance of ImageDataGenerator with normalization options
datagen = ImageDataGenerator(
    featurewise_center=True,
    featurewise_std_normalization=True,
    samplewise_center=True,
    samplewise_std_normalization=True
)

# Load the sample image again and fit the generator (normally done on the training set)
datagen.fit(x_train)

# Generate batches of normalized images
i = 0
for batch in datagen.flow(x_train, batch_size=1):
    plt.figure(i)
    imgplot = plt.imshow(array_to_img(batch[0]))
    plt.title(f'Normalized Image {i + 1}')
    i += 1
    if i % 4 == 0:
        break

plt.show()

### Transfer Learning Implementation

In [None]:
# Load the VGG16 model pre-trained on ImageNet
base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Freeze the base model layers
for layer in base_model.layers:
    layer.trainable = False

# Create a new model and add the base model and new layers
model = Sequential([
    base_model,
    Flatten(),
    Dense(256, activation='relu'),
    Dense(1, activation='sigmoid')  # Change to the number of classes you have
])

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Create directories if they don't exist
os.makedirs('sample_data/class_a', exist_ok=True)
os.makedirs('sample_data/class_b', exist_ok=True)

# Create 10 sample images for each class
for i in range(10):
    # Create a blank white image for class_a
    img = Image.fromarray(np.ones((224, 224, 3), dtype=np.uint8) * 255)
    img.save(f'sample_data/class_a/img_{i}.jpg')

    # Create a blank black image for class_b
    img = Image.fromarray(np.zeros((224, 224, 3), dtype=np.uint8))
    img.save(f'sample_data/class_b/img_{i}.jpg')

print("Sample images created in 'sample_data/'")

# Load and preprocess the dataset
train_datagen = ImageDataGenerator(rescale=1./255)
train_generator = train_datagen.flow_from_directory(
    'sample_data',
    target_size=(224, 224),
    batch_size=32,
    class_mode='binary'
)

# Verify if the generator has loaded images correctly
print(f"Found {train_generator.samples} images belonging to {train_generator.num_classes} classes.")

# Train the model
if train_generator.samples > 0:
    model.fit(train_generator, epochs=10)

# Unfreeze the top layers of the base model 

for layer in base_model.layers[-4:]:
    layer.trainable = True 

# Compile the model again 
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy']) 

# Train the model again 
model.fit(train_generator, epochs=10)

Validation generator and loss by epochs graph

In [None]:
# Load and preprocess the dataset
train_datagen = ImageDataGenerator(rescale=1./255, validation_split=0.2)
train_generator = train_datagen.flow_from_directory(
    'sample_data',
    target_size=(224, 224),
    batch_size=32,
    class_mode='binary',
    subset='training'
)

validation_generator = train_datagen.flow_from_directory(
    'sample_data',
    target_size=(224, 224),
    batch_size=32,
    class_mode='binary',
    subset='validation'
)

#train the model with the validation data
history = model.fit(train_generator, epochs=10, validation_data=validation_generator)

# Plot training and validation loss
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Evaluate the fine-tuned model on the test set
test_loss, test_accuracy = model.evaluate(test_generator)
print(f'Test Accuracy: {test_accuracy * 100:.2f}%')
print(f'Test Loss: {test_loss:.4f}')

### Hyperparameter tuning

Hyperparameter tuning is a crucial step in enhancing the performance of deep learning models. Below are some best practices for optimizing hyperparameters effectively:
<br>

Use Grid Search for Small Spaces: If the number of hyperparameters and their possible values is small, grid search can be a useful approach. It systematically explores all combinations but can become computationally expensive for larger search spaces.
<br>

Random search for larger spaces: In larger search spaces, random search is more efficient than grid search, as it randomly samples configurations. While it's less exhaustive, it often identifies a good set of hyperparameters faster.
<br>

Leverage bayesian optimization: Bayesian optimization methods, such as those used in Keras Tuner, model the model's performance as a function of the hyperparameters. This approach focuses the search on regions of the space that are more likely to yield better results.
<br>

Use early stopping: To avoid overfitting and reduce computational cost, apply early stopping during hyperparameter tuning. This halts training when performance on a validation set plateaus or declines.
<br>

Tune one parameter at a time: When starting, adjust one hyperparameter at a time (e.g., learning rate, batch size, or number of layers) while keeping the others constant. Once you have a clearer understanding, you can start tuning multiple parameters simultaneously.
<br>

Prioritize learning rate: The learning rate is often the most critical hyperparameter. Begin by optimizing the learning rate before focusing on others, such as the dropout rate or optimizer type.
<br>

Monitor overfitting: Employ techniques such as cross-validation and tracking validation loss to detect overfitting during hyperparameter tuning. Regularization methods such as L2 regularization or dropout can help mitigate overfitting.
<br>

By following these best practices, you can optimize your model's performance and avoid common pitfalls in hyperparameter tuning.

### Practical Application of Transpose Convolution (Increasing quality of images)

In [None]:
input_layer = Input(shape=(28, 28, 1))

conv_layer = Conv2D(filters=32, kernel_size=(3, 3), activation='relu', padding='same')(input_layer) 

transpose_conv_layer = Conv2DTranspose(filters=1, kernel_size=(3, 3), activation='sigmoid', padding='same')(conv_layer)

model = Model(inputs=input_layer, outputs=transpose_conv_layer)

model.compile(optimizer='adam', loss='mean_squared_error', metrics=['accuracy'])

history = model.fit(X_train, y_train, epochs=10, batch_size=32, validation_split=0.2)

# Predict on test data 
y_pred = model.predict(X_test) 

# Plot some sample images 

n = 10 # Number of samples to display 

plt.figure(figsize=(20, 4))

for i in range(n): 

    # Display original 
    ax = plt.subplot(2, n, i + 1) 
    plt.imshow(X_test[i].reshape(28, 28), cmap='gray')
    plt.title("Original") 
    plt.axis('off') 
    # Display reconstruction 
    ax = plt.subplot(2, n, i + 1 + n) 
    plt.imshow(y_pred[i].reshape(28, 28), cmap='gray')
    plt.title("Reconstructed")
    plt.axis('off')

plt.show()

### Building Advanced Transformers

In [None]:
# MultiHeadSelfAttention class implements the self-attention mechanism with multiple attention heads.
class MultiHeadSelfAttention(Layer): 

    def __init__(self, embed_dim, num_heads=8): 
        # Initialize the MultiHeadSelfAttention layer with specified embedding dimensions and number of heads
        super(MultiHeadSelfAttention, self).__init__()
        
        # Set embedding dimension and number of attention heads
        self.embed_dim = embed_dim 
        self.num_heads = num_heads 
        
        # Projection dimension per head, calculated by dividing embedding dimension by number of heads
        self.projection_dim = embed_dim // num_heads 
        
        # Layers for dense projections of query, key, and value tensors
        self.query_dense = Dense(embed_dim) 
        self.key_dense = Dense(embed_dim) 
        self.value_dense = Dense(embed_dim) 
        
        # Layer to combine the heads' output into a single tensor
        self.combine_heads = Dense(embed_dim) 

    def attention(self, query, key, value): 
        # Calculate the attention score as a dot product of query and key matrices
        score = tf.matmul(query, key, transpose_b=True) 
        
        # Scale the score by the square root of the key dimension to stabilize gradients
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32) 
        scaled_score = score / tf.math.sqrt(dim_key) 
        
        # Apply softmax to obtain attention weights
        weights = tf.nn.softmax(scaled_score, axis=-1) 
        
        # Compute the weighted sum of the value vectors using the attention weights
        output = tf.matmul(weights, value) 
        return output, weights 

    def split_heads(self, x, batch_size): 
        # Split the input tensor into multiple heads and reshape it
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim)) 
        
        # Transpose the tensor to shape it as (batch_size, num_heads, seq_len, projection_dim)
        return tf.transpose(x, perm=[0, 2, 1, 3]) 

    def call(self, inputs): 
        # Obtain the batch size from the input tensor
        batch_size = tf.shape(inputs)[0] 
        
        # Pass inputs through dense layers to generate query, key, and value matrices
        query = self.query_dense(inputs) 
        key = self.key_dense(inputs) 
        value = self.value_dense(inputs) 
        
        # Split query, key, and value tensors into multiple heads
        query = self.split_heads(query, batch_size) 
        key = self.split_heads(key, batch_size) 
        value = self.split_heads(value, batch_size) 
        
        # Compute attention output and weights using the attention mechanism
        attention, _ = self.attention(query, key, value) 
        
        # Transpose attention output back to shape (batch_size, seq_len, num_heads, projection_dim)
        attention = tf.transpose(attention, perm=[0, 2, 1, 3]) 
        
        # Concatenate heads' output into a single tensor of shape (batch_size, seq_len, embed_dim)
        concat_attention = tf.reshape(attention, (batch_size, -1, self.embed_dim)) 
        
        # Pass concatenated output through a dense layer to combine heads
        output = self.combine_heads(concat_attention) 
        return output 

In [None]:
# TransformerBlock class implements a transformer block layer, combining multi-head self-attention and a feed-forward network.
class TransformerBlock(Layer): 

    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1): 
        # Initialize the TransformerBlock with embedding dimensions, number of heads, feed-forward dimension, and dropout rate
        super(TransformerBlock, self).__init__()
        
        # Multi-head self-attention layer
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)
        
        # Feed-forward network (FFN) with two dense layers:
        # First layer expands to `ff_dim` and applies ReLU activation
        # Second layer reduces back to `embed_dim` without activation
        self.ffn = tf.keras.Sequential([ 
            Dense(ff_dim, activation="relu"), 
            Dense(embed_dim), 
        ])
        
        # Layer normalization applied after attention and feed-forward layers
        self.layernorm1 = LayerNormalization(epsilon=1e-6) 
        self.layernorm2 = LayerNormalization(epsilon=1e-6) 
        
        # Dropout layers to prevent overfitting during training
        self.dropout1 = Dropout(rate) 
        self.dropout2 = Dropout(rate) 

    def call(self, inputs, training):
        # Apply multi-head self-attention to the inputs
        attn_output = self.att(inputs)
        
        # Apply dropout to attention output (if training)
        attn_output = self.dropout1(attn_output, training=training)
        
        # Add residual connection and normalize after attention
        out1 = self.layernorm1(inputs + attn_output)
        
        # Pass normalized output through feed-forward network (FFN)
        ffn_output = self.ffn(out1)
        
        # Apply dropout to FFN output (if training)
        ffn_output = self.dropout2(ffn_output, training=training)
        
        # Add residual connection and normalize after FFN, returning final output
        return self.layernorm2(out1 + ffn_output)

In [None]:
# EncoderLayer class implements an encoder layer for a transformer model, 
# combining multi-head self-attention and a feed-forward network with residual connections.
class EncoderLayer(Layer): 

    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1): 
        # Initialize the EncoderLayer with embedding dimensions, number of heads, feed-forward dimension, and dropout rate
        super(EncoderLayer, self).__init__()
        
        # Multi-head self-attention layer
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)
        
        # Feed-forward network (FFN) with two dense layers:
        # First layer expands to `ff_dim` and applies ReLU activation
        # Second layer reduces back to `embed_dim` without activation
        self.ffn = tf.keras.Sequential([ 
            Dense(ff_dim, activation="relu"), 
            Dense(embed_dim), 
        ])
        
        # Layer normalization applied after attention and feed-forward layers
        self.layernorm1 = LayerNormalization(epsilon=1e-6) 
        self.layernorm2 = LayerNormalization(epsilon=1e-6) 
        
        # Dropout layers to prevent overfitting during training
        self.dropout1 = Dropout(rate) 
        self.dropout2 = Dropout(rate) 

    def call(self, inputs, training): 
        # Apply multi-head self-attention to the inputs
        attn_output = self.att(inputs)
        
        # Apply dropout to attention output (if training)
        attn_output = self.dropout1(attn_output, training=training)
        
        # Add residual connection and normalize after attention
        out1 = self.layernorm1(inputs + attn_output)
        
        # Pass normalized output through feed-forward network (FFN)
        ffn_output = self.ffn(out1)
        
        # Apply dropout to FFN output (if training)
        ffn_output = self.dropout2(ffn_output, training=training)
        
        # Add residual connection and normalize after FFN, returning final output
        return self.layernorm2(out1 + ffn_output)

In [None]:
# Example usage 
embed_dim = 128 
num_heads = 8 
ff_dim = 512 
num_layers = 4 

transformer_encoder = TransformerEncoder(num_layers, embed_dim, num_heads, ff_dim) 
inputs = tf.random.uniform((1, 100, embed_dim)) 
outputs = transformer_encoder(inputs, training=False)  # Use keyword argument for 'training' 
print(outputs.shape)  # Should print (1, 100, 128)

In [None]:
# Define the necessary parameters 
embed_dim = 128
num_heads = 8
ff_dim = 512
num_layers = 4

# Define the Transformer Encoder 
transformer_encoder = TransformerEncoder(num_layers, embed_dim, num_heads, ff_dim) 

# Build the model 
input_shape = (X.shape[1], X.shape[2]) 
inputs = tf.keras.Input(shape=input_shape) 

# Project the inputs to the embed_dim 
x = tf.keras.layers.Dense(embed_dim)(inputs) 
encoder_outputs = transformer_encoder(x) 
flatten = tf.keras.layers.Flatten()(encoder_outputs) 
outputs = tf.keras.layers.Dense(1)(flatten) 
model = tf.keras.Model(inputs, outputs) 

# Compile the model 
model.compile(optimizer='adam', loss='mse') 

# Summary of the model 
model.summary()

# Train the model
model.fit(X, Y, epochs=20, batch_size=32)

# Make predictions 
predictions = model.predict(X) 
predictions = scaler.inverse_transform(predictions) 
 
# Plot the predictions
import matplotlib.pyplot as plt

plt.plot(data, label='True Data')
plt.plot(np.arange(time_step, time_step + len(predictions)), predictions, label='Predictions')
plt.xlabel('Time')
plt.ylabel('Stock Price')
plt.legend()
plt.show()

### Implementing Transformers for Text Generation

In [None]:
# Load the dataset 
path_to_file = get_file('shakespeare.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt') 
text = open(path_to_file, 'rb').read().decode(encoding='utf-8') 

# Preview the dataset 
print(text[:1000])

# Preprocess the dataset 
vocab_size = 10000
seq_length = 100

# Adapt TextVectorization to full text
vectorizer = TextVectorization(max_tokens=vocab_size, output_mode='int')
text_ds = tf.data.Dataset.from_tensor_slices([text]).batch(1)
vectorizer.adapt(text_ds)

# Vectorize the text
vectorized_text = vectorizer([text])[0]
print("Vectorized text shape:", vectorized_text.shape)
print("First 10 vectorized tokens:", vectorized_text.numpy()[:10])

In [None]:
# This function takes a long text and breaks it down into smaller input-target sequences.
# Each sequence is `seq_length` characters long, helping the model learn to predict the next character.
def create_sequences(text, seq_length): 
    input_seqs = []   # List to hold our input sequences
    target_seqs = []  # List to hold our target (or next-character) sequences
    
    # Loop over the text to create sequences of specified length
    # Why? This lets the model learn patterns over a fixed sequence length.
    for i in range(len(text) - seq_length): 
        # Create an input sequence by taking `seq_length` characters from the text, starting from index i
        input_seq = text[i:i + seq_length]
        
        # Create the target sequence, which is the next character after each input sequence
        target_seq = text[i + 1:i + seq_length + 1] 
        
        # Add both input and target sequences to their respective lists
        input_seqs.append(input_seq) 
        target_seqs.append(target_seq)
    
    # Convert the lists to numpy arrays, a format that is easier to work with for ML frameworks
    return np.array(input_seqs), np.array(target_seqs) 

# Generate sequences using the function, from vectorized text data.
# `X` holds input sequences; `Y` holds the target sequences the model should learn to predict.
X, Y = create_sequences(vectorized_text.numpy(), seq_length)

# Check how many sequences were created, so we know the function worked
print("Number of sequences generated:", len(X))

# Print a sample sequence to understand what the data looks like
print("Sample input sequence:", X[0] if len(X) > 0 else "No sequences generated")

# Ensure that both `X` and `Y` contain data. If they are empty, there was an issue in sequence generation.
assert X.size > 0, "Input data X is empty" 
assert Y.size > 0, "Target data Y is empty" 

# Convert the numpy arrays to TensorFlow tensors, as required for model training in TensorFlow
X = tf.convert_to_tensor(X) 
Y = tf.convert_to_tensor(Y) 

# Print the shapes of `X` and `Y` to confirm they are in the expected dimensions
print("Shape of X:", X.shape) 
print("Shape of Y:", Y.shape)

In [None]:
# Import necessary layers and Model from TensorFlow Keras for building a Transformer model
from tensorflow.keras.layers import Embedding, MultiHeadAttention, Dense, LayerNormalization, Dropout
from tensorflow.keras.models import Model

# TransformerBlock class defines a single Transformer block layer
# Each block consists of multi-head attention and feed-forward layers with normalization and dropout
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        
        # Multi-head self-attention layer, allowing the model to focus on different parts of input simultaneously
        self.att = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        
        # Feed-forward network with two dense layers
        # The first layer expands to ff_dim with ReLU activation, and the second layer compresses back to embed_dim
        self.ffn = tf.keras.Sequential([
            Dense(ff_dim, activation="relu"),
            Dense(embed_dim),
        ])
        
        # Layer normalization to stabilize and improve training
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        
        # Dropout layers to reduce overfitting during training
        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, inputs, training=False):
        # Calculate attention output and apply dropout
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        
        # Add residual connection and normalize the result
        out1 = self.layernorm1(inputs + attn_output)
        
        # Apply feed-forward network and dropout
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        
        # Add residual connection and normalize the final output
        return self.layernorm2(out1 + ffn_output)

# TransformerModel class builds the entire Transformer model
class TransformerModel(Model):  # Model is now properly imported
    def __init__(self, vocab_size, embed_dim, num_heads, ff_dim, num_layers, seq_length):
        super(TransformerModel, self).__init__()
        
        # Embedding layer converts token indices into dense vectors of size embed_dim
        self.embedding = Embedding(vocab_size, embed_dim)
        
        # Positional encoding to give the model information about the position of each token in the sequence
        self.pos_encoding = self.positional_encoding(seq_length, embed_dim)
        
        # Stack of Transformer blocks (num_layers indicates how many Transformer blocks to use)
        self.transformer_blocks = [TransformerBlock(embed_dim, num_heads, ff_dim) for _ in range(num_layers)]
        
        # Final dense layer that maps to vocab_size to predict the next token
        self.dense = Dense(vocab_size)

    def positional_encoding(self, seq_length, embed_dim):
        # This function calculates a fixed positional encoding for the input sequence
        # Why? Unlike RNNs, the Transformer has no notion of order, so positional encoding adds this information.
        angle_rads = self.get_angles(np.arange(seq_length)[:, np.newaxis], np.arange(embed_dim)[np.newaxis, :], embed_dim)
        
        # Apply sin to even indices in the angle array and cos to odd indices
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
        
        # Add a new axis for batch compatibility and cast to float32
        pos_encoding = angle_rads[np.newaxis, ...]
        return tf.cast(pos_encoding, dtype=tf.float32)

    def get_angles(self, pos, i, embed_dim):
        # Calculate the angles for positional encoding based on position `pos` and embedding index `i`
        # Formula: angle = pos / (10000^(2i/dim)), where `i` is even/odd for sin/cos use
        angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(embed_dim))
        return pos * angle_rates

    def call(self, inputs, training=False):
        # Forward pass of the model
        seq_len = tf.shape(inputs)[1]  # Get the actual sequence length of the input
        
        # Embed the inputs and add positional encoding to preserve token order
        x = self.embedding(inputs)
        x += self.pos_encoding[:, :seq_len, :]
        
        # Pass input through each Transformer block in the stack
        for transformer_block in self.transformer_blocks:
            x = transformer_block(x, training=training)
        
        # Final dense layer outputs probabilities over the vocabulary for the next token prediction
        output = self.dense(x)
        return output

In [None]:
# Hyperparameters 
embed_dim = 256 
num_heads = 4 
ff_dim = 512 
num_layers = 4 

# Build the Transformer model 
model = TransformerModel(vocab_size, embed_dim, num_heads, ff_dim, num_layers, seq_length)

# Provide input shape to build the model by passing a dummy input with maxval specified
_ = model(tf.random.uniform((1, seq_length), maxval=vocab_size, dtype=tf.int32))

# Compile the model 
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')

# Summary of the model 
model.summary()

In [None]:
# Import necessary libraries for training visualization
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import EarlyStopping

# Early stopping callback to stop training if the loss doesn't improve
early_stopping = EarlyStopping(monitor='loss', patience=2, restore_best_weights=True)

# Train the transformer model on the full input and target sequences
history = model.fit(X, Y, epochs=20, batch_size=32, callbacks=[early_stopping])

# Plot training loss to monitor model performance over epochs
plt.plot(history.history['loss'])
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.show()

In [None]:
def generate_text(model, start_string, num_generate=100, temperature=1.0):
    # Convert the starting text (start_string) into numbers the model can understand
    input_eval = vectorizer([start_string]).numpy()

    # Ensure input is the right length for the model
    if input_eval.shape[1] < seq_length:
        # If input is too short, pad it with zeros to match the required length
        padding = np.zeros((1, seq_length - input_eval.shape[1]))
        input_eval = np.concatenate((padding, input_eval), axis=1)
    elif input_eval.shape[1] > seq_length:
        # If input is too long, truncate it to match the sequence length
        input_eval = input_eval[:, -seq_length:]

    input_eval = tf.convert_to_tensor(input_eval)  # Convert to tensor for model compatibility
    
    # Prepare an empty list to store the generated words
    text_generated = []

    # Loop for the number of words we want to generate
    for i in range(num_generate):
        # Get predictions from the model based on the current input
        predictions = model(input_eval)

        # Get rid of batch dimension, keeping just the predictions for this one sequence
        predictions = predictions[0]

        # Adjust the randomness level of predictions using temperature
        predictions = predictions / temperature

        # Make sure predictions have the right shape
        predictions = tf.expand_dims(predictions, 0)

        # Choose the next word based on predictions with randomness (temperature) applied
        predicted_id = tf.random.categorical(predictions, num_samples=1)[0, 0].numpy()

        # Update the input for the next prediction by adding the newly predicted word
        input_eval = np.append(input_eval.numpy(), [[predicted_id]], axis=1)
        input_eval = input_eval[:, -seq_length:]  # Keep only the last `seq_length` tokens
        input_eval = tf.convert_to_tensor(input_eval)

        # Add the predicted word to our list of generated words
        text_generated.append(vectorizer.get_vocabulary()[predicted_id])

    # Return the starting text along with all the new words
    return start_string + ' ' + ' '.join(text_generated)

# Example usage to generate text with more coherent output by setting temperature to 0.7
start_string = "To be, or not to be"
generated_text = generate_text(model, start_string, temperature=0.7)
print(generated_text)

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import Layer, Dense, LayerNormalization, Dropout, Input
from tensorflow.keras.callbacks import EarlyStopping
import matplotlib.pyplot as plt

# Define a learning rate scheduler  
def scheduler(epoch, lr):  
    # This function adjusts the learning rate at each epoch
    # If the epoch is a multiple of 10 (and not zero), reduce the learning rate by half
    if epoch % 10 == 0 and epoch != 0:  
        lr = lr * 0.5  
    return lr  

# Create a callback to use the learning rate scheduler
callback = tf.keras.callbacks.LearningRateScheduler(scheduler)  

# Train the model with the learning rate scheduler
# Note: Replace `X` and `Y` with your input (X) and target (Y) data, and ensure `model` is defined
history = model.fit(X, Y, epochs=20, batch_size=64, callbacks=[callback])  

# Plot the training loss to visualize model performance over epochs
plt.plot(history.history['loss'])  
plt.xlabel('Epoch')  
plt.ylabel('Loss')  
plt.title('Training Loss with Learning Rate Scheduler')  
plt.show()

### Building Autoencoders

In [None]:
import numpy as np 
from tensorflow.keras.datasets import mnist 

# Load the dataset 
(x_train, _), (x_test, _) = mnist.load_data() 

# Normalize the pixel values 
x_train = x_train.astype('float32') / 255. 
x_test = x_test.astype('float32') / 255. 

# Flatten the images 
x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:]))) 
x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:])))

In [None]:
from tensorflow.keras.models import Model 
from tensorflow.keras.layers import Input, Dense 

# Encoder 
input_layer = Input(shape=(784,)) 
encoded = Dense(64, activation='relu')(input_layer) 

# Bottleneck 
bottleneck = Dense(32, activation='relu')(encoded) 

# Decoder 
decoded = Dense(64, activation='relu')(bottleneck) 
output_layer = Dense(784, activation='sigmoid')(decoded) 

# Autoencoder model 
autoencoder = Model(input_layer, output_layer) 

# Compile the model 
autoencoder.compile(optimizer='adam', loss='binary_crossentropy') 

# Summary of the model 
autoencoder.summary()

# Fit the model
autoencoder.fit(
    x_train, x_train,  
    epochs=25,  
    batch_size=256,  
    shuffle=True,  
    validation_data=(x_test, x_test)
)

In [None]:
import matplotlib.pyplot as plt 

# Predict the test data 
reconstructed = autoencoder.predict(x_test) 

# Visualize the results 
n = 10  # Number of digits to display 
plt.figure(figsize=(20, 4)) 

for i in range(n): 
    # Display original 
    ax = plt.subplot(2, n, i + 1) 
    plt.imshow(x_test[i].reshape(28, 28)) 
    plt.gray() 
    ax.get_xaxis().set_visible(False) 
    ax.get_yaxis().set_visible(False) 

    # Display reconstruction 
    ax = plt.subplot(2, n, i + 1 + n) 
    plt.imshow(reconstructed[i].reshape(28, 28)) 
    plt.gray() 
    ax.get_xaxis().set_visible(False) 
    ax.get_yaxis().set_visible(False) 

plt.show()

In [None]:
# Unfreeze the top layers of the encoder
for layer in autoencoder.layers[-4:]: 
    layer.trainable = True 

# Compile the model again
autoencoder.compile(optimizer='adam', loss='binary_crossentropy') 

# Train the model again
autoencoder.fit(x_train, x_train,  
                epochs=10,  
                batch_size=256,  
                shuffle=True,  
                validation_data=(x_test, x_test))

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Add noise to the data
noise_factor = 0.5
x_train_noisy = x_train + noise_factor * np.random.normal(loc=0.0, scale=1.0, size=x_train.shape)
x_test_noisy = x_test + noise_factor * np.random.normal(loc=0.0, scale=1.0, size=x_test.shape)
x_train_noisy = np.clip(x_train_noisy, 0., 1.)
x_test_noisy = np.clip(x_test_noisy, 0., 1.)

# Train the autoencoder with noisy data
autoencoder.fit(
    x_train_noisy, x_train,
    epochs=20,
    batch_size=512,
    shuffle=True,
    validation_data=(x_test_noisy, x_test)
)

# Denoise the test images
reconstructed_noisy = autoencoder.predict(x_test_noisy)

# Visualize the results
n = 10  # Number of digits to display
plt.figure(figsize=(20, 6))
for i in range(n):
    # Display noisy images
    ax = plt.subplot(3, n, i + 1)
    plt.imshow(x_test_noisy[i].reshape(28, 28))
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
    
    # Display denoised images
    ax = plt.subplot(3, n, i + 1 + n)
    plt.imshow(reconstructed_noisy[i].reshape(28, 28))
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    # Display original images
    ax = plt.subplot(3, n, i + 1 + 2 * n)
    plt.imshow(x_test[i].reshape(28, 28))
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

plt.show()

In [None]:
# Visualize encoder features
import matplotlib.pyplot as plt 

# Extract the encoder part of the autoencoder 
encoder_model = Model(input_layer, bottleneck) 

# Encode the test data 
encoded_imgs = encoder_model.predict(x_test) 

# Visualize the first two dimensions of the encoded features 
plt.figure(figsize=(10, 8)) 
plt.scatter(encoded_imgs[:, 0], encoded_imgs[:, 1], c='blue', alpha=0.5) 
plt.title('Encoded Features - First Two Dimensions') 
plt.xlabel('Encoded Feature 1') 
plt.ylabel('Encoded Feature 2') 
plt.show()

### Implementing Diffusion Models

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Input, Conv2D, Flatten, Dense, Reshape, Conv2DTranspose
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping

# Load the data set  
(x_train, _), (x_test, _) = mnist.load_data()

# Normalize the pixel values  
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.

# Expand dimensions to match the input shape (28, 28, 1)  
x_train = np.expand_dims(x_train, axis=-1)
x_test = np.expand_dims(x_test, axis=-1)

# Add noise to the data
noise_factor = 0.5
x_train_noisy = x_train + noise_factor * np.random.normal(loc=0.0, scale=1.0, size=x_train.shape)
x_test_noisy = x_test + noise_factor * np.random.normal(loc=0.0, scale=1.0, size=x_test.shape)

# Clip the values to be within the range [0, 1]
x_train_noisy = np.clip(x_train_noisy, 0., 1.)
x_test_noisy = np.clip(x_test_noisy, 0., 1.)

In [None]:
# Define the diffusion model architecture with reduced complexity
input_layer = Input(shape=(28, 28, 1))
x = Conv2D(16, (3, 3), activation='relu', padding='same')(input_layer)  # Reduced filters
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)  # Reduced filters
x = Flatten()(x)
x = Dense(64, activation='relu')(x)  # Reduced size
x = Dense(28*28*32, activation='relu')(x)  # Reduced size
x = Reshape((28, 28, 32))(x)
x = Conv2DTranspose(32, (3, 3), activation='relu', padding='same')(x)  # Reduced filters
x = Conv2DTranspose(16, (3, 3), activation='relu', padding='same')(x)  # Reduced filters
output_layer = Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x)
diffusion_model = Model(input_layer, output_layer)

# Compile the model with mixed precision and a different loss function
diffusion_model.compile(optimizer='adam', loss='mean_squared_error')  # Using MSE for regression tasks

# Summary of the optimized model
diffusion_model.summary()

In [None]:
# Cache and prefetch the data using TensorFlow data pipelines for faster loading
train_dataset = tf.data.Dataset.from_tensor_slices((x_train_noisy, x_train))
train_dataset = train_dataset.cache().batch(64).prefetch(tf.data.AUTOTUNE)  # Reduced batch size

val_dataset = tf.data.Dataset.from_tensor_slices((x_test_noisy, x_test))
val_dataset = val_dataset.cache().batch(64).prefetch(tf.data.AUTOTUNE)  # Reduced batch size

In [None]:
# Implement early stopping based on validation loss
early_stopping = EarlyStopping(monitor='val_loss', patience=2, restore_best_weights=True)

# Train the model with early stopping and smaller batch size
diffusion_model.fit(
    train_dataset,
    epochs=3,
    shuffle=True,
    validation_data=val_dataset,
    callbacks=[early_stopping]
)

In [None]:
import matplotlib.pyplot as plt

# Predict the denoised images
denoised_images = diffusion_model.predict(x_test_noisy)

# Visualize the results
n = 10  # Number of digits to display
plt.figure(figsize=(20, 6))
for i in range(n):
    # Display original
    ax = plt.subplot(3, n, i + 1)
    plt.imshow(x_test[i].reshape(28, 28), cmap='gray')
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    # Display noisy
    ax = plt.subplot(3, n, i + 1 + n)
    plt.imshow(x_test_noisy[i].reshape(28, 28), cmap='gray')
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    # Display denoised
    ax = plt.subplot(3, n, i + 1 + 2*n)
    plt.imshow(denoised_images[i].reshape(28, 28), cmap='gray')
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.show()

In [None]:
# Unfreeze the top layers of the model
for layer in diffusion_model.layers[-4:]:
    layer.trainable = True

# Compile the model again
diffusion_model.compile(optimizer='adam', loss='binary_crossentropy')

# Train the model again
diffusion_model.fit(x_train_noisy, x_train,
                    epochs=10,
                    batch_size=64,
                    shuffle=True,
                    validation_data=(x_test_noisy, x_test))

### Develop GANs Using Keras

In [None]:
# Load the MNIST dataset
(x_train, _), (_, _) = mnist.load_data()

# Normalize the pixel values to the range [-1, 1]
x_train = x_train.astype('float32') / 127.5 - 1.
x_train = np.expand_dims(x_train, axis=-1)

# Print the shape of the data
print(x_train.shape)

In [None]:
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Dense, LeakyReLU, BatchNormalization, Reshape 

# Define the generator model 
def build_generator(): 
    model = Sequential() 
    model.add(Dense(256, input_dim=100)) 
    model.add(LeakyReLU(alpha=0.2)) 
    model.add(BatchNormalization(momentum=0.8)) 
    model.add(Dense(512)) 
    model.add(LeakyReLU(alpha=0.2)) 
    model.add(BatchNormalization(momentum=0.8)) 
    model.add(Dense(1024)) 
    model.add(LeakyReLU(alpha=0.2)) 
    model.add(BatchNormalization(momentum=0.8)) 
    model.add(Dense(28 * 28 * 1, activation='tanh')) 
    model.add(Reshape((28, 28, 1))) 
    return model 

# Build the generator 
generator = build_generator() 
generator.summary()

In [None]:
from tensorflow.keras.layers import Flatten 
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Dense, LeakyReLU

# Define the discriminator model 
def build_discriminator(): 
    model = Sequential() 
    model.add(Flatten(input_shape=(28, 28, 1))) 
    model.add(Dense(512)) 
    model.add(LeakyReLU(alpha=0.2)) 
    model.add(Dense(256)) 
    model.add(LeakyReLU(alpha=0.2)) 
    model.add(Dense(1, activation='sigmoid')) 
    return model 

# Build and compile the discriminator 
discriminator = build_discriminator() 
discriminator.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy']) 
discriminator.summary()

This step involves combining the generator and discriminator models to create the GAN. The GAN takes a noise vector as an input, generates a synthetic image using the generator, and classifies the image using the discriminator. The discriminator is set to non-trainable when compiling the GAN to ensure that only the generator is updated during the adversarial training.

In [None]:
from tensorflow.keras.layers import Input 
from tensorflow.keras.models import Model 

# Create the GAN by stacking the generator and the discriminator 
def build_gan(generator, discriminator): 
    discriminator.trainable = False 
    gan_input = Input(shape=(100,)) 
    generated_image = generator(gan_input) 
    gan_output = discriminator(generated_image) 
    gan = Model(gan_input, gan_output) 
    gan.compile(loss='binary_crossentropy', optimizer='adam') 
    return gan 

# Build the GAN 
gan = build_gan(generator, discriminator) 
gan.summary()

In [None]:
# Training parameters 

batch_size = 64 
epochs = 50
sample_interval = 10

# Adversarial ground truths 
real = np.ones((batch_size, 1)) 
fake = np.zeros((batch_size, 1)) 

# Training loop 
for epoch in range(epochs): 
    # Train the discriminator 
    idx = np.random.randint(0, x_train.shape[0], batch_size) 
    real_images = x_train[idx] 
    noise = np.random.normal(0, 1, (batch_size, 100)) 
    generated_images = generator.predict(noise) 
    d_loss_real = discriminator.train_on_batch(real_images, real) 
    d_loss_fake = discriminator.train_on_batch(generated_images, fake) 
    d_loss = 0.5 * np.add(d_loss_real, d_loss_fake) 

    # Train the generator 
    noise = np.random.normal(0, 1, (batch_size, 100)) 
    g_loss = gan.train_on_batch(noise, real) 

    # Print the progress 
    if epoch % sample_interval == 0: 
        print(f"{epoch} [D loss: {d_loss[0]}] [D accuracy: {100 * d_loss[1]}%] [G loss: {g_loss}]")

After training the GAN, we need to assess the quality of the synthetic images generated by the generator. There are two main ways to evaluate the performance of GANs: qualitative assessment and quantitative assessment. 

### Qualitative Assessment: Visual Inspection

Visual inspection is a straightforward method to assess the quality of images generated by a GAN. You can use the `sample_images` function provided in the lab to visualize a grid of generated images. During visual inspection, look for the following qualities:

- **Clarity**: The images should be sharp and not blurry. Blurry images indicate that the generator is struggling to learn the patterns in the data.
- **Coherence**: The generated images should have a coherent structure that resembles the original images in the dataset. For example, in the case of MNIST, the generated images should resemble handwritten digits with the correct number of strokes and shapes.
- **Diversity**: There should be a variety of images generated by the GAN. If all images look similar, it might indicate that the generator is overfitting or has collapsed to a single mode.

### 2. Quantitative Assessment: Metrics

While visual inspection provides an intuitive understanding of the GAN’s performance, it can be subjective. To objectively evaluate GAN performance, you can use quantitative metrics such as:

- **Inception Score (IS)**: This score measures both the quality and diversity of generated images by using a pre-trained classifier (such as Inception-v3) to predict the class of each image. A higher score indicates that the images are both high-quality and diverse. However, IS is not very effective for simple datasets like MNIST; it’s more suitable for complex datasets.

- **Fréchet Inception Distance (FID)**: This metric calculates the distance between the distributions of generated images and real images. A lower FID score indicates that the generated images are more similar to real images. FID is commonly used and considered a reliable metric for evaluating GAN performance.

- **Discriminator Accuracy**: During training, if the discriminator's accuracy is around 50%, it suggests that the generator is producing realistic images that are hard to distinguish from real ones. This metric is easy to implement and provides quick feedback on the training progress.

In [None]:
import matplotlib.pyplot as plt 

def sample_images(generator, epoch, num_images=25): 
    noise = np.random.normal(0, 1, (num_images, 100)) 
    generated_images = generator.predict(noise) 
    generated_images = 0.5 * generated_images + 0.5  # Rescale to [0, 1] 
    fig, axs = plt.subplots(5, 5, figsize=(10, 10)) 
    count = 0 

    for i in range(5): 
        for j in range(5): 
            axs[i, j].imshow(generated_images[count, :, :, 0], cmap='gray') 
            axs[i, j].axis('off') 
            count += 1 
    plt.show() 

# Sample images at the end of training 
sample_images(generator, epochs)

In [None]:
# Calculate and print the discriminator accuracy on real vs. fake images
noise = np.random.normal(0, 1, (batch_size, 100))
generated_images = generator.predict(noise)

# Evaluate the discriminator on real images
real_images = x_train[np.random.randint(0, x_train.shape[0], batch_size)]
d_loss_real = discriminator.evaluate(real_images, np.ones((batch_size, 1)), verbose=0)

# Evaluate the discriminator on fake images
d_loss_fake = discriminator.evaluate(generated_images, np.zeros((batch_size, 1)), verbose=0)

print(f"Discriminator Accuracy on Real Images: {d_loss_real[1] * 100:.2f}%")
print(f"Discriminator Accuracy on Fake Images: {d_loss_fake[1] * 100:.2f}%")

### Custom Training Loops in Keras

In [None]:
# Suppress all Python warnings
warnings.filterwarnings('ignore')  # Stops Python from printing non-critical warning messages

# Set TensorFlow log level to suppress warnings and info messages
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'  # Limits TensorFlow logging to only show errors (ignoring info and warnings)

# Step 1: Set Up the Environment
# Load the MNIST dataset - a popular dataset of hand-written digits
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() 

# Normalize the data by dividing by 255 to bring pixel values between 0 and 1 for faster training
x_train, x_test = x_train / 255.0, x_test / 255.0 

# Convert the training data into a TensorFlow dataset object, batched into groups of 32 examples
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(32)

# Step 2: Define the Model
model = Sequential([  # A simple neural network model defined sequentially
    Flatten(input_shape=(28, 28)),  # Flatten 28x28 images into a single 1D vector of 784 pixels
    Dense(128, activation='relu'),  # A dense layer with 128 units, using ReLU activation for non-linearity
    Dense(10)  # Output layer with 10 units (for the 10 digit classes 0-9)
])

# Step 3: Define Loss Function and Optimizer + Accuracy metric
# Define the loss function: SparseCategoricalCrossentropy for multi-class classification tasks
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) 

# Define the optimizer: Adam, an efficient and adaptive optimizer
optimizer = tf.keras.optimizers.Adam()  

# Define a metric to track accuracy, which compares predicted vs actual labels
accuracy_metric = tf.keras.metrics.SparseCategoricalAccuracy()  

# Step 4: Implement the Custom Training Loop with Custom Callback
epochs = 2  # Number of times to loop through the full dataset during training
custom_callback = CustomCallback()  # Initialize a custom callback (usually for additional actions during training)

# Start looping over the specified number of epochs
for epoch in range(epochs):
    print(f'Start of epoch {epoch + 1}')  # Informative print statement at the beginning of each epoch
    
    # Loop over each batch of training data in this epoch
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        
        with tf.GradientTape() as tape:  # Start recording operations for automatic differentiation
            # Forward pass: Compute predictions (logits) by passing input data through the model
            logits = model(x_batch_train, training=True)
            # Compute the loss between actual labels and predicted logits
            loss_value = loss_fn(y_batch_train, logits)
        
        # Compute gradients of the loss with respect to model parameters
        grads = tape.gradient(loss_value, model.trainable_weights)
        
        # Apply gradients to the optimizer to update the model's weights
        optimizer.apply_gradients(zip(grads, model.trainable_weights))
        
        # Update the accuracy metric with predictions from this batch
        accuracy_metric.update_state(y_batch_train, logits)

        # Print loss and accuracy every 200 steps for monitoring progress
        if step % 200 == 0:
            print(f'Epoch {epoch + 1} Step {step}: Loss = {loss_value.numpy()} Accuracy = {accuracy_metric.result().numpy()}')
    
    # At the end of each epoch, use the custom callback to log final epoch stats
    custom_callback.on_epoch_end(epoch, logs={'loss': loss_value.numpy(), 'accuracy': accuracy_metric.result().numpy()})
    
    # Reset the accuracy metric for the next epoch to avoid mixing results
    accuracy_metric.reset_state()  

### Hyperparameter Tuning with Keras Tuner

In [None]:
# Define a model-building function for hyperparameter tuning
def build_model(hp):  # 'hp' represents hyperparameters that will be tuned
    # Create a simple neural network model
    model = Sequential([
        # Flatten layer to convert 2D images to 1D vector for input
        Flatten(input_shape=(28, 28)),
        
        # Dense layer with a variable number of units, determined by tuning
        Dense(
            units=hp.Int('units', min_value=32, max_value=512, step=32),  # Tune 'units' from 32 to 512 in steps of 32
            activation='relu'  # ReLU activation adds non-linearity to the model
        ),
        
        # Output layer with 10 units (for 10 classes) and 'softmax' to output class probabilities
        Dense(10, activation='softmax')
    ])

    # Compile the model with a configurable learning rate
    model.compile(
        optimizer=Adam(learning_rate=hp.Float('learning_rate', min_value=1e-4, max_value=1e-2, sampling='LOG')),  # Tune learning rate from 0.0001 to 0.01
        loss='sparse_categorical_crossentropy',  # Loss function for multi-class classification
        metrics=['accuracy']  # Track accuracy during training
    )

    return model

# Create a RandomSearch Tuner

tuner = kt.RandomSearch(
    build_model,  # The model-building function to tune
    objective='val_accuracy',  # The goal is to maximize validation accuracy
    max_trials=10,  # Run up to 10 different sets of hyperparameters
    executions_per_trial=2,  # Train each model twice for more reliable results
    directory='my_dir',  # Folder to save tuning logs
    project_name='intro_to_kt'  # Naming the project for organization
)

# Display a summary of the search space
tuner.search_space_summary()  # Shows details of all hyperparameters being tuned

# Run the hyperparameter search
tuner.search(
    x_train, y_train,  # Use training data for tuning
    epochs=5,  # Train each model for 5 epochs
    validation_data=(x_val, y_val)  # Use validation data to assess performance
)

# Display a summary of the results
tuner.results_summary()  # Shows the top-performing models and their hyperparameters

# Step 1: Retrieve the best hyperparameters

# Get the best set of hyperparameters found during tuning
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]  # Retrieve the best combination from all trials

# Print the optimal hyperparameters
print(f""" 

The optimal number of units in the first dense layer is {best_hps.get('units')}. 

The optimal learning rate for the optimizer is {best_hps.get('learning_rate')}. 

""")

# Step 2: Build and Train the Model with Best Hyperparameters

# Build the model using the best hyperparameters
model = tuner.hypermodel.build(best_hps)  

# Train the model with the best hyperparameters on the training data
model.fit(
    x_train, y_train, 
    epochs=10,  # Train for a longer duration with the best hyperparameters
    validation_split=0.2  # Use 20% of training data as validation set
)

# Evaluate the model on the test set
test_loss, test_acc = model.evaluate(x_val, y_val)  # Assess final model performance on the test data

# Print the test accuracy
print(f'Test accuracy: {test_acc}')

### Implementing Q-Learning in Keras

In [None]:
# Create the environment 
env = gym.make('CartPole-v1') 

# Set random seed for reproducibility 
np.random.seed(42) 
env.action_space.seed(42) 
env.observation_space.seed(42)

# Suppress warnings for a cleaner notebook or console experience
import warnings
warnings.filterwarnings('ignore')

# Override the default warning function
def warn(*args, **kwargs):
    pass
warnings.warn = warn

# Import necessary libraries for the Q-Learning model
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input  # Import Input layer
from tensorflow.keras.optimizers import Adam
import gym  # Ensure the environment library is available

# Define the model building function
def build_model(state_size, action_size): 
    model = Sequential() 
    model.add(Input(shape=(state_size,)))  # Use Input layer to specify the input shape 
    model.add(Dense(24, activation='relu')) 
    model.add(Dense(24, activation='relu')) 
    model.add(Dense(action_size, activation='linear')) 
    model.compile(loss='mse', optimizer=Adam(learning_rate=0.001)) 
    return model 

# Create the environment and set up the model
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0] 
action_size = env.action_space.n 
model = build_model(state_size, action_size)

import random
import numpy as np
from collections import deque
import tensorflow as tf

# Define epsilon and epsilon_decay
epsilon = 1.0  # Starting with a high exploration rate
epsilon_min = 0.01  # Minimum exploration rate
epsilon_decay = 0.99  # Faster decay rate for epsilon after each episode

# Replay memory
memory = deque(maxlen=2000)

def remember(state, action, reward, next_state, done):
    """Store experience in memory."""
    memory.append((state, action, reward, next_state, done))

def replay(batch_size=64):  # Increased batch size
    """Train the model using a random sample of experiences from memory."""
    if len(memory) < batch_size:
        return  # Skip replay if there's not enough experience

    minibatch = random.sample(memory, batch_size)  # Sample a random batch from memory
    
    # Extract information for batch processing
    states = np.vstack([x[0] for x in minibatch])
    actions = np.array([x[1] for x in minibatch])
    rewards = np.array([x[2] for x in minibatch])
    next_states = np.vstack([x[3] for x in minibatch])
    dones = np.array([x[4] for x in minibatch])
    
    # Predict Q-values for the next states in batch
    q_next = model.predict(next_states)
    # Predict Q-values for the current states in batch
    q_target = model.predict(states)
    
    # Vectorized update of target values
    for i in range(batch_size):
        target = rewards[i]
        if not dones[i]:
            target += 0.95 * np.amax(q_next[i])  # Update Q value with the discounted future reward
        q_target[i][actions[i]] = target  # Update only the taken action's Q value
    
    # Train the model with the updated targets in batch
    model.fit(states, q_target, epochs=1, verbose=0)  # Train in batch mode

    # Reduce exploration rate (epsilon) after each training step
    global epsilon
    if epsilon > epsilon_min:
        epsilon *= epsilon_decay

def act(state):
    """Choose an action based on the current state and exploration rate."""
    if np.random.rand() <= epsilon:
        return random.randrange(action_size)  # Explore: choose a random action
    act_values = model.predict(state)  # Exploit: predict action based on the state
    return np.argmax(act_values[0])  # Return the action with the highest Q-value

# Define the number of episodes you want to train the model for
episodes = 10  # You can set this to any number you prefer
train_frequency = 5  # Train the model every 5 steps

for e in range(episodes):
    state, _ = env.reset()  # Unpack the tuple returned by env.reset()
    state = np.reshape(state, [1, state_size])
    for time in range(200):  # Limit to 200 time steps per episode
        action = act(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        reward = reward if not done else -10
        next_state = np.reshape(next_state, [1, state_size])
        remember(state, action, reward, next_state, done)  # Store experience
        state = next_state
        
        if done:
            print(f"episode: {e+1}/{episodes}, score: {time}, e: {epsilon:.2}")
            break
        
        # Train the model every 'train_frequency' steps
        if time % train_frequency == 0:
            replay(batch_size=64)  # Call replay with larger batch size for efficiency

env.close()

# Evaluate the performance
for e in range(10):  

    state, _ = env.reset()  # Unpack the state from the tuple 
    state = np.reshape(state, [1, state_size])  # Reshape the state correctly 
    for time in range(500):  
        env.render()  
        action = np.argmax(model.predict(state)[0])  
        next_state, reward, terminated, truncated, _ = env.step(action)  # Unpack the five return values 
        done = terminated or truncated  # Check if the episode is done 
        next_state = np.reshape(next_state, [1, state_size])  
        state = next_state  
        if done:  
            print(f"episode: {e+1}/10, score: {time}")  
            break  

env.close()

### Building a Deep Q-Network (DQN) with Keras

In [None]:
# Create the environment  
env = gym.make('CartPole-v1')

# Set random seed for reproducibility  
np.random.seed(42)
env.reset(seed=42)

# Suppress warnings for a cleaner notebook or console experience
import warnings
warnings.filterwarnings('ignore')

# Disable warnings for a cleaner notebook or console experience
def warn(*args, **kwargs):
    pass
warnings.warn = warn

# Import necessary libraries
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam


def build_model(state_size, action_size):
    model = Sequential()
    model.add(Dense(24, input_dim=state_size, activation='relu'))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(action_size, activation='linear'))
    model.compile(loss='mse', optimizer=Adam(learning_rate=0.001))
    return model

state_size = env.observation_space.shape[0]
action_size = env.action_space.n
model = build_model(state_size, action_size)

from collections import deque
import random

memory = deque(maxlen=2000)
def remember(state, action, reward, next_state, done):
    memory.append((state, action, reward, next_state, done))

# The epsilon-greedy policy balances exploration and exploitation by choosing random actions with probability epsilon.
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.995
 
def act(state):
    if np.random.rand() <= epsilon:
        return random.randrange(action_size)
    q_values = model.predict(state)
    return np.argmax(q_values[0])

# Implement the Q-learning update to train the DQN using experiences stored in the replay buffer.
def replay(batch_size):
    global epsilon
    minibatch = random.sample(memory, batch_size)
    for state, action, reward, next_state, done in minibatch:
        target = reward
        if not done:
            target = reward + gamma * np.amax(model.predict(next_state)[0])
        target_f = model.predict(state)
        target_f[0][action] = target
        model.fit(state, target_f, epochs=1, verbose=0)
    if epsilon > epsilon_min:
        epsilon *= epsilon_decay

# Train the DQN agent by interacting with the environment and updating the Q-values using the replay buffer.
for e in range(10):
    state = env.reset()

    # If state is a tuple, take the first element
    if isinstance(state, tuple):
        state = state[0]

    state = np.reshape(state, [1, state_size])
    
    for time in range(100):
        env.render()
        action = np.argmax(model.predict(state)[0])
        
        # Handle environments that return more than 4 values
        result = env.step(action)
        if isinstance(result, tuple) and len(result) == 4:
            next_state, reward, done, _ = result
        else:
            next_state, reward, done, _, _ = result  # Adjust based on the number of values returned
        
        # If next_state is a tuple, take the first element
        if isinstance(next_state, tuple):
            next_state = next_state[0]

        next_state = np.reshape(next_state, [1, state_size])
        state = next_state
        
        if done:
            print(f"episode: {e+1}/10, score: {time}")
            break

env.close()

# Evaluate the performance of the trained DQN agent.
for e in range(10):
    state = env.reset()

    # Check if state is a tuple and extract the first element if it is
    if isinstance(state, tuple):
        state = state[0]

    state = np.reshape(state, [1, state_size])

    for time in range(100):
        env.render()
        action = np.argmax(model.predict(state)[0])

        # Handle environments that return more than 4 values
        result = env.step(action)
        if len(result) == 4:
            next_state, reward, done, _ = result
        else:
            next_state, reward, done, _, _ = result  # Adjust this based on the number of values returned

        # Check if next_state is a tuple and extract the first element if it is
        if isinstance(next_state, tuple):
            next_state = next_state[0]

        next_state = np.reshape(next_state, [1, state_size])
        state = next_state

        if done:
            print(f"episode: {e+1}/10, score: {time}")
            break

env.close()