In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np  # linear algebra
import pandas as pd  # data processing, CSV file I/O (e.g. pd.read_csv)
from datetime import timedelta

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os

for dirname, _, filenames in os.walk("/kaggle/input"):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"

# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session
file_train_data = "data/train.csv"
file_test_data = "data/test.csv"

In [2]:
import csv


train_csv = []
with open(file_train_data, "r") as f:
    reader = csv.DictReader(f)

    for row in reader:
        train_csv.append(row)

print(train_csv[:3])

test_csv = []
with open(file_test_data, "r") as f:
    reader = csv.DictReader(f)

    for row in reader:
        test_csv.append(row)

print(test_csv[:3])        

[{'PassengerId': '1', 'Survived': '0', 'Pclass': '3', 'Name': 'Braund, Mr. Owen Harris', 'Sex': 'male', 'Age': '22', 'SibSp': '1', 'Parch': '0', 'Ticket': 'A/5 21171', 'Fare': '7.25', 'Cabin': '', 'Embarked': 'S'}, {'PassengerId': '2', 'Survived': '1', 'Pclass': '1', 'Name': 'Cumings, Mrs. John Bradley (Florence Briggs Thayer)', 'Sex': 'female', 'Age': '38', 'SibSp': '1', 'Parch': '0', 'Ticket': 'PC 17599', 'Fare': '71.2833', 'Cabin': 'C85', 'Embarked': 'C'}, {'PassengerId': '3', 'Survived': '1', 'Pclass': '3', 'Name': 'Heikkinen, Miss. Laina', 'Sex': 'female', 'Age': '26', 'SibSp': '0', 'Parch': '0', 'Ticket': 'STON/O2. 3101282', 'Fare': '7.925', 'Cabin': '', 'Embarked': 'S'}]
[{'PassengerId': '892', 'Pclass': '3', 'Name': 'Kelly, Mr. James', 'Sex': 'male', 'Age': '34.5', 'SibSp': '0', 'Parch': '0', 'Ticket': '330911', 'Fare': '7.8292', 'Cabin': '', 'Embarked': 'Q'}, {'PassengerId': '893', 'Pclass': '3', 'Name': 'Wilkes, Mrs. James (Ellen Needs)', 'Sex': 'female', 'Age': '47', 'SibSp'

In [3]:
import copy

def parse_data(lines):
    # First we filter out the data we need
    data = []
    for l in lines:
        data.append(
            {
                "id": l["PassengerId"],
                "survived": l.get("Survived", 0),
                "class": l["Pclass"],
                "sex": l["Sex"],
                "age": l["Age"],
                "sibsp": l["SibSp"],
                "parch": l["Parch"],
                "fare": l.get("Fare", 0),
                "embarked": l["Embarked"],
            }
        )

    # Normalize data
    norm_sex = {"male": 0, "female": 1}
    norm_embarked = {"S": 0, "C": 1, "Q": 2, "": 3}

    normalized_data = []
    # Numberize values
    for row in data:
        # Check for errors
        if norm_sex.get(row["sex"], "unknown") == "unknown":
            print(row["id"], row["sex"])
        if norm_embarked.get(row["embarked"], "unknown") == "unknown":
            print(row["id"], row["embarked"])

        nd = copy.deepcopy(row)

        nd["sex"] = norm_sex[row["sex"]]
        nd["fare"] = float(row["fare"]) if row["fare"] != "" else 0
        nd["embarked"] = norm_embarked[row["embarked"]]
        nd["age"] = float(row["age"]) if row["age"] != "" else 0

        normalized_data.append(nd)


    keys = list(normalized_data[0].keys())

    # Extract min max of each key
    keys_minmax = {}
    for k in keys:
        values = [d[k] for d in normalized_data if k in d]
        # print(values)
        min_value, max_value = float(min(values)), float(max(values))
        keys_minmax[k] = {"min": min_value, "max": max_value}

    # Normalize 0 to 1 all keys
    for r in normalized_data:
        for k in keys:
            if k == "id":
                continue
            value = float(r[k])
            minmax = keys_minmax[k]
            norm_value = (value - minmax["min"]) / (minmax["max"] - minmax["min"]) if (minmax["max"] - minmax["min"]) != 0 else 0.0
            r[k] = norm_value
            # print(keys_minmax[k], value, norm_value)

    # Split data into input -> expected output
    train_data = []
    for r in normalized_data:
        train_data.append(
            {
                "id": r["id"],
                "input": list(
                    {k: v for k, v in r.items() if k not in ["survived", "id"]}.values()
                ),
                "output": [r["survived"]],
            }
        )

    return train_data

train_data = parse_data(train_csv)
print(train_data[:3])

test_data = parse_data(test_csv)
print(test_data[:3])

[{'id': '1', 'input': [1.0, 0.0, 0.275, 0.125, 0.0, 0.014151057562208049, 0.0], 'output': [0.0]}, {'id': '2', 'input': [0.0, 1.0, 0.475, 0.125, 0.0, 0.13913573538264068, 0.3333333333333333], 'output': [1.0]}, {'id': '3', 'input': [1.0, 1.0, 0.325, 0.0, 0.0, 0.015468569817999833, 0.0], 'output': [1.0]}]
[{'id': '892', 'input': [1.0, 0.0, 0.45394736842105265, 0.0, 0.0, 0.015281580671177828, 1.0], 'output': [0.0]}, {'id': '893', 'input': [1.0, 1.0, 0.618421052631579, 0.125, 0.0, 0.013663090060062943, 0.0], 'output': [0.0]}, {'id': '894', 'input': [0.5, 0.0, 0.8157894736842105, 0.0, 0.0, 0.018908740708122825, 1.0], 'output': [0.0]}]


In [None]:
import torch.nn as nn
import torch
import numpy as np
from typing import List

# Determine the best available device
def get_device():
    if torch.cuda.is_available():
        return torch.device("cuda")
    elif torch.backends.mps.is_available():
        return torch.device("mps")
    else:
        return torch.device("cpu")


# DEVICE = torch.device("cpu")
DEVICE = get_device()


class SimpleNeuralNetwork(nn.Module):
    """Simple feedforward neural network using PyTorch"""

    def __init__(
        self,
        input_size: int = 16,
        hidden_layers: List[int] = [256],
        output_size: int = 4,
        empty: bool = False,
    ):
        super().__init__()

        if empty:
            return

        self.input_size = input_size
        self.hidden_layers = hidden_layers
        self.output_size = output_size

        # Build layers using PyTorch modules
        layers = []
        prev_size = input_size

        # Add hidden layers
        for hidden_size in hidden_layers:
            layers.append(nn.Linear(prev_size, hidden_size))
            layers.append(nn.ReLU())
            # layers.append(nn.Sigmoid())
            prev_size = hidden_size

        # Add output layer (no activation)
        layers.append(nn.Linear(prev_size, output_size))

        self.network = nn.Sequential(*layers)

        # Initialize weights using He initialization
        self._initialize_weights()

        # Move to device
        self.to(DEVICE)

    def _initialize_weights(self):
        """Initialize weights using He initialization"""
        for module in self.modules():
            if isinstance(module, nn.Linear):
                nn.init.kaiming_normal_(module.weight, nonlinearity="tanh")
                nn.init.zeros_(module.bias)

    def forward(self, x):
        """Forward pass through the network"""
        # Convert numpy array to tensor if needed and move to device
        if isinstance(x, np.ndarray):
            x = torch.from_numpy(x).float().to(DEVICE)
        elif isinstance(x, torch.Tensor):
            x = x.to(DEVICE)
        elif isinstance(x, list):
            x = torch.tensor(x, dtype=torch.float32).to(DEVICE)

        return self.network(x)

    def mutate(self, mutation_rate: float = 0.1, mutation_strength: float = 0.5):
        """Mutate the network's weights and biases"""
        with torch.no_grad():
            for param in self.parameters():
                if torch.rand(1).item() < mutation_rate:
                    mutation = torch.randn_like(param) * mutation_strength
                    param.add_(mutation)

In [5]:
from joblib import Parallel, delayed
import joblib
from typing import List
from typing import Tuple
import numpy as np
import random


class EvolutionaryOptimizer:
    def __init__(
        self,
        population_size: int = 50,
        elite_size: int = 10,
        new_members: int = 0,
        mutation_rate: float = 0.1,
        mutation_strength: float = 0.5,
        input_size: int = 8,
        hidden_layers: List[int] = [32],
        output_size: int = 1,
        train_data: List = [],
    ):
        self.population_size = population_size
        self.elite_size = elite_size
        self.new_members = new_members
        self.mutation_rate = mutation_rate
        self.mutation_strength = mutation_strength
        self.input_size = input_size
        self.output_size = output_size
        self.hidden_layers = hidden_layers
        self.train_data = train_data

        # Create initial population
        self.population = []
        for _ in range(population_size):
            network = SimpleNeuralNetwork(
                input_size=input_size,
                output_size=output_size,
                hidden_layers=hidden_layers,
            )
            self.population.append(network)

    def evaluate(
        self,
    ) -> List[Tuple[SimpleNeuralNetwork, int, float]]:
        def eval_network(network: SimpleNeuralNetwork):
            # Set network in eval mode
            network.eval()
            scores = []
            with torch.no_grad():
                for test in self.train_data:
                    input_values = test['input']
                    # print(input_values)
                    prediction = network.forward(input_values)
                    # prediction = prediction.cpu()
                    prediction = True if prediction[0] > .5 else False
                    # print(prediction)
                    reality = True if test['output'][0] == 1.0 else False
                    # print(test['output'], reality)
                    scores.append(1 if prediction == reality else 0)

            return (network, sum(scores) / len(scores))


        results = Parallel(n_jobs=joblib.cpu_count())(
            delayed(eval_network)(net) for net in self.population
        )

        return results

    def select_and_breed(
        self, evaluated: List[Tuple[SimpleNeuralNetwork, int, float]]
    ) -> None:
        # Sort by score descending
        evaluated.sort(key=lambda x: x[1], reverse=True)
        elite = evaluated[: self.elite_size]

        new_population = []
        # Keep elite networks
        for net, _ in elite:
            new_population.append(net)

        # Create offspring by mutating elite networks
        while len(new_population) < self.population_size:
            parent = random.choice(elite)[0]

            # Create a child by copying the parent's state
            child = copy.deepcopy(parent)

            # Mutate the child
            child.mutate(self.mutation_rate, self.mutation_strength)
            new_population.append(child)

        # Add random new members
        for _ in range(self.new_members):
            network = SimpleNeuralNetwork(hidden_layers=self.hidden_layers, input_size=self.input_size, output_size=self.output_size)
            new_population.append(network)

        self.population = new_population

    def run_generation(
        self,
    ) -> Tuple[List[SimpleNeuralNetwork], float]:
        evaluated = self.evaluate()
        best_precision = max(precision for _, precision in evaluated)

        self.select_and_breed(evaluated)

        return self.population, best_precision

In [6]:
import pickle

def save_network(network: SimpleNeuralNetwork, filename: str):
    torch.save(network.state_dict(), filename)


def load_network(filename: str, hidden_layers: List[int]) -> SimpleNeuralNetwork:
    network = SimpleNeuralNetwork(hidden_layers=hidden_layers)
    network.load_state_dict(torch.load(filename, map_location=DEVICE))
    network.to(DEVICE)
    return network


def save_population(population: List[SimpleNeuralNetwork], filename: str):
    with open(filename, "wb") as f:
        pickle.dump(population, f)


def load_population(filename: str) -> List[SimpleNeuralNetwork]:
    with open(filename, "rb") as f:
        population = pickle.load(f)
    return population

In [None]:
import time

generations = 10
hidden_layers = [16, 16, 16]
mutation_rate = 0.5
mutation_strength = 0.25

folder = f"networks/{'_'.join(str(x) for x in hidden_layers)}"
os.makedirs(folder, exist_ok=True)
print(f"Saving networks to folder: {folder}")

scores_best = []

optimizer = EvolutionaryOptimizer(
    population_size=100,
    elite_size=20,
    new_members=0,
    mutation_rate=mutation_rate,
    mutation_strength=mutation_strength,
    input_size=7,
    hidden_layers=hidden_layers,
    output_size=1,
    train_data=train_data
)

def mutation_decay(x, mu0=0.25, k=3.0):
    return mu0 * np.exp(-k * x)

start_time = time.time()

epoch = 0
while True:
    print(f"Epoch {epoch+1} starting...")
    epoch += 1
    for gen in range(generations):
        optimizer.mutation_strength = mutation_decay(
            gen / generations, k=4, mu0=mutation_strength
        )
        optimizer.mutation_rate = mutation_decay(gen / generations, k=4, mu0=mutation_rate)

        (population, best_precision) = optimizer.run_generation()
        print(f"Epoch: {epoch} {gen+1}/{generations} - Best Precision: {best_precision:.4f}")

    elapsed_time = time.time() - start_time
    duration = str(timedelta(seconds=(elapsed_time)))
    print(f"Epoch {epoch} completed after {duration}")
    save_population(optimizer.population, f"{folder}/population_epoch_{epoch}.pkl")

Saving networks to folder: networks/16_16_16
Epoch 1 starting...
Epoch: 1 1/10 - Best Precision: 0.7868
Epoch: 1 2/10 - Best Precision: 0.7879
Epoch: 1 3/10 - Best Precision: 0.7969
Epoch: 1 4/10 - Best Precision: 0.7969
Epoch: 1 5/10 - Best Precision: 0.8002
Epoch: 1 6/10 - Best Precision: 0.8025
Epoch: 1 7/10 - Best Precision: 0.8036
Epoch: 1 8/10 - Best Precision: 0.8047
Epoch: 1 9/10 - Best Precision: 0.8058
Epoch: 1 10/10 - Best Precision: 0.8058
Epoch 1 completed after 0:01:00.361474
Epoch 2 starting...


In [None]:
# Generate prediction
network = optimizer.population[0]

predictions = []
for test in test_data:
    input_values = test['input']
    prediction = network.forward(input_values)
    prediction = prediction.cpu()
    prediction = True if prediction[0] > .5 else False
    # print(f"Passenger ID: {test['id']}, Survived: {1 if prediction else 0}")
    predictions.append([test['id'], 1 if prediction else 0])

# Save predictions to CSV
with open("submission.csv", "w", newline="") as csvfile:
    fieldnames = ["PassengerId", "Survived"]
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

    writer.writeheader()
    for pid, survived in predictions:
        writer.writerow({"PassengerId": pid, "Survived": survived})
