## Imports

In [1]:
import os
import sys
import polars as pl
import pandas as pd
import numpy as np
from polars import StringCache

from sklearn.preprocessing import normalize

### PATHS

In [2]:
# Processed Data Files
PROCESSED_PATH = '/kaggle/input/inditex-users-recommender-hackatonnuwe-processed'
USERS_DATA_PATH = os.path.join(PROCESSED_PATH, 'users_combined.parquet')
TRAIN_PARQUET_PATH = os.path.join(PROCESSED_PATH, 'train.parquet')
TEST_PARQUET_PATH = os.path.join(PROCESSED_PATH, 'test.parquet')
PRODUCTS_PARQUET_PATH_IMPUTED = os.path.join(PROCESSED_PATH, 'products_imputed.parquet')

# Engineered Data Files (for model training)
TRAIN_ENGINEERED_PATH = 'train_fe.parquet'
TEST_ENGINEERED_PATH = 'test_fe.parquet'
PRODUCTS_ENGINEERED_PATH = 'products_fe.parquet'
USERS_ENGINEERED_PATH = 'users_fe.parquet'

# Model path
MODEL_PATH = "/kaggle/input/hack-inditex-task3-trainranker/lgbmranker_FULL_NEGNO_seedNO_.txt"

# OUTPUT
# Similarity files
TOP_SIM_PRODUCTS = '/kaggle/input/hack-inditex-task3-products/prod_sim_top_df.parquet'

In [3]:
class PolarsLoader:
    def __init__(self, sampling=False, n_sample = 1_000_000, file_type='csv'):
        """
        Initializes the PolarsLoader class.

        Parameters:
            sampling (bool): If True, loads a sample of 1,000,000 rows from the dataset. Defaults to False.
        """
        self.sampling = sampling
        self.file_type = file_type
        self.n_sample = n_sample
    
    def load_data(self, path, select_cols=None):
        
        """
        Loads the data from a CSV file.

        Parameters:
            path (str): Path to the CSV file.

        Returns:
            polars.DataFrame: Loaded DataFrame without selected columns.
        """
        if self.sampling:
            N_ROWS = self.n_sample
        else:
            N_ROWS = None

        if self.file_type == 'csv':
            # Read dataset as polars DataFrame
            df = pl.read_csv(path, low_memory=True,
                            batch_size=100_000, 
                            n_rows=N_ROWS,
                            try_parse_dates=True,
                            columns=select_cols,
                            )
        elif self.file_type == 'parquet':
            # Read dataset as polars DataFrame
            df = pl.read_parquet(path, low_memory=True,
                            n_rows=N_ROWS,
                            columns=select_cols,
                            )
        else:
            raise ValueError("Unsupported file type. Supported types are 'csv' and 'parquet'.")

        return df
    
    def load_data_lazy(self, path):
        """
        Loads the data from a CSV file lazily.

        Parameters:
            path (str): Path to the CSV file.

        Returns:
            polars.LazyFrame: LazyFrame object for the data.
        """
        if self.sampling:
            N_ROWS = self.n_sample
        else:
            N_ROWS = None

        if self.file_type == 'csv':
            # Read dataset as polars LazyFrame
            lf = pl.scan_csv(path, low_memory=True,
                                n_rows=N_ROWS,
                                try_parse_dates=True,
                                )
        elif self.file_type == 'parquet':
            # Read dataset as polars LazyFrame
            lf = pl.scan_parquet(path, low_memory=True,
                                n_rows=N_ROWS,
                                )
        else:
            raise ValueError("Unsupported file type. Supported types are 'csv' and 'parquet'.")

        return lf

