In [1]:
import gc
import cupy as cp
import numpy as np 
from pathlib import Path

import cudf
import nvtabular as nvt
from merlin.dag import ColumnSelector
from merlin.schema import Schema, Tags

import dask.dataframe as dd
from dask_cuda import LocalCUDACluster
from dask.distributed import Client

  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")


In [2]:
cluster = LocalCUDACluster(
    n_workers=1,                       
    device_memory_limit="18GB",        
    local_directory="/nvme/scratch/",
)
client = Client(cluster)

2024-12-29 19:07:47,365 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2024-12-29 19:07:47,365 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize


In [3]:
SESSIONS_MAX_LENGTH = 20 
MINIMUM_SESSION_LENGTH = 2

In [4]:
item_id = ['aid'] >> nvt.ops.TagAsItemID()
event_type =  ['type'] >> nvt.ops.AddMetadata(tags=[Tags.ITEM])
cat_feats = item_id + ['type'] >> nvt.ops.Categorify()

In [5]:
session_ts = ['ts']

session_time = (
    session_ts >>
    nvt.ops.LambdaOp(lambda col: cudf.to_datetime(col, unit='ms')) >>
    nvt.ops.Rename(name='ts_dt')
)

sessiontime_weekday = (
    session_time >>
    nvt.ops.LambdaOp(lambda col: col.dt.weekday) >>
    nvt.ops.Rename(name='ts_dow')
)

In [6]:
def get_cycled_feature_value_sin(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_sin = np.sin(2*np.pi*value_scaled)
    return value_sin

def get_cycled_feature_value_cos(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_cos = np.cos(2*np.pi*value_scaled)
    return value_cos

In [7]:
weekday_sin = (
    sessiontime_weekday >>
    (lambda col: get_cycled_feature_value_sin(col + 1, 7)) >>
    nvt.ops.Rename(name='t_dow_sin') >>
    nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])
)
weekday_cos = (
    sessiontime_weekday >>
    (lambda col: get_cycled_feature_value_cos(col + 1, 7)) >>
    nvt.ops.Rename(name='t_dow_cos') >>
    nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])
)

In [8]:
class ItemRecency(nvt.ops.Operator):
    def transform(self, columns, gdf):
        for column in columns.names:
            col = gdf[column]
            item_first_timestamp = gdf['prod_first_ts']
            delta_days = (col - item_first_timestamp) / (60*60*24)
            gdf[column + "_age_days"] = delta_days * (delta_days >=0)
        return gdf

    def compute_selector(
        self,
        input_schema: Schema,
        selector: ColumnSelector,
        parents_selector: ColumnSelector,
        dependencies_selector: ColumnSelector,
    ) -> ColumnSelector:
        self._validate_matching_cols(input_schema, parents_selector, "computing input selector")
        return parents_selector

    def column_mapping(self, col_selector):
        column_mapping = {}
        for col_name in col_selector.names:
            column_mapping[col_name + "_age_days"] = [col_name]
        return column_mapping

    @property
    def dependencies(self):
        return ["prod_first_ts"]

    @property
    def output_dtype(self):
        return np.float64

In [9]:
recency_feats = ['ts'] >> ItemRecency()
recency_feats_norm = (
    recency_feats >>
    nvt.ops.LogOp() >>
    nvt.ops.Normalize(out_dtype=np.float32) >>
    nvt.ops.Rename(name='product_recency_day_log_norm')
)

In [10]:
time_features = (
    session_time +
    sessiontime_weekday +
    weekday_sin +
    weekday_cos +
    recency_feats_norm
)

In [11]:
groupby_feats = ['ts', 'session'] + cat_feats + time_features

In [12]:
groupby_features = groupby_feats >> nvt.ops.Groupby(
    groupby_cols=['session'],
    sort_cols=['ts'],
    aggs={
        'aid': ['list', 'count'],
        'type': ['list'],
        'ts': ['first'],
        'ts_dt': ['first'],
        't_dow_sin': ['list'],
        't_dow_cos': ['list'],
        'product_recency_day_log_norm': ['list']
    },
   name_sep="-"
)

