In [1]:
from typing import TypedDict
import csv
import kagglehub
import numpy as np
import os
import random

class Sample(TypedDict):
    path: str
    lat: float
    lon: float
    image: np.ndarray
    target: np.ndarray


_coordinates_dict = {}

def coordinates_from_path(filepath):
    if filepath in _coordinates_dict:
        return _coordinates_dict[filepath]  # (lat, lon)
    return None

def csv_loader(path):
    coords = []
    with open(path) as f:
        lines = f.readlines()
    for line in lines:
        lat, lng = line.strip().split(",")
        coords.append([float(lat), float(lng)])
    return coords

def load_images(path, output_csv="coordinates.csv"):
    global _coordinates_dict
    items = []
    _coordinates_dict = {}
    
    # Load coordinates from dataset
    coords = csv_loader(f"{path}/coords.csv")
    
    # Load all png images
    for image in os.listdir(path):
        if "png" not in image:
            continue
        filepath = f"{path}/{image}"
        items.append(filepath)
        # Image index corresponds to coords index (0.png -> coords[0], etc.)
        index = int(image.replace(".png", ""))
        lat, lon = coords[index]
        _coordinates_dict[filepath] = (lat, lon)
    
    # Output to local coordinates.csv
    with open(output_csv, "w", newline="") as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["lat", "lon"])
        for filepath in items:
            lat, lon = _coordinates_dict[filepath]
            writer.writerow([lat, lon])

    random.shuffle(items)
    return items

def train_test_split(paths, split=0.75, max=None):
    paths = paths[:max] if max is not None else paths
    split_index = int(len(paths) * split)
    train, test = paths[:split_index], paths[split_index:]
    return train, test


# 10,000 images, zero-indexed.png
path = kagglehub.dataset_download("paulchambaz/google-street-view")
path = f"{path}/dataset"
images = load_images(path)
train, test = train_test_split(images, split=0.8)

print(f"{len(train)} images for train, {len(test)} images for test, {len(images)} total!")

# The coordinates for the output-frame (how high resolution should the probability distribution be?)
OUT_H, OUT_W = 32, 64
EPSILON = 1e-12 # value for ensuring model does not hit 0

  from .autonotebook import tqdm as notebook_tqdm


8000 images for train, 2000 images for test, 10000 total!


In [2]:
from typing import Literal
# I call the inputs (incoming nodes) "x", and outgoing (nodes that I affect) "o"

class Layer():
    compiled = False
    len = None
    x = None
    o = None
    mode = "train"
    
    def compile(self, inputs: int = None):
        if self.len is None:
            if inputs is None:
                raise Exception("The layer has an undefined size")
            self.len = inputs
        self.compiled = True
    
    def loss(self, lr):
        pass

    def set_mode(self, mode: Literal["train", "test"]):
        self.mode = mode

    def __len__(self):
        return self.len
        
# ReLU and LeakyReLU together
class ReLU(Layer):
    def __init__(self, c=0):
        self.c = c

    def forward(self, x):
        self.x = x
        self.o = np.where(x > 0, x, self.c * x)
        return self.o
    
    def backward(self, dl):
        return dl * np.where(self.x > 0, 1, self.c)

    def __repr__(self):
        return f"ReLU layer, with c={self.c}"


class Sigmoid(Layer):
    def forward(self, x):
        self.x = x
        self.o = 1 / (1 + np.exp(-x))
        return self.o
    
    def backward(self, dl):
        return dl * self.o * (1 - self.o)
    
    def __repr__(self):
        return "Sigmoid layer"


class BatchNorm(Layer):
    def __init__(self, momentum=0.9, epsilon=1e-5):
        self.momentum = momentum
        self.epsilon = epsilon
        self.running_mean = None
        self.running_var = None
        
    def compile(self, inputs):
        self.len = inputs
        self.running_mean = np.zeros(inputs)
        self.running_var = np.ones(inputs)
        self.gamma = np.ones(inputs)
        self.beta = np.zeros(inputs)
        self.compiled = True
    
    def forward(self, x):
        self.x = x
        if self.mode == "train":
            self.mean = np.mean(x)
            self.var = np.var(x) + self.epsilon
            self.x_norm = (x - self.mean) / np.sqrt(self.var)
            self.running_mean = self.momentum * self.running_mean + (1 - self.momentum) * self.mean
            self.running_var = self.momentum * self.running_var + (1 - self.momentum) * self.var
        else:
            self.x_norm = (x - self.running_mean) / np.sqrt(self.running_var + self.epsilon)
        self.o = self.gamma * self.x_norm + self.beta
        return self.o
    
    def backward(self, dl):
        dx_norm = dl * self.gamma
        return dx_norm / np.sqrt(self.var)
    
    def __repr__(self):
        return f"BatchNorm layer"