In [4]:
class PRODUCTS:
    def __init__(self, processed_path, engineered_path):
        self.processed_path = processed_path
        self.engineered_path = engineered_path

    def load_data(self):
        df = pl.read_parquet(self.processed_path, low_memory=True)
        return df
        
    def normalize_embedding(self, products_df: pl.DataFrame) -> pl.DataFrame:
    
        # Normalized embedding
        embeddings = np.stack(products_df['embedding'].to_numpy())
        normalized = normalize(embeddings)
        products_df = products_df.with_columns([pl.Series('embedding', normalized)])
        
        return products_df

    def create_initial_extra_features(self, df: pl.DataFrame) -> pl.DataFrame:
    
        df = self.normalize_embedding(df)
        
        df = df.with_columns([
            # Categorical frequencies
            pl.col('color_id').count().over('family').alias('color_family_frequency'),
            # Proporciones
            (pl.col('discount').mean().over('family')).alias('family_discount_rate').cast(pl.Float32),
            (pl.col('discount').mean().over('cod_section')).alias('section_discount_rate').cast(pl.Float32),
            # Número de colores únicos en la familia
            pl.col('color_id').n_unique().over('family').alias('family_unique_colors')
        ])
        
        return df

    def run(self):
        df = self.load_data()
        df = self.create_initial_extra_features(df)
        df.write_parquet(self.engineered_path)
        return df

In [5]:
class USERS:
    def __init__(self, processed_path, engineered_path):
        self.processed_path = processed_path
        self.engineered_path = engineered_path

    def load_and_process_users(self):
        # Load and process user data
        users = (
            pl.read_parquet(self.processed_path, low_memory=True)
            .rename({'country': 'user_country'})
        )

        # Select one entry per user based on HIGH F and HIGH R
        users = (
            users.sort(['user_id', 'F', 'R'], descending=[False, True, True])
            .group_by('user_id')
            .agg(pl.all().first())
        )
        return users

    @staticmethod
    def create_initial_extra_features(df: pl.DataFrame) -> pl.DataFrame:

        df = df.with_columns([
            # Average value per purchase
            (pl.col('M') / pl.col('F')).alias('avg_value_per_purchase'),
            # Purchase frequency rate (F normalized by time window)
            (pl.col('F') / pl.col('R')).alias('purchase_rate').cast(pl.Float32),
            # Value density (M normalized by time window)
            (pl.col('M') / pl.col('R')).alias('spend_rate_per_day'),
        ])

        df = df.with_columns([
            # Value-frequency relationship
            (pl.col('M') * pl.col('F')).alias('total_value_frequency').cast(pl.Float32),
            
            # Recency-frequency relationship
            ((pl.col('R') / pl.col('F'))).alias('avg_days_between_purchases').cast(pl.Float32),
        ])

        # Country stats and users relatives to country
        contry_stats = df.group_by('user_country').agg([
                pl.col('M').mean().alias('country_avg_monetary'),
                pl.col('F').mean().alias('country_avg_frequency').cast(pl.Float32),
                pl.col('R').mean().alias('country_avg_recency').cast(pl.Float32)
            ])
        df = df.join(contry_stats, on='user_country').with_columns([
                (pl.col('M') / pl.col('country_avg_monetary')).alias('relative_monetary_value'),
                (pl.col('F') / pl.col('country_avg_frequency')).alias('relative_frequency').cast(pl.Float32),
                (pl.col('R') / pl.col('country_avg_recency')).alias('relative_recency').cast(pl.Float32),
            ])

        df = df.with_columns([
            # High expend customer flag
            (pl.col('M') > pl.col('M').mean().over('user_country')).cast(pl.Int8).alias('is_high_value_incountry'),
            # Frequent buyer flag
            (pl.col('F') > pl.col('F').mean().over('user_country')).cast(pl.Int8).alias('is_frequent_buyer_incountry'),
            # Recent customer flag
            (pl.col('R') > pl.col('R').mean().over('user_country')).cast(pl.Int8).alias('is_recent_customer_incountry'),
            # High expend customer flag
            (pl.col('M') > pl.col('M').mean()).cast(pl.Int8).alias('is_high_value'),
            # Frequent buyer flag
            (pl.col('F') > pl.col('F').mean()).cast(pl.Int8).alias('is_frequent_buyer'),
            # Recent customer flag
            (pl.col('R') > pl.col('R').mean()).cast(pl.Int8).alias('is_recent_customer'),
        ])
        
        # Replace NaN and Inf values with 0
        for col in ['purchase_rate', 'spend_rate_per_day', 'avg_days_between_purchases']:
                if col in df.columns:
                    df = df.with_columns(pl.when(pl.col(col).is_nan() | (pl.col(col)
                                                                .is_infinite()))
                                                                .then(0)
                                                                .otherwise(pl.col(col))
                                                                .alias(col),)

        return df

    @staticmethod
    def create_rfm_segments_and_ranks(users_df):
        # Create quintiles (5 segments) for each metric
        return users_df.with_columns([
                    # Recency quintile (1 is most recent, 5 is least recent)
                    pl.col('R')
                        .rank(descending=True)  # High R is better
                        .over('user_country')
                        .map_batches(lambda x: pd.qcut(x, q=5, labels=False) + 1)
                        .alias('r_segment').cast(pl.UInt8),
                        
                    # Frequency quintile (1 is highest frequency, 5 is lowest)
                    pl.col('F')
                        .rank(descending=True)   # higher F is better
                        .over('user_country')
                        .map_batches(lambda x: pd.qcut(x, q=5, labels=False) + 1)
                        .alias('f_segment').cast(pl.UInt8),
                        
                    # Monetary quintile (1 is highest value, 5 is lowest)
                    pl.col('M')
                        .rank(descending=True)   # higher M is better
                        .over('user_country')
                        .map_batches(lambda x: pd.qcut(x, q=5, labels=False) + 1)
                        .alias('m_segment').cast(pl.UInt8),

                    # Individual percentile ranks
                    pl.col('R').rank(descending=True)
                        .over('user_country').alias('r_rank_in_country').cast(pl.Int32),
                    pl.col('F').rank(descending=True)
                        .over('user_country').alias('f_rank_in_country').cast(pl.Int32),
                    pl.col('M').rank(descending=True)
                        .over('user_country').alias('m_rank_in_country').cast(pl.Int32),
                ])

    def run(self):
        df = self.load_and_process_users()
        df = self.create_initial_extra_features(df)
        df = self.create_rfm_segments_and_ranks(df)
        df.write_parquet(self.engineered_path)
        return df

