# Non Personalized Recommender Systems

In [1]:
from pyspark.sql import functions as f, types as t, Window
from pyspark.ml.linalg import Vectors, VectorUDT

### MovieLens dataset: 20m ratings

In [7]:
import os

common_path = '/data/MobodMovieLens/train'

# ratings = spark.read.csv(os.path.join(common_path, 'ratings.csv'), header=True).cache()
movies = spark.read.csv(os.path.join(common_path, 'movies.csv'), header=True).cache()

In [11]:
"""
ratings_tmp = (
    ratings
    .withColumn(
        'order',
        f.row_number().over(Window.partitionBy('userId').orderBy('timestamp')) /
        f.count('*').over(Window.partitionBy('userId'))
    )
    .withColumn('hash', f.abs(f.hash('userId')) % 211)
)
ratings_train_A = (
    ratings_tmp
    .filter((f.col('hash') > 0) & (f.col('hash') <= 105) & (f.col('order') < 0.905))
    .drop('order', 'hash')
    .cache()
)
ratings_train_B = (
    ratings_tmp
    .filter((f.col('hash') > 106) & (f.col('order') < 0.905))
    .drop('order', 'hash')
    .cache()
)
ratings_dev = (
    ratings_tmp
    .filter((f.col('hash') == 0) | (f.col('order') >= 0.905))
    .drop('order', 'hash')
    .cache()
)
""";

In [12]:
def load(paths, has_rating=True):
    if not isinstance(paths, list):
        paths = [paths]
    result = (
        spark.read.csv(paths, header=True)
        .withColumn('userId', f.col('userId').cast('int'))
        .withColumn('movieId', f.col('movieId').cast('int'))
        .withColumn('timestamp', f.col('timestamp').cast('int'))
        .filter(f.col('userId').isNotNull())
    )
    if has_rating:
        result = result.withColumn('rating', f.col('rating').cast('float'))
    return result.cache()

In [13]:
ratings_train_AB = load(['ratings_train_A.csv', 'ratings_train_B.csv'])
# ratings_train_A = load('ratings_train_A.csv')
# ratings_train_B = load('ratings_train_B.csv')
ratings_dev = load('ratings_dev.csv')

In [14]:
ratings_test = load('/data/MobodMovieLens/test/ratings.csv', has_rating=False)

In [15]:
print('movies count in movie table: ', movies.count())
print('ratings in train:', ratings_train_AB.count())
print('ratings in dev:', ratings_dev.count())
print('ratings in test:', ratings_test.count())
print('users in train: ', ratings_train_AB.agg(f.countDistinct('userId').alias('user_cnt')).collect()[0].user_cnt)
print('users in dev: ', ratings_dev.agg(f.countDistinct('userId').alias('user_cnt')).collect()[0].user_cnt)
print(
    'movies in train: ',
    ratings_train_AB.agg(f.countDistinct('movieId').alias('movie_cnt')).collect()[0].movie_cnt
)
print(
    'movies in dev: ',
    ratings_dev.agg(f.countDistinct('movieId').alias('movie_cnt')).collect()[0].movie_cnt
)

movies count in movie table:  27278
ratings in train: 14453730
ratings in dev: 1690360
ratings in test: 3790860
users in train:  133296
users in dev:  134597
movies in train:  20693
movies in dev:  16904


In [16]:
def calc_score(data_with_predictions, prediction_col='prediction', rating_col='rating', need_print=True):
    score = (
        data_with_predictions
        .withColumn(
            'multiplier',
            f.when((f.col(rating_col) < 4) & (f.col(prediction_col) >= 4), 2).otherwise(1)
        )
        .agg(f.avg(f.col('multiplier') * (f.col(rating_col) - f.col(prediction_col)) ** 2).alias('score'))
    ).collect()
    score = float(score[0].score)
    if need_print:
        print('score: ', score)
    return score

In [5]:
ratings_dev.agg(f.avg(f.col('rating')).alias('mean_rating')).collect()[0]['mean_rating']

3.4941828959511585

## Let's train user - movie ALS

In [17]:
def simple_transform(ALS_model, ratings_dev, predictions_col='prediction'):
    return ALS_model.transform(ratings_dev).filter(predictions_col + ' != "NaN"')

In [18]:
from pyspark.ml.recommendation import ALS

import copy
import itertools
import random
import time


def search_ALS_params(
        ratings_train, ratings_dev,
        model_path, max_iters, reg_params, ranks,
        num_user_blocks=10, num_item_blocks=10, random_search_iters=20,
        rating_col='rating', item_col='movieId', user_col='userId',
        seen_params=dict(), transform_dev=simple_transform,
        **kwargs
):
    best_params = None
    seen_params = copy.copy(seen_params)
    best_score = min(seen_params.values()) if seen_params else 1000
    best_model = None
    param_set_variants = list(itertools.product(max_iters, reg_params, ranks))
    error_seq_len = 0
    
    for i in range(random_search_iters):
        try:
            params_triple = random.choice(param_set_variants)
            if params_triple in seen_params:
                continue
            max_iter, reg_params, rank = params_triple
            params = {
                'maxIter': max_iter, 'regParam': reg_params, 'rank': rank,
            }
            ALS_model_creator = ALS(
                userCol=user_col, itemCol=item_col, ratingCol=rating_col,
                numUserBlocks=num_user_blocks, numItemBlocks=num_item_blocks,
                **params
            )
            ALS_model = ALS_model_creator.fit(ratings_train)
            model_predictions_dev = transform_dev(ALS_model, ratings_dev, **kwargs)
            score = calc_score(model_predictions_dev, need_print=False)
            print(params_triple, score)
            if score < best_score:
                print('new best')
                best_score = score
                best_model = ALS_model
                best_model.write().overwrite().save(model_path)
                best_params = params_triple
            seen_params[params_triple] = score
            error_seq_len = 0
        except KeyboardInterrupt:
            break
        except Exception as e:
            print(str(e)[:500])
            time.sleep(10 + 20 * error_seq_len)
            error_seq_len += 1
            if error_seq_len > 5:
                break
            continue

    return best_model, best_params, best_score, seen_params

### Tune params

In [None]:
best_model, best_params, best_score, seen_params = search_ALS_params(
    ratings_train_AB, ratings_dev,
    'baseline_ALS_path.bin',
    [10], [0.1], [10],
    random_search_iters=1,
)

(10, 0.1, 10) 0.6982353615384809
new best


