In [1]:
import numpy as np
import pandas as pd
import datetime

In [2]:
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

data = pd.read_csv('City_Types.csv')
columns_to_scale = list(data.select_dtypes(include='number').columns)
data['Date'] = pd.to_datetime(data['Date'])
data['month'] = data['Date'].dt.month
data['day'] = data['Date'].dt.day
data['weekday'] = data['Date'].dt.weekday
data = data.drop(columns=['Date'])
y = data['Type']
x = data.drop(columns=['Type'])
x = pd.get_dummies(x)
scaler = StandardScaler()
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=1)
x_train[columns_to_scale] = scaler.fit_transform(x_train[columns_to_scale])
x_test[columns_to_scale] = scaler.transform(x_test[columns_to_scale])


In [3]:
def sigmoid(x):
    x = np.clip(x, -500, 500)
    return 1 / (1 + np.exp(-x))

def softmax(x):
    e_x = np.exp(x - np.max(x))
    return e_x / np.sum(e_x)

def cross_entropy(pred):
    return -np.log(pred + 1e-9)

In [4]:
class Neuron:

    def __init__(self, nin: int):
        self.weights = np.array([np.random.normal() for _ in range(nin)], dtype=np.float64)
        self.bias = np.random.normal()

        self.grads = np.zeros(nin, dtype=np.float64)
        self.gradb = np.float32(0.0)
        self.inputs = np.array([], dtype=np.float64)

        self.activ = 0
        self.delta = 0

    def __call__(self, x):
        interm = np.dot(self.weights, x) + self.bias
        self.inputs = np.array(x, dtype=np.float64)
        self.res = interm
        res = sigmoid(interm)
        self.activ = res
        return res

    def __repr__(self):
        # return f'Neuron(Weights:{self.weights}, Bias:{self.bias})'
        return f'Neuron'

    def sigmoid_derivative(self):
        return self.activ * (1 - self.activ)


class Layer:

    def __init__(self, nin: int, nout: int):
        self.neurons = [Neuron(nin) for _ in range(nout)]

    def __call__(self, x):
        res = np.array([n(x) for n in self.neurons])
        return res if len(res) == 2 else res

    def __repr__(self):
        return f'Layer: {[x for x in self.neurons]}'

    def parameters(self):
        return [x for x in self.neurons]

class MLP:

    def __init__(self, nin: int, nouts: list):
        sz = [nin] + nouts
        self.nn = [Layer(sz[i], sz[i+1]) for i in range(len(nouts))]

    def __call__(self, x):
        for layer in self.nn:
            x = layer(x)
        return x

    def __repr__(self):
        return f'NN: {[x for x in self.nn]}'

    def parameters(self):
        return [p for x in self.nn for p in x.parameters()]

    def backprop(self, output: np.ndarray, pred: int):
        # Compute gradient for softmax + cross-entropy
        # The gradient is: predicted_probs - one_hot_true_label
        grad = output.copy()
        grad[pred] -= 1  # This is correct IF output contains the softmax probabilities

        # Backprop through output layer
        last_layer = self.nn[-1]
        for i, neuron in enumerate(last_layer.neurons):
            delta = grad[i]
            neuron.gradb = delta
            neuron.grads = delta * neuron.inputs
            neuron.delta = delta

        # Backprop through hidden layers
        for layer_idx in range(len(self.nn) - 2, -1, -1):
            layer = self.nn[layer_idx]
            layer_n = self.nn[layer_idx + 1]
            for i, neuron in enumerate(layer.neurons):
                downstream = sum(n.weights[i] * n.delta for n in layer_n.neurons)
                neuron.delta = downstream * neuron.sigmoid_derivative()
                neuron.grads = neuron.delta * neuron.inputs
                neuron.gradb = neuron.delta

        # Gradient descent
        step = 0.01  # Changed from -0.01
        for neuron in self.parameters():
            neuron.weights -= step * neuron.grads  # Now we subtract (gradient descent)
            neuron.bias -= step * neuron.gradb

        # Reset gradients
        for neuron in self.parameters():
            neuron.grads = np.zeros_like(neuron.grads)
            neuron.gradb = 0.0