In [6]:
class TRAIN_TEST:
    
    def __init__(self, sampling=False, train_path=None, test_path=None, 
                        save_train_path=None, save_test_path=None, 
                        topN_global=10,topN_countries=10, topN_known_users=10,
                        products_df=None, users_df=None,
                        prod_sim_top_df_path=None, filter_similar=False, filter_similarN=5):

        self.sampling = sampling
        self.train_path = train_path
        self.test_path = test_path
        self.save_train_path = save_train_path
        self.save_test_path = save_test_path
        self.topN_global = topN_global
        self.topN_countries = topN_countries
        self.topN_known_users = topN_known_users
        self.products_df = products_df
        self.users_df = users_df
        self.prod_sim_top_df_path = prod_sim_top_df_path
        self.filter_similar = filter_similar
        self.filter_similarN = filter_similarN
        
    def load_data(self):
        loader = PolarsLoader(sampling=self.sampling, file_type='parquet')
        train = loader.load_data(path=self.train_path)
        test = loader.load_data(path=self.test_path)
        return train, test
    
    def load_data_lazy(self):
        loader = PolarsLoader(sampling=self.sampling, file_type='parquet')
        train = loader.load_data_lazy(path=self.train_path)
        test = loader.load_data_lazy(path=self.test_path)
        return train, test
    
    def impute_train_test(self, df):
        
        # Cleaning and sorting
        df = (df.drop("date")
            .sort("timestamp_local")
        )
        
        # Add tag for known user first
        df = df.with_columns([
            pl.when(pl.col("user_id").is_null()).then(pl.lit(-1)).otherwise(pl.lit(1)).alias("known_user").cast(pl.Int8),
        ])

        return df.with_columns([pl.col("user_id").fill_null(-1).cast(pl.Int32),
                        pl.col("pagetype").fill_null(pl.col("pagetype").mode()),
                        ])
    
    def create_concatenated_features(self, train: pl.DataFrame, 
                               test: pl.DataFrame) -> tuple[pl.DataFrame, 
                                                            pl.DataFrame]:
        """
        Create cart ratio features by concatenating train and test data, computing ratios,
        then returning the separated dataframes with new features.
        
        Args:
            train (pl.DataFrame): Training dataframe containing 'add_to_cart' column
            test (pl.DataFrame): Test dataframe without 'add_to_cart' column
        
        Returns:
            tuple[pl.DataFrame, pl.DataFrame]: Tuple containing (train, test) dataframes with new features
        """
        # Create dummy test dataframe with placeholder add_to_cart column
        dummy_test = (test
                    .with_columns([pl.lit(None).alias('add_to_cart')])
                    .select(sorted(train.columns, reverse=True))
                    .with_columns([pl.lit(0).alias('flag')])
                      
        )
        
        # Prepare train dataframe with flag
        dummy_train = (train
                    .select(sorted(train.columns, reverse=True))
                    .with_columns([pl.lit(1).alias('flag')])
        )
        
        # Concatenate train and test
        traintest_concat = pl.concat([dummy_train, dummy_test])
        # Calculate page type cart addition ratio
        page_cart_ratio = (traintest_concat.group_by("pagetype")
                        .agg(pl.col("add_to_cart").mean().alias("page_cart_ratio").cast(pl.Float32))
                        .fill_null(pl.lit(-1))
                        .with_columns(pl.col("pagetype").cast(pl.UInt8))
        )
        traintest_concat = traintest_concat.join(page_cart_ratio, on="pagetype", how="left")
        # Calculate device type cart addition ratio
        device_cart_ratio = (traintest_concat.group_by("device_type")
            .agg(pl.col("add_to_cart").mean().alias("device_cart_ratio").cast(pl.Float32))
        )
        traintest_concat = traintest_concat.join(device_cart_ratio, on="device_type", how="left")
        # Calculate country cart addition ratio
        country_cart_ratio = (traintest_concat.group_by("country")
            .agg(pl.col("add_to_cart").mean().alias("country_cart_ratio").cast(pl.Float32))
        )
        traintest_concat = traintest_concat.join(country_cart_ratio, on="country", how="left")
        
        # Calculate cumulative user features
        traintest_concat = traintest_concat.with_columns([
                pl.col("add_to_cart").cum_sum().over("user_id").alias("user_previous_cart_additions"),
                pl.col("user_id").cum_count().over("user_id").alias("user_previous_interactions")
            ])
        
        # Split back into train and test based on flag
        train_processed = traintest_concat.filter(pl.col("flag") == 1).drop("flag")
        test_processed = traintest_concat.filter(pl.col("flag") == 0).drop(["flag", "add_to_cart"])
        
        return train_processed, test_processed

    def feature_engineering_no_candidate_dependent(self, df: pl.DataFrame) -> pl.DataFrame:

        # Cleaning and sorting
        #df_ = (df.drop("date")
        #    .sort("timestamp_local")
        #)

        df_ = df.with_columns([
            # Calculate the difference in timestamps within each session
            (pl.col("timestamp_local").diff().over("session_id").cast(pl.Float32) / 1_000_000)
            .round(1).alias("seconds_since_last_interaction"),
            # Total session duration in seconds
            ((pl.col("timestamp_local").max() - pl.col("timestamp_local").min()).over("session_id").cast(pl.Float32) / 1_000_000)
            .round(1).alias("total_session_time"),
        ]).fill_null(strategy="zero")

        df_ = df_.with_columns([
                    pl.col("seconds_since_last_interaction").shift(-1).over("session_id").alias("interaction_length"),
        ]).fill_null(strategy="zero")

        # Date features
        df_ = df_.with_columns([
            # Extracting day number
            pl.col("timestamp_local").dt.day().alias("day_number"),
            
            # Extracting weekday number
            pl.col("timestamp_local").dt.weekday().alias("weekday_number"),
            
            # Extracting weekday name
            # pl.col("timestamp_local").dt.strftime("%A").alias("weekday_name").cast(pl.Categorical),
            
            # Extracting hour
            pl.col("timestamp_local").dt.hour().alias("hour")
        ])
        
        df_ = df_.with_columns([
                        pl.when((pl.col("hour") >= 6) & (pl.col("hour") < 12)).then(pl.lit(0))
                        .when((pl.col("hour") >= 12) & (pl.col("hour") < 18)).then(pl.lit(1))
                        .when((pl.col("hour") >= 18) & (pl.col("hour") < 24)).then(pl.lit(2))
                        .otherwise(pl.lit(3))
                        .cast(pl.UInt8)
                        .alias("day_frame")
                    ])
        # Others
        df_ = df_.with_columns(pl.col('timestamp_local').count().over('session_id').alias('total_session_interactions'))
        
        return df_
    
    def select_global_candidates(self, train_df):

        if self.topN_global:
            cart_counts = (train_df
                .group_by('partnumber')
                .agg(pl.col('add_to_cart').count())
                .sort('add_to_cart', descending=True)
                )
            
            top_global_prods = cart_counts['partnumber'].head(self.topN_global).to_list()
    
            return top_global_prods
        return []

    def select_country_candidates(self, train_df):
        country_top_products = (train_df
                              .group_by(["country", "partnumber"])  # Group by country and product
                              .agg([
                                  pl.col("add_to_cart").sum().alias("total_add_to_cart")  # Sum of add_to_cart for popularity
                              ])
                              .sort(by=["country", "total_add_to_cart"], descending=True)  # Sort by country and popularity
                              .group_by("country")
                              .agg([
                                  pl.col("partnumber").head(self.topN_countries).alias("top_partnumbers")  # Top N products per country
                              ])
                         )

        exploded_country_products = (country_top_products.explode("top_partnumbers").rename({"top_partnumbers": "partnumber"})
                .with_columns(pl.lit(1).alias("synthetic_candidate").cast(pl.Int64)))

        return exploded_country_products

    def select_kwown_user_candidates(self, train_df):

        kwown_users_top = (train_df
                            .filter((pl.col("user_id") != -1) & (pl.col("add_to_cart") == 0))
                            .select(['user_id', 'partnumber', 'product_interaction_count', 'interaction_length'])
                            .sort(by=['product_interaction_count', 'interaction_length'], descending=[True, True])
                            .group_by("user_id")
                                  .agg([
                                      pl.col("partnumber").head(self.topN_known_users).alias("top_partnumbers")  # Top N interacted products for known users
                                  ])
        )
        
        kwown_user_candidates = kwown_users_top.explode("top_partnumbers").rename({"top_partnumbers": "partnumber"}).with_columns(pl.lit(1).alias("synthetic_candidate").cast(pl.Int64))

        return kwown_user_candidates

    def select_topsimilar_candidates(self):

        top10_sim_df = pl.read_parquet(self.prod_sim_top_df_path)
    
        if self.filter_similar:
            top10_sim_df = top10_sim_df.with_columns(pl.col('top_10_cos_partnumber').list.head(self.filter_similarN))
    
        top10_sim_df = (top10_sim_df.explode("top_10_cos_partnumber")
                    .with_columns(pl.col('partnumber').cast(pl.UInt16))
                    .filter(~(pl.col('partnumber')==pl.col('top_10_cos_partnumber'))) #There are some similar to itself
                   )
        
        return top10_sim_df.with_columns(pl.lit(1).alias("synthetic_candidate").cast(pl.Int64))

    def add_candidates_to_test(self, test_df: pl.DataFrame, global_candidate_products: list,
                           exploded_country_products: pl.DataFrame,
                          kwown_user_candidates: pl.DataFrame,
                          top10_sim_df: pl.DataFrame) -> pl.DataFrame:
        """Adds candidate products as new rows for each session in test data"""   
        
        ###################
        ## GLOBAL ########
        print(f"Adding {len(global_candidate_products)} global propular products to test data...")
        if self.topN_global:            
            test_session_ids = test_df["session_id"].unique()
    
            # Create a dataframe with all the candidate products for each session
            candidates = [
                {"session_id": sid, "partnumber": pn, "synthetic_candidate": 1}
                for sid in test_session_ids 
                for pn in global_candidate_products
            ]
            candidates_df = pl.DataFrame(candidates).with_columns([
                pl.col("session_id").cast(pl.UInt32),
                pl.col("partnumber").cast(pl.UInt16)
            ])
            
            # Add the candidate products to the test data
            test_extended = pl.concat([test_df, candidates_df], how="diagonal").unique()

        else:
            test_extended = test_df.clone()
        #####################
        ## COUNTRIES ########
        print(f"Adding {self.topN_countries} country popular products to test data...")
        # Extract unique session_id and country combinations from the test data
        session_country_mapping = test_df.select(["session_id", "country"]).unique()
    
        # Perform a cross-join to generate all combinations of session_id and country products
        country_session_combinations = (
            session_country_mapping.join(exploded_country_products, on="country", how="inner")
        )

        test_extended = pl.concat([test_extended, country_session_combinations], how="diagonal").unique()

        #######################
        ## KNOWN USERS ########
        print(f"Adding {self.topN_known_users} candidate products for known users to test data...")
        test_users = test_df.select(["session_id", "user_id"]).unique()
        test_user_candidates_combination = test_users.join(kwown_user_candidates, on="user_id", how="inner")
        test_extended = pl.concat([test_extended, test_user_candidates_combination], how="diagonal").unique()

        ###############################
        ## TOP SIMILAR TO TEST ########
        print(f"Adding {10 if not self.filter_similar else self.filter_similarN} candidate products to test data...")
        test_in_products = test_df.select(["session_id", "partnumber"]).unique()
    
        sessions_partnumber_combinations = (test_in_products.join(top10_sim_df, on="partnumber", how="inner")
                                            .with_columns(pl.concat_str(
                                                    [pl.col('session_id'),
                                                     pl.col('partnumber')],
                                                    ).alias("tmp_key").cast(pl.Int64))
                                   ).drop('partnumber').rename({'top_10_cos_partnumber':'partnumber'})
        #Create temporal merging key
        test_extended = (test_extended.with_columns(pl.concat_str(
                        [pl.col('session_id'),
                         pl.col('partnumber')],
                        ).alias("tmp_key").cast(pl.Int64)))

        test_extended = pl.concat([test_extended, sessions_partnumber_combinations], how="diagonal").unique().drop('tmp_key')
         
        return test_extended.unique() #Drops candidates added more than once

    def fill_test_nulls_candidates(self, test_extended: pl.DataFrame) -> pl.DataFrame:
    
        #Fill missing values with generated: Faster version
        backfilled_cols = ['timestamp_local',
                            'user_id',
                            'country',
                            'device_type',
                            'pagetype',
                            'total_session_time',
                            'day_number',
                            'weekday_number',
                            # 'weekday_name',
                            'hour',
                            'day_frame']
        
        statsfilled_cols = ['seconds_since_last_interaction',
                            'interaction_length']
        
        test_extended = (test_extended
                        .sort(["session_id", "timestamp_local"], descending=[False, True])
                        .with_columns([
                            pl.col(col).fill_null(strategy='backward').over("session_id") 
                            for col in backfilled_cols
                        ])
                        .with_columns([
                            pl.col(col).fill_null(strategy='mean').over("session_id") 
                            for col in statsfilled_cols
                        ])
                    )
        return test_extended
    
    def feature_engineering_candidate_dependent(self, df: pl.DataFrame) -> pl.DataFrame:
    
        df = df.with_columns([
            # Assign a cumulative count for each partnumber within a session
            pl.col("partnumber").cum_count().over(["session_id", "partnumber"]).alias("product_interaction_count")
        ])

        # Fill final nulls and correct issues
        remain_nulls = [
            'known_user',
            'page_cart_ratio',
            'device_cart_ratio',
            'country_cart_ratio',
            'user_previous_cart_additions',
            'user_previous_interactions',
            'total_session_interactions'
        ]
        
        df = df.with_columns(
            pl.col(col).fill_null(strategy='max').over("session_id") 
                                    for col in remain_nulls
        )

        df = df.with_columns([
            pl.when(pl.col("known_user") == -1)
                .then(pl.lit(-1))
                .otherwise(pl.col("user_previous_cart_additions"))
                .alias("user_previous_cart_additions").cast(pl.Int16),
            pl.when(pl.col("known_user") == -1)
                .then(pl.lit(-1))
                .otherwise(pl.col("user_previous_interactions"))
                .alias("user_previous_interactions").cast(pl.Int16),    
        ])
        
        return df

    def merge_datasets(self, df, products_df, users_df):
        df = (df
              .drop('timestamp_local')
              .join(products_df.drop('embedding'), on='partnumber', how="left"))
        df = df.join(users_df, on='user_id', how="left")
        return df
    
    def run(self, process_train=True):
        """
        Processes train and test data, with options to skip training data processing
        and dynamically update candidates for the test dataset.
        
        Args:
            process_train (bool): Whether to process the training data.
                
        Returns:
            train_eng (pl.DataFrame): Processed training data (if process_train is True).
            test_extended (pl.DataFrame): Processed and candidate-augmented test data.
        """
        # Load data
        print("Loading train and test data...")
        train, test = self.load_data()
        # train, test = self.load_data_lazy()
        print("Imputing missing values...")
        train = self.impute_train_test(train)
        test = self.impute_train_test(test)
        print("Creating concatenated features...")
        train, test = self.create_concatenated_features(train, test)

        if process_train:
            print("Processing train data...")
            train_eng = self.feature_engineering_no_candidate_dependent(train)
            train_eng = self.feature_engineering_candidate_dependent(train_eng)

            #######################
            ### IF LAZY LOADING ###
            #######################
            # train_eng = train_eng.collect()
        else:
            print("Skipping train processing. Loading preprocessed data...")
            train_eng = pl.read_parquet(self.save_train_path)

        # Test processing
        print("Processing test data...")
        test_eng = self.feature_engineering_no_candidate_dependent(test)

        # Candidate selection
        print("Selecting candidates...")
        global_candidates = self.select_global_candidates(train_eng)
        exploded_country_products = self.select_country_candidates(train_eng)
        kwown_user_candidates = self.select_kwown_user_candidates(train_eng)
        top10_sim_df = self.select_topsimilar_candidates()

        # Add candidates to test
        #######################
        ### IF LAZY LOADING ###
        #######################
        # test_eng = test_eng.collect()

        test_extended = self.add_candidates_to_test(test_df = test_eng,
                                                    global_candidate_products = global_candidates,
                                                    exploded_country_products = exploded_country_products,
                                                    kwown_user_candidates = kwown_user_candidates,
                                                    top10_sim_df = top10_sim_df)
        print("Test set extended.")
        test_extended = self.fill_test_nulls_candidates(test_extended)
        test_extended = self.feature_engineering_candidate_dependent(test_extended)
        print("Filling candidate nulls and generating last features...")
        
        # Merge with users and products
        print("Merging with users and products data...")
        final_train = self.merge_datasets(train_eng, self.products_df, self.users_df)
        final_test = self.merge_datasets(test_extended, self.products_df, self.users_df)

        print("Saving final data to parquet...")
        print("Final size of test data:", len(final_test))
        final_test.write_parquet(self.save_test_path)
        if process_train:
            print("Saving parquet file...")
            final_train.write_parquet(self.save_train_path)
            
        print("Data processing completed.")
        
        return final_train, final_test

