In [2]:
import os

import numpy as np
import pandas as pd

import nvtabular as nvt
from nvtabular.ops import *
from merlin.schema.tags import Tags
import cudf
import cupy
from merlin.schema import Schema
from merlin.io import Dataset
import datetime
from transformers4rec.utils.data_utils import save_time_based_splits
from nvtabular.workflow import Workflow

In [17]:
INPUT_DATA_DIR = "../../../data/train_10/"
TEST_INPUT_DATA_DIR = "../../../data/test_processed/"

In [9]:
behaviors_df = pd.read_parquet("gs://mp-ml-prod-image-encoder-777b-embeddings/recsys/all_10")

In [10]:
behaviors_df

Unnamed: 0,session_id,user_id,impression_time,read_time,article_id,is_premium,article_type,image_id,category,sentiment_score,topics_count,topic
0,1064485_session_25,1064485,2023-05-04 06:49:53,47.0,9748942,False,article_default,9748653.0,sport,0.9832,4,erhverv
1,1065388_session_52,1065388,2023-04-29 14:23:26,4.0,9742226,False,article_default,9742041.0,nyheder,0.9390,4,erhverv
2,2260198_session_18,2260198,2023-05-15 07:01:24,795.0,9764608,False,article_default,9764778.0,nyheder,0.8946,4,erhverv
3,1634216_session_13,1634216,2023-05-19 06:49:27,3.0,9772088,False,article_default,8057827.0,penge,0.5107,4,erhverv
4,1803938_session_10,1803938,2023-05-11 07:55:23,5.0,9759261,False,article_default,9756616.0,nyheder,0.7439,4,erhverv
...,...,...,...,...,...,...,...,...,...,...,...,...
24545882,261038_session_24,261038,2023-05-12 03:22:03,132.0,9761083,False,article_default,9761113.0,krimi,0.9973,2,kriminalitet
24545883,261542_session_14,261542,2023-05-09 08:34:01,6.0,9754087,False,article_scribblelive,9748448.0,krimi,0.9893,2,kriminalitet
24545884,262035_session_24,262035,2023-05-11 10:01:36,1232.0,9757920,False,article_default,9716376.0,krimi,0.9953,2,kriminalitet
24545885,262345_session_2,262345,2023-05-24 18:55:33,5.0,9779045,False,article_default,,nyheder,0.9831,2,kriminalitet


In [8]:
categ_feats = ['article_id', 'is_premium', 'article_type', 'category', 'topic'] >> nvt.ops.Categorify()

groupby_feats = categ_feats + ['session_id', 'impression_time', 'read_time', 'topics_count', 'sentiment_score']

groupby_features = groupby_feats >> nvt.ops.Groupby(
    groupby_cols=["session_id"],
    aggs={
        "article_id": ["list", "count"],
        "is_premium": ["list"],
        "article_type": ["list"],
        "category": ["list"],
        "topic": ["list"],
        "read_time": ["list"],
        "topics_count": ["list"],
        "sentiment_score": ["list"],
        "impression_time": ["min"],
        },
    name_sep="-")


sequence_features_item = (
    groupby_features['article_id-list']
    >> TagAsItemID()
    >> nvt.ops.AddMetadata(tags=[Tags.LIST, Tags.ITEM_ID])
)

sequence_features_cat = (
    groupby_features['is_premium-list', 'article_type-list', 'category-list', 'topic-list']
    >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL, Tags.LIST])
)
sequence_features_cont = (
    groupby_features['read_time-list', 'topics_count-list', 'sentiment_score-list']
    >> nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS, Tags.LIST])
)
day_index = ((groupby_features['impression_time-min'])  >>
             nvt.ops.LambdaOp(lambda col: (col - col.min()).dt.days +1) >>
             nvt.ops.Rename(f = lambda col: "day_index") >>
             nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])
            )

selected_features = (
    groupby_features['article_id-count', 'session_id'] +
    sequence_features_item +
    sequence_features_cat +
    sequence_features_cont +
    day_index
)

seq_feats_list = selected_features['article_id-list', 'is_premium-list', 'article_type-list', 'category-list', 'topic-list', 'read_time-list', 'topics_count-list', 'sentiment_score-list'] >>  nvt.ops.ValueCount()

workflow = nvt.Workflow(seq_feats_list+selected_features['session_id', 'day_index'])

dataset = nvt.Dataset(behaviors_df)
workflow.fit_transform(dataset).to_parquet(os.path.join(INPUT_DATA_DIR, "processed_nvt"))

  df = _general_concat(dfs, ignore_index=True)
  df = _general_concat(dfs, ignore_index=True)
  df = _general_concat(dfs, ignore_index=True)
  df = _general_concat(dfs, ignore_index=True)
  df = _general_concat(dfs, ignore_index=True)
  df = _general_concat(dfs, ignore_index=True)
  df = _general_concat(dfs, ignore_index=True)
  df = _general_concat(dfs, ignore_index=True)
  df = _general_concat(dfs, ignore_index=True)
  df = _general_concat(dfs, ignore_index=True)


