## Step 1: Data Preparation

In [1]:
#1.1
!pip install kaggle

[0m

1.2: Configure Kaggle Credentials
Create a .kaggle folder and move the Kaggle API credentials file (kaggle.json) into it.
Adjust the file permissions, then use the command-line tool to download the specified dataset.
Configure Kaggle API credentials and download the target dataset.

In [2]:
#1.2
#!mkdir -p ~/.kaggle
#!mv kaggle.json ~/.kaggle/
#!chmod 600 ~/.kaggle/kaggle.json

#!kaggle datasets download -d chadgostopp/recsys-challenge-2015 --file yoochoose-clicks.dat

#!unzip yoochoose-clicks.dat.zip

1.3: Import Dependencies
Import the required libraries, 
including tools for data processing, 
GPU acceleration, and specific recommender system utilities.
Provide the necessary tools for data loading, processing, 
and building the recommendation system.

In [2]:
#1.3
import os
import glob
import numpy as np
import pandas as pd
import gc
import calendar
import datetime

import cudf
import cupy
import nvtabular as nvt
from merlin.dag import ColumnSelector
from merlin.schema import Schema, Tags

  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
  from .autonotebook import tqdm as notebook_tqdm


1.4: Disable CUDA Occupancy Warnings
Resolve warnings caused by insufficient GPU occupancy to ensure a stable CUDA runtime environment.
Disable low occupancy warnings to prevent interference with log output.

In [3]:
#1.4
#Avoid Numba low occupancy warnings:
from numba import config
config.CUDA_LOW_OCCUPANCY_WARNINGS = 0

## Step 2: Data Loading and Preprocessing

2.1 Load Raw Data
Define input and output data paths, as well as a flag indicating whether to use synthetic datasets.
Prepare directories and file paths for loading and saving data in subsequent steps.

In [4]:
#2.1
# Define Data Input and Output Paths
DATA_FOLDER = "data"  # Specify the data folder directly
FILENAME_PATTERN = 'yoochoose-clicks.dat'  # File name pattern
DATA_PATH = os.path.join(DATA_FOLDER, FILENAME_PATTERN)  # Combine full data path

OUTPUT_FOLDER = "transformed_data"  # Output folder path
OVERWRITE = False  # Flag to determine whether to overwrite existing files

USE_SYNTHETIC = False  # Flag to indicate whether to use a synthetic dataset


2.2 Generate Synthetic User-Item Interaction Dataset
Define a function to generate synthetic clickstream datasets based on random distributions.
Create simulated clickstream data, including session, item, and category features. The synthetic dataset mimics user interactions with items within a specified time range

In [1]:
#2.2

import datetime
import numpy as np
import pandas as pd
import calendar

def generate_synthetic_data(
    start_date: datetime.date, end_date: datetime.date, rows_per_day: int = 10000
) -> pd.DataFrame:
    # Ensure end_date is later than start_date
    assert end_date > start_date, "end_date must be later than start_date"

    # Calculate the number of days in the range
    number_of_days = (end_date - start_date).days
    # Calculate total number of rows
    total_number_of_rows = number_of_days * rows_per_day

    # Generate a long-tail distribution of item interactions to simulate varying item popularity
    long_tailed_item_distribution = np.clip(
        np.random.lognormal(mean=3.0, sigma=1.0, size=total_number_of_rows).astype(np.int64), 1, 50000
    )

    # Generate random session and item interaction features
    df = pd.DataFrame({
        "session_id": np.random.randint(low=70000, high=80000, size=total_number_of_rows),  # Session ID
        "item_id": long_tailed_item_distribution  # Item ID
    })

    # Generate category mapping for each item_id
    df["category"] = pd.cut(df["item_id"], bins=334, labels=np.arange(1, 335)).astype(np.int64)

    max_session_length = 60 * 60  # Maximum session length in seconds (1 hour)

    # Define function to add timestamps to sessions
    def add_timestamp_to_session(session: pd.DataFrame):
        # Randomly generate start date and time for a session
        random_start_date_and_time = calendar.timegm(
            (
                start_date +
                datetime.timedelta(days=np.random.randint(0, number_of_days)) +  # Add day offset
                datetime.timedelta(seconds=np.random.randint(0, 86_400))  # Add time offset within the day
            ).timetuple()
        )
        # Generate timestamps for each interaction in the session
        session["timestamp"] = random_start_date_and_time + np.clip(
            np.random.lognormal(mean=3.0, sigma=1.0, size=len(session)).astype(np.int64),
            0, max_session_length
        )
        return session

    # Apply timestamp generation for each session
    df = df.groupby("session_id").apply(add_timestamp_to_session).reset_index(drop=True)

    return df


2.3: Load and Clean Data
Load data files (real or synthetic) and sort them by session_id and timestamp.
Load the data and ensure it is ordered correctly, establishing a foundation for subsequent processing.

In [2]:
#2.3
import os
import datetime
import cudf

# Ensure the 'data' directory exists
if not os.path.exists('data'):
    os.makedirs('data')

# Check if the file exists in the data directory
file_path = 'yoochoose-clicks.dat'

# Determine data source based on the USE_SYNTHETIC environment variable
USE_SYNTHETIC = os.environ.get("USE_SYNTHETIC", "False") == "True"  
# Set USE_SYNTHETIC flag from environment or default to False

if USE_SYNTHETIC:
    # Retrieve date range from environment variables or use defaults, format: 'YYYY/MM/DD'
    # Synthetic data generation logic
    START_DATE = os.environ.get("START_DATE", "2014/4/1")
    END_DATE = os.environ.get("END_DATE", "2014/4/5")
    
    # Call the generate_synthetic_data function to create synthetic data
    interactions_df = generate_synthetic_data(
        datetime.datetime.strptime(START_DATE, '%Y/%m/%d'),
        datetime.datetime.strptime(END_DATE, '%Y/%m/%d')
    )
    
    # Convert synthetic data from pandas DataFrame to cudf DataFrame
    interactions_df = cudf.from_pandas(interactions_df)
    
# Real data loading logic
else:
    # If the file exists, read it using cudf
    column_names = ['session_id', 'timestamp', 'item_id', 'category']  # Assume these are the column names
    interactions_merged_df = cudf.read_csv('yoochoose-clicks.dat', names=column_names, header=None)

# Display the first few rows of the DataFrame
interactions_merged_df.head()  # Use interactions_merged_df instead of interactions_df


Unnamed: 0,session_id,timestamp,item_id,category
0,1,2014-04-07T10:51:09.277Z,214536502,0
1,1,2014-04-07T10:54:09.868Z,214536500,0
2,1,2014-04-07T10:54:46.998Z,214536506,0
3,1,2014-04-07T10:57:00.306Z,214577561,0
4,2,2014-04-07T13:56:37.614Z,214662742,0


2.4: Data Deduplication
Remove duplicate item interactions within the same session to avoid data redundancy.
Eliminate duplicate interactions to improve the efficiency of model training.

In [3]:
#2.4
import os
import datetime
import cudf

# Set the USE_SYNTHETIC variable (based on environment variable or directly defined)
USE_SYNTHETIC = os.environ.get("USE_SYNTHETIC", "False") == "True"

# Define the data file path
file_path = 'yoochoose-clicks.dat'

# Sort by timestamp, check the previous interaction, and remove duplicate interactions within the same session
# Sort the DataFrame by session_id and timestamp, and create helper columns prev_item_id and prev_session_id
# Filter duplicates, then drop the helper columns

# If USE_SYNTHETIC is True, generate synthetic data; otherwise, load real data
if USE_SYNTHETIC:
    interactions_df = generate_synthetic_data(
        start_date=datetime.datetime.strptime(START_DATE, '%Y/%m/%d'),
        end_date=datetime.datetime.strptime(END_DATE, '%Y/%m/%d')
    )
    # Convert synthetic data from pandas DataFrame to cudf DataFrame
    interactions_df = cudf.from_pandas(interactions_df)
else:
    # If the file exists, load real data
    if os.path.exists(file_path):
        interactions_df = cudf.read_csv(
            file_path, sep=',', 
            names=['session_id','timestamp', 'item_id', 'category'], 
            dtype=['int', 'datetime64[s]', 'int', 'int']
        )
    else:
        print(f"File {file_path} not found!")
        # If the file does not exist, generate synthetic data
        interactions_df = generate_synthetic_data(
            datetime.datetime.strptime("2014/4/1", '%Y/%m/%d'),
            datetime.datetime.strptime("2014/4/5", '%Y/%m/%d')
        )
        interactions_df = cudf.from_pandas(interactions_df)

# Ensure the data is sorted by session_id and timestamp
interactions_df = interactions_df.sort_values(['session_id', 'timestamp'])

# Create new columns prev_item_id and prev_session_id for comparison
interactions_df['prev_item_id'] = interactions_df['item_id'].shift(1)
interactions_df['prev_session_id'] = interactions_df['session_id'].shift(1)

# Filter out duplicate interactions within the same session
interactions_df = interactions_df[
    ~((interactions_df['session_id'] == interactions_df['prev_session_id']) & 
       (interactions_df['item_id'] == interactions_df['prev_item_id']))
]

# Drop helper columns
interactions_df = interactions_df.drop(columns=['prev_item_id', 'prev_session_id'])

# Output the count of data after removing duplicates
print("Count after removing in-session repeated interactions: {}".format(len(interactions_df)))


Count after removing in-session repeated interactions: 28971543


## Step 3: Feature Engineering

3.1: Generate Temporal Features
Create a new feature for each item that indicates the time it first appeared.
Generate item-related features to provide the model with additional information.

In [6]:
#3.1
# Create a new feature for 
#each item_id indicating the first time the item appeared (timestamp)

# Create a DataFrame with the first timestamp for each item_id
items_first_ts_df = interactions_merged_df.groupby('item_id')['timestamp'].min().reset_index()
items_first_ts_df = items_first_ts_df.rename(columns={'timestamp': 'itemid_ts_first'})

# Merge the new feature back to the original DataFrame
interactions_merged_df = interactions_merged_df.merge(items_first_ts_df, on='item_id', how='left')

# Display the result
print(interactions_merged_df.head())



   session_id                 timestamp    item_id category  \
0        1793  2014-04-02T10:54:58.414Z  214716980        0   
1        2034  2014-04-06T10:14:56.353Z  214821275        0   
2        2033  2014-04-07T15:58:45.078Z  214748338        0   
3        2034  2014-04-06T10:32:18.768Z  214821275        0   
4        2032  2014-04-05T23:23:07.289Z  214826728        0   

            itemid_ts_first  
0  2014-04-01T03:48:52.585Z  
1  2014-04-01T03:24:08.036Z  
2  2014-04-01T05:10:27.244Z  
3  2014-04-01T03:24:08.036Z  
4  2014-04-01T07:08:47.338Z  


3.2 Data Format Conversion and Saving
Save the cleaned data as a Parquet file for use in subsequent steps.
Persist the data to enhance the modularity of the processing workflow.

** Parquet format allows for fast loading, cross-platform usage, compression, and partitioning, making it ideal for large-scale data processing.

In [10]:
#3.2
# Save the interactions_merged_df to disk to be able to use in the inference step.

# Define the directory to save the file (use the existing 'data' folder in the root directory)
DATA_FOLDER = "/data"

# Ensure the directory exists before saving the file
os.makedirs(DATA_FOLDER, exist_ok=True)

# Save the DataFrame as a Parquet file
output_file = os.path.join(DATA_FOLDER, 'interactions_merged_df.parquet')
interactions_merged_df.to_parquet(output_file)

# Print the number of unique items in the dataset
unique_items_count = interactions_merged_df['item_id'].nunique()
print(f"Total unique items: {unique_items_count}")


Total unique items: 52739


3.3 Clear GPU Memory
Release unused or temporary data during processing, such as raw or cleaned interaction data, session-related variables, and DataFrames storing the first appearance times of items.Release unused or temporary data during processing, such as raw or cleaned interaction data, session-related variables, and DataFrames storing the first appearance times of items.


In [11]:
#3.3
# Free GPU memory

# Delete only the variables that are defined in the current session
if 'interactions_df' in globals(): 
    del interactions_df  # Stores raw or cleaned interaction data

if 'session_past_ids' in globals():
    del session_past_ids  # Possibly a feature variable related to sessions

if 'items_first_ts_df' in globals():
    del items_first_ts_df  # Stores the DataFrame containing the first appearance time of items

# Run garbage collection
import gc
gc.collect()


815

## Step 4: Model Construction (Using NVTabular Libraries)

4.1 Define Feature Engineering Workflow:
This workflow includes steps such as categorical feature encoding, 
temporal feature construction, item recency calculation, 
and feature normalization to generate input features 
for the recommendation system and prepare data for model training.

1. ColumnSelector
Functionality: Selects columns to define inputs for feature engineering.
cat_feats = ColumnSelector(['category', 'item_id']) >> Categorify()
session_ts = ColumnSelector(['timestamp'])

2. Categorify
Functionality: Encodes categorical features by mapping category variables to continuous integers.
cat_feats = ColumnSelector(['category', 'item_id']) >> Categorify()

3. LambdaOp
Functionality: Applies custom functions to transform columns.
session_time = session_ts >> LambdaOp(lambda col: cudf.to_datetime(col, unit='s'))

4. Rename
Functionality: Renames columns to make generated features more meaningful.
session_time >> Rename(name='event_time_dt')

5. LogOp
Functionality: Applies a logarithmic transformation to continuous features, smoothing the distribution.
recency_features >> LogOp()

6. Normalize
Functionality: Standardizes continuous features, outputting normalized values.
recency_features >> Normalize(out_dtype=np.float32)

7. stom Feature Operations
Functionality: Allows the definition of complex feature engineering by inheriting from nvt.ops.Operator.
class ItemRecency(nvt.ops.Operator):
    def transform(self, columns, gdf):

8. Workflow（in later cells）
unctionality: Defines and executes a feature engineering workflow.
workflow = nvt.Workflow(features)
transformed_data = workflow.transform(input_dataframe)


In [15]:
#4.1
import nvtabular as nvt
import cudf
import numpy as np
import pandas as pd
from nvtabular import ColumnSelector
from nvtabular.ops import Categorify, LambdaOp, Rename, LogOp, Normalize
from merlin.schema import Schema


# s1
# Encode categorical features (category and item_id) as continuous integer indices.
# This processing improves model training efficiency, especially for embedding layers.
cat_feats = ColumnSelector(['category', 'item_id']) >> Categorify()

# s2 Time feature construction
# Create time features and Convert timestamps
session_ts = ColumnSelector(['timestamp'])

# Use LambdaOp to call cudf.to_datetime to convert second-level timestamps to datetime
# Use Rename to rename the column to event_time_dt
session_time = (
    session_ts >> 
    LambdaOp(lambda col: cudf.to_datetime(col, unit='s')) >> 
    Rename(name='event_time_dt')
)

# Extract weekday information
# Use LambdaOp to call col.dt.weekday, generating an integer value representing the day of the week
sessiontime_weekday = (
    session_time >> 
    LambdaOp(lambda col: col.dt.weekday) >> 
    Rename(name='et_dayofweek')
)

# Derive cyclical features: Define a custom lambda function
def get_cycled_feature_value_sin(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_sin = np.sin(2 * np.pi * value_scaled)
    return value_sin

# Use the custom function get_cycled_feature_value_sin to calculate sine values based on the periodic value
weekday_sin = sessiontime_weekday >> (lambda col: get_cycled_feature_value_sin(col + 1, 7)) >> Rename(name='et_dayofweek_sin')

# s3
# Item recency calculation: the number of days between the first appearance of an item and the current interaction
# Use itemid_ts_first (first appearance time of the item) to calculate the time difference for each interaction
# Set negative values (invalid time differences) to 0
class ItemRecency(nvt.ops.Operator):
    def transform(self, columns, gdf):
        for column in columns.names:
            col = gdf[column]
            item_first_timestamp = gdf['itemid_ts_first']
            delta_days = (col - item_first_timestamp) / (60 * 60 * 24)
            gdf[column + "_age_days"] = delta_days * (delta_days >= 0)  # Ensure no negative recency
        return gdf

    def compute_selector(
        self,
        input_schema: Schema,
        selector: ColumnSelector,
        parents_selector: ColumnSelector,
        dependencies_selector: ColumnSelector,
    ) -> ColumnSelector:
        self._validate_matching_cols(input_schema, parents_selector, "computing input selector")
        return parents_selector

    def column_mapping(self, col_selector):
        column_mapping = {}
        for col_name in col_selector.names:
            column_mapping[col_name + "_age_days"] = [col_name]
        return column_mapping

    @property
    def dependencies(self):
        return ["itemid_ts_first"]

    @property
    def output_dtype(self):
        return np.float64

recency_features = session_ts >> ItemRecency()

# s4
# Apply standardization to this continuous feature
# Feature standardization helps accelerate model convergence and avoids training issues caused by varying feature scales.
recency_features_norm = recency_features >> LogOp() >> Normalize(out_dtype=np.float32) >> Rename(name='product_recency_days_log_norm')

# s5
# Combine time-related features (event_time_dt, weekday, cyclical features, and item recency)
time_features = (
    session_time +
    sessiontime_weekday +
    weekday_sin + 
    recency_features_norm
)

# s6
# Define the final feature set by combining categorical features and time features
# Combine categorical features (category and item_id) with time features to form the final feature set
# The feature set includes session information (session_id and timestamp), item categories, temporal information, and recency.
features = ColumnSelector(['session_id', 'timestamp']) + cat_feats + time_features


4.2 Groupby Operations and Feature Preprocessing Workflow Based on NVTabular
Convert timestamp to datetime format and extract time-related features, 
such as hour and weekday. Perform aggregation, feature generation, 
and filtering operations on session-level data. 
Provide time-related context for the recommendation model, 
ultimately generating a feature set suitable for input to the recommendation system.


In [16]:
#4.2 
import nvtabular as nvt
from nvtabular import ops
from nvtabular.ops import ListSlice

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

# Define column selector
columns = nvt.ColumnSelector(["session_id", "item_id", "category", "timestamp", "event_time_dt", "et_dayofweek_sin", "product_recency_days_log_norm"])

# Define Groupby Operator
# Group session data by session_id to generate new aggregated features
groupby_features = columns >> nvt.ops.Groupby(
    groupby_cols=["session_id"], 
    sort_cols=["timestamp"],
    aggs={
        'item_id': ["list", "count"],
        'category': ["list"],  
        'timestamp': ["first"],
        'event_time_dt': ["first"],
        'et_dayofweek_sin': ["list"],
        'product_recency_days_log_norm': ["list"]
    },
    name_sep="-"
)

# Truncate sequence features to first 20 interacted items
SESSIONS_MAX_LENGTH = 20

# Tag items and continuous features
item_feat = groupby_features["item_id-list"] >> nvt.ops.TagAsItemID()
cont_feats = groupby_features['et_dayofweek_sin-list', 'product_recency_days_log_norm-list'] >> nvt.ops.AddMetadata(tags=["CONTINUOUS"])

# Group features together
groupby_features_list = item_feat + cont_feats + groupby_features['category-list']

# Apply ListSlice to truncate sequences
# Truncate sequence features (e.g., item list, continuous feature list) to a fixed length SESSIONS_MAX_LENGTH.
groupby_features_truncated = groupby_features_list >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH)

# Calculate session day index based on 'event_time_dt-first' column
day_index = (
    groupby_features['event_time_dt-first'] >> 
    nvt.ops.LambdaOp(lambda col: (col - col.min()).dt.days + 1) >> 
    nvt.ops.Rename(f=lambda col: "day_index") >>
    nvt.ops.AddMetadata(tags=["CATEGORICAL"])
)

# Tag session_id column for serving with legacy API
sess_id = groupby_features['session_id'] >> nvt.ops.AddMetadata(tags=["CATEGORICAL"])

# Select features for training
selected_features = sess_id + groupby_features['item_id-count'] + groupby_features_truncated + day_index

# Filter out sessions with fewer than 2 interactions to ensure data quality
MINIMUM_SESSION_LENGTH = 2
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["item_id-count"] >= MINIMUM_SESSION_LENGTH)

