In [1]:
# !pip install elephas

In [2]:
import os
import subprocess
import time
import findspark
findspark.init()

### Experiment setup

In [3]:
start_year = 1950
end_year = 1960
num_workers = 8
collect_stats = True
executable = 'python.exe'
monitor = 'resources_monitor.py'

setup = f'_{str(end_year)[-2:]}_{num_workers}'

In [23]:
# creating stats folders
folders = ['data_loading','ratings_prep','movies_prep','movies_ratings_join','subsets_stats','ratings_array','model_train_eval']
folder_path = "movie_lens_stats"

if not os.path.exists(folder_path): os.mkdir(folder_path)
for f in folders: 
    if os.path.exists(folder_path+f'/{f}'): continue
    os.mkdir(folder_path+f'/{f}')
    print(f'created folder {f}')

created folder data_loading
created folder ratings_prep
created folder movies_prep
created folder movies_ratings_join
created folder subsets_stats
created folder ratings_array
created folder model_train_eval


In [5]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName('AE_Rec_Sys') \
        .setMaster('local[*]') \
        .set("spark.driver.memory", "9g") \
        .set("spark.executor.memory", "2g") \
        .set("spark.driver.maxResultSize", "2g")

conf.set("spark.executor.instances", str(num_workers))

sc = SparkContext(conf=conf)

In [6]:
sc

### Step 0: data loading

In [7]:
step = 0
dest = f'./{folder_path}/{folders[step]}/{folders[step]}{setup}.csv'
if collect_stats: stats_collector = subprocess.Popen([executable, monitor, dest, '1'])
    
ratings = sc.textFile('ml-25m/ratings.csv')

if collect_stats: 
    action = ratings.count()
    stats_collector.kill()
if num_workers == 1: ratings = ratings.repartition(1)

### Step 1: ratings data cleaning and reshaping

In [8]:
# GET (movieId, (userId, rating)) ratings RDD

step = 1
dest = f'./{folder_path}/{folders[step]}/{folders[step]}{setup}.csv'
if collect_stats: stats_collector = subprocess.Popen([executable, monitor, dest, '1'])
    
ratings = ratings.filter(lambda line: line != 'userId,movieId,rating,timestamp') \
            .map(lambda line: line.split(',')) \
            .map(lambda rating: (rating[1], (rating[0], float(rating[2]))))

if collect_stats: 
    action = ratings.count()
    stats_collector.kill()
ratings.take(10)

[('296', ('1', 5.0)),
 ('306', ('1', 3.5)),
 ('307', ('1', 5.0)),
 ('665', ('1', 5.0)),
 ('899', ('1', 3.5)),
 ('1088', ('1', 4.0)),
 ('1175', ('1', 3.5)),
 ('1217', ('1', 3.5)),
 ('1237', ('1', 5.0)),
 ('1250', ('1', 4.0))]

### STEP 2: movies data cleaning, treatment and filtering by year range

In [9]:
movies = sc.textFile('ml-25m/movies.csv')
if num_workers == 1: movies = movies.repartition(1)    

In [10]:
# GET (movie_id, (title, year)) movies RDD
# remove 'movieId,title,genres' header line and all films without a specified year
# movie title may contain ','
# movie titles containing ',' are enclosed in double quotes

import re
pattern = r'\(\d{4}\)' # '(yyyy)' year format

def remove_enclosing_double_quotes(movie_string):
    if movie_string[0] == '"': movie_string = movie_string[1:]
    if movie_string[-1] == '"': movie_string = movie_string[:-1]
    return movie_string

step = 2
dest = f'./{folder_path}/{folders[step]}/{folders[step]}{setup}.csv'
if collect_stats: stats_collector = subprocess.Popen([executable, monitor, dest, 'no_interval'])
    