In [7]:
# Configurable options
PROCESS_PRODS = True
PROCESS_TRAIN = True 
SAMPLE_TRAIN = False ################## SAMPLING
PROCESS_USERS = True

TOP_N_GLOBAL = None  # Number of global most popular candidates
TOP_N_COUNTRIES = 5
TOP_N_KNOWN_USERS = 5
REDUCE_SIMILAR_N = True
TOP_N_SIMILAR = 5

def main():
    if PROCESS_PRODS:
        print("Processing product data...")
        products = PRODUCTS(PRODUCTS_PARQUET_PATH_IMPUTED, 
                            PRODUCTS_ENGINEERED_PATH)
        products_df = products.run()

    if PROCESS_USERS:
        print("Processing user data...")
        users = USERS(USERS_DATA_PATH, USERS_ENGINEERED_PATH)
        users_df = users.run()

    with StringCache():
        # Initialize processor
        traintest_processor = TRAIN_TEST(
            sampling=SAMPLE_TRAIN, 
            train_path=TRAIN_PARQUET_PATH, 
            test_path=TEST_PARQUET_PATH,
            save_train_path=TRAIN_ENGINEERED_PATH,
            save_test_path=TEST_ENGINEERED_PATH,
            topN_global=TOP_N_GLOBAL, topN_countries=TOP_N_COUNTRIES, topN_known_users=TOP_N_KNOWN_USERS,
            users_df=users_df,
            products_df=products_df,
            prod_sim_top_df_path=TOP_SIM_PRODUCTS, filter_similar=REDUCE_SIMILAR_N, filter_similarN=TOP_N_SIMILAR
        )

        # Run processing
        train, test = traintest_processor.run(
            process_train=PROCESS_TRAIN, 
        )
    return train, test
    