# Print the final processed result
print(filtered_sessions)


<Node Filter output>


## Step 5: Organize Feature Structure

In [13]:
# test cell
print(interactions_merged_df.info())


<class 'cudf.core.dataframe.DataFrame'>
RangeIndex: 33003944 entries, 0 to 33003943
Data columns (total 5 columns):
 #   Column           Dtype
---  ------           -----
 0   session_id       int64
 1   timestamp        object
 2   item_id          int64
 3   category         object
 4   itemid_ts_first  object
dtypes: int64(2), object(3)
memory usage: 2.4+ GB
None


In [14]:
# test cell
print(interactions_merged_df.columns)  


Index(['session_id', 'timestamp', 'item_id', 'category', 'itemid_ts_first'], dtype='object')


5.1: Data Preprocessing and Feature Construction
A complete data preprocessing workflow is defined 
with the aim of generating the dataset required 
for feature engineering in a recommendation system. 
This process uses NVTabular and cuDF to carry out data preprocessing, 
feature engineering, metadata enhancement, and saving the output. 
The final output includes the processed data files along with 
the schema information of the features, generating more time-based and interaction-related features

In [17]:
#5.1 
import cudf
import numpy as np
import nvtabular as nvt
import os
from datetime import datetime
from nvtabular import ops

# Define data loading
# GPU-accelerated cudf provides efficient data loading capabilities, suitable for large-scale datasets
column_names = ['session_id', 'timestamp', 'item_id', 'category']
interactions_merged_df = cudf.read_csv('yoochoose-clicks.dat', names=column_names, header=None)

