# PySpark MovieLens Recommendation by ALS

In [1]:
import pyspark.sql.functions as F
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.sql import DataFrame as SDF
from pyspark.sql import Row, SparkSession

## Utility関数定義

In [2]:
def parse_ratings(path: str) -> SDF:
    lines = spark.read.text(path).rdd
    parts = lines.map(lambda row: row.value.split("::"))
    rdd = (
        parts
        .map(
            lambda p: Row(
                userId=int(p[0]),
                movieId=int(p[1]),
                rating=float(p[2]),
                timestamp=int(p[3])))
    )
    return spark.createDataFrame(rdd)


def parse_movies(path: str) -> SDF:
    lines = spark.read.text(path).rdd
    parts = lines.map(lambda row: row.value.split("::"))
    rdd = (
        parts
        .map(
            lambda p: Row(
                movieId=int(p[0]),
                title=str(p[1]),
                genre=str(p[2])))
    )
    return spark.createDataFrame(rdd)



In [3]:
spark = (
    SparkSession
    .builder
    .master('local[*]')
    .appName('movielens_als')
    .config('spark.executor.memory', '8G')
    .config('spark.executor.cores', 2)
    .config('spark.default.parallelism', 10)
    .config('spark.sql.shuffle.partitions', 10)
    .config('spark.executor.instances', 1)
    .getOrCreate()
)
# spark = (
#     SparkSession
#     .builder
#     .master('k8s://https://kubernetes.default.svc.cluster.local:443')
#     .appName('movielens_als')
#     .appName('spakr_on_k8s')
#     .config('spark.kubernetes.container.image', 'kanchishimono/pyspark-worker:latest')
#     .config('spark.kubernetes.pyspark.pythonVersion', 3)
#     .config('spark.executor.instances', 1)
#     .config('spark.kubernetes.namespace', 'notebook')
#     .config('spark.port.maxRetries', 3)
#     .config('spark.history.ui.port', True)
#     .config('spark.ui.enabled', True)
#     .config('spark.ui.port', 4040)
#     .config('spark.driver.host', 'jbxnfic.notebook.svc.cluster.local')
#     .config('spark.driver.port', 29413)
#     .config('spark.executor.memory', '1G')
#     .config('spark.executor.cores', 1)
#     .config('spark.default.parallelism', 10)
#     .config('spark.sql.shuffle.partitions', 10)
#     .config('spark.eventLog.compress', True)
#     .config('spark.eventLog.enabled', True)
#     .config('spark.eventLog.dir', 'file:///tmp/spark-events')
#     .getOrCreate()
# )


In [4]:
spark

## MovieLensデータセット読み込み

In [5]:
ratings_df = parse_ratings('/home/work/ml-1m/ratings.dat').repartition(10)
movies_df = parse_movies('/home/work/ml-1m/movies.dat').repartition(10)

In [6]:
ratings_df.toPandas()

Unnamed: 0,movieId,rating,timestamp,userId
0,971,5.0,978101573,35
1,2793,1.0,982008112,1101
2,1258,4.0,977202210,957
3,947,4.0,975350893,850
4,1610,4.0,975572791,710
...,...,...,...,...
1000204,2085,2.0,960398522,5403
1000205,2805,1.0,960522967,5636
1000206,3868,5.0,967496970,5493
1000207,1196,4.0,958170627,5771


In [7]:
movies_df.toPandas()

Unnamed: 0,genre,movieId,title
0,Thriller,2180,Torn Curtain (1966)
1,Drama|Romance,105,"Bridges of Madison County, The (1995)"
2,Action|War,2476,Heartbreak Ridge (1986)
3,Comedy|Drama|Romance,1244,Manhattan (1979)
4,Crime|Drama|Thriller,1620,Kiss the Girls (1997)
...,...,...,...
3878,Comedy,2150,"Gods Must Be Crazy, The (1980)"
3879,Drama|Romance,49,When Night Is Falling (1995)
3880,Documentary,3890,Back Stage (2000)
3881,Comedy,2555,Baby Geniuses (1999)


In [8]:
train_df, test_df = ratings_df.randomSplit([0.6, 0.4], seed=12345)
train_df.persist()

DataFrame[movieId: bigint, rating: double, timestamp: bigint, userId: bigint]

## レコメンドモデル (ALS) 定義
### パラメーター

|パラメータ名|説明|
|:--|:--|
|userCol|ユーザーIDが記録されているカラム名|
|itemCol|アイテムIDが記録されているカラム名|
|ratingCol|ユーザーのアイテムに対する評価値が記録されているカラム名。レビュー値のように明示的なものと、アクセス回数など暗黙的なものどちらかを使用する。|
|coldStartStrategy|訓練データに含まれない未知のユーザーやアイテムの取り扱い方法。'nan'は未知のIDに対する推論値をnanで返す。'drop'は未知のIDが含まれる行を落とす。|
|numUserBlocks|ユーザーの潜在因子行列のDataFrameパーティション数|
|numItemBlocks|アイテムの潜在因子行列のDataFrameパーティション数|
|implicitPrefs|ratingColに暗黙的な値を使用するか。Trueの場合、暗黙的評価値用の計算式が内部でしようされる。|
|nonnegative|ratingColに含まれる値が非負値か。|
|maxIter|収束計算の繰り返し回数|
|rank|ユーザー、アイテムの潜在因子行列の次元数|
|alpha|implicitPrefs=Trueの時のみ有効。暗黙的評価値の信頼度の高さを示す値。|
|regParam|正則化パラメータ|


