In [1]:
# Write python code to specify, train, evaluate and deploy a neural network with three layers using pytorch
!pip3 install tensorflow-model-optimization




In [2]:
import torch
from torch import nn
from torch.utils.data import DataLoader
import numpy as np


In [43]:
# Generate the training and test data
x_train=np.random.normal(0,5,size=(300000,200)).astype('float32')
x_test=np.random.normal(0,5,size=(10000,200)).astype('float32')
print(type(x_train))

def get_targets(x:np.ndarray)->int:
    x_sum=np.sum(x)
    target=(0 if x_sum<=-100 else 1 if -100<x_sum<=-75 else 2 if -75<x_sum<=-50
            else 3 if -50<x_sum<=-25 else 4 if -25<x_sum<=0 else 5 if 0<x_sum<=25
            else 6 if 25<x_sum<=50 else 7 if 50<x_sum<=75 else 8 if 75<x_sum<=100
            else 9)
    return target

y_train=np.array([get_targets(row) for row in x_train])
y_test=np.array([get_targets(row) for row in x_test])

<class 'numpy.ndarray'>


In [5]:
# Load the data into dataloader
train_data=[(torch.tensor(row1), torch.tensor(row2)) for row1, row2 in zip(x_train, y_train)]
test_data=[(torch.tensor(row1), torch.tensor(row2)) for row1, row2 in zip(x_test, y_test)]

batch_size=64
train_dataloader=DataLoader(train_data, batch_size=batch_size)
test_dataloader=DataLoader(test_data, batch_size=batch_size)

In [6]:
for X,y in train_dataloader:
    print(f"X shape:{X.shape}")
    print(f"y shape:{y.shape}")
    break

print(f"train dataset size:{len(train_dataloader.dataset)}")
print(f"number of batches:{len(train_dataloader)}")

X shape:torch.Size([64, 200])
y shape:torch.Size([64])
train dataset size:100000
number of batches:1563


In [7]:
# Write the model

device=("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")

class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.normalizer=nn.BatchNorm1d(200)
        self.linear_relu_stack=nn.Sequential(
            nn.Linear(200, 128),
            nn.ReLU(),
            nn.Linear(128,128),
            nn.ReLU(),
            nn.Linear(128,128),
            nn.ReLU(),
            nn.Linear(128,10)
        )
    def forward(self, x):
        x=self.normalizer(x)
        logits=self.linear_relu_stack(x)
        return logits

model=NeuralNetwork().to(device)
print(model)


NeuralNetwork(
  (normalizer): BatchNorm1d(200, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=200, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=128, bias=True)
    (3): ReLU()
    (4): Linear(in_features=128, out_features=128, bias=True)
    (5): ReLU()
    (6): Linear(in_features=128, out_features=10, bias=True)
  )
)


In [8]:
# Define the loss and the optimizer
loss_fn=nn.CrossEntropyLoss()
optimizer=torch.optim.Adam(model.parameters(), lr=1e-3)

In [9]:
# Define the train and test process
def train(model, dataloader, loss_fn, optimizer):
    size=len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X,y=X.to(device), y.to(device)
        pred=model(X)
        loss=loss_fn(pred, y)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch %100==0:
            loss,current=loss.item(),(batch+1)*len(X)
            print(f"loss:{loss:>7f}  [{current:>5d}/{size:>5d}]")

def test(model, dataloader, loss_fn):
    size=len(dataloader.dataset)
    num_batches=len(dataloader)
    model.eval()
    test_loss, true_values=0.00, 0
    with torch.no_grad():
        for X,y in dataloader:
            X,y=X.to(device), y.to(device)
            pred=model(X)
            loss=loss_fn(pred, y)
            test_loss+=loss
            true_values+=(pred.argmax(1)==y).type(torch.float).sum().item()
        test_loss/=num_batches
        true_values/=size
        print(f"Test\n Avg Loss:{test_loss:>7f}  Accuracy:{100*true_values:>0.1f}")

In [10]:
# Train the model
epochs=100

for t in range(epochs):
    print(f"Epoch{t+1}----------------\n")
    train(model, train_dataloader, loss_fn, optimizer)
    test(model, test_dataloader, loss_fn)
print("Done!")

Epoch1----------------

