In [19]:
# ============================
# 0) Imports & Config
# ============================
import os, math, random, time, json
import numpy as np
from PIL import Image

# Reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)

# Paths
BASE_DIR = "dogs_vs_cats"
TRAIN_DIR = os.path.join(BASE_DIR, "train")
TEST_DIR  = os.path.join(BASE_DIR, "test")

# Image + training hyperparams
IMG_SIZE   = 28     # keep small for CPU
IN_CHANNEL = 3
BATCH_SIZE = 16
EPOCHS     = 10
LR         = 0.001
MOMENTUM   = 0.9
WEIGHT_DECAY = 1e-4   # L2 regularization on conv + dense

# To speed up experimentation you can limit images per class (None = use all)
MAX_TRAIN_PER_CLASS = None
MAX_TEST_PER_CLASS  = None


In [20]:
# ============================
# 1) Data loading utilities
# ============================
def list_images_in_class(root, cls, limit=None):
    folder = os.path.join(root, cls)
    files = [os.path.join(folder, f) for f in os.listdir(folder) if f.lower().endswith((".jpg",".jpeg",".png",".bmp",".gif"))]
    files.sort()
    if limit is not None:
        files = files[:limit]
    return files

def load_split(root, img_size, limit_per_class=None):
    cat_files = list_images_in_class(root, "cats", limit_per_class)
    dog_files = list_images_in_class(root, "dogs", limit_per_class)
    X, y = [], []
    for fp in cat_files:
        try:
            img = Image.open(fp).convert("RGB").resize((img_size, img_size))
            X.append(np.asarray(img, dtype=np.float32)/255.0)
            y.append(0)
        except Exception:
            pass
    for fp in dog_files:
        try:
            img = Image.open(fp).convert("RGB").resize((img_size, img_size))
            X.append(np.asarray(img, dtype=np.float32)/255.0)
            y.append(1)
        except Exception:
            pass
    X = np.stack(X, axis=0)   # (N, H, W, C)
    y = np.array(y, dtype=np.float32).reshape(-1,1)
    # Shuffle together
    idx = np.arange(len(y))
    np.random.shuffle(idx)
    return X[idx], y[idx]

X_train, y_train = load_split(TRAIN_DIR, IMG_SIZE, MAX_TRAIN_PER_CLASS)
X_test,  y_test  = load_split(TEST_DIR,  IMG_SIZE, MAX_TEST_PER_CLASS)

print("Train:", X_train.shape, y_train.shape)
print("Test :", X_test.shape,  y_test.shape)


Train: (20000, 28, 28, 3) (20000, 1)
Test : (5000, 28, 28, 3) (5000, 1)


In [18]:
# ============================
# 2) Helper ops: im2col/col2im
# ============================
def im2col(X, ksize, stride=1, pad=0):
    """
    X: (N, C, H, W)
    Return: cols (C*ksize*ksize, N*H_out*W_out)
    """
    N, C, H, W = X.shape
    KH = KW = ksize
    H_out = (H + 2*pad - KH)//stride + 1
    W_out = (W + 2*pad - KW)//stride + 1

    X_padded = np.pad(X, ((0,0),(0,0),(pad,pad),(pad,pad)), mode='constant')
    cols = np.zeros((C*KH*KW, N*H_out*W_out), dtype=X.dtype)

    col_idx = 0
    for i in range(0, H_out):
        for j in range(0, W_out):
            patch = X_padded[:, :, i*stride:i*stride+KH, j*stride:j*stride+KW]
            cols[:, col_idx:col_idx+N] = patch.reshape(N, -1).T
            col_idx += N
    return cols, H_out, W_out

def col2im(cols, X_shape, ksize, stride=1, pad=0, H_out=None, W_out=None):
    """
    cols: (C*ksize*ksize, N*H_out*W_out)
    Return: X_grad (N, C, H, W)
    """
    N, C, H, W = X_shape
    KH = KW = ksize
    X_padded = np.zeros((N, C, H + 2*pad, W + 2*pad), dtype=cols.dtype)

    # reconstruct
    col_idx = 0
    for i in range(H_out):
        for j in range(W_out):
            patch = cols[:, col_idx:col_idx+N].T.reshape(N, C, KH, KW)
            X_padded[:, :, i*stride:i*stride+KH, j*stride:j*stride+KW] += patch
            col_idx += N

    if pad == 0:
        return X_padded
    return X_padded[:, :, pad:-pad, pad:-pad]


