# Important library

In [1]:
import pandas as pd
import polars as pl
import numpy as np
from scipy.stats import skew, kurtosis, entropy
import math
from sklearn.feature_selection import mutual_info_classif
from itertools import combinations
from itertools import product
import random
import gc
from collections import Counter
from imblearn.under_sampling import CondensedNearestNeighbour
from imblearn.over_sampling import SMOTE

# Feature selection with MI

In [None]:
def fast_mutual_information(
    df: pl.DataFrame, 
    col1: str, 
    col2: str, 
    num_bins: int = 10
):
    # Keep only required cols
    df_proc = df.select([col1,col2])
    df_proc = df_proc.filter((~pl.col(col1).is_nan())&(~pl.col(col1).is_infinite()))
    n_unique = df_proc[col1].n_unique()
    try:
        if n_unique > num_bins:
            # Quantile binning
            df_proc = df_proc.with_columns(
                pl.col(col1)
                .qcut(num_bins, allow_duplicates=True)
                .to_physical()  # turn into integer labels
                .alias(col1)
            )
    except Exception:
        # Fallback: equal-width bins
        col_min = df_proc[col1].min()
        col_max = df_proc[col1].max()
        edges = np.linspace(col_min, col_max, num_bins + 1)
        df_proc = df_proc.with_columns(
            pl.col(col1)
            .cut(edges)
            .to_physical()
            .alias(col1)
        )

    # Convert to numpy arrays for sklearn
    x = df_proc.get_column(col1)
    y = df_proc.get_column(col2)
    return mutual_info_score(x,y)

def create_mi_table(df_input,label,batch_size=1000):
    # Exclude label column
    total_features = [c for c in df_input.columns if c != label]
    results = []
    # Process in batches
    for i in range(0, len(total_features), batch_size):
        batch = total_features[i:i+batch_size]
        batch_results = [
            (feat,fast_mutual_information(df_input,feat,label,10)) for feat in batch
        ]
        results.extend(batch_results)

    # Build DataFrame once
    df_mi = pl.DataFrame(results, schema=["Feature","Mutual_Info_Score"])
    return df_mi.sort("Mutual_Info_Score", descending=True)


# Feature generation function in First iteration

In [2]:
from itertools import combinations, islice

def generate_mono_feature_vectorized(df_pl, list_feature, index_feature):
    # Ensure lazy mode
    df_lazy = df_pl.lazy()
    # Precompute mean and std for all features
    stats = df_pl.select(
        [pl.col(f).mean().alias(f"{f}_mean") for f in list_feature] +
        [pl.col(f).std().alias(f"{f}_std") for f in list_feature]
    ).to_dicts()[0]
    meta_records = []
    new_cols = []
    index = index_feature
    for feature in list_feature:
        mean = stats[f"{feature}_mean"]
        std = stats[f"{feature}_std"]
        col_names_ops = [
            (f"p_{index}",  pl.col(feature) ** 2,        "Power"),
            (f"sq_{index}", pl.col(feature).sqrt(),      "Square Root"),
            (f"z_{index}",  (pl.col(feature) - mean) / std, "Z-Score"),
            (f"lg_{index}", pl.col(feature).log(base=2), "Log base 2"),
            (f"s_{index}",  1 / (1 + (-1 * pl.col(feature)).exp()), "Sigmoid"),
        ]
        for name, expr, op in col_names_ops:
            new_cols.append(expr.alias(name))
            meta_records.append((name, feature, op))
        index += 1
    # Add all generated columns in one go
    df_lazy = df_lazy.with_columns(new_cols)
    # Collect the final dataset
    df_result = df_lazy.collect()
    # Create metadata DataFrame
    df_meta = pl.DataFrame(meta_records, schema=["feature", "feature_L", "operation"])
    return df_result, df_meta, index

def generate_binary_feature_fast_safe(df_pl,list_feature,index_feature,batch_size=200):
    df_lazy = df_pl.lazy()
    meta_records = []
    index = index_feature
    # Helper: yield batches of combinations
    def batched_combinations(iterable, r, batch_size):
        it = combinations(iterable, r)
        while True:
            batch = list(islice(it, batch_size))
            if not batch:
                break
            yield batch
    # Process in batches
    for comb_batch in batched_combinations(list_feature, 2, batch_size):
        new_cols = []
        for comb in comb_batch:
            ops = [
                (f"add_{index}",   pl.col(comb[0]) + pl.col(comb[1]), "Add",      comb[0], comb[1]),
                (f"mul_{index}",   pl.col(comb[0]) * pl.col(comb[1]), "Multiply", comb[0], comb[1]),
                (f"subt1_{index}", pl.col(comb[0]) - pl.col(comb[1]), "Subtract", comb[0], comb[1]),
                (f"subt2_{index}", pl.col(comb[1]) - pl.col(comb[0]), "Subtract", comb[1], comb[0]),
                (f"div1_{index}",  pl.col(comb[0]) / pl.col(comb[1]), "Divide",   comb[0], comb[1]),
                (f"div2_{index}",  pl.col(comb[1]) / pl.col(comb[0]), "Divide",   comb[1], comb[0]),
            ]
            for name, expr, op, left, right in ops:
                new_cols.append(expr.alias(name))
                meta_records.append((name, left, right, op))
            index += 1
        # Apply the batch of new columns
        df_lazy = df_lazy.with_columns(new_cols)
    df_result = df_lazy.collect()
    df_meta = pl.DataFrame(meta_records, schema=["feature", "feature_L", "feature_R", "operation"])
    return df_result, df_meta, index

# Dropping error feature after feature generation

