### Imports

In [1]:
import numpy as np
import random # for weighted probability

In [2]:
from sklearn.linear_model import LinearRegression
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt

In [None]:
housing = fetch_california_housing(as_frame=True) # Import data set as a data frame

### Genetic Algorithm

In [None]:
class GA:
    
    def __init__(self, fitness, data, target, max_generations, mutation_chance, population_size, n_features, min_exponent=0, max_exponent=1):
        self.fitness = fitness # fitness takes an encoding, data (X) and target (y) which are dataframes
        self.data = data
        self.target = target
        self.max_generations = max_generations
        self.mutation_chance = mutation_chance
        self.min_exponent = min_exponent
        self.max_exponent = max_exponent
        self.population_size = population_size
        self.n_features = n_features

    def run(self):
        max_weight = -np.inf
        best_individual = None
        population = self.get_initial_population()

        for i in range(self.max_generations):
            weights = self.weighted_by(population)

            # save best individual so far. Since entire population is currently overwritten each generation,
            # we might lose the best individual. This is kind of a hacky fix
            cur_max = np.max(weights)
            if cur_max > max_weight:
                max_weight = cur_max
                best_individual = population[np.argmax(weights)]
            
            population = self.get_next_generation(population, weights)
        
        return best_individual, max_weight

    # returns fitness of each individual in the population
    # NOTE weighting is not normalized (it is done implicitly by random.choices)
    def weighted_by(self, population):

        if self.fitness is None:
            weights = np.ones(self.population_size)
        else:
            weights = np.array([self.fitness(individual, self.data, self.target) for individual in population])

        return weights

    # add/subtract 1 to a couple indices? idk what would be best
    # for now, just mutating one index
    def mutate(self, child):
        idx = np.random.randint(self.n_features)

        if child[idx] == self.min_exponent:
            child[idx] += 1
        elif child[idx] == self.max_exponent:
            child[idx] -= 1
        else:
            child[idx] += np.random.choice([1, -1])

        return child

    # given a population and weights, gives the next generation
    # what generation/population model should we use?
    def get_next_generation(self, population, weights):
        next_population = None

        # replace entire population (for now)
        for i in range(self.population_size):

            # random.choices returns list with one element which is the np array (the selected individual)
            # [0] to just get the np array
            parent1 = random.choices(population, weights)[0] # weighted probability (choices should do the normalization for us?)
            parent2 = random.choices(population, weights)[0]
            child = self.reproduce(parent1, parent2)

            if np.random.random() < self.mutation_chance:
                child = self.mutate(child)
            
            if next_population is None:
                next_population = child.reshape((1, self.n_features))
            else:
                # child has to be 2d to append to 2d array (hence reshape)
                next_population = np.append(next_population, child.reshape((1, self.n_features)), axis=0)

        return next_population

    # parents are 1d array w lengths = number of features
    def reproduce(self, parent1, parent2):
        idx = np.random.randint(self.n_features)
        child = np.concatenate((parent1[:idx], parent2[idx:]))
        return child

    # min and max degree are inclusive
    def get_initial_population(self):
        return np.random.randint(self.min_exponent, self.max_exponent + 1, (self.population_size, self.n_features))


### Fitness Function

In [None]:
# I don't have a great motivation for keeping the fitnees_function separate from the GA class other than it
# makes it easier to test different fitness functions should we choose to experiment

# encoding describes the exponent of each feature in our new term
# ex: if our encoding is [1, -1, 0] and our data is [x, y, z] then our new term is x^1 * y^-1 * z^0 = x/y
def fitness_function(encoding, data, target):

    # create new term
    exp_data = np.power(data, encoding)
    term = np.prod(exp_data, axis=1) # multiply across rows
    
    # have to make a copy since we're doing calling this in parallel from a list comprehension
    # (overwriting it each time would create a race condition I think maybe)
    new_data = data.copy()
    new_data["GA_term"] = term

    # evaluate term
    reg = LinearRegression().fit(new_data, target)
    fitness = reg.score(new_data, target)  # R-squared

    # MSE is another option
    # predictions = reg.predict(new_data)
    # fitness = mean_squared_error(target, predictions)

    return fitness

### Testing

In [None]:
# Get testing and training splits
train_set, test_set = train_test_split(housing.frame, test_size=0.2, random_state=42)

train_X = train_set.drop(columns="MedHouseVal")
train_y = train_set["MedHouseVal"]

test_X = test_set.drop(columns="MedHouseVal")
test_y = test_set["MedHouseVal"]

In [7]:
# can play around with the encoding to get fitness of different terms/individuals
encoding = np.array([0, 0, -1, 1, 0, 0, 0, 0]) # bedrooms / room
# encoding = np.array([0, 0, 0, 0, 0, 0, 0, 0]) # original regression with no new terms

fitness_function(encoding, train_X, train_y)

0.6189544423975755

In [16]:
ga = GA(fitness_function,
        train_X,
        train_y,
        max_generations = 100,
        mutation_chance = 0.05,
        population_size = 20,
        n_features = 8,
        min_exponent=-2,
        max_exponent=2)
ga.run()

(array([ 1,  1, -1,  1,  1, -2,  2, -2]), 0.664987464039654)

### TODO:
 - Experiment with different population/generation models
 - Experiment with different mutation functions
 - Add max degree restriction to force simpler terms?
 - Create pipeline to keep finding new terms until $R^2_{adjusted}$ decreases?
 - Compare to other searches?
 - Test on other tabular datasets?
 - Test on other ML algorithms? (different from linear regression)
 - Add termination case for when improvement stops?

### Compare to dummy (selecting terms randomly)

In [15]:
total_individuals = ga.max_generations * ga.population_size

individuals = np.random.randint(ga.min_exponent, ga.max_exponent + 1, (total_individuals, ga.n_features))
weights = np.array([fitness_function(individual, ga.data, ga.target) for individual in individuals])

idx = np.argmax(weights)
print(individuals[idx], weights[idx])

[ 1  0  0  0  0 -2  1 -2] 0.6686303952477901


yikes random search does just as well, if not better...