In [None]:
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import PolynomialFeatures
import random

In [None]:
class RecursiveFeatureEngineering:
    def __init__(self, model, num_of_features=10, importance_threshold=0):
        """
        Initializes the RecursiveFeatureEngineering class.
        
        Parameters:
        - model: The machine learning model to be used for evaluating feature importance.
        - num_of_features: The number of features to generate during the recursive process.
        - importance_threshold: The threshold for feature importance; features with lower importance will be pruned.
        """
        self.model = model
        self.num_of_features = num_of_features
        self.importance_threshold = importance_threshold
        
        # transformations that apply to a single feature
        self.single_transformations = [
            self.create_log_feature,           
            self.create_polynomial_2_feature,    
            self.create_polynomial_3_feature,    
            self.create_polynomial_4_feature,  
            self.create_exponential_feature,    
            self.create_square_root_feature,     
            self.create_inverse_feature,        
            self.create_absolute_feature,            
            self.create_root_log_feature         
        ]
        
        # transformations that apply to pairs of features
        self.double_transformations = [
            self.create_multiplication_feature, 
            self.create_division_feature, 
            self.create_addition_feature, 
            self.create_subtraction_feature, 
            self.create_polynomial_feature
        ]


    def create_polynomial_2_feature(self, X, feature):
        """ Creates a degree 2 polynomial feature """
        new_X = X.copy()
        poly = PolynomialFeatures(degree=2, include_bias=False)
        clipped_feature = np.clip(X[feature], -1e6, 1e6)  # Prevent overflow
        poly_features = poly.fit_transform(clipped_feature.values.reshape(-1, 1))
        new_X[f'{feature}^2'] = np.clip(poly_features[:, 1], -1e10, 1e10)  
        return new_X

    def create_polynomial_3_feature(self, X, feature):
        """ Creates a degree 3 polynomial feature  """
        new_X = X.copy()
        poly = PolynomialFeatures(degree=3, include_bias=False)
        clipped_feature = np.clip(X[feature], -1e6, 1e6)  # Prevent overflow
        poly_features = poly.fit_transform(clipped_feature.values.reshape(-1, 1))
        new_X[f'{feature}^3'] = np.clip(poly_features[:, 2], -1e10, 1e10) 
        return new_X

    def create_polynomial_4_feature(self, X, feature):
        """ Creates a degree 4 polynomial feature """
        new_X = X.copy()
        poly = PolynomialFeatures(degree=4, include_bias=False)
        clipped_feature = np.clip(X[feature], -1e6, 1e6)  # Prevent overflow
        poly_features = poly.fit_transform(clipped_feature.values.reshape(-1, 1))
        new_X[f'{feature}^4'] = np.clip(poly_features[:, 3], -1e10, 1e10) 
        return new_X


    def create_log_feature(self, X, feature):
        """ Creates a logarithmic feature """
        new_X = X.copy()
        valid_mask = (X[feature] > 0) & X[feature].notna() # ensuring only positive values are transformed.
        new_X.loc[valid_mask, f'log_{feature}'] = np.log1p(X.loc[valid_mask, feature]) 
        new_X.loc[~valid_mask, f'log_{feature}'] = 0  
        return new_X

    def create_exponential_feature(self, X, feature):
        """ Creates an exponential feature """
        new_X = X.copy()
        max_val = np.log(1e10)  # Prevents overflow
        clipped_feature = np.clip(X[feature], -max_val, max_val)
        new_X[f'exp_{feature}'] = np.exp(clipped_feature)
        return new_X

    def create_inverse_feature(self, X, feature):
        """ Creates an inverse feature """
        new_X = X.copy()
        safe_values = np.where(np.abs(X[feature]) < 1e-5, 1e-5, X[feature])  # Replace near zero values
        new_X[f'inverse_{feature}'] = 1 / safe_values
        return new_X

    def create_absolute_feature(self, X, feature):
        """ Creates an absolute value feature. """
        new_X = X.copy()
        new_X[f'abs_{feature}'] = np.abs(X[feature])
        return new_X

    def create_square_root_feature(self, X, feature):
        """ Creates a square root feature """
        new_X = X.copy()
        new_X[f'sqrt_{feature}'] = np.sqrt(np.maximum(X[feature], 0))
        return new_X

    def create_root_log_feature(self, X, feature):
        """ Creates a root log feature """
        new_X = X.copy()
        valid_mask = (X[feature] > 0) & X[feature].notna()
        new_X.loc[valid_mask, f'root_log_{feature}'] = np.sqrt(np.log1p(X.loc[valid_mask, feature]))
        new_X.loc[~valid_mask, f'root_log_{feature}'] = 0  # Assign 0 to invalid values
        return new_X

    def create_multiplication_feature(self, X, feature1, feature2):
        """ Creates a multiplication feature """
        new_X = X.copy()
        new_X[f"{feature1}_times_{feature2}"] = np.clip(X[feature1] * X[feature2], -1e10, 1e10)
        return new_X

    def create_division_feature(self, X, feature1, feature2):
        """ Creates a division feature """
        new_X = X.copy()
        safe_values = np.where(np.abs(X[feature2]) < 1e-5, 1e-5, X[feature2])  # Prevent division by zero
        new_X[f"{feature1}_divided_by_{feature2}"] = X[feature1] / safe_values
        return new_X

    def create_addition_feature(self, X, feature1, feature2):
        """ Creates an addition feature """
        new_X = X.copy()
        new_X[f"{feature1}_plus_{feature2}"] = np.clip(X[feature1] + X[feature2], -1e10, 1e10)
        return new_X

    def create_subtraction_feature(self, X, feature1, feature2):
        """ Creates a subtraction feature """
        new_X = X.copy()
        new_X[f"{feature1}_minus_{feature2}"] = np.clip(X[feature1] - X[feature2], -1e10, 1e10)
        return new_X

    def create_polynomial_feature(self, X, feature1, feature2):
        """ Creates a polynomial feature of two variables """
        new_X = X.copy()
        poly = PolynomialFeatures(degree=2, include_bias=False)
        clipped_features = np.clip(X[[feature1, feature2]], -1e6, 1e6)
        poly_features = poly.fit_transform(clipped_features)
        new_X[f"poly_{feature1}_and_{feature2}"] = np.clip(poly_features.sum(axis=1), -1e10, 1e10)
        return new_X

    def add_single_feature(self, X, y, feature_importance, score):
        """
        Recursively adds a single feature transformation
        
        Parameters:
        - X: The feature matrix.
        - y: The target variable.
        - feature_importance: The importance of each feature
        - score: The current model score
        
        Returns:
        - X (modified feature matrix), new score (model evaluation score).
        """
        if not self.single_transformations: # Stopping Criterion for the recursion
            return X, score

        random_transform = random.choice(self.single_transformations)  # Randomly select a transformation

        feature_importance = feature_importance.sort_values(ascending=False)
        for feature, _ in feature_importance.items():
            new_X = random_transform(X, feature)  # Apply transformation
            if new_X.columns.equals(X.columns):  # Skip if no new feature is created
                continue
            _, new_score = self.evaluate_features(new_X, y) # evalute the performance of the model on the new set
            if new_score < score:
                return new_X, new_score  # Keep new feature if it improves performance
            else:
                continue
        
        self.single_transformations.remove(random_transform)  # Remove ineffective transformation
        return self.add_single_feature(X, y, feature_importance, score) # continue to the next transformation 

    def add_double_feature(self, X, y, score):
        """
        Recursively adds a double feature transformation
        
        Parameters:
        - X: The feature matrix.
        - y: The target variable.
        - score: The current model score.
        
        Returns:
        - X (modified feature matrix), new score (model evaluation score).
        """
        if not self.double_transformations: # Stopping Criterion for the recursion
            return X, score
        
        random_transform = random.choice(self.double_transformations) # Randomly select a transformation

        # compute a Corralation matrix to order the feature cuples
        Corralation_matrix = X.corr().abs()
        Corralation_matrix = Corralation_matrix.unstack().sort_values(ascending=False)
        Corralation_matrix = Corralation_matrix[Corralation_matrix.index.get_level_values(0) != Corralation_matrix.index.get_level_values(1)] 

        for feature1, feature2 in Corralation_matrix.index:
            if feature1 in X.columns and feature2 in X.columns:
                new_X = random_transform(X, feature1, feature2) # Apply transformation 
                if new_X.columns.equals(X.columns): # Skip if no new feature is created
                    continue
                _, new_score = self.evaluate_features(new_X, y) # evalute the performance of the model on the new set
                if new_score < score:
                    return new_X, new_score # Keep new feature if it improves performance
                else:
                    continue

        self.double_transformations.remove(random_transform) # Remove ineffective transformation
        return self.add_double_feature(X, y,score) # continue to the next transformation 

    def generate_feature_combinations(self, X, y):
        """
        Generates combinations of features for recursive feature engineering.
        
        Parameters:
        - X: The feature matrix.
        - y: The target variable.
        
        Returns:
        - X (modified feature matrix), score (model evaluation score).
        """
        feature_importance, score = self.evaluate_features(X, y) # evalute the model for a base score
        num_features = random.randint(1, 2)  # Decide whether to do a one or two feature transformation

        if num_features == 1:
            return self.add_single_feature(X, y, feature_importance, score)
        else:
            return self.add_double_feature(X, y, score)

    def prune_features(self, X, y):
        """
        Prunes features that are below the importance threshold.
        
        Parameters:
        - X: The feature matrix.
        - y: The target variable.
        
        Returns:
        - X (modified feature matrix), score (model evaluation score).
        """
        feature_importance, score = self.evaluate_features(X, y) # evalute the model for a base score and feature importance table
        feature_importance = feature_importance[feature_importance > self.importance_threshold] # remove the feature with low importamce 
        new_X = X[feature_importance.index]
        _, new_score = self.evaluate_features(new_X, y)
        if(new_score < score):
            return new_X, new_score # if the model performed better, keep the change
        else:
            return X, score

    def evaluate_features(self, X, y):
        """
        Evaluates the model performance based on the feature matrix and target variable.
        
        Parameters:
        - X: The feature matrix.
        - y: The target variable.
        
        Returns:
        - feature_importances: The importance of each feature.
        - score: The model evaluation score (RMSE).
        """
        X = X.select_dtypes(include=['number']).copy()
        X.replace([np.inf, -np.inf], np.nan, inplace=True)  # Handle infinite values
        X.fillna(X.mean(numeric_only=True), inplace=True)  # Fill missing values with column mean
        X = X.astype(np.float32)  # Ensure numerical consistency
        y = y.fillna(y.mean()).astype(np.float32)

        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # split the data
        self.model.fit(X_train, y_train)
        y_pred = self.model.predict(X_test)
        score = np.sqrt(mean_squared_error(y_test, y_pred))  # Compute RMSE

        feature_importances = pd.Series(self.model.feature_importances_, index=X.columns)
        return feature_importances, score

    def fit_transform(self, X, y):
        """
        Performs recursive feature engineering and returns the best feature set.
        
        Parameters:
        - X: The feature matrix.
        - y: The target variable.
        
        Returns:
        - X (final feature matrix), best_score (model evaluation score).
        """
        print("Starting recursive feature engineering...")
        X = X.select_dtypes(include=['number']).copy()
        X.replace([np.inf, -np.inf], np.nan, inplace=True)  # Handle infinite values
        X.fillna(X.mean(numeric_only=True), inplace=True)  # Fill missing values with column mean
        X = X.astype(np.float32)  # Ensure numerical consistency
        y = y.fillna(y.mean()).astype(np.float32)

        iteration = 0
        best_score = float('inf')

        while iteration < self.num_of_features: # until the right amount of features generated or no improvment is found
            print(f'Iteration {iteration + 1}')
            X_expanded, score_expanded = self.generate_feature_combinations(X, y) # generate a new feature transformation
            X_expanded, score_expanded = self.prune_features(X_expanded, y) # remove features with low importance

            if score_expanded < best_score: # if we found a cafiguration that improves the performance, keep it
                print(f"New best score found: {score_expanded}")
                best_score = score_expanded
                X = X_expanded.copy()
            else: # if we didnt found a transformation that will improve the perforamne
                print("No improvement detected. Stopping feature engineering.")
                break

            iteration += 1

        print("Feature engineering completed.")
        return X, best_score