In [13]:
groupby_features_list = groupby_features['aid-list', 'type-list', 't_dow_sin-list', 't_dow_cos-list', 'product_recency_day_log_norm-list']

In [14]:
groupby_features_trim = groupby_features_list >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH, pad=True)

In [15]:
day_index = (
    session_time >> 
    nvt.ops.LambdaOp(lambda col: ((col - col.min()).dt.days  + 1)) >> 
    nvt.ops.Rename(f=lambda col: "day_index") >>
    nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])
 )

In [16]:
sess_id = groupby_features['session']

selected_features = sess_id + groupby_features['aid-count'] + groupby_features_trim + day_index

In [17]:
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["aid-count"] >= MINIMUM_SESSION_LENGTH)

In [18]:
workflow = nvt.Workflow(filtered_sessions, client=client)



In [19]:
data_path = Path.cwd().parent / 'preprocess/data'
temp_path = Path.cwd() / 'data'
if not temp_path.is_dir():
    temp_path.mkdir()

## LB

In [20]:
lb_in = data_path / 'lb'
lb_out = temp_path / 'lb'
if not lb_out.is_dir():
    lb_out.mkdir()

In [21]:
train_path = lb_in / 'train_parquet'

data = []
for train_file in sorted(train_path.glob('*.parquet')):
    data.append(dd.read_parquet(train_file.as_posix()))

data = dd.concat(data)

item_first_interaction_df = data.groupby('aid').agg({'ts': 'min'}) \
    .reset_index().rename(columns={'ts': 'prod_first_ts'})
data = data.merge(item_first_interaction_df, on=['aid'], how='left').reset_index(drop=True)

del item_first_interaction_df
gc.collect()

dataset = nvt.Dataset(data, npartitions=4)
dataset.shuffle_by_keys('session')
del data
gc.collect()

412

In [22]:
%%time
workflow.fit_transform(dataset).to_parquet((lb_out / 'processed_nvt').as_posix())

del dataset
gc.collect()

  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")


CPU times: user 7.45 s, sys: 2.88 s, total: 10.3 s
Wall time: 6min 14s


2983

In [23]:
workflow.output_schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.freq_threshold,properties.max_size,properties.cat_path,properties.domain.min,properties.domain.max,properties.domain.name,properties.embedding_sizes.cardinality,properties.embedding_sizes.dimension,properties.value_count.min,properties.value_count.max
0,session,(),"DType(name='int64', element_type=<ElementType....",False,False,,,,,,,,,,,
1,aid-count,"(Tags.ID, Tags.ITEM, Tags.CATEGORICAL)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,.//categories/unique.aid.parquet,0.0,1855605.0,aid,1855606.0,512.0,,
2,aid-list,"(Tags.ID, Tags.ITEM, Tags.CATEGORICAL, Tags.LIST)","DType(name='int64', element_type=<ElementType....",True,False,,0.0,0.0,.//categories/unique.aid.parquet,0.0,1855605.0,aid,1855606.0,512.0,20.0,20.0
3,type-list,"(Tags.CATEGORICAL, Tags.LIST)","DType(name='int64', element_type=<ElementType....",True,False,,0.0,0.0,.//categories/unique.type.parquet,0.0,5.0,type,6.0,16.0,20.0,20.0
4,t_dow_sin-list,"(Tags.CONTINUOUS, Tags.LIST)","DType(name='int64', element_type=<ElementType....",True,False,,,,,,,,,,20.0,20.0
5,t_dow_cos-list,"(Tags.CONTINUOUS, Tags.LIST)","DType(name='int64', element_type=<ElementType....",True,False,,,,,,,,,,20.0,20.0
6,product_recency_day_log_norm-list,"(Tags.CONTINUOUS, Tags.LIST)","DType(name='float32', element_type=<ElementTyp...",True,False,,,,,,,,,,20.0,20.0
7,day_index,(Tags.CATEGORICAL),"DType(name='int64', element_type=<ElementType....",False,False,,,,,,,,,,,


In [24]:
workflow_path = lb_out / 'workflow_etl'
workflow.save(workflow_path.as_posix())

del workflow
gc.collect()

70

In [25]:
client.shutdown()
client.close()