movies = movies.filter(lambda line: re.search(pattern, line)) \
            .map(lambda line: line.split(',')) \
            .map(lambda movie: (movie[0], ','.join(movie[1:-1]))) \
            .map(lambda movie: (movie[0], remove_enclosing_double_quotes(movie[1]))) \
            .map(lambda movie: (movie[0], movie[1][:-7], re.findall(pattern, movie[1])[-1])) \
            .map(lambda movie: (movie[0], movie[1], movie[2].translate(str.maketrans('', '', '()')))) \
            .map(lambda movie: (movie[0], (movie[1], int(movie[2])))) \
            .filter(lambda movie: movie[1][1] >= start_year and movie[1][1] <= end_year)

# (movie_id, (title, year))
if collect_stats: 
    action = movies.count()
    stats_collector.kill()
movies.take(10)

[('659', ('Purple Noon (Plein soleil)', 1960)),
 ('668', ('Song of the Little Road (Pather Panchali)', 1955)),
 ('670', ('World of Apu, The (Apur Sansar)', 1959)),
 ('755', ('Kim', 1950)),
 ('820', ('Death in the Garden (Mort en ce jardin, La)', 1956)),
 ('841', ('Eyes Without a Face (Yeux sans visage, Les)', 1959)),
 ('854', ('Ballad of Narayama, The (Narayama Bushiko)', 1958)),
 ('899', ("Singin' in the Rain", 1952)),
 ('900', ('American in Paris, An', 1951)),
 ('901', ('Funny Face', 1957))]

### STEP 3: JOIN ratings and movies RDDs

In [11]:
# joined rdds results in (movie_id, ((userId, rating), (title, year)))

step = 3
dest = f'./{folder_path}/{folders[step]}/{folders[step]}{setup}.csv'
if collect_stats: stats_collector = subprocess.Popen([executable, monitor, dest, '1'])
    
# filtered_ratings
ratings = ratings.join(movies) \
            .map(lambda rating: (rating[1][0][0], rating[0], rating[1][0][1]))

if collect_stats: 
    action = ratings.count()
    stats_collector.kill()
    
# (userId, movie_id, rating)
ratings.take(10)

[('1', '1250', 4.0),
 ('9', '1250', 5.0),
 ('20', '1250', 4.5),
 ('38', '1250', 3.5),
 ('58', '1250', 4.5),
 ('59', '1250', 4.0),
 ('72', '1250', 4.0),
 ('75', '1250', 3.5),
 ('86', '1250', 5.0),
 ('99', '1250', 3.5)]

In [12]:
# in this case, join produces more partitions
if num_workers == 1: ratings = ratings.repartition(1)
ratings.persist() # rdd.persist() since this rdd will be used different times

PythonRDD[14] at RDD at PythonRDD.scala:53

### STEP 4: collecting subset stats 

In [13]:
step = 4
dest = f'./{folder_path}/{folders[step]}/{folders[step]}{setup}.csv'
if collect_stats: stats_collector = subprocess.Popen([executable, monitor, dest, '1'])
    
# {'movie_id': rating_array_index}
movies_ids = ratings.map(lambda x: x[1]).distinct()
movies_id_index_pairs = {}
for i, film in enumerate(movies_ids.collect()): 
    movies_id_index_pairs[film] = i
movies_ids.unpersist()
    
# number of movies
number_of_movies = len(movies_id_index_pairs)

# max rating(useful for normalization)
unique_ratings = ratings.map(lambda x: x[2]).distinct()
max_rating = max(unique_ratings.collect())
unique_ratings.unpersist()

# number of users
users_ids = ratings.map(lambda x: x[0]).distinct()
number_of_users = users_ids.count()
users_ids.unpersist()

# number of ratings
numer_of_ratings = ratings.count()

if collect_stats: stats_collector.kill()

# subset stats:
print('number of movies', number_of_movies)
print('number of users', number_of_users)
print('number of ratings', numer_of_ratings)

number of movies 2821
number of users 77048
number of ratings 565530


### STEP 5: users' ratings arrays creation + normalization [0, 1]

