In [4]:
SESSIONS_MAX_LENGTH = 5
MINIMUM_SESSION_LENGTH = 2


In [19]:
import os
import glob
import numpy as np
import gc
import pandas as pd
import cudf
import cupy
import nvtabular as nvt
from merlin.dag import ColumnSelector
from merlin.schema import Schema, Tags
from datetime import datetime
from merlin.core.dispatch import convert_data
import pyarrow
from numba import config
config.CUDA_LOW_OCCUPANCY_WARNINGS = 0


#### Define Data Input and Output Paths

In [6]:
DATA_FOLDER = "/cta/users/eboran/Tez/Workspace/end-to-end-session-based-movielens-pipeline/movielens20m"
FILENAME_PATTERN = 'manipulated_rating.csv'
DATA_PATH = os.path.join(DATA_FOLDER, FILENAME_PATTERN)

OUTPUT_FOLDER = "./movielens_transformed"
OVERWRITE = False

## Load and clean raw data

In [7]:
interactions_df = cudf.read_csv(DATA_PATH, sep=',', 
                           #     names=['userId','timestamp', 'movieId', 'rating'], 
                           #     dtype=['int', 'date', 'int', 'float64']
                               )

interactions_df["timestamp"] = cudf.to_datetime(interactions_df["timestamp"])

interactions_df = interactions_df.sort_values(['userId', 'timestamp']).reset_index(drop=True)

interactions_df = interactions_df.rename(columns ={"movieId":"item_id", "userId":"session_id"})

interactions_df.timestamp = interactions_df.timestamp.astype(int)

items_first_ts_df = interactions_df.groupby('item_id').agg({'timestamp': 'min'}).reset_index().rename(columns={'timestamp': 'itemid_ts_first'})
interactions_merged_df = interactions_df.merge(items_first_ts_df, on=['item_id'], how='left')
interactions_merged_df.to_parquet(os.path.join(DATA_FOLDER, 'interactions_merged_df.parquet'))
# free gpu memory
del interactions_df, items_first_ts_df
gc.collect()

0

In [8]:


main_cat_feats = ColumnSelector(['item_id', 'session_id','timestamp',]
                          ) >> nvt.ops.Categorify(start_index=1)


cat_feats = ColumnSelector([
                            '_genres', 'tag','genome_tag']
                          ) >> nvt.ops.Categorify(start_index=1)

con_feats = ColumnSelector(['rating','genome_relevance'])


session_ts = ColumnSelector(['timestamp'])
session_time = (
    session_ts >> 
    nvt.ops.LambdaOp(lambda col: cudf.to_datetime(col, unit='ns')) >> 
    nvt.ops.Rename(name = 'event_time_dt')
)

sessiontime_day = (
    session_time >> 
    nvt.ops.LambdaOp(lambda col: col.dt.day) >> 
    nvt.ops.Rename(name ='et_dayofday')
)

sessiontime_month = (
    session_time >> 
    nvt.ops.LambdaOp(lambda col: col.dt.month) >> 
    nvt.ops.Rename(name ='event_time_M')
)



sessiontime_weekday = (
    session_time >> 
    nvt.ops.LambdaOp(lambda col: col.dt.weekday) >> 
    nvt.ops.Rename(name ='et_dayofweek')
)

sessiontime_year = (
    session_time >> 
    nvt.ops.LambdaOp(lambda col: col.dt.year) >> 
    nvt.ops.Rename(name ='et_year')
)