In [3]:
def drop_missing_data(df_pl_data,target,threshold_accept):
    # threshold_accept : limit of portion which use to filter row of record
    ### Drop row which contain empty value or useless value
    total_feature = df_pl_data.columns
    total_feature.remove(target)
    df_pl = df_pl_data.select(total_feature)
    criteria = round(df_pl.height*threshold_accept)
    df_nan_counts = df_pl.select(pl.all().is_nan().sum())
    df_filtered_nan = df_pl.select([col for col in df_nan_counts.columns if df_nan_counts.get_column(col)[0]<criteria])
    df_inf_counts = df_filtered_nan.select(pl.all().is_infinite().sum())
    df_filtered_inf = df_filtered_nan.select([col for col in df_inf_counts.columns if df_inf_counts.get_column(col)[0]<criteria])
    df_zero_counts = df_filtered_inf.select(pl.all().eq(0).sum())
    df_filtered_zero = df_filtered_inf.select([col for col in df_zero_counts.columns if df_zero_counts.get_column(col)[0]<criteria])
    return df_pl_data.select(df_filtered_zero.columns+[target])

# Feature selection after feature generation (New Extended)

In [5]:
import polars as pl
import numpy as np
from scipy.stats import skew, kurtosis, entropy

def generate_meta_feature_fast(df_pl,list_feature):
    # 1. Replace inf with 0 in a vectorized way
    df_clean = df_pl.with_columns([
        pl.when(pl.col(c).is_infinite()).then(0).otherwise(pl.col(c)).alias(c)
        for c in df_pl.columns
    ])
    meta_records = []
    # Precompute common stats in one go
    means = df_clean.select([pl.col(c).mean().alias(c) for c in list_feature]).row(0)
    stds = df_clean.select([pl.col(c).std().alias(c) for c in list_feature]).row(0)
    missing_ratios = df_clean.select([
        (pl.col(c).null_count() / pl.count()).alias(c) for c in list_feature
    ]).row(0)
    zero_ratios = df_clean.select([
        ((pl.col(c) == 0).sum() / pl.count()).alias(c) for c in list_feature
    ]).row(0)
    cardinality_ratios = df_clean.select([
        (pl.col(c).n_unique() / pl.count()).alias(c) for c in list_feature
    ]).row(0)

    # 2. Compute advanced stats per column (needs NumPy/SciPy)
    for i, feat in enumerate(list_feature):
        col_np = df_clean[feat].to_numpy()

        # Drop NaNs for stats that can't handle them
        col_np_clean = col_np[~np.isnan(col_np)]

        meta_records.append((
            feat,
            means[i],
            stds[i],
            skew(col_np_clean) if len(col_np_clean) > 0 else np.nan,
            kurtosis(col_np_clean) if len(col_np_clean) > 0 else np.nan,
            entropy(np.histogram(col_np_clean, bins="auto")[0], base=2) if len(col_np_clean) > 0 else np.nan,
            missing_ratios[i],
            zero_ratios[i],
            cardinality_ratios[i],
        ))

    # 3. Build the final metadata DataFrame
    df_feature_charactor = pl.DataFrame(
        meta_records,
        schema={
            "feature": str,
            "mean": float,
            "std": float,
            "skewness": float,
            "kurtosis": float,
            "entropy": float,
            "missing_ratio": float,
            "zero_ratio": float,
            "cardinality_ratio": float
        }
    )

    return df_feature_charactor


# Meta-learning of operator

In [6]:
########## Build meta-feature ##########
def entropy_histogram_polars(column: pl.Series, bins: int = 10) -> float:
    values = column.drop_nulls().to_numpy()
    if len(values) == 0:
        return None  # avoid divide-by-zero or invalid entropy
    hist, _ = np.histogram(values, bins=bins, density=False)
    probs = hist / hist.sum()
    probs = probs[probs > 0]  # remove zero entries to avoid log(0)
    return entropy(probs, base=2)

def summarize_polars_feature(column: pl.Series):
    values = column.drop_nulls()
    if values.is_empty():
        return {
            "mean": None,
            "std": None,
            "skewness": None,
            "kurtosis": None,
            "entropy": None,
            "missing_ratio": column.null_count() / column.len(),
            "zero_ratio": None,
            "cardinality_ratio": None,
        }

    values_np = values.to_numpy()
    counts = np.unique(values_np, return_counts=True)[1]
    probs = counts / counts.sum()
    probs = probs[probs > 0]
    
    return [values_np.mean(),values_np.std(ddof=1),skew(values_np,bias=False),kurtosis(values_np,bias=False),entropy_histogram_polars(column)
            ,column.null_count()/column.len(),(values_np == 0).mean() if np.issubdtype(values_np.dtype, np.number) else None
            ,len(np.unique(values_np))/column.len()]

def generate_meta_feature(df_pl,list_feature):
    df_feature_charactor = pl.DataFrame(schema={'feature':str,'mean':float,'std':float
    ,'skewness':float,'kurtosis':float,'entropy':float
    ,'missing_ratio':float,'zero_ratio':float,'cardinality_ratio':float})
    df_pl = df_pl.with_columns([pl.col(col).apply(lambda x: 0 if x in [float('inf'),-float('inf')] else x) for col in df_pl.columns])
    for feat in list_feature:
        new_row = [feat]+summarize_polars_feature(df_pl[feat])
        new_row_df = pl.DataFrame([new_row],schema=df_feature_charactor.schema)
        df_feature_charactor = pl.concat([df_feature_charactor,new_row_df]) 
    return df_feature_charactor

def calculate_centroid_meta_feature(df_meta_pair):
    all_meta_feat = ['mean_sim','std_sim','skewness_sim','kurtosis_sim','entropy_sim'
    ,'missing_ratio_sim','zero_ratio_sim','cardinality_ratio_sim']
    df_select_feat = df_meta_pair[all_meta_feat+['operation']]
    df_centroid_meta = pl.DataFrame(schema={'mean_sim':float,'std_sim':float
    ,'skewness_sim':float,'kurtosis_sim':float,'entropy_sim':float
    ,'missing_ratio_sim':float,'zero_ratio_sim':float,'cardinality_ratio_sim':float,'Operator':str})
    for oper in ["Add","Subtract","Multiply","Divide"]:
        df_filter = df_select_feat.filter(pl.col('operation')==oper)
        df_means = df_filter.mean()
        df_means = df_means[all_meta_feat].with_columns([pl.lit(oper).alias("Operator")])
        df_centroid_meta = pl.concat([df_centroid_meta,df_means]) 
    return df_centroid_meta