class PatchLinear(Layer):
    def __init__(self, outputs, wpn, decay):
        self.len = outputs
        self.wpn = wpn
        self.decay = decay

    def compile(self, _=None):
        if _ is not None:
            raise Exception("This layer must be the first layer")
            
        self.weights = np.random.randn(self.len, self.wpn) * (1 / np.sqrt(self.wpn)) # N x Inputs
        self.bias = np.zeros(self.len)  # N x 1
        self.compiled = True

    def forward(self, x):
        self.x = x
        self.o = []
        for patch, weights in zip(x, self.weights):
            self.o.append(patch.T @ weights)  # dot product, returns float
        return np.array(self.o) + self.bias

    def backward(self, dl):
        self.dW = np.zeros_like(self.weights)
        self.db = np.zeros_like(self.bias)
        dx = np.zeros_like(self.x) 

        for i, (patch, w) in enumerate(zip(self.x, self.weights)):
            self.dW[i] = dl[i] * patch
            self.db[i] = dl[i]
            dx[i] = dl[i] * w
        return dx

    def loss(self, lr):
        self.bias -= lr * self.db
        self.weights -= lr * (self.dW + self.decay * self.weights)

    def __repr__(self):
        return f"Patch Linear layer, with {self.wpn} weights per neuron and {self.len} outputs"


class Dense(Layer):
    def __init__(self, outputs, decay):
        self.len = outputs
        self.decay = decay

    def compile(self, inputs):
        if inputs is None and self.inputs is None:
            raise Exception("This layer has an undefined size")
        self.inputs = inputs
        self.weights = np.random.randn(self.len, inputs) / np.sqrt(inputs) # N x Inputs
        self.bias = np.zeros(self.len) # N x 1
        self.compiled = True

    def forward(self, x):
        self.x = x
        self.o = self.weights @ x + self.bias
        return self.o
            
    def backward(self, dl):
        self.dW = np.outer(dl, self.x) # (N) x (Inputs)
        self.db = dl # x 1
        return self.weights.T @ dl
    
    def loss(self, lr):
        self.bias -= lr * self.db
        self.weights -= lr * (self.dW + self.decay * self.weights)
        
    def __repr__(self):
        return f"Dense layer, with {self.inputs} inputs and {self.len} outputs"

class Softmax(Layer):
    def forward(self, x):
        self.x = x
        x_shift = x - np.max(x)
        exp = np.exp(x_shift)
        self.o = exp / np.sum(exp)
        return self.o

    def backward(self, dl):
        return self.o * (dl - np.dot(dl, self.o))

    def __repr__(self):
        return "Softmax layer"

class Dropout(Layer):
    def __init__(self, p=0.5):
        self.p = p
        self.mask = None

    def forward(self, x):
        if self.mode == "test":
            return x
        self.mask = np.random.binomial(n=1, p=1-self.p, size=x.shape)
        self.mask = self.mask * (1 / (1 - self.p))
        self.o = x * self.mask
        return self.o

    def backward(self, dl):
        return dl * self.mask

    def __repr__(self):
        return f"Dropout layer, with p={self.p}"

In [None]:
from typing import List
from tqdm import tqdm
import matplotlib.pyplot as plt
from PIL import Image
from datetime import datetime