# Time feature construction
# Provide richer time information to capture the cyclical patterns of user behavior
interactions_merged_df['event_time_dt'] = cudf.to_datetime(interactions_merged_df['timestamp'])
interactions_merged_df['et_dayofweek_sin'] = np.sin(interactions_merged_df['event_time_dt'].dt.dayofweek * (2 * np.pi / 7))

# Calculate the "recency" feature of products (recent activity feature of items)
# Calculate the difference between the current time and the most recent interaction time of the item (in days).
# Generate the product recency feature `product_recency_days_log_norm` by taking the logarithm to smooth the data distribution.
current_time = datetime.now()
current_time_cudf = cudf.to_datetime([current_time])[0]  # Use cudf.to_datetime
time_difference = current_time_cudf - interactions_merged_df['event_time_dt']
recency_days = time_difference.astype('float64') / (60 * 60 * 24 * 1e9)  # Convert nanoseconds to days
interactions_merged_df['product_recency_days_log_norm'] = np.log(1 + recency_days)

selected_features = ['session_id', 'timestamp', 'item_id', 'category', 'event_time_dt', 'et_dayofweek_sin', 'product_recency_days_log_norm']

# Create a workflow node containing all columns
# Define the feature set `selected_features`, including all generated features
# Use AddMetadata to add more metadata information
workflow_node = (
    nvt.ColumnSelector(selected_features) 
    >> ops.AddMetadata(tags=["selected_features"], properties={
        "num_buckets": None,
        "freq_threshold": 0,
        "max_size": 0,
        "start_index": 0,
        "domain": {"min": 0, "max": 52739, "name": "item_id"},
        "embedding_sizes": {"cardinality": 52740, "dimension": 512},
        "value_count": {"min": 0, "max": 20}
    })
)