In [7]:
best_model, best_params, best_score, seen_params = search_ALS_params(
    ratings_train_AB, ratings_dev,
    'user_movie_ALS_AB_model.bin',
    [10, 20, 40], [0.01, 0.03, 0.1, 0.3, 1, 3], [10, 20, 40],
    random_search_iters=100,
)

(10, 0.01, 10) 0.7574962993602554
new best
(20, 0.01, 40) 0.9130430194152095


In [7]:
best_model, best_params, best_score, seen_params = search_ALS_params(
    ratings_train_AB, ratings_dev,
    'user_movie_ALS_AB_model.bin',
    [5, 10, 20], [0.03, 0.1, 0.3, 1, 3], [5, 10, 20],
    random_search_iters=1,
)

(10, 0.1, 5) 0.7160095896427573
new best


In [8]:
best_model, best_params, best_score, seen_params = search_ALS_params(
    ratings_train_AB, ratings_dev,
    'user_movie_ALS_AB_model.bin',
    [5, 10, 20], [0.03, 0.1, 0.3, 1, 3], [5, 10, 20],
    random_search_iters=1,
)

(5, 0.1, 20) 0.7025028693269061
new best


In [10]:
seen_params[(10, 0.1, 5)] = 0.7160095896427573
seen_params[(10, 0.1, 10)] = 0.6982353615384809

In [11]:
best_model, best_params, best_score, seen_params = search_ALS_params(
    ratings_train_AB, ratings_dev,
    'user_movie_ALS_AB_model.bin',
    [5, 10, 20], [0.03, 0.1, 0.3, 1, 3], [5, 10, 20],
    random_search_iters=2,
    seen_params=seen_params,
)

(5, 0.1, 5) 0.7295197135719883
(5, 3, 20) 13.30271867642405


In [12]:
best_model, best_params, best_score, seen_params = search_ALS_params(
    ratings_train_AB, ratings_dev,
    'user_movie_ALS_AB_model.bin',
    [10, 20], [0.03, 0.1, 0.3, 1], [10, 20],
    random_search_iters=2,
    seen_params=seen_params,
)

(10, 0.1, 20) 0.6931621179184757
new best
(20, 1, 20) 1.7084884885819966


In [13]:
best_model, best_params, best_score, seen_params = search_ALS_params(
    ratings_train_AB, ratings_dev,
    'user_movie_ALS_AB_model.bin',
    [10, 20], [0.03, 0.1, 0.3], [10, 20, 30],
    random_search_iters=3,
    seen_params=seen_params,
)

(20, 0.1, 10) 0.6957023947386827
(20, 0.03, 10) 0.7076571886144899
(20, 0.3, 20) 0.843705206619917


In [14]:
best_model, best_params, best_score, seen_params = search_ALS_params(
    ratings_train_AB, ratings_dev,
    'user_movie_ALS_AB_model.bin',
    [10, 20], [0.008, 0.1, 0.12], [10, 20, 30],
    random_search_iters=5,
    seen_params=seen_params,
)

(10, 0.12, 10) 0.7123227535759822
(20, 0.12, 20) 0.7095432021696254
(10, 0.1, 30) 0.6946954576813209
(20, 0.12, 10) 0.7106308495371556


In [15]:
best_model, best_params, best_score, seen_params = search_ALS_params(
    ratings_train_AB, ratings_dev,
    'user_movie_ALS_AB_model.bin',
    # screwed up with reg param
    [10, 15, 20], [0.005, 0.008, 0.1], [15, 20, 25],
    random_search_iters=5,
    seen_params=seen_params,
)

(15, 0.008, 20) 0.844300048970145
(10, 0.008, 15) 0.8015766673539505
(10, 0.005, 20) 0.8838917470973003
(20, 0.005, 15) 0.8404780661967218
(20, 0.1, 20) 0.6905044480791418
new best


In [16]:
best_model, best_params, best_score, seen_params = search_ALS_params(
    ratings_train_AB, ratings_dev,
    'user_movie_ALS_AB_model.bin',
    [15, 20, 25, 30], [0.05, 0.08, 0.1], [15, 20, 25],
    random_search_iters=5,
    seen_params=seen_params,
)

(20, 0.1, 15) 0.6919322933457315
(30, 0.05, 20) 0.676320875018052
new best
(15, 0.08, 20) 0.6758411467896261
new best
(15, 0.08, 25) 0.6744421653507363
new best
(25, 0.05, 25) 0.673449213208945
new best


In [17]:
best_model, best_params, best_score, seen_params = search_ALS_params(
    ratings_train_AB, ratings_dev,
    'user_movie_ALS_AB_model.bin',
    [15, 20, 25, 30], [0.04, 0.05, 0.06, 0.07, 0.08], [15, 20, 25, 30],
    random_search_iters=5,
    seen_params=seen_params,
)

(25, 0.07, 25) 0.6679456009476727
new best
(20, 0.06, 15) 0.6759796200086172
(15, 0.05, 25) 0.6755824428336334
(30, 0.07, 30) 0.6664263232109933
new best
(20, 0.07, 15) 0.6758338571591643


In [18]:
best_model, best_params, best_score, seen_params = search_ALS_params(
    ratings_train_AB, ratings_dev,
    'user_movie_ALS_AB_model.bin',
    [15, 20, 25, 30], [0.04, 0.05, 0.06, 0.07, 0.08], [15, 20, 25, 30],
    random_search_iters=5,
    seen_params=seen_params,
)

(30, 0.05, 15) 0.6804691905484942
(20, 0.06, 30) 0.6663734698102138
new best
(15, 0.04, 20) 0.6888317230688668
(15, 0.05, 15) 0.6812982087636854
(25, 0.04, 30) 0.6871361318946303


In [19]:
best_model, best_params, best_score, seen_params = search_ALS_params(
    ratings_train_AB, ratings_dev,
    'user_movie_ALS_AB_model.bin',
    [15, 20, 25, 30], [0.04, 0.05, 0.06, 0.07, 0.08], [15, 20, 25, 30],
    random_search_iters=5,
    seen_params=seen_params,
)

(15, 0.05, 20) 0.6770177492807228
(25, 0.08, 30) 0.6716263631001365
(25, 0.05, 30) 0.6718353888808027
(30, 0.06, 20) 0.6711789996242946


In [20]:
best_model, best_params, best_score, seen_params = search_ALS_params(
    ratings_train_AB, ratings_dev,
    'user_movie_ALS_AB_model.bin',
    [20, 25, 30], [0.04, 0.05, 0.06, 0.07], [30, 35, 40],
    random_search_iters=5,
    seen_params=seen_params,
)