class Model:
    def __init__(self, decay=0.0):
        self.layers: List[Layer] = []
        self.decay = decay

    def sigmoid(self):
        self.layers.append(Sigmoid())
        return self
    
    def relu(self):
        self.layers.append(ReLU())
        return self

    def leaky_relu(self, c: float):
        self.layers.append(ReLU(c=c))
        return self
    
    def batchnorm(self):
        self.layers.append(BatchNorm())
        return self
    
    def patch_linear(self, outputs, wpn):
        self.layers.append(PatchLinear(outputs, wpn, self.decay))
        return self

    def dense(self, outputs):
        self.layers.append(Dense(outputs, self.decay))
        return self

    def softmax(self):
        self.layers.append(Softmax())
        return self
    
    def dropout(self, p):
        self.layers.append(Dropout(p))
        return self
    
    def compile(self):
        inputs = None
        for layer in self.layers:
            layer.compile(inputs)
            inputs = len(layer)

    def set_mode(self, mode: Literal["train", "test"]):
        for layer in self.layers:
            layer.set_mode(mode)

    def evaluate(self, test_gen, test_count, alpha=0.5):
        self.set_mode("test")
        loss_list = []
        
        for sample in tqdm(test_gen(), total=test_count, desc="Evaluating"):
            inputs = sample["image"]
            target = sample["target"]
            for layer in self.layers:
                inputs = layer.forward(inputs)
            forward_kl = -np.sum(target * np.log(inputs + EPSILON))
            mse_penalty = np.sum((inputs - target) ** 2)
            loss = forward_kl + alpha * mse_penalty
            loss_list.append(loss)
        
        return np.mean(loss_list)

    def infer(self, test_gen, test_count, num_samples=5, alpha=0.5):
        self.set_mode("test")
        random_indices = set(np.random.choice(test_count, size=min(num_samples, test_count), replace=False))
        loss_list = []
        show_samples = []
        
        for index, sample in enumerate(tqdm(test_gen(), total=test_count, desc="Testing on novel images:")):
            inputs = sample["image"]
            target = sample["target"]
            for layer in self.layers:
                inputs = layer.forward(inputs)

            forward_kl = -np.sum(target * np.log(inputs + EPSILON))
            mse_penalty = np.sum((inputs - target) ** 2)
            loss = forward_kl + alpha * mse_penalty
            loss_list.append(loss)

            if index in random_indices:
                show_samples.append({**sample, "output": inputs, "loss": loss})

        avg_loss = np.mean(loss_list)
        print(f"VALIDATION: avg_loss={avg_loss}, min={np.min(loss_list)}, max={np.max(loss_list)}")

        for show_sample in show_samples:
            flat_index = show_sample["output"].argmax()
            lat_index = flat_index // OUT_W
            lon_index = flat_index % OUT_W 
            guess_lat = ((lat_index + 0.5) / OUT_H) * 180 - 90
            guess_lng = ((lon_index + 0.5) / OUT_W) * 360 - 180
            
            actual_lat, actual_lon = show_sample["lat"], show_sample["lon"]
            
            fig, axes = plt.subplots(1, 3, figsize=(15, 4))
            axes[0].imshow(show_sample["target"].reshape(OUT_H, OUT_W), cmap="hot", origin="lower")
            axes[0].set_title(f"TARGET (lat:{actual_lat}, lon:{actual_lon})")
            axes[0].axis("off")
            axes[1].imshow(show_sample["output"].reshape(OUT_H, OUT_W), cmap="hot", origin="lower")
            axes[1].set_title(f"PREDICTION (lat:{guess_lat}, lon:{guess_lng})")
            axes[1].axis("off")
            axes[2].imshow(Image.open(show_sample["path"]))
            axes[2].set_title(f"REAL IMAGE (loss: {show_sample["loss"]})")
            axes[2].axis("off")
            plt.tight_layout()
            plt.show()

        return avg_loss

                
    def train_epoch(self, train_gen, lr=1e-3, decay=0.0, log_interval=2000, show_images=False, alpha=0.5):
        self.set_mode("train")
        self.decay = decay
        
        loss_list = []
        running_loss = 0.0
        count = 0
        
        for sample in train_gen:
            inputs = sample["image"]
            target = sample["target"]

            if len(self.layers) == 0:
                raise Exception("No layers to train")

            if inputs.shape[0] != len(self.layers[0]):
                raise Exception("Data/input-layer shape mismatch")

            # forward pass
            for layer in self.layers:
                if not layer.compiled:
                    raise Exception("Please compile your layers!")
                inputs = layer.forward(inputs)

            if inputs.shape != target.shape:
                raise Exception("Output-layer/target shape mismatch")
                
            forward_kl = -np.sum(target * np.log(inputs + EPSILON))
            mse_penalty = np.sum((inputs - target) ** 2)
            
            loss = forward_kl + alpha * mse_penalty
            loss_list.append(loss)
            running_loss += loss

            if count > 0 and count % log_interval == 0:
                avg_running = running_loss / log_interval
                print(f"[Step {count}]: avg loss: {avg_running}")
                running_loss = 0.0
                
                if show_images:
                    fig, axes = plt.subplots(1, 3, figsize=(12, 4))
                    axes[0].imshow(target.reshape(OUT_H, OUT_W), cmap="hot", origin="lower")
                    axes[0].set_title("Target")
                    axes[0].axis("off")
                    axes[1].imshow(inputs.reshape(OUT_H, OUT_W), cmap="hot", origin="lower")
                    axes[1].set_title("Model Output")
                    axes[1].axis("off")
                    axes[2].imshow(Image.open(sample["path"]))
                    axes[2].set_title("Original Image")
                    axes[2].axis("off")
                    plt.tight_layout()
                    plt.show()
            count += 1
            
            # backwards pass
            dl = -target / (inputs + EPSILON) + alpha * 2 * (inputs - target)
            for index in range(len(self.layers) - 1, -1, -1):
                layer = self.layers[index]
                dl = layer.backward(dl)

            # update loss on weights
            for layer in self.layers:
                layer.loss(lr)

        return loss_list


    def log(self):
        total_params = 0
        for layer in self.layers:
            if hasattr(layer, "weights"):
                params = layer.weights.size + (layer.bias.size if hasattr(layer, "bias") else 0)
                total_params += params
                print(f"{layer} ({params:,} params)")
            else:
                print(layer)
        print(f"Total trainable parameters: {total_params:,}")
    
    def save(self, filepath):
        state = {}
        for i, layer in enumerate(self.layers):
            layer_state = {}
            if hasattr(layer, "weights"):
                layer_state["weights"] = layer.weights
            if hasattr(layer, "bias"):
                layer_state["bias"] = layer.bias
            if hasattr(layer, "running_mean"):
                layer_state["running_mean"] = layer.running_mean
                layer_state["running_var"] = layer.running_var
                layer_state["gamma"] = layer.gamma
                layer_state["beta"] = layer.beta
            if layer_state:
                state[i] = layer_state
        np.savez(filepath, state=state)
    
    def load(self, filepath):
        data = np.load(filepath, allow_pickle=True)
        state = data["state"].item()
        for i, layer in enumerate(self.layers):
            if i in state:
                layer_state = state[i]
                if "weights" in layer_state:
                    layer.weights = layer_state["weights"]
                if "bias" in layer_state:
                    layer.bias = layer_state["bias"]
                if "running_mean" in layer_state:
                    layer.running_mean = layer_state["running_mean"]
                    layer.running_var = layer_state["running_var"]
                    layer.gamma = layer_state["gamma"]
                    layer.beta = layer_state["beta"]

