In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, BatchNormalization, Dropout, Flatten, Dense
from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

In [None]:
# Step 1: Data Preparation
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()
train_images = train_images.reshape((60000, 28, 28, 1)).astype('float32') / 255
test_images = test_images.reshape((10000, 28, 28, 1)).astype('float32') / 255

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [None]:
# Load and preprocess the MNIST dataset
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()
train_images = train_images.reshape((60000, 28, 28, 1)).astype('float32') / 255
test_images = test_images.reshape((10000, 28, 28, 1)).astype('float32') / 255
train_labels = to_categorical(train_labels)
test_labels = to_categorical(test_labels)

# Define the model architecture
model = Sequential()

model.add(Conv2D(32, kernel_size=3, activation='relu', input_shape=(28, 28, 1)))
model.add(BatchNormalization())
model.add(Conv2D(32, kernel_size=3, activation='relu'))
model.add(BatchNormalization())
model.add(Conv2D(32, kernel_size=5, strides=2, padding='same', activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(0.4))

model.add(Conv2D(64, kernel_size=3, activation='relu'))
model.add(BatchNormalization())
model.add(Conv2D(64, kernel_size=3, activation='relu'))
model.add(BatchNormalization())
model.add(Conv2D(64, kernel_size=5, strides=2, padding='same', activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(0.4))

model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(0.4))
model.add(Dense(10, activation='softmax'))

# Compile the model
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])

In [None]:
# Step 3: Training and Validation
history = model.fit(train_images, train_labels, epochs=10, validation_split=0.2)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
# Save the model
model.save("validation_cnn.h5")

  saving_api.save_model(


In [None]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 26, 26, 32)        320       
                                                                 
 batch_normalization (Batch  (None, 26, 26, 32)        128       
 Normalization)                                                  
                                                                 
 conv2d_1 (Conv2D)           (None, 24, 24, 32)        9248      
                                                                 
 batch_normalization_1 (Bat  (None, 24, 24, 32)        128       
 chNormalization)                                                
                                                                 
 conv2d_2 (Conv2D)           (None, 12, 12, 32)        25632     
                                                                 
 batch_normalization_2 (Bat  (None, 12, 12, 32)        1

In [None]:
# Step 4: Model Evaluation
val_acc = history.history['val_accuracy'][-1]
if val_acc > 0.99:
    print("Validation accuracy greater than 99% achieved.")

Validation accuracy greater than 99% achieved.


In [None]:
print(val_acc)

0.9929999709129333


In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, BatchNormalization, Dropout, Flatten, Dense
from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import to_categorical

# Load and preprocess the MNIST dataset
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()
train_images = train_images.reshape((60000, 28, 28, 1)).astype('float32') / 255
test_images = test_images.reshape((10000, 28, 28, 1)).astype('float32') / 255
train_labels = to_categorical(train_labels)
test_labels = to_categorical(test_labels)

# Define the model architecture
final_model = Sequential()

final_model.add(Conv2D(32, kernel_size=3, activation='relu', input_shape=(28, 28, 1)))
final_model.add(BatchNormalization())
final_model.add(Conv2D(32, kernel_size=3, activation='relu'))
final_model.add(BatchNormalization())
final_model.add(Conv2D(32, kernel_size=5, strides=2, padding='same', activation='relu'))
final_model.add(BatchNormalization())
final_model.add(Dropout(0.4))

final_model.add(Conv2D(64, kernel_size=3, activation='relu'))
final_model.add(BatchNormalization())
final_model.add(Conv2D(64, kernel_size=3, activation='relu'))
final_model.add(BatchNormalization())
final_model.add(Conv2D(64, kernel_size=5, strides=2, padding='same', activation='relu'))
final_model.add(BatchNormalization())
final_model.add(Dropout(0.4))

final_model.add(Flatten())
final_model.add(Dense(128, activation='relu'))
final_model.add(BatchNormalization())
final_model.add(Dropout(0.4))
final_model.add(Dense(10, activation='softmax'))

# Compile the model
final_model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])

# Train the final_model on the entire training set
history = final_model.fit(train_images, train_labels, epochs=10, batch_size=128)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
# Save the model
final_model.save("final_cnn.h5")