loss:2.290251  [   64/100000]
loss:1.230391  [ 6464/100000]
loss:0.978134  [12864/100000]
loss:0.762810  [19264/100000]
loss:0.937361  [25664/100000]
loss:1.256884  [32064/100000]
loss:0.819451  [38464/100000]
loss:0.806127  [44864/100000]
loss:1.472703  [51264/100000]
loss:0.745594  [57664/100000]
loss:0.761159  [64064/100000]
loss:0.685097  [70464/100000]
loss:0.748183  [76864/100000]
loss:0.774111  [83264/100000]
loss:0.752388  [89664/100000]
loss:0.848142  [96064/100000]
Test
 Avg Loss:0.848421  Accuracy:62.5
Epoch2----------------

loss:1.099652  [   64/100000]
loss:0.707739  [ 6464/100000]
loss:1.050079  [12864/100000]
loss:0.592082  [19264/100000]
loss:0.763296  [25664/100000]
loss:1.312450  [32064/100000]
loss:0.701744  [38464/100000]
loss:0.682152  [44864/100000]
loss:1.444980  [51264/100000]
loss:0.633581  [57664/100000]
loss:0.655443  [64064/100000]
loss:0.597528  [70464/100000]
loss:0.690203  [76864/100000]
loss:0.704943  [83264/100000]
loss:0.784809

In [12]:
# Write a tensorflow keras model for classification
import tensorflow as tf
from tensorflow.data import Dataset
from tensorflow.keras import Input
from tensorflow.keras import layers
from tensorflow.keras import regularizers
from tensorflow.keras import Model
from tensorflow.keras import optimizers
from tensorflow.keras import losses
from tensorflow.keras import metrics

In [19]:
# Implement regularization

data_normalizer=layers.Normalization()

def setup_l2_model():
    inputs=Input(shape=(200,))
    x=data_normalizer(inputs)
    x=layers.Dense(128, activation='relu', kernel_regularizer=regularizers.L2(0.01))(x)
    x=layers.Dense(128, activation='relu', kernel_regularizer=regularizers.L2(0.01))(x)
    x=layers.Dense(128, activation='relu', kernel_regularizer=regularizers.L2(0.01))(x)
    outputs=layers.Dense(10, activation='softmax', kernel_regularizer=regularizers.L2(0.01))(x)
    model=Model(inputs, outputs)
    model.summary()
    return model

def compile_model(model):
    loss_fn=losses.SparseCategoricalCrossentropy()
    optimizer=optimizers.legacy.Adam(learning_rate=1e-3)
    metric=[metrics.SparseCategoricalAccuracy()]
    model.compile(optimizer=optimizer, loss=loss_fn, metrics=metric)
    return model

In [21]:
data_normalizer.adapt(x_train)
l2_model=setup_l2_model()
l2model=compile_model(l2_model)

Model: "model_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_4 (InputLayer)        [(None, 200)]             0         
                                                                 
 normalization_2 (Normalizat  (None, 200)              401       
 ion)                                                            
                                                                 
 dense_12 (Dense)            (None, 128)               25728     
                                                                 
 dense_13 (Dense)            (None, 128)               16512     
                                                                 
 dense_14 (Dense)            (None, 128)               16512     
                                                                 
 dense_15 (Dense)            (None, 10)                1290      
                                                           

In [22]:

history=l2_model.fit(x_train, y_train, batch_size=64, epochs=10, validation_data=(x_test, y_test))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [23]:
y_hat=l2_model.predict(x_test)
y_hat_classes=y_hat.argmax(1)
num_obs=20
print(f"Real targets:{y_hat_classes[0:num_obs]}\n")
print(f"Pred targets:{y_test[0:num_obs]}\n")

Real targets:[4 1 1 8 5 0 5 3 8 9 8 5 1 4 6 2 0 6 9 5]

Pred targets:[3 1 1 8 5 0 5 3 8 9 8 5 1 4 6 2 0 7 9 5]



In [24]:
# Implement a regulatization using a dropout approach

data_normalizer=layers.Normalization()

def setup_dropout_model():
    inputs=Input(shape=(200,))
    x=data_normalizer(inputs)
    x=layers.Dense(128, activation='relu')(x)
    x=layers.Dropout(0.2)(x)
    x=layers.Dense(128, activation='relu')(x)
    x=layers.Dropout(0.2)(x)
    x=layers.Dense(128, activation='relu')(x)
    x=layers.Dropout(0.2)(x)
    outputs=layers.Dense(10, activation='softmax')(x)
    model=Model(inputs, outputs)
    model.summary()
    return model