train, test = main()

Processing product data...
Processing user data...
Loading train and test data...
Imputing missing values...
Creating concatenated features...
Processing train data...
Processing test data...
Selecting candidates...
Adding 0 global propular products to test data...
Adding 5 country popular products to test data...
Adding 5 candidate products for known users to test data...
Adding 5 candidate products to test data...
Test set extended.
Filling candidate nulls and generating last features...
Merging with users and products data...
Saving final data to parquet...
Final size of test data: 187613
Saving parquet file...
Data processing completed.


### Predict and export predictions¶


In [None]:
import lightgbm as lgb

model = lgb.Booster(model_file=MODEL_PATH)

In [9]:
feature_cols = [
 #'partnumber',
 'pagetype',
 'known_user',
 'device_type',
 'country',
 'page_cart_ratio',
 'device_cart_ratio',
 'country_cart_ratio',
 'user_previous_cart_additions',
 'user_previous_interactions',
 'seconds_since_last_interaction',
 'total_session_time',
 'interaction_length',
 'day_number',
 'weekday_number',
 'hour',
 'day_frame',
 'total_session_interactions',
 'product_interaction_count',
 'discount',
 'color_id',
 'cod_section',
 'family',
 'color_family_frequency',
 'family_discount_rate',
 'section_discount_rate',
 'family_unique_colors',
 'user_country',
 'R',
 'F',
 'M',
 'avg_value_per_purchase',
 'purchase_rate',
 'spend_rate_per_day',
 'total_value_frequency',
 'avg_days_between_purchases',
 'country_avg_monetary',
 'country_avg_frequency',
 'country_avg_recency',
 'relative_monetary_value',
 'relative_frequency',
 'relative_recency',
 'is_high_value_incountry',
 'is_frequent_buyer_incountry',
 'is_recent_customer_incountry',
 'is_high_value',
 'is_frequent_buyer',
 'is_recent_customer',
 'r_segment',
 'f_segment',
 'm_segment',
 'r_rank_in_country',
 'f_rank_in_country',
 'm_rank_in_country'
]

