In [1]:
import os
import gc
import cupy
import cudf
import nvtabular as nvt
from merlin.dag import ColumnSelector
from merlin.schema import Tags

2023-09-20 18:47:09.608251: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
  warn(f"Triton dtype mappings did not load successfully due to an error: {exc.msg}")


In [2]:
from numba import config
config.CUDA_LOW_OCCUPANCY_WARNINGS = 0

In [3]:
DATA_FOLDER = os.environ.get("DATA_FOLDER", "./dataset")
FILENAME_PATTERN = 'yoochoose-clicks.dat'
DATA_PATH = os.path.join(DATA_FOLDER, FILENAME_PATTERN)

OUTPUT_FOLDER = "./yoochoose_transformed"
OVERWRITE = False

In [4]:
interactions_df = cudf.read_csv(DATA_PATH, sep=',', 
                                names=['session_id','timestamp', 'item_id'], 
                                dtype=['int', 'datetime64[s]', 'int'])

In [5]:
print("Count with in-session repeated interactions: {}".format(len(interactions_df)))

# Sorts the dataframe by session and timestamp, to remove consecutive repetitions
interactions_df.timestamp = interactions_df.timestamp.astype(int)
interactions_df = interactions_df.sort_values(['session_id', 'timestamp'])
past_ids = interactions_df['item_id'].shift(1).fillna()
session_past_ids = interactions_df['session_id'].shift(1).fillna()

# Keeping only no consecutive repeated in session interactions
interactions_df = interactions_df[~((interactions_df['session_id'] == session_past_ids) & (interactions_df['item_id'] == past_ids))]

print("Count after removed in-session repeated interactions: {}".format(len(interactions_df)))

Count with in-session repeated interactions: 33003944
Count after removed in-session repeated interactions: 28971543


In [6]:
items_first_ts_df = interactions_df.groupby('item_id').agg({'timestamp': 'min'}).reset_index().rename(columns={'timestamp': 'itemid_ts_first'})
interactions_merged_df = interactions_df.merge(items_first_ts_df, on=['item_id'], how='left')
print(interactions_merged_df.head())

   session_id   timestamp    item_id  itemid_ts_first
0        3456  1396794685  214706437       1396358113
1        3456  1396794755  214684715       1396339706
2        3456  1396794800  214711438       1396340076
3        3456  1396794841  214717507       1396328724
4        3456  1396794884  214695195       1396339742


In [7]:
if os.path.isdir(OUTPUT_FOLDER) == False:
    os.mkdir(OUTPUT_FOLDER)
interactions_merged_df.to_parquet(os.path.join(DATA_FOLDER, 'interactions_merged_df.parquet'))

In [8]:
# print the total number of unique items in the dataset
print(interactions_merged_df.item_id.nunique())

52739


In [9]:
# free gpu memory
del interactions_df, session_past_ids, items_first_ts_df
gc.collect()

204

In [10]:
# Define Groupby Operator
groupby_features = ColumnSelector(['session_id', 'timestamp', "item_id"]) >> nvt.ops.Groupby(
    groupby_cols=["session_id"], 
    sort_cols=["timestamp"],
    aggs={'item_id': ["list", "count"]},
    name_sep="-")

# Truncate sequence features to first interacted 20 items 
SESSIONS_MAX_LENGTH = 20 

item_feat = groupby_features['item_id-list'] >> nvt.ops.Categorify(out_path=OUTPUT_FOLDER) >> nvt.ops.TagAsItemID() >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH)

# tag session_id column for serving with legacy api
sess_id = groupby_features['session_id'] >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])

# Select features for training 
selected_features = sess_id + groupby_features['item_id-count'] + item_feat

# Filter out sessions with less than 2 interactions 
MINIMUM_SESSION_LENGTH = 2
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["item_id-count"] >= MINIMUM_SESSION_LENGTH)

In [11]:
dataset = nvt.Dataset(interactions_merged_df)
workflow = nvt.Workflow(filtered_sessions['session_id', 'item_id-list'])
workflow.fit_transform(dataset).to_parquet(os.path.join(OUTPUT_FOLDER, "processed_nvt"))
workflow.save(os.path.join(OUTPUT_FOLDER, "workflow_etl"))

In [12]:
SPLIT_DIR = os.path.join(OUTPUT_FOLDER, "split")
os.makedirs(SPLIT_DIR, exist_ok=True)

val_size = 0.1
train_size = 1.0 - val_size
sessions_gdf = cudf.read_parquet(os.path.join(OUTPUT_FOLDER, "processed_nvt", "part_0.parquet"))
random_values = cupy.random.rand(len(sessions_gdf))

train_set = sessions_gdf[random_values <= train_size]
train_set.to_parquet(os.path.join(SPLIT_DIR, "train.parquet"))

valid_set = sessions_gdf[random_values > train_size]
valid_set.to_parquet(os.path.join(SPLIT_DIR, "valid.parquet"))
valid_set.head()

Unnamed: 0,session_id,item_id-list
28,48,"[11549, 6358, 7549, 2106]"
46,77,"[658, 836, 2771, 1283]"
51,86,"[18401, 16041, 15462, 10767, 9360, 10118, 2263..."
68,111,"[906, 1892]"
95,154,"[496, 1204, 1412, 91]"


In [13]:
test_set = valid_set.to_pandas()
test_set['target'] = test_set['item_id-list'].apply(lambda arr: arr[-1])
test_set['item_id-list'] = test_set['item_id-list'].apply(lambda arr: arr[:-1])
test_set.to_parquet(os.path.join(SPLIT_DIR, "test.parquet"))
test_set.head()

Unnamed: 0,session_id,item_id-list,target
28,48,"[11549, 6358, 7549]",2106
46,77,"[658, 836, 2771]",1283
51,86,"[18401, 16041, 15462, 10767, 9360, 10118, 2263...",797
68,111,[906],1892
95,154,"[496, 1204, 1412]",91