In [25]:
data_normalizer.adapt(x_train)
dropout_model=setup_dropout_model()
dropout_model=compile_model(dropout_model)

Model: "model_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_5 (InputLayer)        [(None, 200)]             0         
                                                                 
 normalization_3 (Normalizat  (None, 200)              401       
 ion)                                                            
                                                                 
 dense_16 (Dense)            (None, 128)               25728     
                                                                 
 dropout (Dropout)           (None, 128)               0         
                                                                 
 dense_17 (Dense)            (None, 128)               16512     
                                                                 
 dropout_1 (Dropout)         (None, 128)               0         
                                                           

In [26]:
history=dropout_model.fit(x_train, y_train, batch_size=64, epochs=10, validation_data=(x_test, y_test))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [27]:
y_hat=dropout_model.predict(x_test)
y_hat_classes=y_hat.argmax(1)
num_obs=20
print(f"Real targets:{y_hat_classes[0:num_obs]}\n")
print(f"Pred targets:{y_test[0:num_obs]}\n")

Real targets:[3 1 1 9 5 0 5 2 8 9 9 5 1 4 6 2 0 6 9 5]

Pred targets:[3 1 1 8 5 0 5 3 8 9 8 5 1 4 6 2 0 7 9 5]



In [28]:
# Pruning a neural network
import tensorflow_model_optimization as tfmot
from tensorflow.keras import Sequential
import sys
import tempfile
import os

%load_ext tensorboard


In [48]:
data_normalizer=layers.Normalization()

def functional_model():
    inputs=Input(shape=(200,))
    x=data_normalizer(inputs)
    x=layers.Dense(128, activation='relu', kernel_regularizer=regularizers.L2(0.01))(x)
    x=layers.Dense(128, activation='relu', kernel_regularizer=regularizers.L2(0.01))(x)
    x=layers.Dense(128, activation='relu', kernel_regularizer=regularizers.L2(0.01))(x)
    x=layers.Dense(128, activation='relu', kernel_regularizer=regularizers.L2(0.01))(x)
    outputs=layers.Dense(10, activation='softmax', kernel_regularizer=regularizers.L2(0.01))(x)
    model=Model(inputs, outputs)
    model.summary()
    return model

def sequential_model():
    model = Sequential([
    layers.InputLayer(input_shape=(200,)),
    data_normalizer,
    layers.Dense(128, activation='relu', kernel_regularizer=regularizers.L2(0.01)),
    layers.Dense(128, activation='relu', kernel_regularizer=regularizers.L2(0.01)),
    layers.Dense(128, activation='relu', kernel_regularizer=regularizers.L2(0.01)),
    layers.Dense(10, activation='softmax', kernel_regularizer=regularizers.L2(0.01))
    ])
    model.summary()
    return model


def setup_model(model_type:str="sequential"):
    if model_type=="sequential":
        model=sequential_model()
    elif model_type=="functional":
        model=functional_model()
    else:
        print("Error: the model type is not recognized. Enter model_type='sequential' or 'functional'")
        sys.exit()
    return model

In [49]:
data_normalizer.adapt(x_train)
model=compile_model(setup_model("functional"))
model.fit(x_train, y_train, batch_size=64, epochs=10, validation_data=(x_test, y_test))

Model: "model_11"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_12 (InputLayer)       [(None, 200)]             0         
                                                                 
 normalization_8 (Normalizat  (None, 200)              401       
 ion)                                                            
                                                                 
 dense_48 (Dense)            (None, 128)               25728     
                                                                 
 dense_49 (Dense)            (None, 128)               16512     
                                                                 
 dense_50 (Dense)            (None, 128)               16512     
                                                                 
 dense_51 (Dense)            (None, 128)               16512     
                                                          

<keras.callbacks.History at 0x17ba83580>

In [86]:
# save pretained weights in the output
pretrained_weights="output/pretrained_weights.tf"

base_model_file="output/base_model.h5"

#_, pretrained_weights = tempfile.mkstemp('.tf')
model.save_weights(pretrained_weights)
model.save(base_model_file, include_optimizer=True)

In [87]:
import os
print("Size of pruned Keras model: %.2f Mbytes" % (os.path.getsize(base_model_file)/1e6))