def calculate_centroid_meta_feature_fast(df_meta_pair):
    meta_features = [
        "mean_sim", "std_sim", "skewness_sim", "kurtosis_sim",
        "entropy_sim", "missing_ratio_sim", "zero_ratio_sim", "cardinality_ratio_sim"
    ]

    # Group by 'operation', compute means
    df_centroid_meta = (
        df_meta_pair
        .group_by("operation")
        .agg([pl.mean(f).alias(f) for f in meta_features])
        .rename({"operation": "Operator"})
    )

    return df_centroid_meta

# Remove common pair of feature

In [3]:
def update_set_feature(list_feature,df_meta):
    feature_unary,feature_binary = [],[]
    df_filter_meta = df_meta.filter(pl.col('feature').is_in(list_feature))   
    df_filter_binary = df_filter_meta.filter(pl.col('operation').is_in(['Add','Multiply','Subtract','Divide']))
    feature_binary = df_filter_binary['feature'].to_list()
    df_filter_unary = df_filter_meta.filter(pl.col('operation').is_in(['Power','Square Root','Z-Score','Log base 2','Sigmoid']))
    feature_unary = df_filter_unary['feature'].to_list()
    return feature_unary,feature_binary

def get_all_unary_feature(list_feature,df_meta):
    feature_unary = []
    df_filter_meta = df_meta.filter(pl.col('feature').is_in(list_feature))
    df_filter_unary = df_filter_meta.filter(pl.col('operation').is_in(['Power','Square Root','Z-Score','Log base 2','Sigmoid']))
    feature_unary = df_filter_unary['feature'].to_list()
    return feature_unary

def root_of_feature(feature,df_solution):
    temp_filter = df_solution.filter(pl.col('feature')==feature)
    if temp_filter.height == 0:
       return []
    else:
        level = temp_filter['level'][0]
        if level==1:
           df_temp = df_solution.filter(pl.col('feature')==feature)
           ans_a = df_temp.select('feature_L')[0,0]
           ans_b = df_temp.select('feature_R')[0,0]
           root_feature = [ans_a,ans_b]
           if 'None' in root_feature:
              root_feature.remove('None')
           return root_feature
        else:
             df_temp = df_solution.filter(pl.col('feature')==feature)
             ans_a = df_temp.select('feature_L')[0,0]
             ans_b = df_temp.select('feature_R')[0,0]
             return root_of_feature(ans_a,df_solution)+root_of_feature(ans_b,df_solution)
        
def remove_redudant_root_feature(list_pair,df_solution): ### filter onliy pair of feature which no have intersect root of feature
    filter_pair = [(pair[0],pair[1]) for pair in list_pair if len(list(set(root_of_feature(pair[0],df_solution))&set(root_of_feature(pair[1],df_solution))))<1]
    return filter_pair

# Feature redundant removal

In [None]:
###### Select only generated feature which is provides improve performance
def select_feature_via_meta_fast(df_pl,label,meta_table,df_mi_table,current_level,num_top_feat):
    # Get all features except label
    all_feature = [f for f in df_pl.columns if f != label]
    # Filter meta table to only current level & features present in df
    df_filter = meta_table.filter((pl.col('level') == current_level)&(pl.col('feature').is_in(all_feature)))
    # Separate unary and binary
    unary_ops = ["Power", "Z-Score", "Square Root", "Log base 2", "Sigmoid"]
    binary_ops = ["Add", "Subtract", "Multiply", "Divide"]
    df_unary_feat = df_filter.filter(pl.col('operation').is_in(unary_ops))
    df_binary_feat = df_filter.filter(pl.col('operation').is_in(binary_ops))
    # ==== Unary Evaluation ====
    df_map_unary = (
        df_unary_feat
        .join(df_mi_table, left_on="feature", right_on="Feature", how="left")
        .rename({"Mutual_Info_Score": "MI_Feature"})
        .join(df_mi_table, left_on="feature_L", right_on="Feature", how="left")
        .rename({"Mutual_Info_Score": "MI_Feature_L"})
        .filter(pl.col("MI_Feature") > pl.col("MI_Feature_L"))
        .select("feature","MI_Feature")
    )
    select_unary_feat = df_map_unary["feature"].to_list()
    # ==== Binary Evaluation ====
    df_map_binary = (
        df_binary_feat
        .join(df_mi_table, left_on="feature", right_on="Feature", how="left")
        .rename({"Mutual_Info_Score": "MI_Feature"})
        .join(df_mi_table, left_on="feature_L", right_on="Feature", how="left")
        .rename({"Mutual_Info_Score": "MI_Feature_L"})
        .join(df_mi_table, left_on="feature_R", right_on="Feature", how="left")
        .rename({"Mutual_Info_Score": "MI_Feature_R"})
        .filter(
            (pl.col("MI_Feature") > pl.col("MI_Feature_L")) |
            (pl.col("MI_Feature") > pl.col("MI_Feature_R"))
        )
        .select("feature","MI_Feature")
    )
    select_binary_feat = df_map_binary["feature"].to_list()
    df_improve_feat = pl.concat([df_map_unary,df_map_binary])
    df_improve_sort = df_improve_feat.sort("MI_Feature",descending=True)
    return df_improve_sort.head(num_top_feat)['feature'].to_list()

# Main+sub framework

In [13]:
def generate_validate_pair(df_input,list_original,label,df_meta): ##### Generate pair for built meta-feature for binary
    all_feature = df_input.columns
    all_feature.remove(label)
    last_depth = df_meta['level'].max()
    df_last_track = df_meta.filter(pl.col('level')==last_depth)
    feat_unary,feat_binary = update_set_feature(all_feature,df_last_track) 
    feat_origi = list(set(all_feature)&set(list_original))
    all_depth_unary = get_all_unary_feature(all_feature,df_meta)
    #####################################################
    self_pairs_binary = list(combinations(feat_binary,2))
    self_pairs_unary = list(combinations(feat_unary,2))
    across_origi_unary = list(product(feat_origi,feat_unary))
    across_origi_binary = list(product(feat_origi,feat_binary))
    across_unary_binary = list(product(all_depth_unary,feat_binary))
    ########################################################
    all_list_pair = [self_pairs_unary,self_pairs_binary,across_origi_unary,across_origi_binary,across_unary_binary]
    validate_list_pair = []
    for list_pair in all_list_pair:
        if len(list_pair)>0 :
           validate_pair = remove_redudant_root_feature(list_pair,df_meta)
           if len(validate_pair)>0:
              validate_list_pair.append(validate_pair)
    merge_validate_pair = [item for sublist in validate_list_pair for item in sublist] # Concat list of pairs
    return merge_validate_pair