In [8]:
model = RandomForestRegressor(random_state=42)
rfe = RecursiveFeatureEngineering(model)

In [None]:
df = pd.read_csv('../data/ParisHousing.csv')
X = df.drop(columns=['price'])
y = df['price']
_, base_score = rfe.evaluate_features(X, y)
print(f'base score: {base_score}')
X_transformed, score = rfe.fit_transform(X, y)
print(f'{((base_score - score)/base_score)*100}%')

base score: 4002.8788631292314
Starting recursive feature engineering...
Iteration 1
New best score found: 4000.6772297669586
Iteration 2
New best score found: 3990.7347547289887
Iteration 3
New best score found: 3987.5941654799026
Iteration 4
New best score found: 3957.129540605996
Iteration 5
New best score found: 3953.8311149969277
Iteration 6
New best score found: 3944.9534953092807
Iteration 7
New best score found: 3941.478939378808
Iteration 8
New best score found: 3931.8319554487784
Iteration 9
New best score found: 3920.8278337529205
Iteration 10
New best score found: 3917.2264996448544
Feature engineering completed.
2.139769061545337%


In [None]:
df = pd.read_csv('../data/flood.csv')
X = df.drop(columns=['FloodProbability'])
y = df['FloodProbability']
_, base_score = rfe.evaluate_features(X, y)
print(f'base score: {base_score}')
X_transformed, score = rfe.fit_transform(X, y)
print(f'{((base_score - score)/base_score)*100}%')

