In [1]:
%matplotlib inline
%load_ext autoreload
%autoreload 2

In [2]:
import sys
sys.path.append('../src')

import numpy as np
import pandas as pd

from spark import SparkSessionFactory, read_csv, write_csv, column_values, train_test_split, shuffle_df, Column
from util import remove_dir, Config, LoggerFactory

import pyspark.sql.types as t
import pyspark.sql as s
import pyspark.sql.functions as f

import logging

## Helpers

In [3]:
def log_counts(train_set, val_set, test_set):
    logging.info(f'Train set count = {train_set.count()} observations.')
    logging.info(f'Validation set count = {val_set.count()} observations.')
    logging.info(f'Test set count = {test_set.count()} observations.')

def show(df, limit=5): return df.limit(limit).toPandas()

def show_columns(df, columns=[], limit=5): return show(df.select(*[f.col(c) for c in columns]), limit)

def show_counts(df, columns=[]):
    logging.info('Count:')
    for column in columns:
        logging.info(f'- {column}: {df.select(column).distinct().count()}')
        
class TrainUserMovieFilter:
    def __init__(self, train_set):
        self.__train_user_seqs = train_set.select('user_seq').distinct().rdd.map(lambda r: r.user_seq).collect()
        self.__train_movie_seqs = train_set.select('movie_seq').distinct().rdd.map(lambda r: r.movie_seq).collect()

    def perform(self, obs_set):
        obs_set2 = obs_set.filter(obs_set['user_seq'].isin(self.__train_user_seqs))
        obs_set3 = obs_set2.filter(obs_set2['movie_seq'].isin(self.__train_movie_seqs))

        logging.info(f'Excluded users: {abs(obs_set.count() - obs_set2.count())}')
        logging.info(f'Excluded movies: {abs(obs_set2.count() - obs_set3.count())}')
        
        return obs_set3

## Logger config

In [4]:
config = Config(path='../config/config.yaml')
LoggerFactory(config['logger']).create()

<RootLogger root (INFO)>

# Prepare model input data

**Step 1**: Create a predefined spack session. this is used to create a pipeline that build the model input features. 

In [5]:
session = SparkSessionFactory.create()
session

In [6]:
session.sparkContext.getConf().getAll()

[('spark.driver.port', '37089'),
 ('spark.executor.instances', '12'),
 ('spark.sql.warehouse.dir',
  'file:/home/adrian/development/machine-learning/recommendations/user-movie-genres-model/spark-warehouse'),
 ('spark.executor.id', 'driver'),
 ('spark.app.startTime', '1645883591594'),
 ('spark.app.id', 'local-1645883592120'),
 ('spark.executor.memory', '1G'),
 ('spark.app.name', 'recommendations'),
 ('spark.driver.host', 'skynet'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.memory', '4G'),
 ('spark.ui.showConsoleProgress', 'true')]

**Note**: When create a spark session this run a new spark cluster with one instance in localhost. You can monitor instance jobs clicking the **Spark UI** link. 

**Step 2**: Load raw dataset to a spark dataset. 

In [7]:
TEMP_PATH = './temp'
DATASET_PATH = f'{TEMP_PATH}/dataset'
TRAIN_PATH = f'{TEMP_PATH}/train'
VAL_PATH = f'{TEMP_PATH}/val'
TEST_PATH = f'{TEMP_PATH}/test'

In [8]:
dataset = shuffle_df(read_csv(session, f'{DATASET_PATH}/*.csv'))

**Step 2**: Get all users and movies ids and let's see how many elements has each.

In [9]:
show_counts(dataset, ['user_id', 'movie_id'])

2022-02-26 10:53:14 INFO Count:
2022-02-26 10:53:15 INFO - user_id: 610
2022-02-26 10:53:15 INFO - movie_id: 9724


**Step 3:** Let's see all dataset columns.

In [10]:
dataset.columns

['rating',
 'user_id',
 'movie_id',
 'gen_comedy',
 'gen_drama',
 'gen_romance',
 'gen_action',
 'gen_adventure',
 'gen_sci_fi',
 'gen_crime',
 'gen_thriller',
 'gen_war',
 'gen_documentary',
 'gen_mystery',
 'gen_imax',
 'gen_horror',
 'gen_children',
 'gen_fantasy',
 'gen_animation',
 'gen_musical',
 'gen_film_noir',
 'gen_western',
 'gen_none']

**Step 4**: add user an movies sequence index/new ids.

In [11]:
# dataset_part, _ = train_test_split(dataset, test_size=0.8)
dataset_part = dataset

In [12]:
dataset_part2 = Column.sequence(session, dataset_part, 'user_id', 'user_seq')
dataset_part3 = Column.sequence(session, dataset_part2, 'movie_id', 'movie_seq')

show_counts(dataset_part3, ['user_id', 'user_seq', 'movie_id', 'movie_seq'])

2022-02-26 10:53:21 INFO Count:
2022-02-26 10:53:22 INFO - user_id: 610
2022-02-26 10:53:22 INFO - user_seq: 610
2022-02-26 10:53:23 INFO - movie_id: 9724
2022-02-26 10:53:23 INFO - movie_seq: 9724


**Step 5**: Split data into train, validacion, test sets.

In [13]:
train_set, val_test_sets = train_test_split(dataset_part3, test_size=0.3)
val_set, test_set = train_test_split(val_test_sets, test_size=0.3)

log_counts(train_set, val_set, test_set)

2022-02-26 10:53:23 INFO Train set count = 70288 observations.
2022-02-26 10:53:23 INFO Validation set count = 21253 observations.
2022-02-26 10:53:24 INFO Test set count = 9295 observations.


**Step 6**: Get only test and val samples for users and movies that appears in the train set.

In [14]:
filter = TrainUserMovieFilter(train_set)

val_set2 = filter.perform(val_set)
test_set2 = filter.perform(test_set)

log_counts(train_set, val_set2, test_set2)

2022-02-26 10:53:30 INFO Excluded users: 0
2022-02-26 10:53:30 INFO Excluded movies: 1055
2022-02-26 10:53:36 INFO Excluded users: 0
2022-02-26 10:53:36 INFO Excluded movies: 444
2022-02-26 10:53:37 INFO Train set count = 70288 observations.
2022-02-26 10:53:38 INFO Validation set count = 20198 observations.
2022-02-26 10:53:38 INFO Test set count = 8851 observations.


In [15]:
remove_dir(TRAIN_PATH)
remove_dir(VAL_PATH)
remove_dir(TEST_PATH)

'./temp/test'

In [16]:
write_csv(train_set, TRAIN_PATH)
write_csv(val_set2, VAL_PATH)
write_csv(test_set2, TEST_PATH)

In [17]:
session.stop()