In [15]:
def calculate_score_operator(df_meta_pair,df_represent):
    df_score_operator = pl.DataFrame(schema={'feature_1':str,'feature_2':str
    ,'cos_sim_add':float,'cos_sim_sub':float,'cos_sim_mul':float,'cos_sim_div':float})
    all_meta_feat = ['mean_sim','std_sim','skewness_sim','kurtosis_sim','entropy_sim','missing_ratio_sim','zero_ratio_sim','cardinality_ratio_sim']
    ######################################################
    vector_add = np.array(df_represent.filter(pl.col('Operator')=='Add')[all_meta_feat].row(0))
    vector_sub = np.array(df_represent.filter(pl.col('Operator')=='Subtract')[all_meta_feat].row(0))
    vector_mul = np.array(df_represent.filter(pl.col('Operator')=='Multiply')[all_meta_feat].row(0))
    vector_div = np.array(df_represent.filter(pl.col('Operator')=='Divide')[all_meta_feat].row(0))
    ######################################################
    for i in range(0,len(df_meta_pair)):
        feat1 = df_meta_pair.row(i)[0]
        feat2 = df_meta_pair.row(i)[1]
        vector = np.array(df_meta_pair[all_meta_feat].row(i))
        # Cosine similarity
        sim_add = np.dot(vector,vector_add)/(np.linalg.norm(vector)*np.linalg.norm(vector_add))
        sim_sub = np.dot(vector,vector_sub)/(np.linalg.norm(vector)*np.linalg.norm(vector_sub))
        sim_mul = np.dot(vector,vector_mul)/(np.linalg.norm(vector)*np.linalg.norm(vector_mul))
        sim_div = np.dot(vector,vector_div)/(np.linalg.norm(vector)*np.linalg.norm(vector_div))
        new_row = [feat1,feat2,sim_add,sim_sub,sim_mul,sim_div]
        new_row_df = pl.DataFrame([new_row],schema=df_score_operator.schema)
        df_score_operator = pl.concat([df_score_operator,new_row_df]) 
    return df_score_operator

def norm_cos_sim(x):
    return (x+1)/2

def roulette_wheel_operator(struct):
    n_pick = 1 #<------- Change number Here!!!!!
    sum_prob = struct['prob_add']+struct['prob_sub']+struct['prob_mul']+struct['prob_div']
    weight_operator = {'Add':struct['prob_add'],'Subtract':struct['prob_sub']
    ,'Multiply':struct['prob_mul'],'Divide':struct['prob_div']}
    list_rec_operator = []
    keys = list(weight_operator.keys())
    weights = list(weight_operator.values())
    probs = [w/sum_prob for w in weights]
    while len(list_rec_operator)!=n_pick:
          rec_operator = random.choices(keys,weights=probs,k=1)[0]
          if rec_operator not in list_rec_operator:
             list_rec_operator.append(rec_operator) 
    return list_rec_operator

In [16]:
def build_meta_record(feat1,feat2,operator,index_feat):
    df_meta_table = pl.DataFrame(schema={'feature':str,'feature_L':str,'feature_R':str,'operation':str}) 
    if operator=='Add':
       new_row = ['add_'+str(index_feat),feat1,feat2,'Add']
       new_row_df = pl.DataFrame([new_row],schema=df_meta_table.schema) 
    elif  operator=='Subtract':
          new_row1 = ['subt1_'+str(index_feat),feat1,feat2,'Subtract']
          new_row2 = ['subt2_'+str(index_feat),feat2,feat1,'Subtract']
          new_row_df1 = pl.DataFrame([new_row1],schema=df_meta_table.schema)
          new_row_df2 = pl.DataFrame([new_row2],schema=df_meta_table.schema)
          new_row_df = pl.concat([new_row_df1,new_row_df2]) 
    elif  operator=='Multiply':
          new_row = ['mul_'+str(index_feat),feat1,feat2,'Multiply']
          new_row_df = pl.DataFrame([new_row],schema=df_meta_table.schema)
    elif  operator=='Divide':
          new_row1 = ['div1_'+str(index_feat),feat1,feat2,'Divide']
          new_row2 = ['div2_'+str(index_feat),feat2,feat1,'Divide']
          new_row_df1 = pl.DataFrame([new_row1],schema=df_meta_table.schema)
          new_row_df2 = pl.DataFrame([new_row2],schema=df_meta_table.schema)
          new_row_df = pl.concat([new_row_df1,new_row_df2])   
    return new_row_df

def rec_operator_to_meta(df_rec_operator, index_feature):
    records = []  # store rows here
    df_temp = df_rec_operator.select(["feature_1", "feature_2", "best_operator"])
    for i in range(len(df_temp)):
        feature_L, feature_R, list_operator = df_temp.row(i)
        for oper in list_operator:
            df_meta_record = build_meta_record(feature_L, feature_R, oper, index_feature)
            index_feature += 1
            # Instead of concat, just store as tuple/dict
            records.append(df_meta_record.row(0))
    # Build DataFrame once at the end
    df_meta_table = pl.DataFrame(
        records, 
        schema={"feature": str, "feature_L": str, "feature_R": str, "operation": str}
    )
    return df_meta_table, index_feature

