In [1]:
import os

import numpy as np 
# import cupy as cp
import glob

# import cudf
import nvtabular as nvt

from nvtabular.ops import Operator
from merlin.dag import ColumnSelector
from merlin.schema import Schema, Tags
import pandas as pd

  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
  warn(f"Triton dtype mappings did not load successfully due to an error: {exc.msg}")


In [2]:
# avoid numba warnings
from numba import config
# config.CUDA_LOW_OCCUPANCY_WARNINGS = 0

In [3]:
# define data path about where to get our data
INPUT_DATA_DIR = os.environ.get("INPUT_DATA_DIR", "./data/")

In [7]:
%%time
df = pd.read_parquet(os.path.join(INPUT_DATA_DIR, 'test_processed.parquet'))  

CPU times: user 563 ms, sys: 298 ms, total: 861 ms
Wall time: 819 ms


In [8]:
df['purchase_date_ts']= df['purchase_date'].astype('int')

In [9]:
df.to_parquet(os.path.join(INPUT_DATA_DIR, 'process/test_processed.parquet'))

Let's check if there is any column with nulls.

In [6]:
df.isnull().any()

item_id             False
y                   False
session_id          False
feature             False
purchase_date       False
wf                  False
purchase_date_ts    False
dtype: bool

In [7]:
df.shape

(918379, 7)

In [8]:
df.head(3)

Unnamed: 0,item_id,y,session_id,feature,purchase_date,wf,purchase_date_ts
0,"[9655, 9655]",15085,3,"[[0, 105, 353, 156, 110, 117, 23, 35, 26, 203,...",2020-12-18 21:26:47.986,0.863068,1608326807986000000
1,[15654],18626,13,"[[0, 129, 102, 2, 1, 138, 46, 137, 98, 83, 15,...",2020-03-13 19:36:15.507,0.574915,1584128175507000000
2,"[4026, 2507, 18316]",24911,18,"[[40, 135, 66, 42, 78, 27, 28, 26, 131, 21, 92...",2020-08-26 19:20:32.049,0.745691,1598469632049000000


In [9]:
tmp = df

In [11]:
tmp["max"] = tmp["item_id"].apply(lambda x: x.max())


In [14]:
tmp["session_id"].max(), tmp["session_id"].min()

(5029, 3)

In [6]:
df = df.head(1000)

We see that `'category_code'` and `'brand'` columns have null values, and in the following cell we are going to fill these nulls with via categorify op, and then all categorical columns will be encoded to continuous integers. Categorify op maps nulls to `1`, OOVs to `2`, automatically. We reserve `0` for padding the sequence features. The encoding of each category starts from `3`.

In [7]:
from nvtabular.ops import *
item_id = ['item_id'] >> nvt.ops.AddMetadata(tags=[Tags.LIST, Tags.ITEM_ID,Tags.CATEGORICAL])
y = ['y'] >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])
sess_id = ['session_id'] >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])
feature = ['feature'] >> nvt.ops.AddMetadata(tags=[Tags.LIST, Tags.ITEM])
wf = ['wf'] >> nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])
purchase_date = ['purchase_date'] >> nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])
purchase_date_ts = ['purchase_date_ts'] >> nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])

In [8]:
workflow = nvt.Workflow(item_id+y+sess_id+feature+wf+purchase_date+purchase_date_ts)
dataset = nvt.Dataset(df)
# Learn features statistics necessary of the preprocessing workflow
# The following will generate schema.pbtxt file in the provided folder and export the parquet files.
workflow.fit_transform(dataset).to_parquet(os.path.join(INPUT_DATA_DIR, "processed_nvt"))



In [18]:
type(workflow.output_schema)

merlin.schema.schema.Schema

In [10]:
workflow.output_schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.value_count.min,properties.value_count.max
0,item_id,"(Tags.ITEM, Tags.CATEGORICAL, Tags.ID, Tags.LIST)","DType(name='int64', element_type=<ElementType....",True,True,0.0,
1,y,(Tags.CATEGORICAL),"DType(name='int64', element_type=<ElementType....",False,False,,
2,session_id,(Tags.CATEGORICAL),"DType(name='int64', element_type=<ElementType....",False,False,,
3,feature,"(Tags.ITEM, Tags.LIST)","DType(name='int64', element_type=<ElementType....",True,True,0.0,
4,wf,(Tags.CONTINUOUS),"DType(name='float64', element_type=<ElementTyp...",False,False,,
5,purchase_date,(Tags.CONTINUOUS),"DType(name='datetime64[ns]', element_type=<Ele...",False,False,,
6,purchase_date_ts,(Tags.CONTINUOUS),"DType(name='int64', element_type=<ElementType....",False,False,,


Above, we created an NVTabular Dataset object using our input dataset. Then, we calculate statistics for this workflow on the input dataset, i.e. on our training set, using the `workflow.fit()` method so that our Workflow can use these stats to transform any given input.

In [11]:
workflow_path = os.path.join(INPUT_DATA_DIR, 'workflow_etl')
workflow.save(workflow_path)