(20, 0.04, 40) 0.6862104790585454
(30, 0.06, 40) 0.6634529269050783
new best
(25, 0.07, 30) 0.6664495282205287
(25, 0.04, 35) 0.6864194350278456
(25, 0.06, 30) 0.6658786971023828


In [21]:
best_model, best_params, best_score, seen_params = search_ALS_params(
    ratings_train_AB, ratings_dev,
    'user_movie_ALS_AB_model.bin',
    [20, 25, 30, 35, 40], [0.04, 0.05, 0.06, 0.07], [30, 35, 40, 45, 50],
    random_search_iters=5,
    seen_params=seen_params,
)

An error occurred while calling o4298.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6969.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6969.0 (TID 38610, mipt-node02.atp-fivt.org, executor 55): java.lang.StackOverflowError
	at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2661)
	at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2677)
	at java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:3178)
	at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1682)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1743)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
	at sun.reflect.Gener

An error occurred while calling o4384.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 7053.0 failed 4 times, most recent failure: Lost task 1.3 in stage 7053.0 (TID 39370, mipt-node04.atp-fivt.org, executor 77): java.lang.StackOverflowError
	at org.apache.spark.util.ByteBufferInputStream.read(ByteBufferInputStream.scala:35)
	at java.io.ObjectInputStream$PeekInputStream.peek(ObjectInputStream.java:2639)
	at java.io.ObjectInputStream$BlockDataInputStream.peek(ObjectInputStream.java:2946)
	at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2956)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1736)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
	at s

An error occurred while calling o4470.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 7126.0 failed 4 times, most recent failure: Lost task 8.3 in stage 7126.0 (TID 39922, mipt-node05.atp-fivt.org, executor 73): ExecutorLostFailure (executor 73 exited caused by one of the running tasks) Reason: Container marked as failed: container_1572363582174_5689_02_000027 on host: mipt-node05.atp-fivt.org. Exit status: 50. Diagnostics: Exception from container-launch.
Container id: container_1572363582174_5689_02_000027
Exit code: 50
Stack trace: ExitCodeException exitCode=50: 
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:601)
	at org.apache.hadoop.util.Shell.run(Shell.java:504)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:786)
	at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213)
	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.

### Make predictions on train and dev, using cross train

In [19]:
class CrossTrainer:
    def __init__(
        self, ratings, train_func, transform_func, prediction_col, model_path,
        max_hash=211, user_fold_cnt=3, movie_fold_cnt=3,
        train_func_params={}, transform_func_params={},
    ):
        self.train_func = train_func
        self.train_func_params = train_func_params
        self.transform_func = transform_func
        self.transform_func_params = transform_func_params
        self.prediction_col = prediction_col
        self.max_hash = max_hash
        self.user_fold_cnt = user_fold_cnt
        self.movie_fold_cnt = movie_fold_cnt
        self.ratings = ratings
        self.ratings_with_hash = (
            ratings
            .withColumn('user_hash', f.abs(f.hash('userId')) % max_hash)
            .withColumn('movie_hash', f.abs(f.hash('movieId')) % max_hash)
            .cache()
        )
        self.model = None
        self.model_path = model_path
        self.result = None
        
    def get_model(self):
        if self.model is None:
            self.model = self.train_func(self.ratings, prediction_col=self.prediction_col, **self.train_func_params)
            self.model.write().overwrite().save(self.model_path)
        return self.model
    
    def _get_bounds(self, fold_cnt, part_ind):
        return int(self.max_hash / fold_cnt * part_ind), int(self.max_hash / fold_cnt * (part_ind + 1))
    
    def get_part(self, user_part_ind, movie_part_ind):
        user_lower_bound, user_upper_bound = self._get_bounds(self.user_fold_cnt, user_part_ind)
        movie_lower_bound, movie_upper_bound = self._get_bounds(self.movie_fold_cnt, movie_part_ind)
        return (
            self.ratings_with_hash
            .filter(
                (f.col('user_hash') >= user_lower_bound) & (f.col('user_hash') < user_upper_bound) &
                (f.col('movie_hash') >= movie_lower_bound) & (f.col('movie_hash') < movie_upper_bound)
            )
            .drop('movie_hash', 'user_hash')
        )
        
    def calc_part(self, user_part_ind, movie_part_ind):
        leaved_out_ratings = self.get_part(user_part_ind, movie_part_ind)
        user_lower_bound, user_upper_bound = self._get_bounds(self.user_fold_cnt, user_part_ind)
        movie_lower_bound, movie_upper_bound = self._get_bounds(self.movie_fold_cnt, movie_part_ind)
        ratings_train = (
            self.ratings_with_hash
            .filter(
                ~(
                    (f.col('user_hash') >= user_lower_bound) & (f.col('user_hash') < user_upper_bound) &
                    (f.col('movie_hash') >= movie_lower_bound) & (f.col('movie_hash') < movie_upper_bound)
                )
            )
            .drop('movie_hash', 'user_hash')
        )
        model = self.train_func(ratings_train, prediction_col=self.prediction_col, **self.train_func_params)
        ratings_with_predictions = self.transform_func(model, leaved_out_ratings, **self.transform_func_params)
            
        if self.result is None:
            self.result = ratings_with_predictions
        else:
            self.result = self.result.union(ratings_with_predictions)
        return ratings_with_predictions
            
    def get_result(self):
        return self.result
    
    def transform_dev(self, dev):
        return self.transform_func(self.get_model(), dev, **self.transform_func_params)

In [20]:
def train_user_movie_ALS(
        ratings_train, rating_col='rating', item_col='movieId', user_col='userId',
        prediction_col='prediction', params=(20, 0.06, 30),
):
    max_iter, reg_params, rank = params
    params = {
        'maxIter': max_iter, 'regParam': reg_params, 'rank': rank,
    }
    ALS_model_creator = ALS(userCol=user_col, itemCol=item_col, ratingCol=rating_col, **params)
    ALS_model_creator.setPredictionCol(prediction_col)
    return ALS_model_creator.fit(ratings_train)


def transform_no_filter(ALS_model, ratings_dev):
    return ALS_model.transform(ratings_dev)

In [21]:
cross_trainer = CrossTrainer(
    ratings_train_AB, train_user_movie_ALS, transform_no_filter, 'user_movie_ALS',
    'user_movie_ALS_model.bin',
)