base score: 0.025892922451380078
Starting recursive feature engineering...
Iteration 1
New best score found: 0.02587251599830059
Iteration 2
New best score found: 0.02561289463888072
Iteration 3
New best score found: 0.025131045871464204
Iteration 4
New best score found: 0.025114505317206343
Iteration 5
New best score found: 0.025107274993190876
Iteration 6
New best score found: 0.025098672269700036
Iteration 7
New best score found: 0.02509601367360587
Iteration 8
New best score found: 0.025050208779318257
Iteration 9
New best score found: 0.025047243235558105
Iteration 10
New best score found: 0.025039544469216076
Feature engineering completed.
3.295796307915475%


In [None]:
df = pd.read_csv('../data/weatherHistory.csv')
X = df.drop(columns=['Apparent Temperature (C)'])
y = df['Apparent Temperature (C)']
_, base_score = rfe.evaluate_features(X, y)
print(f'base score: {base_score}')
X_transformed, score = rfe.fit_transform(X, y)
print(f'{((base_score - score)/base_score)*100}%')

base score: 0.05492278234859448
Starting recursive feature engineering...
Iteration 1
New best score found: 0.05476747409223273
Iteration 2
New best score found: 0.05447228539661956
Iteration 3
New best score found: 0.05443666743477979
Iteration 4
New best score found: 0.05429413713100989
Iteration 5
New best score found: 0.05427390411843533
Iteration 6
New best score found: 0.05423340327022739
Iteration 7
New best score found: 0.05415875100248315
Iteration 8
New best score found: 0.05393790168982996
Iteration 9
New best score found: 0.053822974722991995
Iteration 10
New best score found: 0.05376990244621762
Feature engineering completed.
2.099092313021477%