In [16]:
workflow.output_schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.freq_threshold,properties.max_size,properties.cat_path,properties.domain.min,properties.domain.max,properties.domain.name,properties.embedding_sizes.cardinality,properties.embedding_sizes.dimension,properties.value_count.min,properties.value_count.max
0,article_id-list,"(Tags.LIST, Tags.ITEM, Tags.CATEGORICAL, Tags.ID)","DType(name='int64', element_type=<ElementType....",True,True,,0.0,0.0,.//categories/unique.article_id.parquet,0.0,24747.0,article_id,24748.0,462.0,2.0,20.0
1,is_premium-list,"(Tags.LIST, Tags.CATEGORICAL)","DType(name='int64', element_type=<ElementType....",True,True,,0.0,0.0,.//categories/unique.is_premium.parquet,0.0,4.0,is_premium,5.0,16.0,2.0,20.0
2,article_type-list,"(Tags.LIST, Tags.CATEGORICAL)","DType(name='int64', element_type=<ElementType....",True,True,,0.0,0.0,.//categories/unique.article_type.parquet,0.0,15.0,article_type,16.0,16.0,2.0,20.0
3,category-list,"(Tags.LIST, Tags.CATEGORICAL)","DType(name='int64', element_type=<ElementType....",True,True,,0.0,0.0,.//categories/unique.category.parquet,0.0,29.0,category,30.0,16.0,2.0,20.0
4,topic-list,"(Tags.LIST, Tags.CATEGORICAL)","DType(name='int64', element_type=<ElementType....",True,True,,0.0,0.0,.//categories/unique.topic.parquet,0.0,38.0,topic,39.0,16.0,2.0,20.0
5,read_time-list,"(Tags.LIST, Tags.CONTINUOUS)","DType(name='float64', element_type=<ElementTyp...",True,True,,,,,,,,,,2.0,20.0
6,topics_count-list,"(Tags.LIST, Tags.CONTINUOUS)","DType(name='int64', element_type=<ElementType....",True,True,,,,,,,,,,2.0,20.0
7,sentiment_score-list,"(Tags.LIST, Tags.CONTINUOUS)","DType(name='float64', element_type=<ElementTyp...",True,True,,,,,,,,,,2.0,20.0
8,session_id,(),"DType(name='object', element_type=<ElementType...",False,False,,,,,,,,,,,
9,day_index,(Tags.CATEGORICAL),"DType(name='int64', element_type=<ElementType....",False,False,,,,,,,,,,,


Save NVTabular workflow.

In [17]:
workflow.save(os.path.join(INPUT_DATA_DIR, "workflow_etl"))

# test set export

In [3]:
test_df = pd.read_parquet("gs://mp-ml-prod-image-encoder-777b-embeddings/recsys/test_final/")

In [None]:
test_df['impression_time'] = pd.to_datetime(test_df['impression_time'] / 1e9, unit='s') 
test_df.columns = ['session_id', 'user_id', 'impression_id', 'impression_time',
       'article_id', 'read_time', 'is_premium', 'image_id', 'article_type',
       'category', 'topic', 'topics_count', 'sentiment_score']

In [10]:
test_df

Unnamed: 0,session_id,user_id,impression_id,impression_time,article_id,read_time,is_premium,image_id,article_type,category,topic,topics_count,sentiment_score
0,6670763,31320,282937272,2023-05-30 07:39:01,9787679,1.0,False,9787697.0,article_default,sport,erhverv,6,0.9736
1,9551999,1599139,419846066,2023-05-31 11:42:52,9789977,1.0,False,9789994.0,article_default,sport,kendt,5,0.8886
2,11664075,1187450,508763259,2023-05-30 21:44:00,9779365,1.0,True,9775276.0,article_default,krimi,politik,4,0.9830
3,7464328,2232924,320523412,2023-05-31 16:26:15,9789745,1.0,False,,article_default,nyheder,dyr,3,0.7373
4,5692130,251427,235599779,2023-05-31 16:51:11,9673564,1.0,True,9677844.0,article_default,forbrug,økonomi,2,0.7402
...,...,...,...,...,...,...,...,...,...,...,...,...,...
134633557,9604495,136175,420434536,2023-06-01 04:50:24,9788677,59.0,False,9701785.0,article_default,underholdning,erhverv,5,0.9586
134633558,13114569,1321868,571390470,2023-06-01 04:58:43,9788677,64.0,False,9701785.0,article_default,underholdning,erhverv,5,0.9586
134633559,4717890,893036,207879173,2023-06-01 03:56:40,9788677,79.0,False,9701785.0,article_default,underholdning,erhverv,5,0.9586
134633560,12931115,316055,554881448,2023-06-01 05:21:20,9788677,117.0,False,9701785.0,article_default,underholdning,erhverv,5,0.9586


In [None]:
workflow = Workflow.load(os.path.join(INPUT_DATA_DIR, "workflow_etl"))

In [29]:
max_session_id = test_df.session_id.max()
decile_idx = [max_session_id * i // 10 for i in range(1, 10)]

# Split the dataframe into ten parts based on session_id
dfs = []
dfs.append(test_df.loc[test_df['session_id'] < decile_idx[0]])

for i in range(1, 9):
    dfs.append(test_df.loc[(test_df['session_id'] >= decile_idx[i-1]) & (test_df['session_id'] < decile_idx[i])])

dfs.append(test_df.loc[test_df['session_id'] >= decile_idx[8]])

processing 1
processing 2
processing 3
processing 4
processing 5
processing 6
processing 7
processing 8
processing 9
processing 10


In [None]:
for i, df in enumerate(dfs, start=1):
    print("processing", i)
    workflow.transform(nvt.Dataset(df)).to_parquet(os.path.join(TEST_INPUT_DATA_DIR, "processed_nvt", str(i)))
    gc.collect()
    torch.cuda.empty_cache()

In [28]:
gc.collect()
torch.cuda.empty_cache()

## Export pre-processed data by day

In [18]:
OUTPUT_DIR = os.environ.get("OUTPUT_DIR",os.path.join(INPUT_DATA_DIR, "sessions_by_ts"))
sessions_gdf = pd.read_parquet(os.path.join(INPUT_DATA_DIR, "processed_nvt/part_0.parquet"))
save_time_based_splits(data=nvt.Dataset(sessions_gdf),
                       output_dir= OUTPUT_DIR,
                       partition_col='day_index',
                       timestamp_col='session_id',
                       cpu=False,
                      )