#### Lib Imports

In [2]:
import warnings
warnings.filterwarnings("ignore")
import pandas as pd
import numpy as np
from typing import List
from datetime import datetime
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer
from sklearn.preprocessing import StandardScaler

#### Data Import

In [None]:
treino = pd.read_parquet('local/treino.parquet')
treino.head()

#### Pipeline Functions

In [4]:
def split_multivalued_df(df: pd.DataFrame, split_columns: list) -> pd.DataFrame:
    df[split_columns] = df[split_columns].apply(lambda col: col.str.split(','))
    expanded_df = df.explode(split_columns, ignore_index=True)
    return expanded_df


def drop_columns(df: pd.DataFrame, columns_to_drop: list) -> pd.DataFrame:
    dropped_df = df.drop(columns=columns_to_drop, axis=1)
    return dropped_df


def set_time_base_features(df:pd.DataFrame) -> pd.DataFrame:
    decay_rate = 0.0001
    df['timestamp'] = pd.to_datetime(df['timestampHistory'], unit='ms')
    max_date = df['timestamp'].max()
    df['timeOnPageHistory'] = pd.to_numeric(df['timeOnPageHistory'])
    df['days_since_click'] = (max_date - df['timestamp']).dt.days
    df['day_of_week'] = df['timestamp'].dt.day_name()
    df['hour_of_day'] = df['timestamp'].dt.hour
    df['time_normalized'] = df['days_since_click'] / df['days_since_click'].max()
    df['time_decay_weight'] = np.exp(-decay_rate * df['time_normalized'])
    df['time_on_page_minutes'] = df['timeOnPageHistory'] / 60000
    return df


def calculate_engagement_score(df:pd.DataFrame) -> pd.DataFrame:
    df['numberOfClicksHistory'] = pd.to_numeric(df['numberOfClicksHistory'])
    df['scrollPercentageHistory'] = pd.to_numeric(df['scrollPercentageHistory'])
    df['pageVisitsCountHistory'] = pd.to_numeric(df['pageVisitsCountHistory'])
    df['time_on_page_minutes'] = pd.to_numeric(df['time_on_page_minutes'])
    df['time_decay_weight'] = pd.to_numeric(df['time_decay_weight'])
    df['engagement_score'] = (
        df['numberOfClicksHistory'] * 0.4 +
        df['scrollPercentageHistory'] * 0.2 +
        df['pageVisitsCountHistory'] * 0.2 +
        df['time_on_page_minutes'] * 0.1 +
        df['time_decay_weight'] * 0.1
    )
    return df


def get_engagement_score_with_PCA(df:pd.DataFrame, interaction_features: list) -> pd.DataFrame:
    df['days_since_click'] = (df['days_since_click'] * -1)
    df['time_decay_weight'] = (df['time_decay_weight'] * -1)
    scaler = StandardScaler()
    X = df[interaction_features]
    scaled_X = scaler.fit_transform(X)
    pca = PCA(n_components=1)
    pca_result = pca.fit_transform(scaled_X)
    df['engagement_score_pca'] = pca_result
    return df


def initial_adjusts(df:pd.DataFrame, )->pd.DataFrame:
    df['userId'] = df['userId'].str.replace(r"\s+", "", regex=True)
    df['history'] = df['history'].str.replace(r"\s+", "", regex=True)
    df['timestampHistory'] = df['timestampHistory'].str.replace(r"\s+", "", regex=True)
    return df

# I suggest mantain final adjusted column without duplicate name, only 'history' because the same pipeline could provide users.parquet file too.
# Same observation for column timestampHistory.
def initial_validation_adjusts(df:pd.DataFrame)->pd.DataFrame:
    df['history'] = df['history'].str.strip()
    df['history_adjusted'] = df['history'].str.replace(' \n ', ',', regex=False)
    df['history_adjusted'] = df['history_adjusted'].str.replace(r"\s+", "", regex=True) 
    df['timestampHistory'] = df['timestampHistory'].str.strip()
    df['timestampHistory_adjusted'] = df['timestampHistory'].str.replace(' ', ',', regex=False)
    df['timestampHistory_adjusted'] = df['timestampHistory_adjusted'].str.replace(r"\s+", "", regex=True) 
    return df


def convert_to_timestamp(df: pd.DataFrame, field: str) -> pd.DataFrame:
    df[field] = pd.to_datetime(df[field], unit='ms')
    return df

#### Pipeline

