<div>
<img src="https://www.ul.ie/themes/custom/ul/logo.jpg" />
</div>

#**MSc in Artificial Intelligence and Machine Learning**
##CS6271 - Evolutionary Algorithms and Humanoid Robotics 2024
### Kaggle Competition


Module Leader: Conor Ryan

Developer: Allan De Lima

Team Members: Dylan Rodrigues , Tarun Bezawada

In [None]:
!pip install deap

Collecting deap
  Downloading deap-1.4.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (13 kB)
Downloading deap-1.4.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (135 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/135.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.4/135.4 kB[0m [31m6.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: deap
Successfully installed deap-1.4.1


# **Step-1  Import Required Libraries**

In [None]:
import pandas as pd
import numpy as np
import operator
import random
from deap import base, creator, tools, gp, algorithms
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler, OneHotEncoder,LabelEncoder
from multiprocessing import Pool

# **Step-2  Set Random Seed for Reproducibility**

In [None]:
random.seed(42)
np.random.seed(42)

# **Step-3 Load and Preprocess Dataset**

In [None]:
# Load the dataset from a CSV file.
train_data = pd.read_csv('train.csv')

# Separate the dataset into features (X) and target (y).
X = train_data.drop(columns=['output']).values
y = train_data['output'].values

# Standardize the feature data to have zero mean and unit variance.
scaler = StandardScaler()
X = scaler.fit_transform(X)

# Split the standardized data into training (80%) and validation (20%) sets.
# Stratified splitting ensures class distribution remains consistent in both sets.
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)


# **Step-4 Define Custom Operators for Genetic Programming**

In [None]:
def safe_div(x, y):
    """
    Safe division function to handle division by zero.
    If the denominator 'y' is zero, it returns 0 to avoid an error.
    """
    return x / y if y != 0 else 0

def protected_log(x):
    """
    Safe logarithm function to avoid taking log of non-positive numbers.
    Returns log(x) if x is positive, otherwise returns 0.
    """
    return np.log(x) if x > 0 else 0

def protected_sqrt(x):
    """
    Safe square root function to avoid taking the square root of negative numbers.
    Returns sqrt(x) if x is non-negative, otherwise returns 0.
    """
    return np.sqrt(x) if x >= 0 else 0

def generate_random_constant():
    """
    Generates a random constant between -1 and 1.
    This is used as an ephemeral constant in the genetic programming tree.
    """
    return random.uniform(-1, 1)

# **Step-5 Setup DEAP for Genetic Programming**

In [None]:
# Define the fitness function to be maximized (accuracy in this case).
creator.create("FitnessMax", base.Fitness, weights=(1.0,))

# Define an individual as a PrimitiveTree structure with the fitness defined above.
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)

# Create a Primitive Set, where each individual has a number of inputs equal to the number of features in X.
pset = gp.PrimitiveSet("MAIN", X.shape[1])

# Add basic arithmetic operations and mathematical functions to the Primitive Set.
pset.addPrimitive(operator.add, 2)       # Addition
pset.addPrimitive(operator.sub, 2)       # Subtraction
pset.addPrimitive(operator.mul, 2)       # Multiplication
pset.addPrimitive(safe_div, 2)           # Division with error handling
pset.addPrimitive(operator.neg, 1)       # Negation
pset.addPrimitive(protected_log, 1)      # Logarithm with error handling
pset.addPrimitive(protected_sqrt, 1)     # Square root with error handling
pset.addPrimitive(np.sin, 1)             # Sine function
pset.addPrimitive(np.cos, 1)             # Cosine function
pset.addPrimitive(np.tan, 1)             # Tangent function
pset.addEphemeralConstant("rand101", generate_random_constant)  # Random constants between -1 and 1
# Rename the input features to make them more interpretable in the expression trees.
pset.renameArguments(**{f'ARG{i}': f'Feature_{i}' for i in range(X.shape[1])})


# **Step-6 Define Toolbox for Genetic Operations**

In [None]:
# Initialize the toolbox which will contain genetic operators and helper functions.
toolbox = base.Toolbox()

# Define how to generate random expressions for initializing individuals.
toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=1, max_=3)

# Create an individual using the generated expression.
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)

# Create a population consisting of individuals.
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

# Compile the individuals to convert them into callable functions.
toolbox.register("compile", gp.compile, pset=pset)

# **Step-7 Define Fitness Function Using Cross-Validation**