def generate_unary_from_meta(df_data,df_meta_unary):
    expressions = []
    for i in range(len(df_meta_unary)):
        feature_name, feature_L, feature_R, operator = df_meta_unary.row(i)     
        if operator == 'Power':
           expr = (pl.col(feature_L)* pl.col(feature_L)).alias(feature_name)
        elif operator == 'Square Root':
             expr = pl.col(feature_L).sqrt().alias(feature_name)
        elif operator == 'Z-Score':
             mean = df_data[feature_L].mean()
             std = df_data[feature_L].std()
             expr = ((pl.col(feature_L) - mean)/std).alias(feature_name)
        elif operator == 'Log base 2':
             expr = pl.col(feature_L).log(base=2).alias(feature_name)
        elif operator == 'Sigmoid':
             expr = (1 / (1 + (-1 * pl.col(feature_L)).exp())).alias(feature_name)
        else:
            continue  # skip unknown operators   
        expressions.append(expr)
    # Add all new columns in a single call (faster, avoids repeated copies)
    df_data = df_data.with_columns(expressions)
    return df_data

def generate_binary_from_meta(df_data,df_meta_bin):
    expressions = []
    for i in range(len(df_meta_bin)):
        feature_name, feature_L, feature_R, operator = df_meta_bin.row(i)     
        if operator == 'Add':
            expr = (pl.col(feature_L) + pl.col(feature_R)).alias(feature_name)
        elif operator == 'Subtract':
            expr = (pl.col(feature_L) - pl.col(feature_R)).alias(feature_name)
        elif operator == 'Multiply':
            expr = (pl.col(feature_L) * pl.col(feature_R)).alias(feature_name)
        elif operator == 'Divide':
            expr = (pl.col(feature_L) / pl.col(feature_R)).alias(feature_name)
        else:
            continue  # skip unknown operators   
        expressions.append(expr)
    # Add all new columns in a single call (faster, avoids repeated copies)
    df_data = df_data.with_columns(expressions)
    return df_data

In [17]:
def gen_initial_centroid(list_operator):
    df_centroid_meta = pl.DataFrame(schema={'mean_sim':float,'std_sim':float,'skewness_sim':float
    ,'kurtosis_sim':float,'entropy_sim':float
    ,'missing_ratio_sim':float,'zero_ratio_sim':float,'cardinality_ratio_sim':float,'Operator':str})
    for oper in list_operator:
        new_row = [0.5]*8+[oper]
        new_row_df = pl.DataFrame([new_row],schema=df_centroid_meta.schema)
        df_centroid_meta = pl.concat([df_centroid_meta,new_row_df]) 
        ### Clear unneed variables & Force force Python to free memory
        del new_row_df
        gc.collect()
    return df_centroid_meta

def update_centroid_fast(df_centroid_old,df_centroid_new,learning_rate):
    """
    Update centroids with exponential moving average.
    Keeps operators from old centroids if missing in new.
    """
    meta_features = [
        "mean_sim", "std_sim", "skewness_sim", "kurtosis_sim",
        "entropy_sim", "missing_ratio_sim", "zero_ratio_sim", "cardinality_ratio_sim"
    ]
    # Outer join ensures we keep all operators
    joined = df_centroid_old.join(df_centroid_new, on="Operator", how="outer", suffix="_new")
    # Build updated columns in one pass
    updates = [
        (pl.when(pl.col(f"{feat}_new").is_not_null())  # if new value exists
         .then(pl.col(feat) + learning_rate * (pl.col(f"{feat}_new") - pl.col(feat)))
         .otherwise(pl.col(feat))  # if no new value, keep old
         .alias(feat))
        for feat in meta_features
    ]
    result = joined.with_columns(updates).select(meta_features + ["Operator"])
    return result

In [18]:
def built_meta_feature_centroid(
    df_input: pl.DataFrame,
    list_candidate_feat: list[str],
    df_meta_binary: pl.DataFrame,
    df_mi_for_map: pl.DataFrame,
    limit_per_operation: int
):
    # Use Lazy for better memory management
    df_meta_bin_filter = (
        df_meta_binary.lazy()
        .filter(pl.col("feature").is_in(list_candidate_feat))
    )

    # Join with MI scores
    df_map_1 = df_meta_bin_filter.join(
        df_mi_for_map.lazy(),
        left_on="feature", right_on="Feature", how="left"
    )

    # Select only top N per operation (cuts down data early)
    df_top_sorted = (
        df_map_1
        .sort("Mutual_Info_Score", descending=True)
        .group_by("operation")
        .head(limit_per_operation)
    ).collect(streaming=True)

    # Avoid huge Python lists — keep in Polars
    df_pair_feat = df_top_sorted.select(["feature_L", "feature_R"]).unique()
    pair_feature = [row for row in df_pair_feat.iter_rows()]

    # Extract unique features directly in Polars
    unique_sub_feat = (
        df_top_sorted.select(["feature_L", "feature_R"])
        .melt()
        .select("value")
        .unique()
        .to_series()
        .to_list()
    )

    # Generate meta features
    df_meta_feature = generate_meta_feature_fast(df_input, unique_sub_feat)
    df_meta_pair_feature = generate_pair_meta_feature_fast(df_meta_feature, pair_feature)

    # Second join (also lazy + streaming)
    df_map_2 = (
        df_meta_pair_feature.lazy().join(
            df_map_1,
            left_on=["feature_1", "feature_2"],
            right_on=["feature_L", "feature_R"],
            how="left"
        )
    )

    df_top_sorted2 = (
        df_map_2
        .sort("Mutual_Info_Score", descending=True)
        .group_by("operation")
        .head(limit_per_operation)
        .collect(streaming=True)
    )
    return df_meta_feature, calculate_centroid_meta_feature_fast(df_top_sorted2)