# Data transformation and saving
# Use NVTabular to define and execute the data preprocessing workflow
workflow = nvt.Workflow(workflow_node)

# Create a dataset object
# Convert the loaded data to NVTabular's Dataset format for workflow processing
dataset = nvt.Dataset(interactions_merged_df)

# Learn feature statistics and transform the data, output to a Parquet file
DATA_FOLDER = './processed_data'
workflow.fit_transform(dataset).to_parquet(os.path.join(DATA_FOLDER, "processed_nvt"))

# Print the output schema to observe detailed information of each column (output and processed schema)
output_schema = workflow.output_schema
output_list = []

# Format the output
for col in output_schema.column_schemas.values():
    output_list.append({
        'name': col.name,
        'tags': col.tags,
        'dtype': col.dtype,
        'is_list': col.is_list,
        'is_ragged': col.is_ragged,
        'properties.num_buckets': col.properties.get('num_buckets', 'NaN'),
        'properties.freq_threshold': col.properties.get('freq_threshold', 'NaN'),
        'properties.max_size': col.properties.get('max_size', 'NaN'),
        'properties.start_index': col.properties.get('start_index', 'NaN'),
        'properties.cat_path': col.properties.get('cat_path', 'NaN'),
        'properties.domain.min': col.properties.get('domain', {}).get('min', 'NaN'),
        'properties.domain.max': col.properties.get('domain', {}).get('max', 'NaN'),
        'properties.domain.name': col.properties.get('domain', {}).get('name', 'NaN'),
        'properties.embedding_sizes.cardinality': col.properties.get('embedding_sizes', {}).get('cardinality', 'NaN'),
        'properties.embedding_sizes.dimension': col.properties.get('embedding_sizes', {}).get('dimension', 'NaN'),
        'properties.value_count.min': col.properties.get('value_count', {}).get('min', 'NaN'),
        'properties.value_count.max': col.properties.get('value_count', {}).get('max', 'NaN')
    })

# Finally, convert the result to a Pandas DataFrame and print the output.
import pandas as pd
output_df = pd.DataFrame(output_list)
print(output_df)


                            name                 tags  \
0                     session_id  (selected_features)   
1                      timestamp  (selected_features)   
2                        item_id  (selected_features)   
3                       category  (selected_features)   
4                  event_time_dt  (selected_features)   
5               et_dayofweek_sin  (selected_features)   
6  product_recency_days_log_norm  (selected_features)   

                                               dtype  is_list  is_ragged  \