In [None]:
def eval_individual_cv(individual):
    """
    Evaluate the individual's fitness using 5-fold cross-validation.
    The function returns the mean accuracy score over all folds.
    """
    # Compile the individual's expression tree into a callable function.
    func = toolbox.compile(expr=individual)
    skf = StratifiedKFold(n_splits=5)
    accuracies = []

    # Perform cross-validation to evaluate the accuracy.
    for train_idx, val_idx in skf.split(X_train, y_train):
        try:
            # Generate predictions using the individual's function.
            preds = [int(func(*X_train[i]) > 0) for i in train_idx]

            # Calculate accuracy and store it.
            acc = accuracy_score(y_train[train_idx], preds)
            accuracies.append(acc)
        except (OverflowError, ZeroDivisionError, TypeError):
            # Handle any errors by assigning a score of 0.
            accuracies.append(0)

    # Return the mean accuracy over all folds.
    return np.mean(accuracies),

# Register evaluation, selection, mating, and mutation functions.
toolbox.register("evaluate", eval_individual_cv)
toolbox.register("select", tools.selTournament, tournsize=3)
toolbox.register("mate", gp.cxOnePoint)
toolbox.register("expr_mut", gp.genFull, min_=1, max_=2)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)

# Limit the height of individuals to prevent overfitting and excessive complexity.
toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=10))
toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=10))

In [None]:
#Parallelize Evaluation
# Use multiprocessing to speed up the evaluation of individuals.
pool = Pool()
toolbox.register("map", pool.map)

# **Step-8 Set Evolutionary Algorithm Parameters**

In [None]:
# Create an initial population of individuals.
population = toolbox.population(n=750)

# Define a Hall of Fame to store the best individuals found during the evolution.
hof = tools.HallOfFame(5)

# Set up statistics to track during the evolution.
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean)
stats.register("std", np.std)
stats.register("min", np.min)
stats.register("max", np.max)

# Run the evolutionary algorithm for a set number of generations.
pop, log = algorithms.eaMuPlusLambda(
    population, toolbox,
    mu=700, lambda_=1000,
    cxpb=0.6, mutpb=0.4,
    ngen=50,
    stats=stats, halloffame=hof, verbose=True
)

# Close the multiprocessing pool after the evolution is complete.
pool.close()
pool.join()

# Compile and evaluate the best individual on the validation set.
best_ind = hof[0]
best_func = toolbox.compile(expr=best_ind)
val_predictions = [int(best_func(*X_val[i]) > 0) for i in range(len(X_val))]
accuracy = accuracy_score(y_val, val_predictions)
# Print the best individual and its accuracy on the validation set.
print("\nBest Individual:", best_ind)
print("Validation Accuracy:", accuracy)


gen	nevals	avg     	std     	min     	max     
0  	750   	0.505116	0.120598	0.316886	0.698488
1  	1000  	0.608797	0.0719696	0.356062	0.698488
2  	1000  	0.64632 	0.0361601	0.382841	0.707166
3  	1000  	0.66012 	0.0230611	0.447062	0.707166
4  	1000  	0.670927	0.0183544	0.54922 	0.707166
5  	1000  	0.677591	0.0219972	0.380361	0.707166
6  	1000  	0.682494	0.0192937	0.380361	0.707166
7  	1000  	0.687226	0.0156343	0.397719	0.712869
8  	1000  	0.68931 	0.0176949	0.386809	0.722539
9  	1000  	0.691597	0.0156887	0.484007	0.722539
10 	1000  	0.694604	0.0121504	0.614679	0.722539
11 	1000  	0.696369	0.0143706	0.540293	0.722539
12 	1000  	0.698058	0.0133184	0.558145	0.722539
13 	1000  	0.701787	0.0117628	0.63873 	0.729234
14 	1000  	0.704116	0.0118206	0.63873 	0.729234
15 	1000  	0.7073  	0.0116384	0.625589	0.729234
16 	1000  	0.70939 	0.0179469	0.381602	0.733201
17 	1000  	0.713915	0.012325 	0.542276	0.737664
18 	1000  	0.715773	0.0122147	0.612448	0.737664
19 	1000  	0.717099	0.0122491	0.643938	0.7

# **To downlad the output into CSV**

In [None]:

# Load the test data
test_data = pd.read_csv('test.csv')  # Replace 'test.csv' with the actual path

# Preprocess the test data (same as the training data)
X_test = test_data.values
X_test = scaler.transform(X_test)  # Use the same scaler fitted on the training data

# Apply the best individual to the test data
best_ind = hof[0]
best_func = toolbox.compile(expr=best_ind)
test_predictions = [int(best_func(*X_test[i]) > 0) for i in range(len(X_test))]

# Create a submission DataFrame
submission_df = pd.DataFrame({'index': test_data.index, 'output': test_predictions})

# Save to a CSV file
submission_df.to_csv('submission.csv', index=False)

# Download the file (Colab specific)
from google.colab import files
files.download('submission.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>