# ETL process NVTabular Vs Pandas

## Import libraries

In [0]:
# External dependencies
import os
import pandas as pd
from nvtabular.utils import download_file

# Get dataframe library - cudf or pandas
from nvtabular.dispatch import get_lib
df_lib = get_lib()
import shutil
import numpy as np
import nvtabular as nvt
from os import path

## Define data path

In [0]:
INPUT_DATA_DIR = os.environ.get(
    "INPUT_DATA_DIR", os.path.expanduser("~/nvt-examples/movielens/data/")
)

## Download data

In [0]:
download_file(
    "http://files.grouplens.org/datasets/movielens/ml-25m.zip",
    os.path.join(INPUT_DATA_DIR, "ml-25m.zip"),
)

## Read data using NVT

In [0]:
%%time
movies = df_lib.read_csv(os.path.join(INPUT_DATA_DIR, "ml-25m/movies.csv"))

In [0]:
%%time
ratings = df_lib.read_csv(os.path.join(INPUT_DATA_DIR, "ml-25m", "ratings.csv"))
ratings.head()
print(ratings.shape)

In [0]:
movies.head()
print(movies.shape)

## Read data using Pandas

In [0]:
%%time
pd_movies = pd.read_csv(os.path.join(INPUT_DATA_DIR, "ml-25m/movies.csv"))

In [0]:
pd_movies.head()
print(pd_movies.shape)

In [0]:
%%time
pd_ratings = pd.read_csv(os.path.join(INPUT_DATA_DIR, "ml-25m", "ratings.csv"))
ratings.head()
print(ratings.shape)

## Data cleaning NVT

In [0]:
%%time
movies["genres"] = movies["genres"].str.split("|")
movies = movies.drop("title", axis=1)
movies.head()

Unnamed: 0,movieId,genres
0,1,"[Adventure, Animation, Children, Comedy, Fantasy]"
1,2,"[Adventure, Children, Fantasy]"
2,3,"[Comedy, Romance]"
3,4,"[Comedy, Drama, Romance]"
4,5,[Comedy]


## Data cleaning Pandas

In [0]:
%%time
pd_movies["genres"] = pd_movies["genres"].str.split("|")
pd_movies = pd_movies.drop("title", axis=1)
pd_movies.head()

Unnamed: 0,movieId,genres
0,1,"[Adventure, Animation, Children, Comedy, Fantasy]"
1,2,"[Adventure, Children, Fantasy]"
2,3,"[Comedy, Romance]"
3,4,"[Comedy, Drama, Romance]"
4,5,[Comedy]


## Convert to Parquet using NVT

In [0]:
%%time 
movies.to_parquet(os.path.join(INPUT_DATA_DIR, "movies_convertedNVT.parquet"))

## Convert to Parquet using Pandas

In [0]:
%%time 
pd_movies.to_parquet(os.path.join(INPUT_DATA_DIR, "movies_convertedPD.parquet"))

## Preprocess data NVT

In [0]:
ratings = ratings.drop("timestamp", axis=1)
ratings.to_parquet(os.path.join(INPUT_DATA_DIR, "ratings.parquet"))

## Preprocess data Pandas

In [0]:
pd_ratings = pd_ratings.drop("timestamp", axis=1)
pd_ratings.to_parquet(os.path.join(INPUT_DATA_DIR, "ratingsPD.parquet"))

## Read Parquet data NVT

In [0]:
%%time
movies_nvt = df_lib.read_parquet(os.path.join(INPUT_DATA_DIR, "movies_convertedNVT.parquet"))
movies_nvt.head()

Unnamed: 0,movieId,genres
0,1,"[Adventure, Animation, Children, Comedy, Fantasy]"
1,2,"[Adventure, Children, Fantasy]"
2,3,"[Comedy, Romance]"
3,4,"[Comedy, Drama, Romance]"
4,5,[Comedy]


## Read Parquet data Pandas

In [0]:
%%time
movies_pd = pd.read_parquet(os.path.join(INPUT_DATA_DIR, "movies_convertedPD.parquet"))
movies_pd.head()

Unnamed: 0,movieId,genres
0,1,"[Adventure, Animation, Children, Comedy, Fantasy]"
1,2,"[Adventure, Children, Fantasy]"
2,3,"[Comedy, Romance]"
3,4,"[Comedy, Drama, Romance]"
4,5,[Comedy]


In [0]:
CATEGORICAL_COLUMNS = ["userId", "movieId"]
LABEL_COLUMNS = ["rating"]

In [0]:
!pip install graphviz

## Join operation NVT

In [0]:
%%time
joined = ["userId", "movieId"] >> nvt.ops.JoinExternal(movies_nvt, on=["movieId"])
join_graph = joined.graph