In [10]:
def predict(test_df, feature_cols, model):
    """
    Hace predicciones en nuevos datos
    """
    test_pandas = test.to_pandas() # Just in case to retain order
    predictions = model.predict(test_pandas[feature_cols])
    
    # Devolver los top 5 productos por sesión
    test_pandas['predicted_score'] = predictions
    
    recommendations = []
    for session_id in test_pandas['session_id'].unique():
        session_preds = test_pandas[test_pandas['session_id'] == session_id]
        top_5 = session_preds.nlargest(5, 'predicted_score')[['session_id', 'partnumber', 'predicted_score']]
        recommendations.append(top_5)
        
    return pd.concat(recommendations)

recommendations = predict(test, feature_cols, model)
print(recommendations.head())

    session_id  partnumber  predicted_score
40         746       13295         2.952565
11         746       26738         2.825188
10         746       25688         2.762497
14         746        9145         2.741379
8          746        3061         2.735885


In [11]:
import json
from datetime import datetime

now = datetime.now().strftime("ene%d-%H%Mhs")
file_tag = f"{TOP_N_GLOBAL}-{TOP_N_COUNTRIES}-{TOP_N_KNOWN_USERS}_{REDUCE_SIMILAR_N}"

# Group by session_id and convert partnumbers to a list
grouped = recommendations.groupby('session_id')['partnumber'].apply(list).to_dict()

# Build final JSON dict (session IDs as strings are more standard in JSON)
final_json = {
    "target": {
        str(sess_id): parts for sess_id, parts in grouped.items()
    }
}

with open(f'predictions_3_{now}_{file_tag}.json', 'w') as f:
    json.dump(final_json, f, indent=4)