In [None]:
df = pd.read_csv('../data/house_price_regression_dataset.csv')
X = df.drop(columns=['House_Price'])
y = df['House_Price']
_, base_score = rfe.evaluate_features(X, y)
print(f'base score: {base_score}')
X_transformed, score = rfe.fit_transform(X, y)
print(f'{((base_score - score)/base_score)*100}%')

base score: 19827.827277120257
Starting recursive feature engineering...
Iteration 1
New best score found: 19790.713653412746
Iteration 2
New best score found: 19774.610265146643
Iteration 3
New best score found: 19751.151642948502
Iteration 4
New best score found: 19730.430981122565
Iteration 5
New best score found: 19648.214240693567
Iteration 6
New best score found: 19549.95451033402
Iteration 7
New best score found: 18596.268127014726
Iteration 8
New best score found: 18545.033981619705
Iteration 9
New best score found: 18538.952691351114
Iteration 10
New best score found: 18510.230689902644
Feature engineering completed.
6.645188949865501%


In [None]:
df = pd.read_csv('../data/test_energy_data.csv')
X = df.drop(columns=['Energy Consumption'])
y = df['Energy Consumption']
_, base_score = rfe.evaluate_features(X, y)
print(f'base score: {base_score}')
X_transformed, score = rfe.fit_transform(X, y)
print(f'{((base_score - score)/base_score)*100}%')

base score: 514.0469090426051
Starting recursive feature engineering...
Iteration 1
New best score found: 512.2237188407865
Iteration 2
New best score found: 510.4883649130336
Iteration 3
New best score found: 507.49239968102927
Iteration 4
New best score found: 496.8286689097764
Iteration 5
New best score found: 496.0888666736249
Iteration 6
New best score found: 495.3012683073736
Iteration 7
New best score found: 491.19909673059993
Iteration 8
New best score found: 487.2643523754238
Iteration 9
New best score found: 481.542050062204
Iteration 10
New best score found: 481.23128263062296
Feature engineering completed.
6.383780513941834%