In [13]:
cross_trainer.calc_part(0, 0)
cross_trainer.calc_part(0, 1)
cross_trainer.calc_part(0, 2)

In [14]:
cross_trainer.calc_part(1, 0)
cross_trainer.calc_part(1, 1)
cross_trainer.calc_part(1, 2)

In [15]:
cross_trainer.calc_part(2, 0)
cross_trainer.calc_part(2, 1)
cross_trainer.calc_part(2, 2)

In [16]:
ratings_train_with_ALS = cross_trainer.result.cache()

In [18]:
ratings_dev_with_ALS = cross_trainer.transform_dev(ratings_dev)

In [22]:
ratings_test_with_ALS = cross_trainer.transform_dev(ratings_test)

## Now train user - genre ALS

In [23]:
@f.udf(t.ArrayType(t.StringType()))
def split_genres(genres):
    """Given as a string of genres concatenated with '|', splits it into array"""
    return genres.split('|')

In [24]:
exploded_movies = (
    movies
    .drop('title')
    .withColumn('genres', split_genres(movies.genres))
    .withColumn('genre', f.explode('genres')).drop('genres')
)

In [25]:
from pyspark.ml.feature import StringIndexer

# fit StringIndexer (think about how does it work and why do we need a model)
genre_indexer_creator = StringIndexer(inputCol='genre', outputCol='genreId')
genre_indexer = genre_indexer_creator.fit(exploded_movies)

In [26]:
def group_by_genres(ratings, exploded_movies, genre_indexer):
    grouped_ratings = (
        ratings
        .join(f.broadcast(exploded_movies), on='movieId')
        .groupBy('genre', 'userId').agg(f.avg('rating').alias('avg_rating'))
        .select('userId', 'genre', 'avg_rating')
    )
    return genre_indexer.transform(grouped_ratings).drop('genre')

In [49]:
ratings_grouped_train_AB = group_by_genres(ratings_train_AB, exploded_movies, genre_indexer)

In [23]:
def transform_genre(ALS_model, ratings_dev, exploded_movies, genre_indexer, aggregation_func=f.avg):
    exploded_ratings = (
        ratings_dev
        .join(f.broadcast(exploded_movies), on='movieId')
    )
    exploded_ratings = genre_indexer.transform(exploded_ratings).drop('genre')
    return (
        ALS_model.transform(exploded_ratings)
        .filter('prediction != "NaN"')
        .groupBy('userId', 'movieId', 'timestamp', 'rating')
        .agg(aggregation_func('prediction').alias('prediction'))
    )

### Search hyperparams

In [None]:
geners_best_model, geners_best_params, geners_best_score, geners_seen_params = search_ALS_params(
    ratings_grouped_train_AB, ratings_dev,
    'baseline_genres_ALS_path.bin',
    [10], [0.1], [10],
    random_search_iters=1,
    rating_col='avg_rating', item_col='genreId',
    transform_dev=transform_genre,
    exploded_movies=exploded_movies, genre_indexer=genre_indexer,
)

In [52]:
geners_best_model, geners_best_params, geners_best_score, geners_seen_params = search_ALS_params(
    ratings_grouped_train_AB, ratings_dev,
    'baseline_genres_ALS_path.bin',
    [5, 10, 15], [0.01, 0.03, 0.1, 0.3], [2, 4, 6, 8, 10],
    random_search_iters=5,
    rating_col='avg_rating', item_col='genreId',
    transform_dev=transform_genre,
    exploded_movies=exploded_movies, genre_indexer=genre_indexer,
)

(5, 0.3, 8) 0.979655915131543
new best
(15, 0.01, 8) 0.9620052563834316
new best
(10, 0.1, 8) 0.9622491112275654
(5, 0.03, 2) 1.0008761617477449
(15, 0.1, 4) 0.9672685600621342


In [53]:
geners_seen_params[(10, 0.1, 10)] = 0.9563308129440355

In [55]:
geners_best_model, geners_best_params, geners_best_score, geners_seen_params = search_ALS_params(
    ratings_grouped_train_AB, ratings_dev,
    'baseline_genres_ALS_path.bin',
    [10, 15, 20], [0.01, 0.03, 0.1, 0.3], [2, 4, 6, 8, 10],
    random_search_iters=5,
    rating_col='avg_rating', item_col='genreId',
    transform_dev=transform_genre,
    exploded_movies=exploded_movies, genre_indexer=genre_indexer,
    seen_params=geners_seen_params,
)

(20, 0.01, 8) 0.9680335244497905
(15, 0.1, 10) 0.9556957315357621
new best
(15, 0.01, 6) 0.9747926921137647
(10, 0.3, 8) 0.9918427643669453
(15, 0.3, 4) 0.9947536718482373


In [56]:
geners_best_model, geners_best_params, geners_best_score, geners_seen_params = search_ALS_params(
    ratings_grouped_train_AB, ratings_dev,
    'baseline_genres_ALS_path.bin',
    [10, 15, 20], [0.03, 0.05, 0.07, 0.1], [2, 4, 6, 8, 10, 15, 20],
    random_search_iters=5,
    rating_col='avg_rating', item_col='genreId',
    transform_dev=transform_genre,
    exploded_movies=exploded_movies, genre_indexer=genre_indexer,
    seen_params=geners_seen_params,
)

(10, 0.07, 10) 0.9495442109353974
new best
(20, 0.1, 8) 0.9586582301168608
(20, 0.03, 2) 0.9986277057441895
(10, 0.05, 20) 0.9385298472300967
new best
(20, 0.05, 8) 0.9617087469908908


In [58]:
geners_best_model, geners_best_params, geners_best_score, geners_seen_params = search_ALS_params(
    ratings_grouped_train_AB, ratings_dev,
    'baseline_genres_ALS_path.bin',
    [10, 15, 20], [0.03, 0.05, 0.06, 0.07, 0.08], [4, 6, 10, 15, 20, 30],
    random_search_iters=5,
    rating_col='avg_rating', item_col='genreId',
    transform_dev=transform_genre,
    exploded_movies=exploded_movies, genre_indexer=genre_indexer,
    seen_params=geners_seen_params,
)

(10, 0.08, 4) 0.9753434527342677
(10, 0.03, 15) 0.9414623837321071
(15, 0.08, 4) 0.9729688243635309
(15, 0.07, 20) 0.9467049654015341
(15, 0.08, 20) 0.9483446613409774