In [4]:
model = (
    Model(decay=1e-5)
    .patch_linear(400, wpn=3072) # 400 patches, 3072 weights per patch
    .batchnorm()
    .leaky_relu(0.01)
    .dropout(0.2)
    .dense(1024)
    .batchnorm()
    .leaky_relu(0.01)
    .dropout(0.2)
    .dense(512)
    .leaky_relu(0.01)
    .dense(OUT_H * OUT_W)  # broadcast back up to heatmap coordinates
    .softmax()
)

model.compile()
model.log()

Patch Linear layer, with 3072 weights per neuron and 400 outputs (1,229,200 params)
BatchNorm layer
ReLU layer, with c=0.01
Dropout layer, with p=0.2
Dense layer, with 400 inputs and 1024 outputs (410,624 params)
BatchNorm layer
ReLU layer, with c=0.01
Dropout layer, with p=0.2
Dense layer, with 1024 inputs and 512 outputs (524,800 params)
ReLU layer, with c=0.01
Dense layer, with 512 inputs and 2048 outputs (1,050,624 params)
Softmax layer
Total trainable parameters: 3,215,248


In [None]:
def create_target(lat, lon, height, width, sigma=1.0):
    array = np.zeros((height, width))
    lat_index = int((lat + 90) / 180 * height)
    lat_index = min(height - 1, lat_index)
    lon_index = int((lon + 180) / 360 * width)
    lon_index = min(width - 1, lon_index)
    for y in range(height):
        for x in range(width):
            # gaussian distribution for heatmap
            dist = (x - lon_index)**2 + (y - lat_index)**2
            array[y, x] = np.exp(-dist / (2 * sigma**2))
    array = array.flatten()
    normalized = array / array.sum()
    normalized = np.clip(normalized, 1e-12, None)
    normalized = normalized / normalized.sum()
    return normalized

