In [1]:
import tensorflow as tf
import numpy as np
from tqdm import tqdm

In [2]:
# ----------------------------
# 1. Load MNIST and preprocess
# ----------------------------
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data()

# normalize to [0,1] and add channel dimension
X_train = X_train.astype(np.float32) / 255.0
X_test  = X_test.astype(np.float32) / 255.0
X_train = X_train[..., np.newaxis]  # shape (N,28,28,1)
X_test  = X_test[..., np.newaxis]


Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
[1m11490434/11490434[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [3]:
# ----------------------------
# 2. Custom Dataset and DataLoader
# ----------------------------
class Data:
    def __init__(self, X, y):
        self.X = X
        self.y = y
        self.len = len(X)

    def __getitem__(self, index):
        return self.X[index], self.y[index]

    def __len__(self):
        return self.len

def DataLoader(dataset, batch_size=64, shuffle=True):
    indices = np.arange(len(dataset))
    if shuffle:
        np.random.shuffle(indices)
    for start_idx in range(0, len(dataset), batch_size):
        batch_indices = indices[start_idx:start_idx+batch_size]
        batch_X = dataset.X[batch_indices]
        batch_y = dataset.y[batch_indices]
        yield batch_X, batch_y

In [4]:
# Create datasets and loaders
batch_size = 64
train_dataset = Data(X_train, y_train)
test_dataset  = Data(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [5]:
# ----------------------------
# 3. Define CNN (like your PyTorch model)
# ----------------------------
class CNN(tf.keras.Model):
    def __init__(self, in_channels=1, num_classes=10):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv2D(
            filters=8, kernel_size=3, strides=1, padding="same", activation="relu",
            input_shape=(28,28,in_channels)
        )
        self.pool = tf.keras.layers.MaxPooling2D(pool_size=2, strides=2)
        self.conv2 = tf.keras.layers.Conv2D(
            filters=16, kernel_size=3, strides=1, padding="same", activation="relu"
        )
        self.flatten = tf.keras.layers.Flatten()
        self.fc1 = tf.keras.layers.Dense(num_classes)  # logits

    def call(self, x):
        x = self.conv1(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = self.pool(x)
        x = self.flatten(x)
        return self.fc1(x)

model = CNN(in_channels=1, num_classes=10)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [6]:
# ----------------------------
# 4. Define loss and optimizer
# ----------------------------
learning_rate = 0.001
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

In [7]:
# ----------------------------
# 5. Training loop (like PyTorch)
# ----------------------------
num_epochs = 10

for epoch in range(num_epochs):
    print(f"Epoch [{epoch+1}/{num_epochs}]")
    for batch_idx, (data, targets) in enumerate(tqdm(train_loader)):
        # Convert to tensors
        data = tf.convert_to_tensor(data)
        targets = tf.convert_to_tensor(targets)

        with tf.GradientTape() as tape:
            scores = model(data, training=True)
            loss = loss_fn(targets, scores)

        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

Epoch [1/10]


938it [00:41, 22.41it/s]


Epoch [2/10]


0it [00:00, ?it/s]


Epoch [3/10]


0it [00:00, ?it/s]


Epoch [4/10]


0it [00:00, ?it/s]


Epoch [5/10]


0it [00:00, ?it/s]


Epoch [6/10]


0it [00:00, ?it/s]


Epoch [7/10]


0it [00:00, ?it/s]


Epoch [8/10]


0it [00:00, ?it/s]


Epoch [9/10]


0it [00:00, ?it/s]


Epoch [10/10]


0it [00:00, ?it/s]


In [12]:
def check_accuracy(loader, model):
    num_correct = 0
    num_samples = 0
    for x, y in loader:
        x = tf.convert_to_tensor(x, dtype=tf.float32)
        y = tf.convert_to_tensor(y, dtype=tf.int32)  # <- cast here

        scores = model(x, training=False)
        predictions = tf.argmax(scores, axis=1, output_type=tf.int32)  # make sure same dtype

        num_correct += tf.reduce_sum(tf.cast(predictions == y, tf.int32)).numpy()
        num_samples += x.shape[0]

    accuracy = 100 * num_correct / num_samples
    print(f"Got {num_correct}/{num_samples} with accuracy {accuracy:.2f}%")

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

print('Train Accuracy')
check_accuracy(train_loader, model)
print('Test Accuracy')
check_accuracy(test_loader, model)


Train Accuracy
Got 57908/60000 with accuracy 96.51%
Test Accuracy
Got 9684/10000 with accuracy 96.84%