def get_cycled_feature_value_sin(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_sin = np.sin(2*np.pi*value_scaled)
    return value_sin

weekday_sin = sessiontime_weekday >> (lambda col: get_cycled_feature_value_sin(col+1, 7)) >> nvt.ops.Rename(name = 'et_dayofweek_sin')
dayofday_sin = sessiontime_day >> (lambda col: get_cycled_feature_value_sin(col+1, 30)) >> nvt.ops.Rename(name = 'et_dayofday_sin')


class ItemRecency(nvt.ops.Operator):
    def transform(self, columns, gdf):
        for column in columns.names:
            col = gdf[column]
            item_first_timestamp = gdf['itemid_ts_first']
            delta_days = (col - item_first_timestamp) / (60*60*24)
            gdf[column + "_age_days"] = delta_days * (delta_days >=0)
        return gdf

    def compute_selector(
        self,
        input_schema: Schema,
        selector: ColumnSelector,
        parents_selector: ColumnSelector,
        dependencies_selector: ColumnSelector,
    ) -> ColumnSelector:
        self._validate_matching_cols(input_schema, parents_selector, "computing input selector")
        return parents_selector

    def column_mapping(self, col_selector):
        column_mapping = {}
        for col_name in col_selector.names:
            column_mapping[col_name + "_age_days"] = [col_name]
        return column_mapping

    @property
    def dependencies(self):
        return ["itemid_ts_first"]

    @property
    def output_dtype(self):
        return np.float64
    
recency_features = session_ts >> ItemRecency() 

recency_features_norm = recency_features >> nvt.ops.LogOp() >> nvt.ops.Normalize() >> nvt.ops.Rename(name='product_recency_days_log_norm')

time_features = (
    session_time +
    sessiontime_day + 
    sessiontime_month + 
    sessiontime_weekday +
    sessiontime_year +    
    weekday_sin + 
    dayofday_sin + 
    recency_features_norm 
)



In [9]:

features = main_cat_feats + cat_feats + con_feats + time_features


In [10]:

groupby_features = features >> nvt.ops.Groupby(
    groupby_cols=["session_id"], 
    aggs={
        'item_id': ["list", "count"],
        '_genres': ["list"], 
        'tag': ["list"],
        'genome_tag': ["list"],        
        'genome_relevance' : ['list'],
        
        'rating': ["list"],
        'timestamp': ["first"],
        'event_time_dt': ["first"],
        'event_time_M':['first','list'],
        'et_dayofweek_sin': ["list"],
        'et_dayofday_sin': ["list"],
        
        'et_dayofweek':['list','first'],
        'et_dayofday': ['list'],
        'product_recency_days_log_norm': ["list"],
        'et_year': ["list",'first','last'],
    },
    name_sep="-") >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])



In [11]:

groupby_features_list = groupby_features['item_id-list', 
                                         
                                         '_genres-list',
                                         'tag-list',
                                         'genome_tag-list',
                                         'genome_relevance-list',
                                         
                                         'rating-list', 

                                         'et_dayofweek_sin-list',                        
                                         'et_dayofday_sin-list', 
                                         'product_recency_days_log_norm-list',
                                         'et_year-list',
                                         'et_dayofweek-list',
                                         'et_dayofday-list',
                                         'event_time_M-list',
                                                                                                                                                                                                                                                                                                                         ]
groupby_features_list

<Node [('item_id-list', '_genres-list', 'tag-list', 'genome_tag-list', 'genome_relevance-list', 'rating-list', 'et_dayofweek_sin-list', 'et_dayofday_sin-list', 'product_recency_days_log_norm-list', 'et_year-list', 'et_dayofweek-list', 'et_dayofday-list', 'event_time_M-list')] output>

In [12]:

day_index_month = ((groupby_features['event_time_M-first'])  >> 
    nvt.ops.LambdaOp(
                     lambda col: col - col.min()
                    ) >> 
    nvt.ops.Rename(f = lambda col: "day_index_M")
)



In [13]:

day_index_year = ((groupby_features['et_year-first'])  >> 
    nvt.ops.LambdaOp(
                     lambda col: col - col.min()
                    ) >> 
    nvt.ops.Rename(f = lambda col: "day_index_year")
)



In [14]:
def list_files(startpath):
    """
    Util function to print the nested structure of a directory
    """
    for root, dirs, files in os.walk(startpath):
        level = root.replace(startpath, "").count(os.sep)
        indent = " " * 4 * (level)
        print("{}{}/".format(indent, os.path.basename(root)))
        subindent = " " * 4 * (level + 1)
        for f in files:
            print("{}{}".format(subindent, f))

**run all**

In [15]:

for SESSIONS_MAX_LENGTH in [SESSIONS_MAX_LENGTH]:

    print("SESSIONS_MAX_LENGTH:", SESSIONS_MAX_LENGTH)
    
    groupby_features_truncated = groupby_features_list >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH, pad=True) >> nvt.ops.Rename(postfix = '_seq')
    selected_features = groupby_features['session_id', 'item_id-count']   + day_index_year + day_index_month + groupby_features_truncated
    filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["item_id-count"] >= MINIMUM_SESSION_LENGTH) 
    
    convert_data(interactions_merged_df)

    dataset = nvt.Dataset(interactions_merged_df)

    workflow = nvt.Workflow(filtered_sessions)



    workflow.fit(dataset)
    
    sessions_gdf = workflow.transform(dataset).compute()
    workflow.save('workflow_etl')

    sessions_gdf["day_index"] = sessions_gdf["day_index_year"] * 12 + sessions_gdf["day_index_M"]  + 1


    output_dir = f"./preproc_sessions_by_day{SESSIONS_MAX_LENGTH}"


    from transformers4rec.data.preprocessing import save_time_based_splits
    
    save_time_based_splits(data=nvt.Dataset(sessions_gdf),
                           output_dir= output_dir,
                           partition_col='day_index',
                           timestamp_col='session_id', 
                          )


    dir_list = os.listdir(output_dir)

    dir_list_int = list()

    for _dir in dir_list:
        dir_list_int.append(int(_dir))

    dir_list_int = sorted(dir_list_int)

    for i, _dir in enumerate(dir_list_int):
        os.rename(f"{output_dir}/{_dir}",f"{output_dir}/{i+1}")


SESSIONS_MAX_LENGTH: 5
item_id-list
_genres-list
tag-list
genome_tag-list
genome_relevance-list
rating-list
et_dayofweek_sin-list
et_dayofday_sin-list
product_recency_days_log_norm-list
et_year-list
et_dayofweek-list
et_dayofday-list
event_time_M-list
item_id-list
_genres-list
tag-list
genome_tag-list
genome_relevance-list
rating-list
et_dayofweek_sin-list
et_dayofday_sin-list
product_recency_days_log_norm-list
et_year-list
et_dayofweek-list
et_dayofday-list
event_time_M-list
t4r core called from local package


Creating time-based splits: 100%|██████████| 231/231 [00:29<00:00,  7.73it/s]


Let's print the head of our preprocessed dataset. You can notice that now each example (row) is a session and the sequential features with respect to user interactions were converted to lists with matching length.

In [16]:
sessions_gdf