### Must assign existing meta-feature
def built_meta_feature_centroid_second(
    df_input: pl.DataFrame,
    list_candidate_feat: list[str],
    df_meta_feature: pl.DataFrame,
    df_meta_binary: pl.DataFrame,
    df_mi_for_map: pl.DataFrame,
    limit_per_operation: int
):
    # Step 1: Filter only candidate features
    df_meta_bin_filter = (
        df_meta_binary.lazy()
        .filter(pl.col("feature").is_in(list_candidate_feat))
    )

    # Step 2: Join with MI scores
    df_map_1 = df_meta_bin_filter.join(
        df_mi_for_map.lazy(),
        left_on="feature", right_on="Feature",
        how="left"
    )

    # Step 3: Get top-N per operation
    df_top_sorted = (
        df_map_1
        .sort("Mutual_Info_Score", descending=True)
        .group_by("operation")
        .head(limit_per_operation)
        .collect(streaming=True)  # <-- streaming reduces memory
    )

    # Step 4: Extract unique sub-features (avoid two big .to_list())
    unique_sub_feat = (
        df_top_sorted.select(["feature_L", "feature_R"])
        .melt()
        .select("value")
        .unique()
        .to_series()
        .to_list()
    )

    # Step 5: Extract unique feature pairs (still small after top-N)
    pair_feature = (
        df_top_sorted.select(["feature_L", "feature_R"])
        .unique()
        .iter_rows()
    )

    # Step 6: Generate new meta features for sub-features
    df_meta_feature_new = generate_meta_feature_fast(df_input,unique_sub_feat)

    # Append (avoid full duplication → drop duplicates if needed)
    df_meta_feature = pl.concat(
        [df_meta_feature, df_meta_feature_new],
        how="vertical_relaxed"  # more efficient than default concat
    ).unique(maintain_order=True)

    # Step 7: Generate pair meta features
    df_meta_pair_feature = generate_pair_meta_feature_fast(df_meta_feature, pair_feature)

    # Step 8: Second join + top-N per operation
    df_map_2 = (
        df_meta_pair_feature.lazy().join(
            df_map_1,
            left_on=["feature_1", "feature_2"],
            right_on=["feature_L", "feature_R"],
            how="left"
        )
    )

    df_top_sorted2 = (
        df_map_2
        .sort("Mutual_Info_Score", descending=True)
        .group_by("operation")
        .head(limit_per_operation)
        .collect(streaming=True)
    )

    # Return updated centroid
    return df_meta_feature, calculate_centroid_meta_feature_fast(df_top_sorted2)

# Sampling function

In [19]:
def stratified_sample_binary(df: pl.DataFrame, label: str, n_sub: int, seed: int = 42) -> pl.DataFrame:
    """Stratified sample preserving class balance."""
    # class proportions
    counts = df.group_by(label).len().sort(label)
    N = counts["len"].sum()
    # allocate per class
    alloc = counts.with_columns(
        (pl.col("len") / N * n_sub).round(0).cast(pl.Int64).alias("take")
    )
    parts = []
    for cls, take in zip(alloc[label].to_list(), alloc["take"].to_list()):
        part = df.filter(pl.col(label) == cls).sample(n=int(max(0, take)),
                                                      with_replacement=False,
                                                      shuffle=True, seed=seed)
        parts.append(part)
    # adjust if rounding lost a few rows
    out = pl.concat(parts)
    deficit = n_sub - out.height
    if deficit > 0:
        remaining = df.filter(~pl.col("row_nr").is_in(out.with_row_count("row_nr")["row_nr"])) if "row_nr" in df.columns else df
        out = pl.concat([out, remaining.sample(n=deficit, with_replacement=False, shuffle=True, seed=seed+1)])
    return out

# Main sub-algorithm of Robust-CRAFG framework

In [20]:
class Result_generation:
    def __init__(self,df_result,df_meta,df_meta_feat,df_mi,df_centroid,lasted_index,_iterate):
        self.df_generated = df_result
        self.df_meta_data = df_meta
        self.df_meta_feature = df_meta_feat
        self.df_mi_table = df_mi 
        self.df_meta_centroid = df_centroid
        self.iterate = _iterate
        self.last_index = lasted_index

def first_feature_generation(df_input,list_filter_feat,num_select_feat,label,mi_table):
    #### Generate feature both of unary and binary ####
    df_gen_unary,df_meta_unary,last_index = generate_mono_feature_vectorized(df_input,list_filter_feat,1)
    df_gen_binary,df_meta_binary,last_index = generate_binary_feature_fast_safe(df_gen_unary,list_filter_feat,last_index+1,200)
    del df_gen_unary; gc.collect()
    #### Drop row that contains error value ##########
    df_drop_error = drop_missing_data(df_gen_binary,label,0.3) #<------- Can change percent dropping error feature
    list_feature_drop_error = [f for f in df_drop_error.columns if f!=label]
    del df_drop_error; gc.collect()
    #### Select only unary&binary feature which survive from dropping error
    unary_feat = list(set(df_meta_unary['feature'].to_list())&(set(list_feature_drop_error)))
    binary_feat = list(set(df_meta_binary['feature'].to_list())&(set(list_feature_drop_error)))
    #### Create MI table
    df_mi_unary = create_mi_table(df_gen_binary[unary_feat+[label]],label,1000)
    df_mi_binary = create_mi_table(df_gen_binary[binary_feat+[label]],label,1000)
    df_mi_table_new = pl.concat([mi_table,df_mi_unary,df_mi_binary])
    del df_mi_unary; gc.collect()
    #### Create meta-table
    df_meta_unary2 = df_meta_unary.with_columns(pl.lit('None').alias('feature_R'))
    df_meta_unary2 = df_meta_unary2.select(['feature','feature_L','feature_R','operation'])
    df_meta_combine = pl.concat([df_meta_unary2,df_meta_binary])
    del df_meta_unary2; gc.collect()
    df_meta_combine = df_meta_combine.with_columns(pl.lit(1).alias('level'))
    #### Select only unary&binary feature which provide bettet classfication performance  
    cols = unary_feat+binary_feat+[label]
    df_temp = df_gen_binary.select(cols)
    improved_feat = select_feature_via_meta_fast(df_temp,label,df_meta_combine,df_mi_table_new,1,num_select_feat)
    del df_temp; gc.collect()
    #### Build meta-feature of binary feature (using between two feature) ##############
    limit_per_operation = len(df_input.columns)-1
    df_meta_feature,df_centroid_meta = built_meta_feature_centroid(df_gen_binary,improved_feat,df_meta_binary,df_mi_table_new,limit_per_operation)
    list_bin_oper = ["Add","Multiply","Subtract","Divide"]
    df_initial_centroid = gen_initial_centroid(list_bin_oper)
    df_update_centroid = update_centroid_fast(df_initial_centroid,df_centroid_meta,1)
    final_cols = list_filter_feat+improved_feat+[label]
    result_obj = Result_generation(df_gen_binary.select(final_cols),df_meta_combine,df_meta_feature,df_mi_table_new,df_update_centroid,last_index,1)
    return result_obj