In [16]:
x_train = np.array(x_train)
y_train = np.array(y_train)
x_test = np.array(x_test)
y_test = np.array(y_test)

nin = len(x_train[0])
mlp = MLP(nin, [8, 2])

In [17]:
epochs = 50
losses = []
prediction = ['Industrial', 'Residential']

# Before training, reinitialize with smaller weights
for neuron in mlp.parameters():
    nin = len(neuron.weights)
    limit = np.sqrt(1.0 / nin)
    neuron.weights = np.random.uniform(-limit, limit, nin).astype(np.float64)
    neuron.bias = 0.0

for epoch in range(epochs):
    total_loss = 0
    correct = 0  # Track accuracy

    for x, y in zip(x_train, y_train):
        out = mlp(x)
        out = softmax(out)
        loss = cross_entropy(out[prediction.index(y)])
        total_loss += loss

        # Check if prediction is correct
        if np.argmax(out) == prediction.index(y):
            correct += 1

        mlp.backprop(out, prediction.index(y))

    avg_loss = total_loss / len(x_train)
    accuracy = correct / len(x_train)
    losses.append(avg_loss)

    if epoch % 10 == 0:  # Print every 10 epochs
        print(f"Epoch {epoch} | Loss: {avg_loss:.4f} | Accuracy: {accuracy:.4f}")

Epoch 0 | Loss: 0.3637 | Accuracy: 0.9492
Epoch 10 | Loss: 0.3135 | Accuracy: 0.9998
Epoch 20 | Loss: 0.3136 | Accuracy: 0.9996
Epoch 30 | Loss: 0.3143 | Accuracy: 0.9990
Epoch 40 | Loss: 0.3140 | Accuracy: 0.9993
Epoch 50 | Loss: 0.3269 | Accuracy: 0.9864


KeyboardInterrupt: 

In [None]:
# First, check your dataset for data leakage
print("Dataset info:")
print(f"Training samples: {len(x_train)}")
print(f"Test samples: {len(x_test)}")
print(f"Number of features: {len(x_train[0])}")
print(f"\nClass distribution in training:")
print(pd.Series(y_train).value_counts())

# Check a few predictions
print("\n" + "="*50)
print("SAMPLE PREDICTIONS (first 10):")
print("="*50)

for i in range(min(10, len(x_train))):
    x, y = x_train[i], y_train[i]
    out = mlp(x)
    out = softmax(out)
    pred_label = prediction[np.argmax(out)]
    true_idx = prediction.index(y)

    print(f"Sample {i}:")
    print(f"  True: {y}, Predicted: {pred_label}")
    print(f"  Probabilities: Industrial={out[0]:.4f}, Residential={out[1]:.4f}")
    print(f"  Loss: {cross_entropy(out[true_idx]):.4f}")
    print()

Dataset info:
Training samples: 42163
Test samples: 10541
Number of features: 15

Class distribution in training:
Residential    21131
Industrial     21032
Name: count, dtype: int64

SAMPLE PREDICTIONS (first 10):
Sample 0:
  True: Residential, Predicted: Residential
  Probabilities: Industrial=0.2689, Residential=0.7311
  Loss: 0.3133

Sample 1:
  True: Industrial, Predicted: Industrial
  Probabilities: Industrial=0.7311, Residential=0.2689
  Loss: 0.3133

Sample 2:
  True: Industrial, Predicted: Industrial
  Probabilities: Industrial=0.7311, Residential=0.2689
  Loss: 0.3133

Sample 3:
  True: Residential, Predicted: Residential
  Probabilities: Industrial=0.2689, Residential=0.7311
  Loss: 0.3133

Sample 4:
  True: Residential, Predicted: Residential
  Probabilities: Industrial=0.2689, Residential=0.7311
  Loss: 0.3133

Sample 5:
  True: Industrial, Predicted: Industrial
  Probabilities: Industrial=0.7311, Residential=0.2689
  Loss: 0.3133

Sample 6:
  True: Residential, Predicted: 

In [14]:
# Load the original data and check
data = pd.read_csv('City_Types.csv')
print("\nOriginal data columns:")
print(data.columns.tolist())
print("\nFirst few rows:")
print(data.head())