Unnamed: 0,session_id,item_id-count,day_index_year,day_index_M,item_id-list_seq,_genres-list_seq,tag-list_seq,genome_tag-list_seq,genome_relevance-list_seq,rating-list_seq,et_dayofweek_sin-list_seq,et_dayofday_sin-list_seq,product_recency_days_log_norm-list_seq,et_year-list_seq,et_dayofweek-list_seq,et_dayofday-list_seq,event_time_M-list_seq,day_index
0,2,9254,6,7,"[9084, 18166, 23971, 21863, 20211]","[3, 18, 2, 5, 4]","[2, 2, 2, 2, 2]","[300, 13, 13, 13, 13]","[0.99275, 0.0, 0.0, 0.0, 0.0]","[4.0, 3.5, 2.5, 3.0, 3.0]","[0.7818321, 0.7818321, 0.7818321, 0.7818321, -...","[0.9510564, 0.9510564, 0.9510564, 0.9510564, -...","[-0.9110680818557739, -15.817054748535156, -15...","[2013, 2013, 2013, 2013, 2013]","[0, 0, 0, 0, 5]","[8, 8, 8, 8, 15]","[7, 7, 7, 7, 6]",80
1,3,7515,11,9,"[217, 256, 210, 1881, 1959]","[3, 2, 2, 4, 12]","[2, 2, 2, 2, 2]","[46, 10, 31, 417, 115]","[0.99925, 0.9217500000000001, 0.998, 0.97975, ...","[3.0, 1.5, 3.5, 2.5, 1.5]","[0.7818321, 0.7818321, 0.7818321, 1.1285199e-0...","[0.20791137, 0.20791137, 0.20791137, 0.4067363...","[0.8084907531738281, 0.8047841787338257, 0.808...","[2006, 2006, 2006, 2006, 2006]","[0, 0, 0, 6, 6]","[13, 13, 13, 12, 12]","[11, 11, 11, 11, 11]",142
2,4,5646,6,8,"[1577, 3801, 8171, 8691, 5408]","[3, 10, 4, 3, 3]","[2, 2, 2, 2, 2]","[398, 260, 658, 353, 526]","[0.9844999999999999, 0.9510000000000001, 0.974...","[4.5, 4.5, 4.0, 4.0, 4.0]","[-0.9749281, -0.9749281, 0.7818321, 0.9749277,...","[-5.6425995e-07, -5.6425995e-07, -0.5877857, -...","[-1.5035436153411865, -2.422315835952759, 0.28...","[2005, 2005, 2005, 2005, 2005]","[4, 4, 0, 1, 5]","[14, 14, 17, 18, 15]","[10, 10, 10, 10, 10]",81
3,5,5520,7,6,"[7019, 10295, 17328, 12110, 8844]","[5, 4, 12, 4, 4]","[2, 2, 2, 2, 2]","[200, 179, 13, 13, 23]","[1.0, 0.88425, 0.0, 0.0, 0.99925]","[2.5, 3.0, 2.0, 2.0, 3.0]","[0.43388295, 0.43388295, -0.781831, -0.781831,...","[-0.8660257, -0.8660257, -0.99452186, -0.99452...","[-1.2354615926742554, -0.6409911513328552, -15...","[2012, 2012, 2012, 2012, 2012]","[2, 2, 5, 5, 3]","[19, 19, 22, 22, 30]","[9, 9, 9, 9, 8]",91
4,6,5491,5,7,"[4859, 11802, 7194, 8972, 5477]","[2, 3, 2, 10, 4]","[2, 2, 2, 2, 2]","[172, 13, 199, 13, 431]","[0.9804999999999999, 0.0, 0.9697500000000001, ...","[2.5, 2.5, 3.5, 3.0, 2.5]","[1.1285199e-06, 0.43388295, 0.43388295, 1.1285...","[0.587785, -5.6425995e-07, -5.6425995e-07, 0.5...","[-1.156063199043274, -3.8918070793151855, -3.0...","[2009, 2009, 2009, 2009, 2009]","[6, 2, 2, 6, 6]","[11, 14, 14, 11, 11]","[1, 1, 1, 1, 1]",68
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
138488,138490,20,2,3,"[17, 1964, 1798, 468, 1049]","[2, 5, 5, 5, 3]","[2, 2, 2, 2, 2]","[11, 227, 128, 88, 179]","[0.9944999999999999, 0.9035, 0.983, 0.97875, 0...","[5.0, 4.0, 5.0, 5.0, 4.0]","[-0.781831, -0.781831, -0.781831, -0.781831, -...","[0.40673634, 0.40673634, 0.40673634, 0.4067363...","[-0.9887669682502747, -0.5876731276512146, -0....","[1997, 1997, 1997, 1997, 1997]","[5, 5, 5, 5, 5]","[12, 12, 12, 12, 12]","[4, 4, 4, 4, 4]",28
138489,138491,20,1,8,"[5, 94, 86, 39, 111]","[6, 2, 5, 8, 4]","[2, 2, 2, 2, 2]","[15, 10, 55, 19, 74]","[0.99375, 0.99425, 0.98425, 0.988, 0.996499999...","[3.0, 1.0, 3.0, 3.0, 3.0]","[-0.9749281, -0.9749281, -0.9749281, -0.974928...","[0.20791137, 0.20791137, 0.20791137, 0.2079113...","[-0.9056805372238159, -0.9056783318519592, -0....","[1996, 1996, 1996, 1996, 1996]","[4, 4, 4, 4, 4]","[13, 13, 13, 13, 13]","[9, 9, 9, 9, 9]",21
138490,138492,20,13,11,"[347, 317, 357, 387, 345]","[3, 5, 2, 3, 2]","[2, 2, 2, 2, 2]","[2, 47, 4, 352, 37]","[0.9657500000000001, 0.99025, 0.99749999999999...","[3.5, 5.0, 4.0, 4.5, 3.0]","[0.9749277, 0.9749277, 0.9749277, 0.9749277, 0...","[0.5877854, 0.5877854, 0.5877854, 0.5877854, 0...","[0.8994638919830322, 0.8749988675117493, 0.909...","[2008, 2008, 2008, 2008, 2008]","[1, 1, 1, 1, 1]","[2, 2, 2, 2, 2]","[12, 12, 12, 12, 12]",168
138491,138493,20,1,11,"[73, 202, 7620, 2243, 12]","[9, 2, 2, 3, 5]","[2, 2, 2, 2, 2]","[156, 57, 754, 539, 63]","[0.98925, 0.981, 0.81125, 0.97325, 0.99925]","[4.0, 5.0, 5.0, 1.0, 3.0]","[0.7818321, 0.7818321, 0.7818321, 0.7818321, 0...","[-0.40673694, -0.40673694, -0.40673694, -0.406...","[-1.4216985702514648, -2.588221788406372, -2.5...","[1996, 1996, 1996, 1996, 1996]","[0, 0, 0, 0, 0]","[16, 16, 16, 16, 16]","[12, 12, 12, 12, 12]",24