In [59]:
geners_best_model, geners_best_params, geners_best_score, geners_seen_params = search_ALS_params(
    ratings_grouped_train_AB, ratings_dev,
    'baseline_genres_ALS_path.bin',
    [10, 15, 20], [0.03, 0.05, 0.06, 0.07, 0.08], [15, 20, 30],
    random_search_iters=5,
    rating_col='avg_rating', item_col='genreId',
    transform_dev=transform_genre,
    exploded_movies=exploded_movies, genre_indexer=genre_indexer,
    seen_params=geners_seen_params,
)

(20, 0.07, 20) 0.9503370261933348
(20, 0.03, 15) 0.9531434939214563
(15, 0.08, 15) 0.9499206980488664
(15, 0.06, 20) 0.9450943955369574
(10, 0.08, 20) 0.9448084148605389


In [60]:
geners_best_model, geners_best_params, geners_best_score, geners_seen_params = search_ALS_params(
    ratings_grouped_train_AB, ratings_dev,
    'baseline_genres_ALS_path.bin',
    [10, 15], [0.03, 0.05, 0.06, 0.07, 0.08], [15, 20, 25, 30],
    random_search_iters=5,
    rating_col='avg_rating', item_col='genreId',
    transform_dev=transform_genre,
    exploded_movies=exploded_movies, genre_indexer=genre_indexer,
    seen_params=geners_seen_params,
)

(10, 0.03, 30) 0.935826622275196
new best
(10, 0.05, 15) 0.940677844106879
(10, 0.03, 20) 0.9354548840611082
new best
(15, 0.03, 25) 0.9413865682595082
(15, 0.08, 30) 0.9493123711194692


In [61]:
geners_best_model, geners_best_params, geners_best_score, geners_seen_params = search_ALS_params(
    ratings_grouped_train_AB, ratings_dev,
    'baseline_genres_ALS_path.bin',
    [8, 10, 15], [0.02, 0.03, 0.04, 0.05, 0.06, 0.07], [15, 20, 25],
    random_search_iters=5,
    rating_col='avg_rating', item_col='genreId',
    transform_dev=transform_genre,
    exploded_movies=exploded_movies, genre_indexer=genre_indexer,
    seen_params=geners_seen_params,
)

(8, 0.06, 15) 0.9405350246259917
(8, 0.04, 15) 0.9398678556002609
(8, 0.05, 15) 0.9393402820189869
(8, 0.07, 25) 0.9413584156928311
(15, 0.03, 20) 0.9414423782237241


In [62]:
geners_best_model, geners_best_params, geners_best_score, geners_seen_params = search_ALS_params(
    ratings_grouped_train_AB, ratings_dev,
    'baseline_genres_ALS_path.bin',
    [8, 10, 15], [0.02, 0.03, 0.04, 0.05], [15, 20, 25],
    random_search_iters=5,
    rating_col='avg_rating', item_col='genreId',
    transform_dev=transform_genre,
    exploded_movies=exploded_movies, genre_indexer=genre_indexer,
    seen_params=geners_seen_params,
)

(15, 0.03, 15) 0.9471120508205193
(8, 0.04, 20) 0.9365839739769821
(10, 0.03, 25) 0.9356244596158605


In [63]:
geners_best_model, geners_best_params, geners_best_score, geners_seen_params = search_ALS_params(
    ratings_grouped_train_AB, ratings_dev,
    'baseline_genres_ALS_path.bin',
    [7, 8, 10, 12], [0.02, 0.03, 0.04, 0.05], [20],
    random_search_iters=5,
    rating_col='avg_rating', item_col='genreId',
    transform_dev=transform_genre,
    exploded_movies=exploded_movies, genre_indexer=genre_indexer,
    seen_params=geners_seen_params,
)

(12, 0.03, 20) 0.9373185081382436
(12, 0.04, 20) 0.937632009468036
(10, 0.04, 20) 0.9358105206930074
(12, 0.05, 20) 0.9395175028899768


In [64]:
geners_best_model, geners_best_params, geners_best_score, geners_seen_params = search_ALS_params(
    ratings_grouped_train_AB, ratings_dev,
    'baseline_genres_ALS_path.bin',
    [7, 8], [0.02, 0.03], [20],
    random_search_iters=8,
    rating_col='avg_rating', item_col='genreId',
    transform_dev=transform_genre,
    exploded_movies=exploded_movies, genre_indexer=genre_indexer,
    seen_params=geners_seen_params,
)

(8, 0.02, 20) 0.9403485537816613
(8, 0.03, 20) 0.9369480383323424
(7, 0.02, 20) 0.9466752696329187
(7, 0.03, 20) 0.9416662008075773


In [27]:
def train_ALS_by_genre(ratings, prediction_col, exploded_movies, genre_indexer):
    ratings_grouped_train = group_by_genres(ratings, exploded_movies, genre_indexer)
    return train_user_movie_ALS(
        ratings_grouped_train, prediction_col=prediction_col, params=(10, 0.03, 20),
        item_col='genreId', rating_col='avg_rating',
    )

def transform_genre_no_filter(
        ALS_model, ratings_dev, exploded_movies, genre_indexer, prediction_col, group_by_columns=[],
        aggregation_func=f.avg,
):
    exploded_ratings = (
        ratings_dev
        .join(f.broadcast(exploded_movies), on='movieId')
    )
    exploded_ratings = genre_indexer.transform(exploded_ratings).drop('genre')
    return (
        ALS_model.transform(exploded_ratings)
        .groupBy('userId', 'movieId', 'timestamp', *group_by_columns)
        .agg(aggregation_func(prediction_col).alias(prediction_col))
    )

In [28]:
ALS_by_genre_cross_trainer = CrossTrainer(
    ratings_train_AB, train_ALS_by_genre, transform_genre_no_filter, 'user_genre_ALS',
    'user_genre_ALS_model.bin',
    train_func_params={'exploded_movies': exploded_movies, 'genre_indexer': genre_indexer},
    transform_func_params={
        'exploded_movies': exploded_movies, 'genre_indexer': genre_indexer,
        'prediction_col': 'user_genre_ALS', 'group_by_columns': ['rating', 'user_movie_ALS'],
    },
)

In [40]:
ALS_by_genre_cross_trainer.calc_part(0, 0)
ALS_by_genre_cross_trainer.calc_part(0, 1)
ALS_by_genre_cross_trainer.calc_part(0, 2)

In [39]:
ALS_by_genre_cross_trainer.result.show(5)