# Check if there's any column that perfectly predicts the Type
print("\nChecking for perfect correlations with Type...")
for col in data.select_dtypes(include='number').columns:
    if col != 'Type':
        correlation = data.groupby('Type')[col].mean()
        print(f"{col}:")
        print(correlation)
        print()


Original data columns:
['Date', 'City', 'CO', 'NO2', 'SO2', 'O3', 'PM2.5', 'PM10', 'Type']

First few rows:
                        Date    City     CO   NO2   SO2    O3  PM2.5  PM10  \
0  2024-01-01 00:00:00+00:00  Moscow  208.0  15.9  13.2  44.0    8.6   9.4   
1  2024-01-01 01:00:00+00:00  Moscow  207.0  17.4  13.7  44.0    8.6  10.5   
2  2024-01-01 02:00:00+00:00  Moscow  217.0  19.0  15.5  43.0   10.4  12.9   
3  2024-01-01 03:00:00+00:00  Moscow  231.0  21.0  20.7  36.0   12.3  15.3   
4  2024-01-01 04:00:00+00:00  Moscow  263.0  34.5  27.2  27.0   13.6  20.0   

         Type  
0  Industrial  
1  Industrial  
2  Industrial  
3  Industrial  
4  Industrial  

Checking for perfect correlations with Type...
CO:
Type
Industrial     795.402474
Residential    220.658470
Name: CO, dtype: float64

NO2:
Type
Industrial     42.450565
Residential    16.782419
Name: NO2, dtype: float64

SO2:
Type
Industrial     42.139595
Residential     2.634904
Name: SO2, dtype: float64

O3:
Type
Industri

In [8]:
def evaluate_model(mlp, x_test, y_test, prediction_classes):
    """
    Evaluate the model on test data

    Parameters:
    - mlp: trained MLP model
    - x_test: test features
    - y_test: test labels
    - prediction_classes: list of class names (e.g., ['Industrial', 'Residential'])

    Returns:
    - accuracy: test accuracy
    - predictions: list of predicted labels
    """
    correct = 0
    predictions = []
    test_loss = 0

    for x, y in zip(x_test, y_test):
        # Forward pass
        out = mlp(x)
        out = softmax(out)

        # Get prediction
        pred_idx = np.argmax(out)
        pred_label = prediction_classes[pred_idx]
        predictions.append(pred_label)

        # Calculate accuracy
        if pred_label == y:
            correct += 1

        # Calculate loss
        true_idx = prediction_classes.index(y)
        test_loss += cross_entropy(out[true_idx])

    accuracy = correct / len(x_test)
    avg_loss = test_loss / len(x_test)

    print(f"\n{'='*50}")
    print(f"TEST SET RESULTS")
    print(f"{'='*50}")
    print(f"Test Accuracy: {accuracy:.4f} ({correct}/{len(x_test)})")
    print(f"Test Loss: {avg_loss:.4f}")
    print(f"{'='*50}\n")

    # Confusion matrix
    from collections import Counter
    true_counts = Counter(y_test)
    pred_counts = Counter(predictions)

    print("True distribution:")
    for label, count in true_counts.items():
        print(f"  {label}: {count}")

    print("\nPredicted distribution:")
    for label, count in pred_counts.items():
        print(f"  {label}: {count}")

    # Calculate per-class accuracy
    print("\nPer-class accuracy:")
    for class_name in prediction_classes:
        class_correct = sum(1 for true, pred in zip(y_test, predictions)
                           if true == class_name and pred == class_name)
        class_total = sum(1 for true in y_test if true == class_name)
        if class_total > 0:
            class_acc = class_correct / class_total
            print(f"  {class_name}: {class_acc:.4f} ({class_correct}/{class_total})")

    return accuracy, predictions

# Use it like this:
accuracy, predictions = evaluate_model(mlp, x_test, y_test, prediction)


TEST SET RESULTS
Test Accuracy: 1.0000 (10541/10541)
Test Loss: 0.3133

True distribution:
  Residential: 5221
  Industrial: 5320

Predicted distribution:
  Residential: 5221
  Industrial: 5320

Per-class accuracy:
  Industrial: 1.0000 (5320/5320)
  Residential: 1.0000 (5221/5221)