In [5]:
## User Pipeline Variables
split_columns = [
    'history', 
    'timestampHistory', 
    'numberOfClicksHistory', 
    'timeOnPageHistory', 
    'scrollPercentageHistory', 
    'pageVisitsCountHistory', 
    'timestampHistory_new'
]

columns_to_drop = [
    'userType',
    'historySize', 
    'timestampHistory_new', 
    'timestampHistory', 
    'timeOnPageHistory', 
    'numberOfClicksHistory', 
    'timeOnPageHistory', 
    'scrollPercentageHistory', 
    'pageVisitsCountHistory',
    'timestamp',
    'days_since_click',
    'day_of_week',
    'hour_of_day',
    'time_normalized',
    'time_decay_weight',
    'time_on_page_minutes'
]

interaction_features = [
    'numberOfClicksHistory', 
    'timeOnPageHistory', 
    'scrollPercentageHistory', 
    'pageVisitsCountHistory', 
    'time_on_page_minutes', 
    'time_decay_weight', 
    'days_since_click'
]

validation_split_columns = [
    'history_adjusted',
    'timestampHistory_adjusted'
]

validation_drop_columns = [
    'timestampHistory',
    'history'
]

validacao_time_field = 'timestampHistory_adjusted'

In [6]:
users_pipeline = Pipeline(
    steps=[
        ('initial_adjustments_in_train', FunctionTransformer(initial_adjusts)),
        ('split_multivalued_df', FunctionTransformer(
            split_multivalued_df, 
            kw_args={'split_columns': split_columns})),
        ('create_time_features', FunctionTransformer(set_time_base_features)),
        ('set_engagement_score_with_pca', FunctionTransformer(
            get_engagement_score_with_PCA, 
            kw_args={'interaction_features': interaction_features})),
        ('set_engagement_score_with_formula', FunctionTransformer(calculate_engagement_score)),
        ('drop_columns', FunctionTransformer(drop_columns, kw_args={'columns_to_drop': columns_to_drop}))
    ]
)

validation_pipeline = Pipeline(
    steps=[
        ('initial_adjustments_in_validation', FunctionTransformer(initial_validation_adjusts)),
        ('split_multivalued_columns', FunctionTransformer(
            split_multivalued_df,
            kw_args={'split_columns': validation_split_columns})),
        ('convert_timestamp_field', FunctionTransformer(
            convert_to_timestamp,
            kw_args={'field': validacao_time_field})),
        ('drop_columns', FunctionTransformer(drop_columns, kw_args={'columns_to_drop': validation_drop_columns}))
    ]
)

#### Pipeline Run

##### Test Run

In [None]:
df_test_pipeline = treino.head()
users_df = users_pipeline.transform(df_test_pipeline)
users_df.head()

##### PROD Run

In [None]:
users_df = users_pipeline.transform(treino)
users_df.head()

In [8]:
out_filepath = 'local/user_colab_filter.parquet'
users_df.to_parquet(out_filepath, index=False)

##### Users Validation

In [None]:
validacao = pd.read_parquet('local/validacao.parquet')
validacao.head()

In [None]:
validacao_df = validation_pipeline.transform(validacao)
validacao_df.head()

In [None]:
validacao_df.info()

In [None]:
itens = pd.read_parquet('local/itens_text_db_scan.parquet')
itens.head()

In [None]:
# strictly necessary before merge
itens['page'] = itens['page'].str.replace(r"\s+", "", regex=True)

In [None]:
merged_df = pd.merge(validacao_df, itens, how='left', left_on='history_adjusted', right_on='page')
merged_df.head()

In [None]:
merged_df.info()

In [None]:
print(178868 - 112184)

In [None]:
merged_df.tail()

In [None]:
print(merged_df['userId'][178863])

In [None]:
users_df['history'] = users_df['history'].str.strip()
users_df.head()

In [None]:
merged_train_df = pd.merge(users_df, itens, how='left', left_on='history', right_on='page')
merged_train_df.head()

In [None]:
grouped_train = merged_train_df.groupby(['userId', 'classes'])['engagement_score'].mean().reset_index()
grouped_train.head()

In [None]:
filtered_df = grouped_train[grouped_train['userId'] == 'aacb28d7d2a4ea745e12ceba1f9ffa0c7b92aae9304ce51f7f66044a927bdbaa']
filtered_df.head()


In [None]:
filtered_validation_df = merged_df[['userId', 'page', 'classes']][merged_df['userId'] == 'aacb28d7d2a4ea745e12ceba1f9ffa0c7b92aae9304ce51f7f66044a927bdbaa']
filtered_validation_df