In [None]:
final_model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_6 (Conv2D)           (None, 26, 26, 32)        320       
                                                                 
 batch_normalization_7 (Bat  (None, 26, 26, 32)        128       
 chNormalization)                                                
                                                                 
 conv2d_7 (Conv2D)           (None, 24, 24, 32)        9248      
                                                                 
 batch_normalization_8 (Bat  (None, 24, 24, 32)        128       
 chNormalization)                                                
                                                                 
 conv2d_8 (Conv2D)           (None, 12, 12, 32)        25632     
                                                                 
 batch_normalization_9 (Bat  (None, 12, 12, 32)       

In [None]:
# Evaluate the final_model on the test set
test_loss, test_acc = final_model.evaluate(test_images, test_labels)
print("Test accuracy:", test_acc)

NameError: name 'final_model' is not defined

In [None]:
# Step 7: Misclassification Analysis
predictions = final_model.predict(test_images)
predicted_classes = np.argmax(predictions, axis=1)
true_classes = np.argmax(test_labels, axis=1)
misclassified_indices = np.where(predicted_classes != true_classes)[0]
print("Number of misclassified samples:", len(misclassified_indices))


In [None]:
# Plot some misclassified samples
plt.figure(figsize=(10, 5))
for i, idx in enumerate(misclassified_indices[:5]):
    plt.subplot(1, 5, i + 1)
    plt.imshow(test_images[idx].reshape(28, 28), cmap='gray')
    predicted_label = np.argmax(predictions[idx])
    true_label = np.argmax(test_labels[idx])
    plt.title(f"Predicted: {predicted_label}\nTrue: {true_label}", fontsize=10, pad=5)
    plt.axis('off')
plt.tight_layout()
plt.show()


# Transformer

In [None]:
import numpy as np
import tensorflow as tf

In [None]:
# this is written as a tensorflow "layer".  it's just a vector the same size as the
# output of the previous layer. the vector is initialized randomly, but we'll use
# gradient descent to update the values in the vector
#
# it's purpose is to be appended to the beginning of the sequence of vectors fed into
# the transformer.  then after the transformer runs on the whole data, we just grab
# the resulting zero-th vector...the class token...and use that as the portfolio weights
class ClassToken(tf.keras.layers.Layer):
    def __init__(self):
        super().__init__()

    def build(self, input_shape):
        w_init = tf.random_normal_initializer()
        self.w = tf.Variable(
            initial_value = w_init(shape=(1, 1, input_shape[-1]), dtype=tf.float32),
            trainable = True
        )

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        hidden_dim = self.w.shape[-1]

        cls = tf.broadcast_to(self.w, [batch_size, 1, hidden_dim])
        cls = tf.cast(cls, dtype=inputs.dtype)
        return cls

In [None]:
def build_ViT(n,m,block_size,hidden_dim,num_layers,num_heads,key_dim,mlp_dim,dropout_rate,num_classes):
    # n is number of rows of blocks
    # m is number of cols of blocks
    # block_size is number of pixels (with rgb) in each block

    inp = tf.keras.layers.Input(shape=(n*m,block_size))
    inp2 = tf.keras.layers.Input(shape=(n*m))
    mid = tf.keras.layers.Dense(hidden_dim)(inp) # transform to vectors with different dimension
    # the positional embeddings
#     positions = tf.range(start=0, limit=n*m, delta=1)
    emb = tf.keras.layers.Embedding(input_dim=n*m, output_dim=hidden_dim)(inp2) # learned positional embedding for each of the n*m possible possitions
    mid = mid + emb # for some reason, tf.keras.layers.Add causes an error, but + doesn't?
    # create and append class token to beginning of all input vectors
    token = ClassToken()(mid) # append class token to beginning of sequence
    mid = tf.keras.layers.Concatenate(axis=1)([token, mid])

    for l in range(num_layers): # how many Transformer Head layers are there?
        ln  = tf.keras.layers.LayerNormalization()(mid) # normalize
        mha = tf.keras.layers.MultiHeadAttention(num_heads=num_heads,key_dim=key_dim,value_dim=key_dim)(ln,ln,ln) # self attention!
        add = tf.keras.layers.Add()([mid,mha]) # add and norm
        ln  = tf.keras.layers.LayerNormalization()(add)
        den = tf.keras.layers.Dense(mlp_dim,activation='gelu')(ln) # maybe should be relu...who knows...
        den = tf.keras.layers.Dropout(dropout_rate)(den) # regularization
        den = tf.keras.layers.Dense(hidden_dim)(den) # back to the right dimensional space
        den = tf.keras.layers.Dropout(dropout_rate)(den)
        mid = tf.keras.layers.Add()([den,add]) # add and norm again
    ln = tf.keras.layers.LayerNormalization()(mid)
    fl = ln[:,0,:] # just grab the class token for each image in batch
    clas = tf.keras.layers.Dense(num_classes,activation='softmax')(fl) # probability that the image is in each category
    mod = tf.keras.models.Model([inp,inp2],clas)
    mod.compile(optimizer='adam',loss='sparse_categorical_crossentropy',metrics=['accuracy'])
    return mod

In [None]:
n = 4
m = 4
block_size = 49
hidden_dim = 128
num_layers = 8
num_heads = 8
key_dim = hidden_dim//num_heads # usually good practice for key_dim to be hidden_dim//num_heads...this is why we do Multi-Head attention
mlp_dim = hidden_dim
dropout_rate = 0.13
num_classes = 10



trans = build_ViT(n,m,block_size,hidden_dim,num_layers,num_heads,key_dim,mlp_dim,dropout_rate,num_classes)
trans.summary()

In [None]:
mnist = tf.keras.datasets.mnist

(x_train, y_train),(x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

ndata_train = x_train.shape[0]
ndata_test = x_test.shape[0]

In [None]:
x_train.shape

In [None]:
x_train_ravel = np.zeros((ndata_train, n * m, block_size))
for img in range(ndata_train):
    ind = 0
    for row in range(n):
        for col in range(m):
            block = x_train[img, row * int(np.sqrt(block_size)):(row + 1) * int(np.sqrt(block_size)), col * int(np.sqrt(block_size)):(col + 1) * int(np.sqrt(block_size))].reshape(-1)
            x_train_ravel[img, ind, :] = block
            ind += 1


In [None]:
x_test_ravel = np.zeros((ndata_test, n * m, block_size))
for img in range(ndata_test):
    ind = 0
    for row in range(n):
        for col in range(m):
            block = x_test[img, row * int(np.sqrt(block_size)):(row + 1) * int(np.sqrt(block_size)), col * int(np.sqrt(block_size)):(col + 1) * int(np.sqrt(block_size))].reshape(-1)
            x_test_ravel[img, ind, :] = block
            ind += 1


In [None]:
pos_feed_train = np.array([list(range(n*m))]*ndata_train)
pos_feed_test = np.array([list(range(n*m))]*ndata_test)

In [None]:
# Define your learning rate scheduler function
def lr_scheduler(epoch, lr):
    if epoch < 10:
        return lr  # Keep the initial learning rate for the first 10 epochs
    else:
        return lr * tf.math.exp(-0.1)  # Exponentially decay the learning rate after 10 epochs

# Create the learning rate scheduler callback
lr_callback = tf.keras.callbacks.LearningRateScheduler(lr_scheduler)

# Define the model checkpoint callback to save the best model weights
checkpoint_filepath = 'model_checkpoint_v2.h5'
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True)

# Call trans.fit() with the learning rate scheduler and model checkpoint callbacks
trans.fit([x_train_ravel, pos_feed_train], y_train,
          epochs=50, batch_size=160, validation_split=0.20,
          callbacks=[lr_callback, model_checkpoint_callback])

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.src.callbacks.History at 0x7d271a7c6200>

In [None]:
# Create a new instance of your model
trans = build_ViT(n, m, block_size, hidden_dim, num_layers, num_heads, key_dim, mlp_dim, dropout_rate, num_classes)

# Load the weights from the checkpoint file
checkpoint_filepath = 'model_checkpoint_v2.h5'
trans.load_weights(checkpoint_filepath)

# Compile the model (if necessary)
# trans.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])


In [None]:
out = trans.evaluate([x_test_ravel,pos_feed_test],y_test)



Anvil Uplink

In [None]:
#pip install anvil-uplink

Collecting anvil-uplink
  Downloading anvil_uplink-0.4.2-py2.py3-none-any.whl (90 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/90.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.1/90.1 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting argparse (from anvil-uplink)
  Downloading argparse-1.4.0-py2.py3-none-any.whl (23 kB)
Collecting ws4py (from anvil-uplink)
  Downloading ws4py-0.5.1.tar.gz (51 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/51.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.4/51.4 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: ws4py
  Building wheel for ws4py (setup.py) ... [?25l[?25hdone
  Created wheel for ws4py: filename=ws4py-0.5.1-py3-none-any.whl size=45228 sha256=e2c0f8ff6c39899ca

In [None]:
import anvil.server

anvil.server.connect("server_X5QT7YAEYYWLOO7WLNIVSWMW-OZ5H5BS2NZJKEVCA")

Connecting to wss://anvil.works/uplink
Anvil websocket open
Connected to "Development" as SERVER