In [21]:
# ============================
# 3) Layers
# ============================

class Conv2D:
    def __init__(self, in_channels, out_channels, ksize=3, stride=1, pad=0):
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.ksize = ksize
        self.stride = stride
        self.pad = pad
        # He init
        scale = math.sqrt(2.0/(in_channels*ksize*ksize))
        self.W = np.random.randn(out_channels, in_channels, ksize, ksize).astype(np.float32) * scale
        self.b = np.zeros((out_channels,1), dtype=np.float32)
        # momentum buffers
        self.vW = np.zeros_like(self.W)
        self.vb = np.zeros_like(self.b)

    def forward(self, X):
        # Save for backward
        self.X = X  # (N, C, H, W)
        W_col = self.W.reshape(self.out_channels, -1)  # (F, C*KH*KW)
        X_col, self.H_out, self.W_out = im2col(X, self.ksize, self.stride, self.pad)
        out = (W_col @ X_col + self.b).reshape(self.out_channels, self.X.shape[0], self.H_out, self.W_out)
        return np.transpose(out, (1,0,2,3))  # (N, F, H_out, W_out)

    def backward(self, dY):
        # dY: (N,F,H_out,W_out)
        dY = np.transpose(dY, (1,0,2,3)).reshape(self.out_channels, -1)  # (F, N*H_out*W_out)

        X_col, H_out, W_out = im2col(self.X, self.ksize, self.stride, self.pad)
        W_col = self.W.reshape(self.out_channels, -1)

        # grads
        dW_col = dY @ X_col.T
        db = np.sum(dY, axis=1, keepdims=True)

        dX_col = W_col.T @ dY
        dX = col2im(dX_col, self.X.shape, self.ksize, self.stride, self.pad, H_out, W_out)

        self.dW = dW_col.reshape(self.W.shape) + WEIGHT_DECAY * self.W
        self.db = db
        return dX

    def step(self, lr, momentum):
        self.vW = momentum*self.vW - lr*self.dW
        self.vb = momentum*self.vb - lr*self.db
        self.W += self.vW
        self.b += self.vb


class ReLU:
    def forward(self, X):
        self.mask = (X > 0).astype(np.float32)
        return X * self.mask
    def backward(self, dY):
        return dY * self.mask
    def step(self, lr, momentum):  # no params
        pass


class MaxPool2D:
    def __init__(self, ksize=2, stride=2):
        self.ksize = ksize
        self.stride = stride
    def forward(self, X):
        # X: (N,C,H,W)
        N,C,H,W = X.shape
        KH=KW=self.ksize
        H_out = (H - KH)//self.stride + 1
        W_out = (W - KW)//self.stride + 1
        self.X = X
        out = np.zeros((N,C,H_out,W_out), dtype=X.dtype)
        self.max_idx = np.zeros_like(out, dtype=np.int32)

        for n in range(N):
            for c in range(C):
                for i in range(H_out):
                    for j in range(W_out):
                        patch = X[n,c, i*self.stride:i*self.stride+KH, j*self.stride:j*self.stride+KW]
                        idx = np.argmax(patch)
                        out[n,c,i,j] = patch.reshape(-1)[idx]
                        self.max_idx[n,c,i,j] = idx
        return out

    def backward(self, dY):
        N,C,H,W = self.X.shape
        KH=KW=self.ksize
        H_out = (H - KH)//self.stride + 1
        W_out = (W - KW)//self.stride + 1
        dX = np.zeros_like(self.X)

        for n in range(N):
            for c in range(C):
                for i in range(H_out):
                    for j in range(W_out):
                        idx = self.max_idx[n,c,i,j]
                        r = idx // KW
                        s = idx %  KW
                        dX[n,c, i*self.stride+r, j*self.stride+s] += dY[n,c,i,j]
        return dX

    def step(self, lr, momentum):  # no params
        pass


class Flatten:
    def forward(self, X):
        self.X_shape = X.shape
        return X.reshape(X.shape[0], -1)
    def backward(self, dY):
        return dY.reshape(self.X_shape)
    def step(self, lr, momentum):
        pass