+------+-------+----------+------+--------------+------------------+
|userId|movieId| timestamp|rating|user_movie_ALS|    user_genre_ALS|
+------+-------+----------+------+--------------+------------------+
|  5074|   2706| 948075039|   4.0|     2.1839106|3.0895506143569946|
| 25639|    852|1212972505|   3.5|     3.2534451|3.4901952743530273|
| 36066|    497| 857028279|   3.0|     4.0408645|3.5944039821624756|
| 45027|    440|1122655883|   2.5|     3.4902065|3.1727263927459717|
| 45027|   8916|1122247741|   2.5|     3.2732892|3.1727263927459717|
+------+-------+----------+------+--------------+------------------+
only showing top 5 rows



In [41]:
ALS_by_genre_cross_trainer.calc_part(1, 0)
ALS_by_genre_cross_trainer.calc_part(1, 1)
ALS_by_genre_cross_trainer.calc_part(1, 2)

In [42]:
ALS_by_genre_cross_trainer.calc_part(2, 0)
ALS_by_genre_cross_trainer.calc_part(2, 1)
ALS_by_genre_cross_trainer.calc_part(2, 2)

### Save results for movies and genres

In [43]:
ALS_by_genre_cross_trainer.result.write.csv(
    'ratings_train_with_ALS_predictions.csv', header=True, mode='overwrite',
)

In [27]:
ratings_dev_with_ALS = ALS_by_genre_cross_trainer.transform_dev(ratings_dev_with_ALS)
ratings_dev_with_ALS.repartition(1).write.csv(
    'ratings_dev_with_ALS_predictions.csv', header=True, mode='overwrite',
)

In [36]:
ratings_dev_with_ALS.show(5)

+------+-------+---------+------+------------------+--------------+
|userId|movieId|timestamp|rating|    user_genre_ALS|user_movie_ALS|
+------+-------+---------+------+------------------+--------------+
| 75781|    148|895230335|   3.0|3.5878705978393555|     3.0964668|
| 77165|    148|840699559|   3.0| 3.399810314178467|     3.1482704|
|  9084|    148|833674024|   2.0| 3.945089101791382|     2.7101426|
| 14282|    148|940520793|   3.0| 3.560908794403076|     3.2439415|
| 41770|    148|832326067|   3.0|3.1053566932678223|     2.5573483|
+------+-------+---------+------+------------------+--------------+
only showing top 5 rows



In [39]:
ALS_by_genre_cross_trainer = CrossTrainer(
    ratings_train_AB, train_ALS_by_genre, transform_genre_no_filter, 'user_genre_ALS',
    'user_genre_ALS_model.bin',
    train_func_params={'exploded_movies': exploded_movies, 'genre_indexer': genre_indexer},
    transform_func_params={
        'exploded_movies': exploded_movies, 'genre_indexer': genre_indexer,
        'prediction_col': 'user_genre_ALS', 'group_by_columns': ['user_movie_ALS'],
    },
)
ratings_test_with_ALS = ALS_by_genre_cross_trainer.transform_dev(ratings_test_with_ALS)
ratings_test_with_ALS.repartition(1).write.csv(
    'ratings_test_with_ALS_predictions.csv', header=True, mode='overwrite',
)

In [3]:
ratings_train_with_ALS_predictions = load('ratings_train_with_ALS_predictions.csv')

In [5]:
splitter = CrossTrainer(ratings_train_with_ALS_predictions, None, None, None, None)

In [6]:
splitter.get_part(0, 0).repartition(1).write.csv(
    'ratings_train_with_ALS_predictions_u_0_m_0.csv', header=True, mode='overwrite',
)
splitter.get_part(0, 1).repartition(1).write.csv(
    'ratings_train_with_ALS_predictions_u_0_m_1.csv', header=True, mode='overwrite',
)
splitter.get_part(0, 2).repartition(1).write.csv(
    'ratings_train_with_ALS_predictions_u_0_m_2.csv', header=True, mode='overwrite',
)

In [7]:
splitter.get_part(1, 0).repartition(1).write.csv(
    'ratings_train_with_ALS_predictions_u_1_m_0.csv', header=True, mode='overwrite',
)
splitter.get_part(1, 1).repartition(1).write.csv(
    'ratings_train_with_ALS_predictions_u_1_m_1.csv', header=True, mode='overwrite',
)
splitter.get_part(1, 2).repartition(1).write.csv(
    'ratings_train_with_ALS_predictions_u_1_m_2.csv', header=True, mode='overwrite',
)

In [8]:
splitter.get_part(2, 0).repartition(1).write.csv(
    'ratings_train_with_ALS_predictions_u_2_m_0.csv', header=True, mode='overwrite',
)
splitter.get_part(2, 1).repartition(1).write.csv(
    'ratings_train_with_ALS_predictions_u_2_m_1.csv', header=True, mode='overwrite',
)
splitter.get_part(2, 2).repartition(1).write.csv(
    'ratings_train_with_ALS_predictions_u_2_m_2.csv', header=True, mode='overwrite',
)

## Now, let's train user - actor ALS

In [32]:
movies_imdb = (
    spark.read.csv('movies_imdb.csv', header=True)
    .withColumn('movieId', f.col('movieId').cast('int'))
    .cache()
)

In [33]:
@f.udf(t.ArrayType(t.IntegerType()))
def split_actors(actors):
    """Given as a string of concatenated actor Ids in format 'nm<some_int>', splits it into array"""
    return list(map(int, actors.split('nm')[1:])) if actors else []

In [34]:
exploded_by_actors_movies = (
    movies_imdb
    .select('movieId', 'nconst')
    .withColumn('actors', split_actors('nconst'))
    .withColumn('actor', f.explode('actors'))
    .drop('actors', 'nconst')
)

In [35]:
def group_by_actors(ratings, exploded_movies, min_cnt=0):
    grouped = (
        ratings
        .join(f.broadcast(exploded_movies), on='movieId')
        .groupBy('actor', 'userId').agg(
            f.avg('rating').alias('avg_rating'),
            f.count('rating').alias('count')
        )
    )
    if min_cnt > 0:
        grouped = (
            grouped
            .withColumn(
                'actor_cnt',
                f.sum('count').over(Window.partitionBy('actor'))
            )
            .filter('actor_cnt > {}'.format(min_cnt))
        )
    return grouped.select('userId', 'actor', 'avg_rating').cache()