**create the schema**

In [21]:
def get_min_max_of_the_feture(feature):
    min_ = 9999999999999999
    max_ = -9999999999999999
    
    for values in sessions_gdf[feature].to_arrow():
        if isinstance(values, pyarrow.lib.ListScalar):
            for element in values:
                e = element.as_py()
                if e > max_:
                    max_ = e
                if e <= min_:
                    min_ = e
        else:
            e = values.as_py()
            if e > max_:
                max_ = e
            if e <= min_:
                min_ = e
            
    return min_, max_
    
# get_min_max_of_the_feture("et_year-list_seq")

In [24]:
sessions_gdf.info()

<class 'cudf.core.dataframe.DataFrame'>
RangeIndex: 138493 entries, 0 to 138492
Data columns (total 18 columns):
 #   Column                                  Non-Null Count   Dtype
---  ------                                  --------------   -----
 0   session_id                              138493 non-null  int64
 1   item_id-count                           138493 non-null  int32
 2   day_index_year                          138493 non-null  int32
 3   day_index_M                             138493 non-null  int16
 4   item_id-list_seq                        138493 non-null  list
 5   _genres-list_seq                        138493 non-null  list
 6   tag-list_seq                            138493 non-null  list
 7   genome_tag-list_seq                     138493 non-null  list
 8   genome_relevance-list_seq               138493 non-null  list
 9   rating-list_seq                         138493 non-null  list
 10  et_dayofweek_sin-list_seq               138493 non-null  list
 11  et_da

In [22]:
for column in sessions_gdf.columns:
    #if column not in ["session_id","item_id-count" , "day_index",
    #                 "day_index_year","day_index_M","day_index"]:
    min_, max_ = get_min_max_of_the_feture(column)
    print(f"{column}: min: {min_} max: {max_}")

session_id: min: 2 max: 138494
item_id-count: min: 20 max: 9254
day_index_year: min: 0 max: 20
day_index_M: min: 0 max: 11
item_id-list_seq: min: 2 max: 26744
_genres-list_seq: min: 2 max: 20
tag-list_seq: min: 2 max: 17911
genome_tag-list_seq: min: 2 max: 812
genome_relevance-list_seq: min: 0.0 max: 1.0
rating-list_seq: min: 0.5 max: 5.0
et_dayofweek_sin-list_seq: min: -0.974928081035614 max: 0.9749277234077454
et_dayofday_sin-list_seq: min: -0.994521975517273 max: 0.9945219159126282
product_recency_days_log_norm-list_seq: min: -15.817054748535156 max: 1.1735230684280396
et_year-list_seq: min: 1996 max: 2015
et_dayofweek-list_seq: min: 0 max: 6
et_dayofday-list_seq: min: 1 max: 31
event_time_M-list_seq: min: 1 max: 12
day_index: min: 1 max: 243


In [25]:
# feature types
print("categoricals", cat_feats)
print("con_feats", con_feats)
print("time_features", time_features)


categoricals <Node Categorify>
con_feats <merlin.dag.selector.ColumnSelector object at 0x14ae713d4880>
time_features <Node + output>