class Dense:
    def __init__(self, in_features, out_features):
        # He-like for ReLU layers; small for last layer
        scale = math.sqrt(2.0/in_features)
        self.W = np.random.randn(in_features, out_features).astype(np.float32) * scale
        self.b = np.zeros((1, out_features), dtype=np.float32)
        self.vW = np.zeros_like(self.W)
        self.vb = np.zeros_like(self.b)
    def forward(self, X):
        self.X = X
        return X @ self.W + self.b
    def backward(self, dY):
        self.dW = self.X.T @ dY + WEIGHT_DECAY * self.W
        self.db = np.sum(dY, axis=0, keepdims=True)
        return dY @ self.W.T
    def step(self, lr, momentum):
        self.vW = momentum*self.vW - lr*self.dW
        self.vb = momentum*self.vb - lr*self.db
        self.W += self.vW
        self.b += self.vb


# Loss + activation
def sigmoid(X):
    return 1. / (1. + np.exp(-X))

class SigmoidWithBCE:
    def forward(self, logits, targets):
        # logits: (N,1), targets: (N,1)
        self.targets = targets
        self.p = sigmoid(logits)
        # numerically stable BCE
        eps = 1e-12
        loss = -np.mean(targets*np.log(self.p+eps) + (1-targets)*np.log(1-self.p+eps))
        return loss
    def backward(self):
        # dL/dlogits = (p - y)/N
        N = self.targets.shape[0]
        return (self.p - self.targets) / N