In [21]:
def sub_sequent_feature_generation(obj_result,list_filter_origi_feat,num_select_feat,label):
    all_feature = [f for f in obj_result.df_generated.columns if f!=label]
    df_last_depth = obj_result.df_meta_data.filter(pl.col('level')==(obj_result.iterate))
    feat_unary,feat_binary = update_set_feature(all_feature,df_last_depth)
    #### Generate unary feature via exhaustive method
    df_gen_unary,df_meta_unary,last_index = generate_mono_feature_vectorized(
    obj_result.df_generated,feat_unary+feat_binary,obj_result.last_index+1)
    df_meta_unary2 = df_meta_unary.with_columns(pl.lit('None').alias('feature_R'))
    df_meta_unary2 = df_meta_unary2.select(['feature','feature_L','feature_R','operation'])
    validate_list = generate_validate_pair(obj_result.df_generated,list_filter_origi_feat,label,obj_result.df_meta_data)
    #######################
    set_feature = set()
    for pair in validate_list:
        temp_set = set(pair)
        set_feature = set_feature.union(temp_set)
    unique_feat = list(set_feature)
    ########################
    feat_in_meta = obj_result.df_meta_feature.get_column('feature').to_list()
    other_feat_in_meta = [f for f in unique_feat if f not in feat_in_meta]
    df_meta_feature_new = generate_meta_feature_fast(obj_result.df_generated,other_feat_in_meta)
    meta_feature_merged = pl.concat([obj_result.df_meta_feature,df_meta_feature_new])
    df_meta_pair_feature = generate_pair_meta_feature_fast(meta_feature_merged,validate_list)
    ### Select top 2 operator for pair of feature
    df_choose_operator = calculate_score_operator(df_meta_pair_feature,obj_result.df_meta_centroid)
    df_choose_operator = df_choose_operator.with_columns(pl.col('cos_sim_add').map_elements(norm_cos_sim).alias('prob_add'))\
    .with_columns(pl.col('cos_sim_sub').map_elements(norm_cos_sim).alias('prob_sub'))\
    .with_columns(pl.col('cos_sim_mul').map_elements(norm_cos_sim).alias('prob_mul'))\
    .with_columns(pl.col('cos_sim_div').map_elements(norm_cos_sim).alias('prob_div'))\
    .with_columns(pl.struct(['prob_add','prob_sub','prob_mul','prob_div']).map_elements(roulette_wheel_operator).alias('best_operator'))
    #### Build meta table from recommend operator
    df_meta_binary,last_index = rec_operator_to_meta(df_choose_operator,last_index+1)
    #### Generate binary feature from Built meta table
    df_gen_binary = generate_binary_from_meta(df_gen_unary,df_meta_binary)
    del df_gen_unary; gc.collect()
    df_meta_update = pl.concat([df_meta_unary2,df_meta_binary])
    df_meta_update = df_meta_update.with_columns(pl.lit(obj_result.iterate+1).alias('level'))
    df_meta_table = pl.concat([obj_result.df_meta_data,df_meta_update])
    #### Drop row that contains error value ##########
    df_drop_error = drop_missing_data(df_gen_binary,label,0.3)
    list_feature_drop_error = [f for f in df_drop_error.columns if f!=label]
    del df_drop_error; gc.collect()
    #### Select only unary&binary feature which survive from dropping error
    unary_feat = list(set(df_meta_unary['feature'].to_list())&(set(list_feature_drop_error)))
    binary_feat = list(set(df_meta_binary['feature'].to_list())&(set(list_feature_drop_error)))
    #### Create MI table 
    df_mi_unary = create_mi_table(df_gen_binary[unary_feat+[label]],label,1000)
    df_mi_binary = create_mi_table(df_gen_binary[binary_feat+[label]],label,1000)
    df_mi_table_new = pl.concat([obj_result.df_mi_table,df_mi_unary,df_mi_binary])
    del df_mi_unary; gc.collect()
    #### Select only unary&binary feature which provide bettet classfication performance  
    cols = unary_feat+binary_feat+[label]
    df_temp = df_gen_binary.select(cols)
    improved_feat = select_feature_via_meta_fast(df_temp,label,df_meta_table,df_mi_table_new,obj_result.iterate+1,num_select_feat)
    del df_temp; gc.collect()
    #### Calculate new Centroid of meta-feature 
    limit_per_operation = len(obj_result.df_generated.columns)-1
    df_meta_feature,df_centroid_meta_new = built_meta_feature_centroid_second(df_gen_binary,improved_feat,meta_feature_merged,df_meta_binary,df_mi_table_new,limit_per_operation)
    #### Update Centroid of meta-feature 
    df_centroid_meta = update_centroid_fast(obj_result.df_meta_centroid,df_centroid_meta_new,1/(obj_result.iterate+1))
    final_cols = list_filter_origi_feat+improved_feat+[label]
    result_obj = Result_generation(df_gen_binary.select(final_cols),df_meta_table,df_meta_feature,df_mi_table_new,df_centroid_meta,last_index,obj_result.iterate+1)
    return result_obj