In [14]:
import numpy as np 

def full_ratings_array(ratings, num_films, films_idx):
    ratings_array = np.zeros(num_films)
    for rating in ratings: 
        ratings_array[films_idx[rating[0]]] = rating[1]
    return ratings_array
    
step = 5
dest = f'./{folder_path}/{folders[step]}/{folders[step]}{setup}.csv'
if collect_stats: stats_collector = subprocess.Popen([executable, monitor, dest, 'no_interval'])

aggregated_user_ratings = ratings.map(lambda x: (x[0], (x[1], x[2]))) \
                            .groupByKey() \
                            .map(lambda x : full_ratings_array(x[1], number_of_movies, movies_id_index_pairs)) \
                            .map(lambda rating: rating/max_rating)

if collect_stats: 
    action = aggregated_user_ratings.count()
    stats_collector.kill()
    
# (array_ratings[])
aggregated_user_ratings.take(10)

[array([1., 0., 0., ..., 0., 0., 0.]),
 array([0.6, 0. , 0. , ..., 0. , 0. , 0. ]),
 array([0.8, 0. , 0. , ..., 0. , 0. , 0. ]),
 array([0.6, 0. , 0. , ..., 0. , 0. , 0. ]),
 array([0.8, 0. , 0. , ..., 0. , 0. , 0. ]),
 array([0.8, 0. , 0. , ..., 0. , 0. , 0. ]),
 array([1., 0., 0., ..., 0., 0., 0.]),
 array([0.7, 0. , 0. , ..., 0. , 0. , 0. ]),
 array([0.8, 0. , 0. , ..., 0. , 0. , 0. ]),
 array([0.8, 0. , 0. , ..., 0. , 0. , 0. ])]

In [15]:
# ratings rdd is no longer needed. 
ratings.unpersist()

PythonRDD[14] at RDD at PythonRDD.scala:53

### autoencoder

In [16]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.optimizers import Adam

def create_autoencoder(samples_dim):
    
    model = Sequential()
    model.add(Dense(150, input_dim=samples_dim))
    model.add(Activation('relu'))
    model.add(Dense(25))
    model.add(Activation('relu'))
    
    model.add(Dense(150))
    model.add(Activation('relu'))
    model.add(Dense(samples_dim))
    model.add(Activation('sigmoid'))

    model.compile(loss='mean_squared_error', optimizer=Adam(learning_rate=0.001))
    
    return model

In [17]:
autoencoder = create_autoencoder(number_of_movies)
autoencoder.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 150)               423300    
                                                                 
 activation (Activation)     (None, 150)               0         
                                                                 
 dense_1 (Dense)             (None, 25)                3775      
                                                                 
 activation_1 (Activation)   (None, 25)                0         
                                                                 
 dense_2 (Dense)             (None, 150)               3900      
                                                                 
 activation_2 (Activation)   (None, 150)               0         
                                                                 
 dense_3 (Dense)             (None, 2821)              4

In [18]:
# # Self-made epoch by epoch model training just to plot loss curves and show training behaviour. 
# # Elephas fit method does not provide losses history


# from evaluable_training import evaluable_distributed_training
# import numpy as np

# train_losses, val_losses = evaluable_distributed_training(sc, aggregated_user_ratings, autoencoder, num_workers, 100)
# np.save(f'train_losses{setup}.npy', np.array(train_losses))
# np.save(f'val_losses{setup}.npy', np.array(val_losses))

### STEP 6:  model training

In [19]:
train_rdd, test_rdd = aggregated_user_ratings.randomSplit([0.8, 0.2], seed=42)

# CHECK IF SAMPLES ARE WELL DISTRIBUTED OVER PARTITIONS
partition_sizes = train_rdd.mapPartitions(lambda partition: [len(list(partition))])
print(partition_sizes.collect())

[2733, 2642, 2707, 2723, 2692, 2697, 2621, 2653, 2703, 2750, 2662, 2755, 2650, 2601, 2588, 2722, 2624, 2710, 2678, 2571, 2611, 2577, 2821]