In [None]:
# ============================
# 4) Model definition
# ============================
class SimpleCNN:
    def __init__(self, img_size=64, in_ch=3):
        # (N,H,W,C) -> (N,C,H,W)
        self.layers = [
            # Block 1
            Conv2D(in_ch, 16, ksize=3, stride=1, pad=1),
            ReLU(),
            MaxPool2D(ksize=2, stride=2),  # 64->32

            # Block 2
            Conv2D(16, 32, ksize=3, stride=1, pad=1),
            ReLU(),
            MaxPool2D(ksize=2, stride=2),  # 32->16

            # Block 3
            Conv2D(32, 64, ksize=3, stride=1, pad=1),
            ReLU(),
            MaxPool2D(ksize=2, stride=2),  # 16->8

            Flatten(),
            Dense(64 * (img_size//8) * (img_size//8), 64),
            ReLU(),
            Dense(64, 1)  # logits
            self.layers[-1].W *= 5.0   # boost final weights so sigmoid != stuck at 0.5

        ]
        self.criterion = SigmoidWithBCE()

    def forward(self, X):
        # X: (N,H,W,C) -> to channels-first for our conv
        x = np.transpose(X, (0,3,1,2))
        for layer in self.layers:
            x = layer.forward(x) if hasattr(layer, "forward") else x
        return x  # logits

    def backward(self, dL):
        # go backwards through layers
        x = dL
        for layer in reversed(self.layers):
            if hasattr(layer, "backward"):
                x = layer.backward(x)
        return x

    def step(self, lr=1e-2, momentum=0.9):
        for layer in self.layers:
            if hasattr(layer, "step"):
                layer.step(lr, momentum)

    def save(self, path="cnn_weights.npz"):
        weights = {}
        idx = 0
        for layer in self.layers:
            if isinstance(layer, (Conv2D, Dense)):
                weights[f"W{idx}"] = layer.W
                weights[f"b{idx}"] = layer.b
            idx += 1
        np.savez(path, **weights)

    def load(self, path="cnn_weights.npz"):
        data = np.load(path)
        idx = 0
        for layer in self.layers:
            if isinstance(layer, (Conv2D, Dense)):
                layer.W = data[f"W{idx}"]
                layer.b = data[f"b{idx}"]
            idx += 1


In [6]:
# ============================
# 5) Batching & metrics
# ============================
def iterate_minibatches(X, y, batch_size, shuffle=True):
    idx = np.arange(len(y))
    if shuffle:
        np.random.shuffle(idx)
    for start in range(0, len(y), batch_size):
        end = start + batch_size
        batch_idx = idx[start:end]
        yield X[batch_idx], y[batch_idx]

def accuracy_from_logits(logits, y_true):
    # logits shape: (N,1)
    probs = sigmoid(logits)
    preds = (probs >= 0.5).astype(np.float32)
    return float(np.mean(preds == y_true))


In [8]:
# ============================
# 6) Train
# ============================
model = SimpleCNN(img_size=IMG_SIZE, in_ch=IN_CHANNEL)

def train(model, X_train, y_train, X_val, y_val, epochs, lr, momentum):
    history = {"loss": [], "val_loss": [], "acc": [], "val_acc": []}
    for ep in range(1, epochs+1):
        t0 = time.time()
        losses = []
        accs = []
        for xb, yb in iterate_minibatches(X_train, y_train, BATCH_SIZE, shuffle=True):
            # forward
            logits = model.forward(xb)
            # loss
            loss = model.criterion.forward(logits, yb)
            # backward
            dlogits = model.criterion.backward()
            model.backward(dlogits)
            # step
            model.step(lr, momentum)
            # metrics
            losses.append(loss)
            accs.append(accuracy_from_logits(logits, yb))

        # epoch metrics
        train_loss = float(np.mean(losses))
        train_acc  = float(np.mean(accs))

        # validation
        val_logits = model.forward(X_val)
        val_loss = model.criterion.forward(val_logits, y_val)
        val_acc  = accuracy_from_logits(val_logits, y_val)

        dt = time.time()-t0
        history["loss"].append(train_loss)
        history["val_loss"].append(float(val_loss))
        history["acc"].append(train_acc)
        history["val_acc"].append(float(val_acc))

        print(f"Epoch {ep:02d}/{epochs} | "
              f"loss {train_loss:.4f} acc {train_acc:.4f} | "
              f"val_loss {val_loss:.4f} val_acc {val_acc:.4f} | "
              f"{dt:.1f}s")
    return history

history = train(model, X_train, y_train, X_test, y_test, EPOCHS, LR, MOMENTUM)
model.save("dog_vs_cat_numpy_cnn.npz")


Epoch 01/10 | loss 0.6948 acc 0.4995 | val_loss 0.6932 val_acc 0.5058 | 686.9s
Epoch 02/10 | loss 0.6936 acc 0.5056 | val_loss 0.6931 val_acc 0.4980 | 697.8s
Epoch 03/10 | loss 0.6936 acc 0.5006 | val_loss 0.6933 val_acc 0.5002 | 690.7s
Epoch 04/10 | loss 0.6937 acc 0.4995 | val_loss 0.6936 val_acc 0.5000 | 689.0s
Epoch 05/10 | loss 0.6937 acc 0.4966 | val_loss 0.6932 val_acc 0.4834 | 691.6s
Epoch 06/10 | loss 0.6936 acc 0.5007 | val_loss 0.6935 val_acc 0.5000 | 689.9s
Epoch 07/10 | loss 0.6935 acc 0.4999 | val_loss 0.6933 val_acc 0.5000 | 677.2s
Epoch 08/10 | loss 0.6936 acc 0.4933 | val_loss 0.6932 val_acc 0.5000 | 1333.0s
Epoch 09/10 | loss 0.6936 acc 0.4958 | val_loss 0.6931 val_acc 0.5000 | 679.6s
Epoch 10/10 | loss 0.6937 acc 0.5015 | val_loss 0.6932 val_acc 0.5000 | 8194.9s


In [2]:
# ============================
# 7) Prediction
# ============================
def predict_image(model, image_path, img_size=IMG_SIZE):
    img = Image.open(image_path).convert("RGB").resize((img_size,img_size))
    x = np.asarray(img, dtype=np.float32)/255.0
    x = x[None, ...]  # (1,H,W,C)
    logits = model.forward(x)
    p = float(sigmoid(logits)[0,0])
    return {"prob_dog": p, "pred": "dog" if p>=0.5 else "cat"}


In [17]:
model = SimpleCNN(img_size=IMG_SIZE, in_ch=IN_CHANNEL)

# Step 2: Load the saved weights
model.load("dog_vs_cat_numpy_cnn.npz")

# change name in path to check different images
print(predict_image(model, "dogs_vs_cats/test/cats/cat.22.jpg"))


{'prob_dog': 0.49582991003990173, 'pred': 'cat'}