def load_samples(images, height, width, sigma=1.0):
    # Count valid samples without loading images
    count = sum(1 for img_path in images if coordinates_from_path(img_path) is not None)
    
    def gen():
        for img_path in images:
            coords = coordinates_from_path(img_path)
            if not coords:
                continue
            lat, lon = coords
            image = Image.open(img_path)
            target = create_target(lat, lon, height, width, sigma)
            yield Sample(
                path=img_path,
                lat=lat,
                lon=lon,
                image=format_frame(np.asarray(image)),
                target=target
            )
    
    return gen, count

# Need data in the right shape
# Incoming: [640, 640, 3] 
# Returns: -> [3 x 640 x 640]
def format_frame(frame, patch_size=32):
    H, W, C  = frame.shape
    num_patches_h = H // patch_size
    num_patches_w = W // patch_size
    # this took SO much experimentation, but you need to do this to ensure the patches are preserved
    frame = frame.reshape(num_patches_h, patch_size, num_patches_w, patch_size, 3)
    frame = frame.transpose(0, 2, 1, 3, 4)
    frame = frame.reshape(-1, patch_size, C)
    frame = frame.transpose(2, 0, 1)
    frame = frame.reshape(num_patches_h * num_patches_w, -1)
    return frame / 255.0  # normalize to 0-1


SIGMA = 2.5
train_gen, train_count = load_samples(train, OUT_H, OUT_W, sigma=SIGMA)
test_gen, test_count = load_samples(test, OUT_H, OUT_W, sigma=SIGMA)

epochs = 12
base_lr = 5e-4
decay = 1e-5

all_train_losses = []
all_val_losses = []
best_val_loss = float("inf")

print(f"Starting training: {epochs} epochs, base_lr={base_lr}, decay={decay}")
print(f"Train samples: {train_count}, Val samples: {test_count}")
print("-" * 60)

for epoch in range(epochs):
    lr = base_lr * (0.8 ** epoch) # exponential decay
    
    print(f"Epoch {epoch + 1}/{epochs}, rate={lr}")
    
    # Train
    loss_list = model.train_epoch(
        tqdm(train_gen(), total=train_count, desc=f"Training"),
        lr=lr, 
        decay=decay,
        log_interval=2000,
        show_images=True
    )
    train_loss = np.mean(loss_list)
    all_train_losses.append(train_loss)
    
    # Validate
    val_loss = model.infer(test_gen, test_count, num_samples=5)
    all_val_losses.append(val_loss)
    
    print(f"Train {train_loss}, val {val_loss}")
    
    # Save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        model.save(f"models/model_4_{OUT_H}x{OUT_W}_best.npz")
        print(f"NEW BEST MODEL!!!!! (val_loss: {val_loss})")
    
    model.save(f"models/model_4_{OUT_H}x{OUT_W}_e{epoch + 1}_{datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.npz")

print("\n" + "=" * 60)
print("Training Complete!")
print(f"Best validation loss: {best_val_loss}")
print("=" * 60)

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

axes[0].plot(range(1, epochs + 1), all_train_losses, "b-o", label="Train Loss", markersize=6)
axes[0].plot(range(1, epochs + 1), all_val_losses, "r-o", label="Val Loss", markersize=6)
axes[0].set_xlabel("Epoch")
axes[0].set_ylabel("Loss")
axes[0].set_title("LOSS CURVE")
axes[0].legend()
axes[0].grid(True, alpha=0.3)

lrs_used = [base_lr * (0.8 ** e) for e in range(epochs)]
axes[1].plot(range(1, epochs + 1), lrs_used, "g-o", markersize=6)
axes[1].set_xlabel("Epoch")
axes[1].set_ylabel("Learning Rate")
axes[1].set_title("LEARNING RATE CURVE")
axes[1].set_yscale("log")
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig("loss_graph.png", dpi=150)
plt.show()

print("Loading best model for final evaluation...")
model.load(f"models/model_4_{OUT_H}x{OUT_W}_best.npz")
model.infer(test_gen, test_count, num_samples=5)

In [None]:
# train_gen, train_count = load_samples(train, OUT_H, OUT_W, sigma=2)
# model.load("models/model_4_32x64_e1_2025-11-21_23-50-11.npz")
# model.infer(train_gen, train_count)