Size of pruned Keras model: 0.97 Mbytes


In [80]:
# Fine tune model with pruning

base_model = setup_model("functional")
base_model.load_weights(pretrained_weights)

end_step=int(len(x_train)/64)

pruning_params = {
    'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(
        initial_sparsity=0.2,
        final_sparsity=0.8,
        begin_step=0,
        end_step=end_step
    ),
}

def apply_pruning_to_dense(layer):
  if isinstance(layer, tf.keras.layers.Dense):
    return tfmot.sparsity.keras.prune_low_magnitude(layer, **pruning_params) 
  return layer

model_for_pruning = tf.keras.models.clone_model(
    base_model,
    clone_function=apply_pruning_to_dense,
)

model_for_pruning.summary()


Model: "model_16"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_17 (InputLayer)       [(None, 200)]             0         
                                                                 
 normalization_8 (Normalizat  (None, 200)              401       
 ion)                                                            
                                                                 
 dense_73 (Dense)            (None, 128)               25728     
                                                                 
 dense_74 (Dense)            (None, 128)               16512     
                                                                 
 dense_75 (Dense)            (None, 128)               16512     
                                                                 
 dense_76 (Dense)            (None, 128)               16512     
                                                          

In [81]:
pruned_model=compile_model(model_for_pruning)

In [82]:
log_dir = tempfile.mkdtemp()
callbacks = [
    tfmot.sparsity.keras.UpdatePruningStep(),
    # Log sparsity and other metrics in Tensorboard.
    tfmot.sparsity.keras.PruningSummaries(log_dir=log_dir)
]

pruned_model.fit(x_train, y_train, batch_size=64, epochs=10, validation_data=(x_test, y_test), callbacks=callbacks)

#docs_infra: no_execute
#%tensorboard --logdir={log_dir}

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x28cab2da0>

In [62]:
_, keras_model_file = tempfile.mkstemp('.h5')

# Checkpoint: saving the optimizer is necessary (include_optimizer=True is the default).
model_for_pruning.save(keras_model_file, include_optimizer=True)

In [94]:
model_for_export = tfmot.sparsity.keras.strip_pruning(model_for_pruning)

strip_model_file="output/strip_pruned_model.h5"
model_for_export.save(strip_model_file, include_optimizer=True)



In [96]:
converter = tf.lite.TFLiteConverter.from_keras_model(model_for_export)
pruned_tflite_model = converter.convert()

pruned_tflite_file = 'output/final_model.tflite'

with open(pruned_tflite_file, 'wb') as f:
  f.write(pruned_tflite_model)

print('Saved pruned TFLite model to:', pruned_tflite_file)

INFO:tensorflow:Assets written to: /var/folders/4j/fv30hb2s7zl7_5dgpz7hqdb80000gn/T/tmpg19fm6em/assets
Saved pruned TFLite model to: output/final_model.tflite


2023-08-12 01:40:40.636376: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:364] Ignored output_format.
2023-08-12 01:40:40.636655: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:367] Ignored drop_control_dependency.
2023-08-12 01:40:40.641851: I tensorflow/cc/saved_model/reader.cc:45] Reading SavedModel from: /var/folders/4j/fv30hb2s7zl7_5dgpz7hqdb80000gn/T/tmpg19fm6em
2023-08-12 01:40:40.643132: I tensorflow/cc/saved_model/reader.cc:89] Reading meta graph with tags { serve }
2023-08-12 01:40:40.643141: I tensorflow/cc/saved_model/reader.cc:130] Reading SavedModel debug info (if present) from: /var/folders/4j/fv30hb2s7zl7_5dgpz7hqdb80000gn/T/tmpg19fm6em
2023-08-12 01:40:40.651239: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:353] MLIR V1 optimization pass is not enabled
2023-08-12 01:40:40.664468: I tensorflow/cc/saved_model/loader.cc:231] Restoring SavedModel bundle.
2023-08-12 01:40:40.737729: I tensorflow/cc/saved_model/loader.

In [99]:
#  quantization

converter = tf.lite.TFLiteConverter.from_keras_model(model_for_export)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
quantized_and_pruned_tflite_model = converter.convert()

quantized_and_pruned_tflite_file = 'output/quantized_model.tflite'

with open(quantized_and_pruned_tflite_file, 'wb') as f:
  f.write(quantized_and_pruned_tflite_model)