In [20]:
from elephas.spark_model import SparkModel

# (input, target) elephas rdd required format
train_rdd = train_rdd.map(lambda rating_array: (rating_array,rating_array))
epochs = 50
spark_ae_model = SparkModel(autoencoder, frequency='epoch', mode='synchronous', num_workers=num_workers)

step = 6
dest = f'./{folder_path}/{folders[step]}/model_train{setup}.csv'
if collect_stats: stats_collector = subprocess.Popen([executable, monitor, dest, '10'])

spark_ae_model.fit(train_rdd, epochs=epochs, batch_size=64, verbose=1, validation_split=0.1)
if collect_stats: stats_collector.kill()
    
# save the trained AE
spark_ae_model.save(f'trained AEs/AE_model{setup}.keras')

train_rdd = train_rdd.map(lambda rating_array: rating_array[0]) # removing redundancy

>>> Fit model
>>> Synchronous training complete.


### STEP 7: model evaluation

In [21]:
from evaluable_training import calculate_avg_mse_loss
import pandas as pd

train_recs = spark_ae_model.predict(train_rdd)
train_recs_rdd = sc.parallelize(train_recs)
train_mse = calculate_avg_mse_loss(train_rdd, train_recs_rdd)
train_recs_rdd.unpersist()

test_recs = spark_ae_model.predict(test_rdd)
test_recs_rdd = sc.parallelize(test_recs)
test_mse = calculate_avg_mse_loss(test_rdd, test_recs_rdd)
test_recs_rdd.unpersist()

print('model\'s performances on train set:')
print('\tmse:  ', train_mse)
print('\trmse: ', np.sqrt(train_mse))
print()
print('model\'s performances on test set:')
print('\tmse:  ', test_mse)
print('\trmse: ', np.sqrt(test_mse))

model's performances on train set:
	mse:   0.0011734025231792842
	rmse:  0.03425496348238141

model's performances on test set:
	mse:   0.001184529329908802
	rmse:  0.034416991877687424


In [27]:
model_eval = {
    'train_mse': [train_mse],
    'train_rmse': [np.sqrt(train_mse)],
    'test_mse': [test_mse],
    'test_rmse': [np.sqrt(test_mse)],
}
df = pd.DataFrame(model_eval)
dest = f'./{folder_path}/{folders[step]}/model_eval{setup}.csv'
df.to_csv(dest, index=False)

### STEP 8: example of recommendation

In [22]:
# RECOMMENDATION METHOD: Users with similar preferences will have similar representations in the latent space. 
# If two users are close in the latent space, it suggests that their preferences are alike. 
# Consequently, items that are well-rated by one user are likely to be well-rated by the other. 

from tensorflow.keras.models import load_model
from IPython.display import display, HTML

AE = load_model(f'trained AEs/AE_model{setup}.keras')
user_ratings = aggregated_user_ratings.takeSample(False, 1)[0]  # sampling a random user's ratings

user_ratings = np.expand_dims(user_ratings, axis=0)
rec = AE.predict(user_ratings, verbose=0)

merged = np.stack([np.arange(0, number_of_movies), 
                    np.squeeze(user_ratings, axis=0), 
                    np.squeeze(rec, axis=0)
                  ], axis=1)
filtered_ratings = list(filter(lambda rating_pair: rating_pair[1] == 0, merged))
recom_movie_index = max(filtered_ratings,key=lambda item: item[2])[0]
movie_id = list(movies_id_index_pairs.keys())[int(recom_movie_index)]
movie_data = movies.filter(lambda movie: movie[0]==movie_id).collect()[0]
suggested_movie = f'{movie_data[1][0]} ({movie_data[1][1]})'

display(HTML(f'<br><font size="3">Suggested movie: </font><br><h1>{suggested_movie}</h1>'))

In [None]:
sc.stop()