In [1]:
from sklearn.datasets import load_diabetes
from sklearn.ensemble import RandomForestRegressor
import random
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score


random.seed(0)
np.random.seed(0)

# Load dataset
X, y = load_diabetes(return_X_y=True)
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)
# Train Random Forest
r = RandomForestRegressor()
r.fit(x_train, y_train)
print(r2_score(y_test, r.predict(x_test)))

0.26865181564422547


In [86]:
def safe_add(x, y):
    return np.clip(x + y, -1e6, 1e6)

def safe_sub(x, y):
    return np.clip(x - y, -1e6, 1e6)

def safe_mul(x, y):
    return np.clip(x * y, -1e6, 1e6)

def safe_div(x, y):
    with np.errstate(divide='ignore', invalid='ignore'):
        result = np.true_divide(x, y)
        result[~np.isfinite(result)] = 0
    return np.clip(result, -1e6, 1e6)

FUNCTIONS = [safe_add, safe_sub, safe_mul, safe_div]
FUNCTION_NAMES = ['+', '-', '*', '/']

class Node:
    def __init__(self, function=None, left=None, right=None, value=None):
        self.function = function
        self.left = left
        self.right = right 
        self.value = value

    def evaluate(self, X):
        if self.value is not None:
            return X[:, self.value]
        left_val = self.left.evaluate(X)
        right_val = self.right.evaluate(X)
        try:
            return self.function(left_val, right_val)
        except ZeroDivisionError:
            return np.zeros_like(left_val) 

    def __str__(self):
        if self.value is not None:
            return f"x[{self.value}]"
        return f"({self.left} {FUNCTION_NAMES[FUNCTIONS.index(self.function)]} {self.right})"

def generate_random_tree(depth, num_features):
    if depth == 0 or (depth > 1 and random.random() < 0.3):
        return Node(value=random.randint(0, num_features - 1))
    function = random.choice(FUNCTIONS)
    return Node(function=function,
                left=generate_random_tree(depth-1, num_features),
                right=generate_random_tree(depth-1, num_features))

def fitness(tree, X, y):
    y_pred = tree.evaluate(X)
    r2 = r2_score(y, y_pred)
    return 1 - r2

def crossover(parent1, parent2, num_features):
    if parent1 is None or parent2 is None:
        return parent1 if parent2 is None else parent2
    
    if parent1.value is not None and parent2.value is not None:
        return Node(value=random.choice([parent1.value, parent2.value]))

    new_node = Node(function=random.choice([
        parent1.function if parent1.function else random.choice(FUNCTIONS),
        parent2.function if parent2.function else random.choice(FUNCTIONS)
    ]))

    new_node.left = crossover(parent1.left if parent1.left else Node(value=random.randint(0, num_features - 1)),
                            parent2.left if parent2.left else Node(value=random.randint(0, num_features - 1)), num_features)
    new_node.right = crossover(parent1.right if parent1.right else Node(value=random.randint(0, num_features - 1)),
                            parent2.right if parent2.right else Node(value=random.randint(0, num_features - 1)), num_features)

    return new_node

def mutate(tree: Node, num_features, max_depth):
    if random.random() < 0.4 and tree.function is not None:
        tree.function = random.choice(FUNCTIONS)
    elif random.random() < 0.4 and tree.value is not None:
        tree.value = random.randint(0, num_features - 1)
    else:
        if random.random() < 0.4 and tree.left is not None:
            tree.left = generate_random_tree(max_depth, num_features)
        elif random.random() < 0.4 and tree.right is not None:
            tree.right = generate_random_tree(max_depth, num_features)
        else:
            tree = generate_random_tree(max_depth, num_features)
    return tree

def genetic_programming(X, y, num_generations=10000, population_size=100, max_depth=3):
    population = [generate_random_tree(max_depth, X.shape[1]) for _ in range(population_size)]
    for generation in range(num_generations):
        fitness_scores = np.array([fitness(tree, X, y) for tree in population])
        sorted_indices = np.argsort(fitness_scores)
        sorted_population = [population[i] for i in sorted_indices]

        #print(f"Generacja {generation}, Najlepsze MSE: {min(fitness_scores)}")

        elite_size = 10
        elite_population = sorted_population[:elite_size]
        population = sorted_population[:population_size // 2]
        population = elite_population + population[:population_size - elite_size]
        while len(population) < population_size:
            parent1, parent2 = random.choices(sorted_population[:population_size // 2], k=2)
            child = crossover(parent1, parent2, X.shape[1])
            if random.random() < 0.5:
                child = mutate(child, X.shape[1], max_depth)
            population.append(child)
    return sorted_population[0]

Generowanie cech


In [87]:
best_trees = []
num_features = X.shape[1]
population_size = 1000
num_generations = 200
max_depth = 2

for _ in range(2):
    best_tree = genetic_programming(
        X=x_train,
        y=y_train,
        num_generations=num_generations,
        population_size=population_size,
        max_depth=max_depth
    )
    print(best_tree)
    best_trees.append(best_tree)

new_features_train = np.hstack([tree.evaluate(x_train).reshape(-1, 1) for tree in best_trees])
new_features_test = np.hstack([tree.evaluate(x_test).reshape(-1, 1) for tree in best_trees])

x_train_extended = np.hstack((x_train, new_features_train))
x_test_extended = np.hstack((x_test, new_features_test))

rf = RandomForestRegressor()
rf.fit(x_train_extended, y_train)

r2 = r2_score(y_test, rf.predict(x_test_extended))
print(f"Poprawione R2: {r2}")

(((x[8] + x[2]) / (x[1] * x[1])) - ((x[6] / x[8]) - (x[7] / x[4])))
(((x[7] + x[2]) / (x[1] * x[1])) + ((x[3] / x[3]) + (x[9] / x[8])))
Poprawione R2: 0.32536159472661474


In [6]:
def feature_11(x):
    return (((x[8] + x[2]) / (x[1] * x[1])) - ((x[6] / x[8]) - (x[7] / x[4])))

def feature_12(x):
    return (((x[7] + x[2]) / (x[1] * x[1])) + ((x[3] / x[3]) + (x[9] / x[8])))

In [8]:
new_features = np.array([
    [feature_11(row), feature_12(row)]
    for row in X
])

In [9]:
X_extended = np.hstack((X, new_features))
x_train, x_test, y_train, y_test = train_test_split(X_extended, y, test_size=0.2, random_state=0)
rf = RandomForestRegressor()
rf.fit(x_train, y_train)

r2 = r2_score(y_test, rf.predict(x_test))
print(f"Poprawione R2: {r2}")

Poprawione R2: 0.32536159472661474