In [9]:
als = ALS(
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
    coldStartStrategy='drop',
    numUserBlocks=2,
    numItemBlocks=2,
    implicitPrefs=False,
    nonnegative=True)

## レコメンドモデル計算
### グリッドサーチパラメータ定義

In [10]:
param_grid = (
    ParamGridBuilder()
    .addGrid(als.maxIter, [5, 10, 15])
    .addGrid(als.rank, [10, 15, 20])
    # alpha enabled only when implicitPrefs is True
    # .addGrid(als.alpha, [1.0, 10.0, 100.0])
    .addGrid(als.regParam, [0.01, 0.05, 0.1, 0.5, 1.0])
    .build()
)

### 評価指標選択 (RMSE)

In [11]:
evaluator = RegressionEvaluator(
    metricName='rmse',
    labelCol='rating',
    predictionCol='prediction')

### グリッドサーチ実行

In [12]:
tsv = TrainValidationSplit(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    trainRatio=0.8,
    collectSubModels=True)

In [13]:
models = tsv.fit(train_df)

In [14]:
best_model = models.bestModel

## モデル評価

In [15]:
prediction = best_model.transform(test_df)
rmse = evaluator.evaluate(prediction)
print('RMSE: {:.3f}'.format(rmse))

RMSE: 0.871


In [16]:
metric_param_sets = [(m, p) for m, p in zip(models.validationMetrics, param_grid)]

## レコメンド結果確認
### 確認対象ユーザー選択

In [17]:
users = (
    ratings_df
    .select('userId')
    .distinct()
    .filter(F.col('userId') == 55)
)

In [18]:
users.toPandas()

Unnamed: 0,userId
0,55


### 確認対象ユーザー評価履歴確認

In [19]:
(
    ratings_df
    .join(users, ['userId'], 'inner')
    .join(movies_df, ['movieId'], 'inner')
    .orderBy('userId', F.desc('rating'))
    .toPandas()
)

Unnamed: 0,movieId,userId,rating,timestamp,genre,title
0,110,55,5.0,977943155,Action|Drama|War,Braveheart (1995)
1,318,55,5.0,977942882,Drama,"Shawshank Redemption, The (1994)"
2,356,55,5.0,977948435,Comedy|Romance|War,Forrest Gump (1994)
3,589,55,5.0,977948346,Action|Sci-Fi|Thriller,Terminator 2: Judgment Day (1991)
4,3114,55,5.0,977943112,Animation|Children's|Comedy,Toy Story 2 (1999)
5,2761,55,5.0,977948346,Animation|Children's,"Iron Giant, The (1999)"
6,527,55,5.0,977942911,Drama|War,Schindler's List (1993)
7,1704,55,5.0,977943112,Drama,Good Will Hunting (1997)
8,457,55,5.0,977948394,Action|Thriller,"Fugitive, The (1993)"
9,2762,55,5.0,977943181,Thriller,"Sixth Sense, The (1999)"


### レコメンド結果確認

In [20]:
(
    best_model
    .recommendForUserSubset(users, 10)
    .withColumn('temp', F.explode('recommendations'))
    .select(
        'userId',
        F.col('temp').getItem('movieId').alias('movieId'),
        F.col('temp').getItem('rating').alias('rating')
    )
    .join(movies_df, ['movieId'], 'inner')
    .orderBy(F.desc('rating'))
    .toPandas()
)

Unnamed: 0,movieId,userId,rating,genre,title
0,2776,55,4.967983,Documentary,"Marcello Mastroianni: I Remember Yes, I Rememb..."
1,3092,55,4.908737,Drama,Chushingura (1962)
2,2197,55,4.881322,Drama,Firelight (1997)
3,37,55,4.789057,Documentary,Across the Sea of Time (1995)
4,3114,55,4.732663,Animation|Children's|Comedy,Toy Story 2 (1999)
5,687,55,4.721839,Drama|Romance,Country Life (1994)
6,1197,55,4.714823,Action|Adventure|Comedy|Romance,"Princess Bride, The (1987)"
7,2905,55,4.692392,Action|Adventure,Sanjuro (1962)
8,1198,55,4.691902,Action|Adventure,Raiders of the Lost Ark (1981)
9,260,55,4.680371,Action|Adventure|Fantasy|Sci-Fi,Star Wars: Episode IV - A New Hope (1977)


In [21]:
spark.stop()