INFO:tensorflow:Assets written to: /var/folders/4j/fv30hb2s7zl7_5dgpz7hqdb80000gn/T/tmp0ike5md2/assets


INFO:tensorflow:Assets written to: /var/folders/4j/fv30hb2s7zl7_5dgpz7hqdb80000gn/T/tmp0ike5md2/assets
2023-08-12 01:58:54.092976: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:364] Ignored output_format.
2023-08-12 01:58:54.092992: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:367] Ignored drop_control_dependency.
2023-08-12 01:58:54.093268: I tensorflow/cc/saved_model/reader.cc:45] Reading SavedModel from: /var/folders/4j/fv30hb2s7zl7_5dgpz7hqdb80000gn/T/tmp0ike5md2
2023-08-12 01:58:54.095837: I tensorflow/cc/saved_model/reader.cc:89] Reading meta graph with tags { serve }
2023-08-12 01:58:54.095843: I tensorflow/cc/saved_model/reader.cc:130] Reading SavedModel debug info (if present) from: /var/folders/4j/fv30hb2s7zl7_5dgpz7hqdb80000gn/T/tmp0ike5md2
2023-08-12 01:58:54.123050: I tensorflow/cc/saved_model/loader.cc:231] Restoring SavedModel bundle.
2023-08-12 01:58:54.150518: I tensorflow/cc/saved_model/loader.cc:215] Running initialization

In [88]:
#A helper function to measure the size of the model

def get_gzipped_model_size(file):
  # Returns size of gzipped model, in bytes.
  import os
  import zipfile

  _, zipped_file = tempfile.mkstemp('.zip')
  with zipfile.ZipFile(zipped_file, 'w', compression=zipfile.ZIP_DEFLATED) as f:
    f.write(file)

  return os.path.getsize(zipped_file)

In [102]:
print(" File size of the base model : %.2f Mbytes"%(get_gzipped_model_size("output/base_model.h5")/1e6))
print(" File size of the pruned model : %.2f Mbytes"%(get_gzipped_model_size("output/keras_model.h5")/1e6))
print(" File size of the strip pruned model : %.2f Mbytes"%(get_gzipped_model_size("output/strip_pruned_model.h5")/1e6))
print(" File size of the tflite pruned model : %.2f Mbytes"%(get_gzipped_model_size("output/final_model.tflite")/1e6))
print(" File size of the quantized pruned model : %.2f Mbytes"%(get_gzipped_model_size("output/quantized_model.tflite")/1e6))

 File size of the base model : 0.88 Mbytes
 File size of the pruned model : 0.45 Mbytes
 File size of the strip pruned model : 0.08 Mbytes
 File size of the tflite pruned model : 0.08 Mbytes
 File size of the quantized pruned model : 0.01 Mbytes


In [75]:
import os
model_for_pruning.save(keras_model_file, include_optimizer=True)

print("Size of pruned Keras model: %.2f Mbytes" % (os.path.getsize(keras_model_file)/1e6))

Size of pruned Keras model: 1.29 Mbytes


In [67]:
# Deserialize model.
with tfmot.sparsity.keras.prune_scope():
  loaded_model = tf.keras.models.load_model(keras_model_file)

loaded_model.summary()

Model: "model_15"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_16 (InputLayer)       [(None, 200)]             0         
                                                                 
 normalization_8 (Normalizat  (None, 200)              401       
 ion)                                                            
                                                                 
 prune_low_magnitude_dense_6  (None, 128)              51330     
 8 (PruneLowMagnitude)                                           
                                                                 
 prune_low_magnitude_dense_6  (None, 128)              32898     
 9 (PruneLowMagnitude)                                           
                                                                 
 prune_low_magnitude_dense_7  (None, 128)              32898     
 0 (PruneLowMagnitude)                                    

In [68]:
y_hat=loaded_model.predict(x_test)
y_hat_classes=y_hat.argmax(1)
num_obs=20
print(f"Real targets:{y_hat_classes[0:num_obs]}\n")
print(f"Pred targets:{y_test[0:num_obs]}\n")

Real targets:[9 7 4 4 4 5 6 6 0 7 9 0 3 1 3 0 9 9 6 1]

Pred targets:[9 7 4 3 4 6 6 6 0 7 9 0 3 1 3 0 9 9 6 1]



In [None]:
# Implement the quantization of the neural network