def transform_actor(ALS_model, ratings_dev, exploded_movies, aggregation_func=f.avg):
    exploded_ratings = (
        ratings_dev
        .join(f.broadcast(exploded_movies), on='movieId')
    )
    return (
        ALS_model.transform(exploded_ratings)
        .filter('prediction != "NaN"')
        .groupBy('userId', 'movieId', 'timestamp', 'rating')
        .agg(aggregation_func('prediction').alias('prediction'))
    )

In [13]:
grouped_by_actor_ratings_train = group_by_actors(ratings_train_AB, exploded_by_actors_movies)

### Tune params

In [15]:
actors_seen_params = {}

In [38]:
actors_best_model, actors_best_params, actors_best_score, actors_seen_params = search_ALS_params(
    grouped_by_actor_ratings_train, ratings_dev,
    'actors_ALS_path.bin',
    [10], [0.1], [10],
    num_user_blocks=20, random_search_iters=1,
    rating_col='avg_rating', item_col='actor',
    transform_dev=transform_actor,
    exploded_movies=exploded_by_actors_movies,
)

(10, 0.1, 10) 0.7624251027119892
new best


In [17]:
actors_best_model, actors_best_params, actors_best_score, actors_seen_params = search_ALS_params(
    grouped_by_actor_ratings_train, ratings_dev,
    'actors_ALS_path.bin',
    [10, 15, 20], [0.01, 0.03, 0.05, 0.07, 0.1], [10, 20, 30],
    num_user_blocks=20, random_search_iters=5,
    rating_col='avg_rating', item_col='actor',
    seen_params=actors_seen_params,
    transform_dev=transform_actor, exploded_movies=exploded_by_actors_movies,
)

An error occurred while calling o150.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 13.0 failed 4 times, most recent failure: Lost task 9.3 in stage 13.0 (TID 657, mipt-node03.atp-fivt.org, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1572363582174_6679_01_000003 on host: mipt-node03.atp-fivt.org. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Killed by external signal

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
	at scala.collection.mutable.ResizableArray$class.foreach(Resizable

An error occurred while calling o392.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 177.0 failed 4 times, most recent failure: Lost task 3.3 in stage 177.0 (TID 2629, mipt-node08.atp-fivt.org, executor 32): ExecutorLostFailure (executor 32 exited caused by one of the running tasks) Reason: Slave lost
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFail

In [18]:
actors_seen_params[(10, 0.1, 10)] = 0.7624251027119892
actors_seen_params[(20, 0.01, 10)] = 0.7833205610988209
actors_seen_params[(10, 0.07, 10)] = 0.7521941308027306

In [19]:
actors_best_model, actors_best_params, actors_best_score, actors_seen_params = search_ALS_params(
    grouped_by_actor_ratings_train, ratings_dev,
    'actors_ALS_path.bin',
    [10, 15, 20], [0.03, 0.05, 0.07, 0.1], [10],
    num_user_blocks=20, random_search_iters=5,
    rating_col='avg_rating', item_col='actor',
    seen_params=actors_seen_params,
    transform_dev=transform_actor, exploded_movies=exploded_by_actors_movies,
)

(10, 0.05, 10) 0.7513336984216085
new best
An error occurred while calling o564.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 71 in stage 290.0 failed 4 times, most recent failure: Lost task 71.3 in stage 290.0 (TID 4263, mipt-node07.atp-fivt.org, executor 36): java.lang.StackOverflowError
	at java.lang.reflect.Constructor.newInstance(Constructor.java:416)
	at java.io.ObjectStreamClass.newInstance(ObjectStreamClass.java:1079)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.j
An error occurred while calling o719.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 444.0 failed 4 times, most recent failure: Lost task 8.3 in stage 444.0 (TID 5758, mipt-node05.atp-fivt.org, executor 49): ExecutorLostFailure (executor 49 exited caused by one of the running tasks) Reason: Container marked as failed: container_1572363582174_6751_02_000017 on host: mipt-node05.atp-fivt.org. Exit sta

KeyboardInterrupt: 

In [19]:
actors_seen_params[(10, 0.05, 10)] = 0.7513336984216085

In [19]:
actors_best_model, actors_best_params, actors_best_score, actors_seen_params = search_ALS_params(
    grouped_by_actor_ratings_train, ratings_dev,
    'actors_ALS_path.bin',
    [10, 15, 20], [0.03, 0.05, 0.07, 0.1], [10],
    num_user_blocks=20, random_search_iters=3,
    rating_col='avg_rating', item_col='actor',
    seen_params=actors_seen_params,
    transform_dev=transform_actor, exploded_movies=exploded_by_actors_movies,
)

An error occurred while calling o272.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 156.0 failed 4 times, most recent failure: Lost task 2.3 in stage 156.0 (TID 1569, mipt-node02.atp-fivt.org, executor 8): ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Container marked as failed: container_1572363582174_6819_01_000009 on host: mipt-node02.atp-fivt.org. Exit status: 143. Diagnostics: Container killed on req
An error occurred while calling o427.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 309.0 failed 4 times, most recent failure: Lost task 3.3 in stage 309.0 (TID 2893, mipt-node04.atp-fivt.org, executor 26): java.lang.StackOverflowError
	at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2956)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1736)
	at java.io.ObjectInputStream.readOrdinaryObje

In [22]:
actors_best_model, actors_best_params, actors_best_score, actors_seen_params = search_ALS_params(
    grouped_by_actor_ratings_train, ratings_dev,
    'actors_ALS_path.bin',
    [10], [0.04], [10],
    num_user_blocks=30, random_search_iters=1,
    rating_col='avg_rating', item_col='actor',
    transform_dev=transform_actor, exploded_movies=exploded_by_actors_movies,
)

An error occurred while calling o735.fit.
: org.apache.spark.SparkException: Job 39 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:837)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:835)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.sca


### try to filter rare actors

In [15]:
actors_best_model, actors_best_params, actors_best_score, actors_seen_params_mc_5 = search_ALS_params(
    group_by_actors(ratings_train_AB, exploded_by_actors_movies, min_cnt=10), ratings_dev,
    'actors_ALS_path.bin',
    [10], [0.03, 0.05, 0.07], [10],
    num_user_blocks=20, random_search_iters=1,
    rating_col='avg_rating', item_col='actor',
    transform_dev=transform_actor, exploded_movies=exploded_by_actors_movies,
)

(10, 0.03, 10) 0.7578032427842788
new best


### Lets predict

In [12]:
ratings_train_with_ALS_predictions = load('ratings_train_with_ALS_predictions.csv')