0  DType(name='int64', element_type=<ElementType....    False      False   
1  DType(name='object', element_type=<ElementType...    False      False   
2  DType(name='int64', element_type=<ElementType....    False      False   
3  DType(name='object', element_type=<ElementType...    False      False   
4  DType(name='datetime64[ns]', element_type=<Ele...    False      False   
5  DType(name='float64', element_type=<ElementTyp...    False      False   
6  DType(na

5.2 NVTabular Workflow Saved to Disk for Reuse

The NVTabular workflow is saved to disk 
so that it can be reused later without needing to 
redefine or retrain the statistical information 
(such as categorical feature dictionary mappings, means, standard deviations, etc.).

In [20]:
#5.2

# Save the workflow
# Use workflow.save() to save the workflow to the specified path for later loading and reuse
WORKFLOW_SAVE_PATH = os.path.join(DATA_FOLDER, "workflow_etl")
workflow.save(WORKFLOW_SAVE_PATH)

# Inform the user where the workflow has been saved
print(f"Workflow saved at: {WORKFLOW_SAVE_PATH}")


Workflow saved at: ./processed_data/workflow_etl


## Step 6: Session-Level Data Processing

6.1 Process Session-Level Data

Process a session-level dataset by splitting it 
into multiple subsets based on days. Further, split each subset into training, 
validation, and test sets. Finally, save these datasets as separate Parquet files.

Split the processed data by day and aggregate each session to 
generate sequence features. Focus on item interaction records, 
time features, and categorical features for each session.

Determine feature types, including categorical features, continuous features, and target labels.

In [23]:
#6.1
import cudf
import os
from datetime import datetime

# Export data split by day
# Define dataset path
DATA_FOLDER = './processed_data'

# Read in the processed train dataset (load and filter data)
# Read in the processed train dataset
sessions_gdf = cudf.read_parquet(os.path.join(DATA_FOLDER, "processed_nvt/part_0.parquet"))

# Ensure the 'day_index' column exists, calculate 'day_index' based on timestamp
if 'day_index' not in sessions_gdf.columns:
    # Assuming 'timestamp' is a string representing a date, convert to datetime
    sessions_gdf['timestamp'] = cudf.to_datetime(sessions_gdf['timestamp'])
    epoch = datetime(1970, 1, 1)
    sessions_gdf['day_index'] = (sessions_gdf['timestamp'] - epoch).dt.days

# If using a synthetic dataset, filter based on environment variable THRESHOLD_DAY_INDEX
USE_SYNTHETIC = os.environ.get("USE_SYNTHETIC", "False") == "True"
if USE_SYNTHETIC:
    THRESHOLD_DAY_INDEX = int(os.environ.get("THRESHOLD_DAY_INDEX", '1'))
    sessions_gdf = sessions_gdf[sessions_gdf['day_index'] >= THRESHOLD_DAY_INDEX]
else:
    sessions_gdf = sessions_gdf[sessions_gdf['day_index'] >= 178]

# Print the first few rows of the processed data to confirm data loading and filtering
print(sessions_gdf.head(3))

# Perform session-level aggregation (aggregate by session)
# Grouping by 'session_id' to get list features for each session
grouped_sessions_gdf = sessions_gdf.groupby('session_id').agg({
    'item_id': list,
    'et_dayofweek_sin': list,
    'product_recency_days_log_norm': list,
    'category': list,
    'day_index': 'first'  # We take the first 'day_index' as the session level day_index
})

# Rename columns to match original output format
grouped_sessions_gdf = grouped_sessions_gdf.rename(columns={
    'item_id': 'item_id-list',
    'et_dayofweek_sin': 'et_dayofweek_sin-list',
    'product_recency_days_log_norm': 'product_recency_days_log_norm-list',
    'category': 'category-list'
})

# We are only interested in the last 5 days
LAST_N_DAYS = 5
unique_days = sorted(grouped_sessions_gdf['day_index'].unique().to_pandas())[-LAST_N_DAYS:]

# Define output directory
EXPORT_FOLDER = './exported_by_day'
if not os.path.exists(EXPORT_FOLDER):
    os.makedirs(EXPORT_FOLDER)

# Create folders by day and split dataset
for day in unique_days:
    day_data = grouped_sessions_gdf[grouped_sessions_gdf['day_index'] == day]

    # Split dataset into train, validation, and test sets (split data by day)
    # Here we are arbitrarily splitting: 70% train, 15% validation, 15% test
    train_size = int(len(day_data) * 0.7)
    val_size = int(len(day_data) * 0.15)

    train_data = day_data.iloc[:train_size]
    val_data = day_data.iloc[train_size:train_size + val_size]
    test_data = day_data.iloc[train_size + val_size:]

    # Create a separate folder for each day
    day_folder = os.path.join(EXPORT_FOLDER, f'day_{day}')
    if not os.path.exists(day_folder):
        os.makedirs(day_folder)

    # Save data to Parquet files
    train_data.to_parquet(os.path.join(day_folder, "train.parquet"))
    val_data.to_parquet(os.path.join(day_folder, "validation.parquet"))
    test_data.to_parquet(os.path.join(day_folder, "test.parquet"))

    print(f"Data for day {day} exported to folder: {day_folder}")

# Inform the user that the data has been exported
print(f"All data successfully exported to {EXPORT_FOLDER}")


   session_id               timestamp    item_id category  \
0           1 2014-04-07 10:51:09.277  214536502        0   
1           1 2014-04-07 10:54:09.868  214536500        0   
2           1 2014-04-07 10:54:46.998  214536506        0   

            event_time_dt  et_dayofweek_sin  product_recency_days_log_norm  \
0 2014-04-07 10:51:09.277               0.0                       8.265496   
1 2014-04-07 10:54:09.868               0.0                       8.265496   
2 2014-04-07 10:54:46.998               0.0                       8.265496   

   day_index  
0      16167  
1      16167  
2      16167  
Data for day 16339 exported to folder: ./exported_by_day/day_16339
Data for day 16340 exported to folder: ./exported_by_day/day_16340
Data for day 16341 exported to folder: ./exported_by_day/day_16341
Data for day 16342 exported to folder: ./exported_by_day/day_16342
Data for day 16343 exported to folder: ./exported_by_day/day_16343
All data successfully exported to ./exported_by

6.2: Simplify Data Splitting
Use the save_time_based_splits function to split the dataset by time 
(based on timestamps and partition columns) 
and save it to the specified output directory. 
This simplifies the splitting and saving logic, 
relying on the efficient data processing capabilities of Transformers4Rec and NVTabular.

In [21]:
#6.2
from transformers4rec.utils.data_utils import save_time_based_splits
import nvtabular as nvt
import cudf
import os
from datetime import datetime

# Define dataset path
DATA_FOLDER = './processed_data'

# Read in the processed train dataset
sessions_gdf = cudf.read_parquet(os.path.join(DATA_FOLDER, "processed_nvt/part_0.parquet"))

# Ensure the 'day_index' column exists, calculate 'day_index' based on timestamp
if 'day_index' not in sessions_gdf.columns:
    # Assuming 'timestamp' is a string representing a date, convert to datetime
    sessions_gdf['timestamp'] = cudf.to_datetime(sessions_gdf['timestamp'])
    epoch = datetime(1970, 1, 1)
    sessions_gdf['day_index'] = (sessions_gdf['timestamp'] - epoch).dt.days

# Define the output folder
OUTPUT_FOLDER = os.path.join(DATA_FOLDER, "preproc_sessions_by_day")
if not os.path.exists(OUTPUT_FOLDER):
    os.makedirs(OUTPUT_FOLDER)

# Split and save by time
save_time_based_splits(
    data=nvt.Dataset(sessions_gdf),  # Input dataset
    output_dir=OUTPUT_FOLDER,  # Output directory
    partition_col='day_index',  # Column for splitting the data (split by day index)
    timestamp_col='event_time_dt',  # Use event_time_dt as the timestamp column
)

print(f"Data successfully split and saved to {OUTPUT_FOLDER}")


Creating time-based splits: 100%|██████████| 183/183 [00:30<00:00,  6.06it/s]


Data successfully split and saved to ./processed_data/preproc_sessions_by_day


6.3: Release GPU Memory
Delete unnecessary variables and clean up GPU memory to free resources for subsequent processing.

In [22]:
#6.3
import gc
import cupy as cp
import rmm

# Free GPU memory

# Check if the variable exists, and delete only if it does
if 'sessions_gdf' in locals() or 'sessions_gdf' in globals():
    del sessions_gdf  # Delete session dataset to release GPU memory resources

# Delete other potentially large datasets if they exist (optional)
# if 'other_large_dataframe' in locals() or 'other_large_dataframe' in globals():
#     del other_large_dataframe  # Delete other large datasets if necessary

# Force Python's garbage collection to ensure Python-level memory is released
gc.collect()  # Python's garbage collector can release CPU memory

# Clean GPU memory
cp.get_default_memory_pool().free_all_blocks()  # Use CuPy's memory pool to release GPU memory
rmm.reinitialize()  # Reinitialize RMM to release any potentially locked GPU memory

# Inform the user that memory has been released
print("GPU memory has been successfully released.")


GPU memory has been successfully released.


## Step 7: Training Data Preparation and Model Fine-Tuning
End-to-end session-based recommendations with PyTorch

7.1 Prepare Training Data
Load a dataset in Parquet format, extract its schema information, 
and select specific features for model training.

In [24]:
#7.1
# Import required libraries
import os
from merlin.schema import Schema
from merlin.io import Dataset

# Function: Load dataset
def load_dataset(input_dir, file_name):
    """
    Load parquet dataset
    """
    return Dataset(os.path.join(input_dir, file_name))

# Function: Extract schema and select specific features
def get_schema(dataset, selected_features):
    """
    Extract schema from the dataset and select features for model training
    """
    schema = dataset.schema
    return schema.select_by_name(selected_features)

# Set environment variables and paths
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
INPUT_DATA_DIR = os.environ.get("INPUT_DATA_DIR", "")
OUTPUT_DIR = os.environ.get("OUTPUT_DIR", f"{INPUT_DATA_DIR}/preproc_sessions_by_day")

# Execute logic
train_dataset = load_dataset(INPUT_DATA_DIR, "processed_nvt/part_0.parquet")
selected_features = ['item_id-list', 'category-list', 'product_recency_days_log_norm-list', 'et_dayofweek_sin-list']
schema = get_schema(train_dataset, selected_features)

# Print schema information
schema


Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.freq_threshold,properties.max_size,properties.start_index,properties.cat_path,properties.embedding_sizes.cardinality,properties.embedding_sizes.dimension,properties.domain.min,properties.domain.max,properties.domain.name,properties.value_count.min,properties.value_count.max
0,item_id-list,"(Tags.ID, Tags.CATEGORICAL, Tags.LIST, Tags.IT...","DType(name='int64', element_type=<ElementType....",True,True,,0.0,0.0,0.0,.//categories/unique.item_id.parquet,52740.0,512.0,0.0,52739.0,item_id,0,20
1,category-list,"(Tags.LIST, Tags.CATEGORICAL)","DType(name='int64', element_type=<ElementType....",True,True,,0.0,0.0,0.0,.//categories/unique.category.parquet,335.0,42.0,0.0,334.0,category,0,20
2,product_recency_days_log_norm-list,"(Tags.LIST, Tags.CONTINUOUS)","DType(name='float32', element_type=<ElementTyp...",True,True,,,,,,,,,,,0,20
3,et_dayofweek_sin-list,"(Tags.LIST, Tags.CONTINUOUS)","DType(name='float64', element_type=<ElementTyp...",True,True,,,,,,,,,,,0,20


7.2 Model Construction
Create a session-based recommendation model based on XLNet, 
including input modules, prediction tasks, and Transformer configurations.


In [25]:
#7.2
# Import necessary modules
from transformers4rec import torch as tr

# Define function: Create input module
def create_input_module(schema, max_sequence_length, d_model):
    """
    Create TabularSequenceFeatures input module
    """
    return tr.TabularSequenceFeatures.from_schema(
        schema=schema,
        max_sequence_length=max_sequence_length,
        continuous_projection=64,
        aggregation="concat",
        d_output=d_model,
        masking="mlm"
    )

# Define function: Create prediction task
def create_prediction_task():
    """
    Create next item prediction task
    """
    return tr.NextItemPredictionTask(weight_tying=True)

# Define function: Create Transformer configuration
def create_transformer_config(d_model, n_head, n_layer, max_sequence_length):
    """
    Create XLNet configuration
    """
    return tr.XLNetConfig.build(
        d_model=d_model,
        n_head=n_head,
        n_layer=n_layer,
        total_seq_length=max_sequence_length
    )

# Define function: Connect modules to build model
def build_model(schema, max_sequence_length, d_model, n_head, n_layer):
    """
    Build end-to-end model
    """
    input_module = create_input_module(schema, max_sequence_length, d_model)
    prediction_task = create_prediction_task()
    transformer_config = create_transformer_config(d_model, n_head, n_layer, max_sequence_length)
    return transformer_config.to_torch_model(input_module, prediction_task)

# Execute model creation
max_sequence_length, d_model, n_head, n_layer = 20, 320, 8, 2
model = build_model(schema, max_sequence_length, d_model, n_head, n_layer)

# Print model
model


Projecting inputs of NextItemPredictionTask to'64' As weight tying requires the input dimension '320' to be equal to the item-id embedding dimension '64'


Model(
  (heads): ModuleList(
    (0): Head(
      (body): SequentialBlock(
        (0): TabularSequenceFeatures(
          (_aggregation): ConcatFeatures()
          (to_merge): ModuleDict(
            (continuous_module): SequentialBlock(
              (0): ContinuousFeatures(
                (filter_features): FilterFeatures()
                (_aggregation): ConcatFeatures()
              )
              (1): SequentialBlock(
                (0): DenseBlock(
                  (0): Linear(in_features=2, out_features=64, bias=True)
                  (1): ReLU(inplace=True)
                )
              )
              (2): AsTabular()
            )
            (categorical_module): SequenceEmbeddingFeatures(
              (filter_features): FilterFeatures()
              (embedding_tables): ModuleDict(
                (item_id-list): Embedding(52740, 64, padding_idx=0)
                (category-list): Embedding(335, 64, padding_idx=0)
              )
            )
          )
      

7.3 Define Training Parameters and Initialize Trainer
Train the session-based recommendation model using Trainer, with support for half-precision (FP16) acceleration.

In [26]:
#7.3
# Import the torch module from Transformers4Rec
from transformers4rec import torch as tr
import os

# Set batch sizes for training and validation
BATCH_SIZE_TRAIN = int(os.environ.get("BATCH_SIZE_TRAIN", "512"))  # Default training batch size is 512
BATCH_SIZE_VALID = int(os.environ.get("BATCH_SIZE_VALID", "256"))  # Default validation batch size is 256

# Step 1: Define training arguments
training_args = tr.trainer.T4RecTrainingArguments(
    output_dir="./tmp",  # Path to save training results
    max_sequence_length=20,  # Maximum sequence length
    data_loader_engine='merlin',  # Data loading engine (default is GPU-optimized Merlin)
    num_train_epochs=10,  # Number of training epochs
    dataloader_drop_last=False,  # Whether to drop the last batch if it is smaller than batch size
    per_device_train_batch_size=BATCH_SIZE_TRAIN,  # Training batch size per device
    per_device_eval_batch_size=BATCH_SIZE_VALID,  # Validation batch size per device
    learning_rate=0.0005,  # Learning rate
    fp16=True,  # Use half-precision (FP16) for accelerated training
    report_to=[],  # No external logging systems (e.g., wandb)
    logging_steps=200  # Log every 200 steps
)

# Step 2: Instantiate the trainer
recsys_trainer = tr.Trainer(
    model=model,  # Use the model defined earlier
    args=training_args,  # Training arguments
    schema=schema,  # Schema for the model input
    compute_metrics=True  # Compute evaluation metrics (e.g., ranking metrics)
)

# Inform the user that the trainer has been successfully instantiated
print("Trainer initialized successfully!")


Using amp fp16 backend


Trainer initialized successfully!


7.4 Start Daily Fine-Tuning and Evaluation
Use fit_and_evaluate to perform daily fine-tuning and evaluation, 
supporting time-window-based training loops.

In [27]:
#7.4

# Import the fit_and_evaluate method
from transformers4rec.torch.utils.examples_utils import fit_and_evaluate

# Set output directory
OUTPUT_DIR = 'preproc_sessions_by_day'

# Get time index from environment variables or use default values
start_time_idx = int(os.environ.get("START_TIME_INDEX", "178"))  # Start time index
end_time_idx = int(os.environ.get("END_TIME_INDEX", "180"))  # End time index

# Use the fit_and_evaluate method to perform fine-tuning and evaluation over the time window
OT_results = fit_and_evaluate(
    recsys_trainer,  # Instantiated recommendation system trainer
    start_time_index=start_time_idx,  # Start time index
    end_time_index=end_time_idx,  # End time index
    input_dir=OUTPUT_DIR  # Data input directory
)

# Print training and evaluation results
print("Training and evaluation completed.")
print(f"Results: {OT_results}")




***** Launch training for day 178: *****


***** Running training *****
  Num examples = 28672
  Num Epochs = 10
  Instantaneous batch size per device = 512
  Total train batch size (w. parallel, distributed & accumulation) = 512
  Gradient Accumulation steps = 1
  Total optimization steps = 560


Step,Training Loss
200,7.5898
400,6.4854


Saving model checkpoint to ./tmp/checkpoint-500
Trainer.model is not a `PreTrainedModel`, only saving its state dict.


Training completed. Do not forget to share your model on huggingface.co/models =)




***** Running training *****
  Num examples = 20480
  Num Epochs = 10
  Instantaneous batch size per device = 512
  Total train batch size (w. parallel, distributed & accumulation) = 512
  Gradient Accumulation steps = 1
  Total optimization steps = 400



***** Evaluation results for day 179:*****

 eval_/next-item/avg_precision@10 = 0.08657721430063248
 eval_/next-item/avg_precision@20 = 0.09137023240327835
 eval_/next-item/ndcg@10 = 0.11436359584331512
 eval_/next-item/ndcg@20 = 0.13220059871673584
 eval_/next-item/recall@10 = 0.2034682035446167
 eval_/next-item/recall@20 = 0.27398842573165894

***** Launch training for day 179: *****


Step,Training Loss
200,6.8129
400,6.396




Training completed. Do not forget to share your model on huggingface.co/models =)





***** Evaluation results for day 180:*****

 eval_/next-item/avg_precision@10 = 0.056897178292274475
 eval_/next-item/avg_precision@20 = 0.06076783314347267
 eval_/next-item/ndcg@10 = 0.0797809362411499
 eval_/next-item/ndcg@20 = 0.09389662742614746
 eval_/next-item/recall@10 = 0.15524475276470184
 eval_/next-item/recall@20 = 0.21165500581264496


***** Running training *****
  Num examples = 16896
  Num Epochs = 10
  Instantaneous batch size per device = 512
  Total train batch size (w. parallel, distributed & accumulation) = 512
  Gradient Accumulation steps = 1
  Total optimization steps = 330



***** Launch training for day 180: *****


Step,Training Loss
200,6.9496




Training completed. Do not forget to share your model on huggingface.co/models =)





***** Evaluation results for day 181:*****

 eval_/next-item/avg_precision@10 = 0.12699222564697266
 eval_/next-item/avg_precision@20 = 0.13346321880817413
 eval_/next-item/ndcg@10 = 0.16788730025291443
 eval_/next-item/ndcg@20 = 0.1924923062324524
 eval_/next-item/recall@10 = 0.29870128631591797
 eval_/next-item/recall@20 = 0.3961038887500763
Training and evaluation completed.
Results: {'indexed_by_time_eval_/next-item/avg_precision@10': [0.08657721430063248, 0.056897178292274475, 0.12699222564697266], 'indexed_by_time_eval_/next-item/avg_precision@20': [0.09137023240327835, 0.06076783314347267, 0.13346321880817413], 'indexed_by_time_eval_/next-item/ndcg@10': [0.11436359584331512, 0.0797809362411499, 0.16788730025291443], 'indexed_by_time_eval_/next-item/ndcg@20': [0.13220059871673584, 0.09389662742614746, 0.1924923062324524], 'indexed_by_time_eval_/next-item/recall@10': [0.2034682035446167, 0.15524475276470184, 0.29870128631591797], 'indexed_by_time_eval_/next-item/recall@20': [0.27

7.5 Visualize Evaluation Metrics Over Time
‘OT_results’ is a list of evaluation scores (e.g., accuracy metrics) 
calculated based on the given start and end time indices.
In this example, we evaluate the data at time indices 
179, 180, and 181, so OT_results contains metric scores for these three time points.
The following code will calculate the average of each metric over time and output the results

In [28]:
#7.5 
# Import numpy library
import numpy as np

# Print the contents of OT_results for verification
print("OT_results:", OT_results)


OT_results: {'indexed_by_time_eval_/next-item/avg_precision@10': [0.08657721430063248, 0.056897178292274475, 0.12699222564697266], 'indexed_by_time_eval_/next-item/avg_precision@20': [0.09137023240327835, 0.06076783314347267, 0.13346321880817413], 'indexed_by_time_eval_/next-item/ndcg@10': [0.11436359584331512, 0.0797809362411499, 0.16788730025291443], 'indexed_by_time_eval_/next-item/ndcg@20': [0.13220059871673584, 0.09389662742614746, 0.1924923062324524], 'indexed_by_time_eval_/next-item/recall@10': [0.2034682035446167, 0.15524475276470184, 0.29870128631591797], 'indexed_by_time_eval_/next-item/recall@20': [0.27398842573165894, 0.21165500581264496, 0.3961038887500763]}


In [11]:
#7.6 
# Calculate the average metric values over time
avg_results = {k: np.mean(v) for k, v in OT_results.items()}

# Sort by metric name and print results
for key in sorted(avg_results.keys()): 
    print(" %s = %s" % (key, str(avg_results[key])))

# Inform that calculation is complete
print("Average metrics calculation complete.")


 indexed_by_time_eval_/next-item/avg_precision@10 = 0.07962210476398468
 indexed_by_time_eval_/next-item/avg_precision@20 = 0.08440909907221794
 indexed_by_time_eval_/next-item/ndcg@10 = 0.1085995187362035
 indexed_by_time_eval_/next-item/ndcg@20 = 0.12668888767560324
 indexed_by_time_eval_/next-item/recall@10 = 0.20082703729470572
 indexed_by_time_eval_/next-item/recall@20 = 0.2726915429035823
Average metrics calculation complete.


7.7: Model Tracing
Convert the model to TorchScript format using torch.jit.trace to support inference and deployment.

In [12]:
#7.7
# Import necessary modules
# torch: Used for deep learning model inference and tracing.
# cudf: For GPU-accelerated DataFrame operations.
# merlin and nvtabular: For data manipulation and preprocessing in recommendation systems.
import os
import torch
import cudf
from merlin.io import Dataset
from nvtabular import Workflow

from merlin.table import TensorTable, TorchColumn
from merlin.table.conversions import convert_col

# Set input data directory and time index
# Dynamically read environment variables to specify the start time index and data input directory.
# If environment variables are not set, use default values.
start_time_idx = int(os.environ.get("START_TIME_INDEX", "178"))
INPUT_DATA_DIR = os.environ.get("INPUT_DATA_DIR", "")

# Load processed data as a sample input
# Read training data from Parquet files.
df = cudf.read_parquet(
    os.path.join(INPUT_DATA_DIR, f"preproc_sessions_by_day/{start_time_idx}/train.parquet"), 
    # Load specific columns based on the model's input schema to ensure data matches the model's expected format.
    columns=model.input_schema.column_names
)

# Construct a TensorTable from the DataFrame, selecting the first 100 rows, suitable for small-scale testing.
table = TensorTable.from_df(df.iloc[:100])

# Set the model's Top-K to 20 to control the number of recommendation candidates.
topk = 20
model.top_k = topk

# Convert the data to TorchColumn format for compatibility with PyTorch models.
for column in table.columns:
    table[column] = convert_col(table[column], TorchColumn)

# Convert the TensorTable to a dictionary, to be used as model input.
model_input_dict = table.to_dict()

# Switch the model to evaluation mode to disable dropout and other training-specific operations.
model.eval()

# Perform model inference using the model_input_dict, generating output.
output = model(model_input_dict)

# Use torch.jit.trace to trace the model and generate an optimized TorchScript representation.
# strict=True ensures complete consistency between the inputs and model architecture.
traced_model = torch.jit.trace(model, (model_input_dict,), strict=True)

# Indicate that tracing is complete.
print("Model tracing completed successfully!")

# Check the 'item_id-list__values' column in the input dictionary.
print("Sample input from 'item_id-list__values':", model_input_dict['item_id-list__values'])


Model tracing completed successfully!
Sample input from 'item_id-list__values': tensor([  603,   877,   741,    89,  4776,  1582,  3445,  8082,  3445,  4017,
          741,  4776,   183, 12287,  2064,   429,     5,    29,     5,   156,
         1986,  2589, 10854,  8216,  4209,  8710,  4241,    80,   111,  4241,
         5731,  6809,    32,     5,    72,   663,  2311,  7123,  9112,   444,
         1156,   773,   684,   429,  1944,   474,   596,   288,   165,    28,
          341,   288,    32,   422,   165,   479,  2771,   287,   961,  4000,
         2049,  3273,   498,  1218,   394,  1635, 11838, 10713, 11106,   288,
          165,   649,  1084,   301,    87,   649,   213,   303,   176,   316,
          422,     5,  3817,   930,  2185,  1084,   205,   686,  3830,   686,
          201,    19,    42,    19,  2033, 10456,    19,    20,  1125,  2814,
         4209, 34263,   829,   773,   619,  2049,  1986,  1078,  4712,  1335,
          660,   288,   429,   862,  4828,  5785, 19155, 17269

Inspecting Offsets in Input Data

In the model input dictionary, the item_id-list__offsets column represents the value offsets, which indicate the range of values corresponding to each example.
This is particularly important for handling variable-length sequence data, such as click behavior sequences in user sessions.

The following code demonstrates how to inspect the item_id-list__offsets column:

In [13]:
# Check the 'item_id-list__offsets' column in the input dictionary
print("Sample offsets from 'item_id-list__offsets':", model_input_dict['item_id-list__offsets'])


Sample offsets from 'item_id-list__offsets': tensor([  0,  12,  14,  16,  19,  21,  23,  26,  32,  35,  45,  47,  49,  51,
         56,  59,  61,  66,  69,  72,  76,  78,  80,  83,  87,  91,  94,  96,
         98, 100, 102, 104, 107, 111, 113, 122, 124, 128, 130, 133, 136, 141,
        143, 161, 164, 167, 172, 176, 180, 182, 184, 191, 193, 196, 199, 201,
        209, 214, 234, 237, 243, 245, 265, 267, 274, 276, 281, 289, 295, 298,
        300, 305, 307, 312, 314, 317, 320, 324, 327, 331, 335, 337, 339, 343,
        345, 348, 351, 354, 356, 360, 362, 364, 367, 374, 376, 381, 383, 386,
        388, 396, 401], device='cuda:0', dtype=torch.int32)


7.8: An inference operation graph is built to integrate feature engineering workflow and PyTorch model into an inference pipeline, and stored in a specified path, which provides the basis for online inference of the recommendation system.


In [14]:
# 7.8

# Import necessary modules
# os and shutil: Used for file system operations, such as path management and directory cleanup.
# nvtabular.Workflow: Loads the previously saved feature engineering workflow for preprocessing input data.
# PredictPyTorch and TransformWorkflow:
# PredictPyTorch: Responsible for invoking the TorchScript model for predictions.
# TransformWorkflow: Applies the feature engineering workflow.
import os
import shutil
from nvtabular import Workflow
from merlin.systems.dag.ops.pytorch import PredictPyTorch
from merlin.systems.dag.ops.workflow import TransformWorkflow

# Set the model storage path
# Dynamically retrieve the model storage path using the environment variable `ens_model_path`
# or use the default path: {INPUT_DATA_DIR}/models
ens_model_path = os.environ.get("ens_model_path", f"{INPUT_DATA_DIR}/models")

# Clean up the existing directory and create a new folder
# Ensure the model storage path is a clean directory to avoid confusion with old files.
if os.path.isdir(ens_model_path):
    shutil.rmtree(ens_model_path)  # Delete the existing directory
os.mkdir(ens_model_path)  # Create a new directory

# Load the previously saved workflow
# A workflow defines feature transformations and records the logic of feature engineering.
workflow = Workflow.load(os.path.join(INPUT_DATA_DIR, "workflow_etl"))

# Build the operation graph with workflow and TorchScript model
torch_op = (
    workflow.input_schema.column_names  # Input features
    >> TransformWorkflow(workflow)  # Apply the workflow for feature transformations
    >> PredictPyTorch(traced_model, model.input_schema, model.output_schema)  # Use the PyTorch model for predictions
)

# Indicate completion of the operation
print(f"Model operation graph created and stored in: {ens_model_path}")


Model operation graph created and stored in: /models




# 7.8: Saving Model parameter for attack model.

df = cudf.read_parquet(os.path.join(INPUT_DATA_DIR, f"preproc_sessions_by_day/{start_time_idx}/train.parquet"), columns=model.input_schema.column_names)
df.head()

<div>
<style scoped>
    .dataframe tbody tr th:only-of-type {
        vertical-align: middle;
    }

    .dataframe tbody tr th {
        vertical-align: top;
    }

    .dataframe thead th {
        text-align: right;
    }
</style>
<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>product_recency_days_log_norm-list</th>
      <th>et_dayofweek_sin-list</th>
      <th>item_id-list</th>
      <th>category-list</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>0</th>
      <td>[-2.751772, -3.0140762, -3.0140762, -2.7857492...</td>
      <td>[0.9749277124471076, 0.9749277124471076, 0.974...</td>
      <td>[18299, 16220, 15865, 10992, 9671, 10416, 2192...</td>
      <td>[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]</td>
    </tr>
    <tr>
      <th>1</th>
      <td>[-2.6465325, -2.6465726, -2.6463258, -2.646005...</td>
      <td>[0.9749277124471076, 0.9749277124471076, 0.974...</td>
      <td>[370, 650, 650, 370, 273, 339, 191, 2340, 1758...</td>
      <td>[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]</td>
    </tr>
    <tr>
      <th>2</th>
      <td>[-2.6599658, -2.6397069, -2.6337621, -2.638489...</td>
      <td>[0.9749277124471076, 0.9749277124471076, 0.974...</td>
      <td>[740, 18, 766, 18, 531, 531, 18, 740, 18, 150,...</td>
      <td>[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...</td>
    </tr>
    <tr>
      <th>4</th>
      <td>[-2.55788, -2.572989, -2.5622084, -2.5649703, ...</td>
      <td>[0.9749277124471076, 0.9749277124471076, 0.974...</td>
      <td>[858, 4392, 38, 90, 1034, 2654, 594, 7102, 345...</td>
      <td>[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]</td>
    </tr>
    <tr>
      <th>5</th>
      <td>[-2.6032808, -2.601593, -2.6026015, -2.6122754...</td>
      <td>[0.9749277124471076, 0.9749277124471076, 0.974...</td>
      <td>[113, 74, 113, 132, 113, 113, 132, 113, 3206, ...</td>
      <td>[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...</td>
    </tr>
  </tbody>
</table>
</div>


In [None]:
row_seq = 10
table = TensorTable.from_df(df.iloc[:row_seq])
for column in table.columns:
    table[column] = convert_col(table[column], TorchColumn)
model_input_dict = table.to_dict()

In [None]:
model.eval()
item_scores = model(model_input_dict)
item_scores, len(item_scores)

(tensor([[ 0.3122, -0.1045, -0.0541,  ...,  0.3079,  0.1996, -0.0404],
         [ 0.3153, -0.1078, -0.0513,  ...,  0.3066,  0.2012, -0.0439],
         [ 0.3490, -0.1086, -0.0560,  ...,  0.2799,  0.2165, -0.0544],
         ...,
         [ 0.3157, -0.1130, -0.0496,  ...,  0.3101,  0.2063, -0.0433],
         [ 0.3613, -0.0942, -0.0499,  ...,  0.2601,  0.2228, -0.0615],
         [ 0.3429, -0.0999, -0.0530,  ...,  0.2790,  0.2091, -0.0528]],
        device='cuda:0', grad_fn=<DivBackward0>),
 10)

In [None]:
torch.save(item_scores, f"mia/output_member_{row_seq}_seqs.pt")

In [None]:
df_test = cudf.read_parquet(os.path.join(INPUT_DATA_DIR, f"preproc_sessions_by_day/{start_time_idx}/test.parquet"), columns=model.input_schema.column_names)
df_test.head()

<div>
<style scoped>
    .dataframe tbody tr th:only-of-type {
        vertical-align: middle;
    }

    .dataframe tbody tr th {
        vertical-align: top;
    }

    .dataframe thead th {
        text-align: right;
    }
</style>
<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>product_recency_days_log_norm-list</th>
      <th>et_dayofweek_sin-list</th>
      <th>item_id-list</th>
      <th>category-list</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>3</th>
      <td>[-2.5880072, -2.5885816, -2.5964737, -2.587951...</td>
      <td>[0.9749277124471076, 0.9749277124471076, 0.974...</td>
      <td>[671, 1757, 1296, 1757, 1296, 1296, 1296, 233,...</td>
      <td>[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]</td>
    </tr>
    <tr>
      <th>14</th>
      <td>[-2.8195918, -2.8189173, -2.8187954, -2.814405...</td>
      <td>[0.9749277124471076, 0.9749277124471076, 0.974...</td>
      <td>[715, 1498, 1498, 550, 339, 2219, 2775, 2689, ...</td>
      <td>[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]</td>
    </tr>
    <tr>
      <th>21</th>
      <td>[-2.6729205, -2.6728654, -2.7348363, -2.726106...</td>
      <td>[0.9749277124471076, 0.9749277124471076, 0.974...</td>
      <td>[433, 433, 1227, 1786, 1786, 1910, 3418, 3418,...</td>
      <td>[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]</td>
    </tr>
    <tr>
      <th>30</th>
      <td>[-2.7185333, -2.7196856, -2.7212174, -2.721435...</td>
      <td>[0.9749277124471076, 0.9749277124471076, 0.974...</td>
      <td>[1202, 2168, 715, 1922, 1498, 468, 3059, 2168,...</td>
      <td>[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]</td>
    </tr>
    <tr>
      <th>39</th>
      <td>[-2.611448, -2.6098468, -2.609604, -2.6047618,...</td>
      <td>[0.9749277124471076, 0.9749277124471076, 0.974...</td>
      <td>[1655, 1655, 1655, 12942, 12035, 4820, 7783, 3...</td>
      <td>[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]</td>
    </tr>
  </tbody>
</table>
</div>

In [None]:
row_seq = 10
table = TensorTable.from_df(df_test.iloc[:row_seq])
for column in table.columns:
    table[column] = convert_col(table[column], TorchColumn)
model_input_dict_test = table.to_dict()

In [None]:
model.eval()
item_scores_test = model(model_input_dict_test)
item_scores_test, len(item_scores_test)

(tensor([[ 0.2834, -0.1043, -0.0532,  ...,  0.3190,  0.1886, -0.0442],
         [ 0.3393, -0.1000, -0.0495,  ...,  0.2840,  0.2094, -0.0501],
         [ 0.3394, -0.1019, -0.0567,  ...,  0.2746,  0.2147, -0.0532],
         ...,
         [ 0.3410, -0.1085, -0.0532,  ...,  0.2819,  0.2083, -0.0525],
         [ 0.3723, -0.0842, -0.0578,  ...,  0.2486,  0.2149, -0.0606],
         [ 0.3594, -0.0950, -0.0568,  ...,  0.2632,  0.2167, -0.0586]],
        device='cuda:0', grad_fn=<DivBackward0>),
 10)

In [None]:
torch.save(item_scores_test, f"mia/output_nonmember_{row_seq}_seqs.pt")