In [22]:
def mapping_info_full_data(df_original,label,dict_sub_generated,meta_table):
    unary_ops = ["Power", "Z-Score", "Square Root", "Log base 2", "Sigmoid"]
    binary_ops = ["Add", "Subtract", "Multiply", "Divide"]
    df_full = df_original.clone()
    key_iterate = dict_sub_generated.keys()
    dict_data_generated = {}
    expressions = []
    total_feature = set()
    ##### Generate feature follow meta-table into full dataset
    for iterate in key_iterate:
        features = [f for f in dict_sub_generated[iterate].columns if f!=label]
        df_filter_meta = meta_table.filter((pl.col('level')==iterate)&(pl.col('feature').is_in(features)))
        df_filter_meta = df_filter_meta.select(['feature','feature_L','feature_R','operation'])
        meta_unary = df_filter_meta.filter(pl.col('operation').is_in(unary_ops))
        meta_binary = df_filter_meta.filter(pl.col('operation').is_in(binary_ops))
        if meta_unary.height>0:
           df_full = generate_unary_from_meta(df_full,meta_unary) 
        if meta_binary.height>0:
           df_full = generate_binary_from_meta(df_full,meta_binary)
        df_gen = df_full.select(features+[label])
        total_feature = total_feature.union(set(features))
        dict_data_generated[iterate] = df_gen
    ##### Calculate MI table of all feature on entire full dataset
    df_mi_table = create_mi_table(df_full,label,batch_size=1000)   
    return dict_data_generated,df_mi_table

In [1]:
from collections import Counter
from imblearn.under_sampling import CondensedNearestNeighbour
from imblearn.over_sampling import SMOTE
import polars as pl

def rebalance_data(df_input,label:str):
    cols_name = [i for i in df_input.columns if i!=label]
    # Split into X, y (as pandas for imblearn)
    X = df_input.drop(label).to_pandas()
    y = df_input.select(label).to_pandas().squeeze()

    # CNN undersampling
    cnn = CondensedNearestNeighbour(n_neighbors=1, random_state=42)
    X_cnn, y_cnn = cnn.fit_resample(X,y)

    # Count classes
    class_counts = Counter(y_cnn)
    minority_class, minority_count = min(class_counts.items(),key=lambda x: x[1])
    majority_class, majority_count = max(class_counts.items(),key=lambda x: x[1])

    # Ratio of minority
    ratio_minority = minority_count / (minority_count + majority_count)

    if 0.45 <= ratio_minority <= 0.55:
        # Convert back to Polars
        X_cnn_pl = pl.DataFrame(X_cnn,schema=cols_name)
        y_cnn_pl = pl.Series(label, y_cnn)
        df_cnn = pl.concat([X_cnn_pl, y_cnn_pl.to_frame()], how="horizontal")
        return df_cnn
    else:
        # Apply SMOTE on CNN result
        smote = SMOTE(sampling_strategy="auto",random_state=42, k_neighbors=5)
        X_balanced, y_balanced = smote.fit_resample(X_cnn,y_cnn)
        # Convert back to Polars
        X_balanced_pl = pl.DataFrame(X_balanced, schema=cols_name)
        y_balanced_pl = pl.Series(label, y_balanced)
        df_balanced = pl.concat([X_balanced_pl, y_balanced_pl.to_frame()],how="horizontal")
        return df_balanced

In [4]:
def robust_crafg(
    data_pl: pl.DataFrame,
    label: str,
    per_filter: float,
    skip_first_select: str = "N",
    iteration: int = 1,
    skip_sub_samp: str = "Y",
    ratio_sub_samp: float = 0.25,
    handle_imbalance: str = "N",
):
    """
    Parameters
    ----------
    data_pl : polars.DataFrame
        Input dataset containing both features and the target column.
    label : str
        Name of the target column.
    per_filter : float
        Fraction (0 < per_filter ≤ 1) of features to retain after each MI-based
        filtering step.
    skip_first_select : {"Y", "N"}, default="N"
        If "Y", skip the initial MI filtering step. If "N", perform filtering
        in the first iteration.
    iteration : int, default=1
        Number of feature generation and selection iterations to perform.
    skip_sub_samp : {"Y", "N"}, default="Y"
        If "Y", skip subsampling. If "N", perform stratified subsampling in each
        iteration.
    ratio_sub_samp : float, default=1.0
        Fraction (0 < ratio_sub_samp ≤ 1) of the dataset to use during subsampling.
        Ignored if `skip_sub_samp="Y"`.
    handle_imbalance : {"Y", "N"}, default="N"
        If "Y", apply imbalance handling (undersampling/oversampling) in the
        initial iteration. If "N", do not rebalance.
    """
    dict_generated = {}
    if skip_sub_samp == 'N':
       n_sub = ratio_sub_samp*data_pl.height
       df_input = stratified_sample_binary(data_pl,label,n_sub,41)
       if handle_imbalance == 'Y':
          df_input = rebalance_data(df_input,label) 
    else:
         if handle_imbalance == 'Y':
            df_input = rebalance_data(data_pl,label) 
         else:
            df_input = data_pl.clone() 
    # Safe way to exclude target column
    feature_origi = [c for c in data_pl.columns if c != label]
    total_num_feature = len(feature_origi)
    num_filter_feature = round(per_filter * total_num_feature)
    #### Step 1: First feature filtering (optional) ####
    df_mi_table = create_mi_table(df_input,label,1000)
    if skip_first_select == "N":
       filter_feature = (df_mi_table.head(num_filter_feature).get_column("Feature").to_list())
    else:
        filter_feature = feature_origi
    ####################################################
    obj_generated = first_feature_generation(df_input,filter_feature,num_filter_feature,label,df_mi_table)
    dict_generated[1] = obj_generated.df_generated
    if iteration==1:
       pass
    else:
        ############# Next Iteration #############
        for iterate in range(2,iteration+1): 
            obj_generated = sub_sequent_feature_generation(obj_generated,filter_feature,num_filter_feature,label)
            dict_generated[iterate] = obj_generated.df_generated
        ##########################################
    if skip_sub_samp == 'Y' and handle_imbalance == 'N':
       return dict_generated,obj_generated
    else:
         meta_table = obj_generated.df_meta_data
         dict_full_generated,df_mi_table_full = mapping_info_full_data(data_pl,label,dict_generated,meta_table)
         obj_generated_full = Result_generation(
                             dict_full_generated[iteration]
                            ,obj_generated.df_meta_data
                            ,obj_generated.df_meta_feature
                            ,df_mi_table_full
                            ,obj_generated.df_meta_centroid
                            ,obj_generated.last_index
                            ,iteration)
         return dict_full_generated,obj_generated_full