In [0]:
print(join_graph.source)

## Lambda operation NVT on a specific column

In [0]:
%%time
ratings = nvt.ColumnGroup(["rating"]) >> (lambda col: (col > 3).astype("int8"))

## Set up NVT workflow

In [0]:
output = joined + ratings

## Define workflow

In [0]:
workflow = nvt.Workflow(output)

In [0]:
dict_dtypes = {}

for col in CATEGORICAL_COLUMNS:
    dict_dtypes[col] = np.int64

for col in LABEL_COLUMNS:
    dict_dtypes[col] = np.float32

In [0]:
rating_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "ratings.parquet")])

## Execute NVT workflow

In [0]:
%%time
workflow.fit(rating_dataset)

In [0]:
%%time
workflow.transform(rating_dataset).to_parquet(
    output_path=os.path.join(INPUT_DATA_DIR, "output"),
    shuffle=nvt.io.Shuffle.PER_PARTITION,
    cats=["userId", "movieId", "genres"],
    labels=["rating"],
    dtypes=dict_dtypes,
)

## Save workflow for further use

In [0]:
workflow.save(os.path.join(INPUT_DATA_DIR, "workflow"))

## Load and check NVT processed data

In [0]:
import glob

PATHS = sorted(glob.glob(os.path.join(INPUT_DATA_DIR, "output", "*.parquet")))

df = df_lib.read_parquet(PATHS[0])
df.set_index('movieId', inplace=True)
df

Unnamed: 0_level_0,userId,genres,rating
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
31696,22804,"[Action, Fantasy, Horror, Thriller]",1.0
30810,9193,"[Adventure, Comedy, Fantasy]",1.0
48385,9193,[Comedy],1.0
37733,22804,"[Action, Crime, Drama, Thriller]",1.0
508,63323,[Drama],1.0
...,...,...,...
79134,63205,[Comedy],0.0
7895,22716,"[Crime, Drama, Thriller]",0.0
97752,9094,"[Drama, Sci-Fi, IMAX]",1.0
89745,9094,"[Action, Adventure, Sci-Fi, IMAX]",1.0


## Perform same operations as NVT using Pandas

In [0]:
pd_ratings_dataset = pd.read_parquet([os.path.join(INPUT_DATA_DIR, "ratingsPD.parquet")])

In [0]:
%%time
pd_ratings_dataset.set_index('movieId', inplace=True)
movies_pd.set_index('movieId', inplace=True)

In [0]:
pd_ratings_dataset.head()

Unnamed: 0_level_0,userId,rating
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1
296,1,5.0
306,1,3.5
307,1,5.0
665,1,5.0
899,1,3.5


In [0]:
movies_pd.head()

Unnamed: 0_level_0,genres
movieId,Unnamed: 1_level_1
1,"[Adventure, Animation, Children, Comedy, Fantasy]"
2,"[Adventure, Children, Fantasy]"
3,"[Comedy, Romance]"
4,"[Comedy, Drama, Romance]"
5,[Comedy]


## Join operation Pandas

In [0]:
%%time

joined_pd = movies_pd.join(pd_ratings_dataset, on="movieId", how="left")
joined_pd

Unnamed: 0_level_0,genres,userId,rating
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,"[Adventure, Animation, Children, Comedy, Fantasy]",2.0,3.5
1,"[Adventure, Animation, Children, Comedy, Fantasy]",3.0,4.0
1,"[Adventure, Animation, Children, Comedy, Fantasy]",4.0,3.0
1,"[Adventure, Animation, Children, Comedy, Fantasy]",5.0,4.0
1,"[Adventure, Animation, Children, Comedy, Fantasy]",8.0,4.0
...,...,...,...
209157,[Drama],119571.0,1.5
209159,[Documentary],115835.0,3.0
209163,"[Comedy, Drama]",6964.0,4.5
209169,[(no genres listed)],119571.0,3.0


## Lambda operation Pandas

In [0]:
%%time
joined_pd["rating"] = joined_pd["rating"].apply((lambda col: (col > 3))).astype("int8")

In [0]:
joined_pd

Unnamed: 0_level_0,genres,userId,rating
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,"[Adventure, Animation, Children, Comedy, Fantasy]",2.0,1
1,"[Adventure, Animation, Children, Comedy, Fantasy]",3.0,1
1,"[Adventure, Animation, Children, Comedy, Fantasy]",4.0,0
1,"[Adventure, Animation, Children, Comedy, Fantasy]",5.0,1
1,"[Adventure, Animation, Children, Comedy, Fantasy]",8.0,1
...,...,...,...
209157,[Drama],119571.0,0
209159,[Documentary],115835.0,0
209163,"[Comedy, Drama]",6964.0,1
209169,[(no genres listed)],119571.0,0