In [16]:
ratings_dev_with_ALS_predictions = load('ratings_dev_with_ALS_predictions.csv')

In [36]:
def train_ALS_by_actor(ratings, prediction_col, exploded_movies):
    ratings_grouped_train = group_by_actors(ratings, exploded_movies)
    return train_user_movie_ALS(
        ratings_grouped_train, prediction_col=prediction_col, params=(10, 0.05, 10),
        item_col='actor', rating_col='avg_rating',
    )

def transform_actor_no_filter(
        ALS_model, ratings_dev, exploded_movies, prediction_col, group_by_columns=[],
        aggregation_func=f.avg,
):
    exploded_ratings = (
        ratings_dev
        .join(f.broadcast(exploded_movies), on='movieId', how='left')
        .withColumn('actor', f.when(f.isnull('actor'), -1).otherwise(f.col('actor')))
    )
    return (
        ALS_model.transform(exploded_ratings)
        .groupBy('userId', 'movieId', 'timestamp', *group_by_columns)
        .agg(aggregation_func(prediction_col).alias(prediction_col))
    )

In [15]:
ALS_by_actor_cross_trainer = CrossTrainer(
    ratings_train_AB, train_ALS_by_actor, transform_actor_no_filter, 'user_actor_ALS',
    'user_actor_ALS_model.bin',
    train_func_params={'exploded_movies': exploded_by_actors_movies},
    transform_func_params={
        'exploded_movies': exploded_by_actors_movies,
        'prediction_col': 'user_actor_ALS',
        'group_by_columns': ['rating'], 
    },
)

In [14]:
def write_train_part_with_new_ALS(ALS_cross_trainer, user_part_ind, movie_part_ind, ratings_with_other_ALS):
    """Calc ALS on lighter pool, than join other ALS predictions"""
    path = 'ratings_train_with_actor_ALS_predictions_u_{}_m_{}.csv'.format(user_part_ind, movie_part_ind)
    ratings_with_new_ALS = ALS_cross_trainer.calc_part(user_part_ind, movie_part_ind)
    (
        ratings_with_other_ALS
        .join(f.broadcast(ratings_with_new_ALS), on=['userId', 'movieId', 'timestamp', 'rating'])
        .repartition(1).write.csv(path, header=True, mode='overwrite')
    )

In [30]:
write_train_part_with_new_ALS(ALS_by_actor_cross_trainer, 0, 0, ratings_train_with_ALS_predictions)

In [21]:
write_train_part_with_new_ALS(ALS_by_actor_cross_trainer, 0, 1, ratings_train_with_ALS_predictions)

In [22]:
write_train_part_with_new_ALS(ALS_by_actor_cross_trainer, 0, 2, ratings_train_with_ALS_predictions)

In [19]:
write_train_part_with_new_ALS(ALS_by_actor_cross_trainer, 1, 0, ratings_train_with_ALS_predictions)

In [21]:
write_train_part_with_new_ALS(ALS_by_actor_cross_trainer, 1, 1, ratings_train_with_ALS_predictions)

In [22]:
write_train_part_with_new_ALS(ALS_by_actor_cross_trainer, 1, 2, ratings_train_with_ALS_predictions)

In [19]:
write_train_part_with_new_ALS(ALS_by_actor_cross_trainer, 2, 0, ratings_train_with_ALS_predictions)

In [20]:
write_train_part_with_new_ALS(ALS_by_actor_cross_trainer, 2, 1, ratings_train_with_ALS_predictions)

In [17]:
write_train_part_with_new_ALS(ALS_by_actor_cross_trainer, 2, 2, ratings_train_with_ALS_predictions)

In [37]:
ALS_by_actor_cross_trainer = CrossTrainer(
    ratings_train_AB, train_ALS_by_actor, transform_actor_no_filter, 'user_actor_ALS',
    'user_actor_ALS_model.bin',
    train_func_params={'exploded_movies': exploded_by_actors_movies},
    transform_func_params={
        'exploded_movies': exploded_by_actors_movies,
        'prediction_col': 'user_actor_ALS',
        'group_by_columns': ['user_movie_ALS', 'user_genre_ALS', 'rating']
    },
)
ratings_dev_with_actor_ALS = ALS_by_actor_cross_trainer.transform_dev(ratings_dev_with_ALS_predictions)
ratings_dev_with_actor_ALS.repartition(1).write.csv(
    'ratings_dev_with_actor_ALS_predictions.csv', header=True, mode='overwrite',
)

In [19]:
ratings_dev_with_actor_ALS.show()

+------+-------+----------+------+--------------+------------------+------------------+
|userId|movieId| timestamp|rating|user_movie_ALS|    user_genre_ALS|    user_actor_ALS|
+------+-------+----------+------+--------------+------------------+------------------+
| 35514|   1344|1112508523|   2.0|      3.457928| 3.223349650700887| 3.383542037010193|
| 71163|   1344| 952800448|   3.0|     3.4519303|3.5229710737864175| 3.471204400062561|
|137286|   1344| 878554377|   3.0|     3.6283116| 3.641315460205078|3.4424354314804075|
|106159|   1344| 955742333|   3.0|     3.3002014|3.4272267818450928|3.3249454259872437|
|117527|   3052|1216132826|   4.0|     3.5423348|2.8819111982981362|  3.36899291144477|
| 60172|   4734|1176143366|   4.0|     3.7282295|3.6279306411743164|3.7272368355801233|
| 61662|   1917| 965552693|   5.0|     3.6590412| 3.614311158657074|3.4472132205963133|
| 86484|   3987|1139630091|   2.0|     2.9700406| 3.610045075416565| 3.410357928276062|
| 68180|  55290|1247247569|   4.

In [44]:
ALS_by_actor_cross_trainer = CrossTrainer(
    ratings_train_AB, train_ALS_by_actor, transform_actor_no_filter, 'user_actor_ALS',
    'user_actor_ALS_model.bin',
    train_func_params={'exploded_movies': exploded_by_actors_movies},
    transform_func_params={
        'exploded_movies': exploded_by_actors_movies,
        'prediction_col': 'user_actor_ALS',
        'group_by_columns': ['user_movie_ALS', 'user_genre_ALS']
    },
)
ratings_test_with_actor_ALS = ALS_by_actor_cross_trainer.transform_dev(ratings_test_with_ALS)
ratings_test_with_actor_ALS.repartition(1).write.csv(
    'ratings_test_with_actor_ALS_predictions.csv', header=True, mode='overwrite',
)

## Make predictions on test