In [1]:
!pip install pyspark
!apt install openjdk-8-headless -qql

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 42.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845513 sha256=4ca4418dd5c6d32475e738580a50a31c14172ba796dda472cad7d46275b74d56
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1
E: Command line option 'l' [from -qql] is not understood in combination with the other options.


In [2]:
import pyspark
from pyspark.sql import *

In [3]:
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.types import *

In [6]:
schema = StructType([
    StructField('userId', IntegerType(), True),
    StructField('movieId', IntegerType(), True),
    StructField('rating', FloatType(), True),
])
rating_df = spark.read.option('header',True).schema(schema).csv('/content/drive/MyDrive/3-2 university/Big_data_process_and_application/input/uRating.csv')
rating_df.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     0|      2|   3.0|
|     0|      3|   1.0|
|     0|      5|   2.0|
|     0|      9|   4.0|
|     0|     11|   1.0|
|     0|     12|   2.0|
|     0|     15|   1.0|
|     0|     17|   1.0|
|     0|     19|   1.0|
|     0|     21|   1.0|
|     0|     23|   1.0|
|     0|     26|   3.0|
|     0|     27|   1.0|
|     0|     28|   1.0|
|     0|     29|   1.0|
|     0|     30|   1.0|
|     0|     31|   1.0|
|     0|     34|   1.0|
|     0|     37|   1.0|
|     0|     41|   2.0|
+------+-------+------+
only showing top 20 rows



In [14]:
train, test = rating_df.randomSplit([0.8, 0.2])

In [15]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS


In [16]:
# MaxIter : 최대 반복횟수
# UserCol : 참조할 user 열
# ItemCol : 참조할 item 열
# RatingCol : 참조할 rating 열
als = ALS().setMaxIter(5).setUserCol("userId").setItemCol('movieId').setRatingCol('rating')

alsModel = als.fit(train)

In [18]:
#모든 사용자에게 5개씩 추천
alsModel.recommendForAllUsers(5).first()

Row(userId=20, recommendations=[Row(movieId=22, rating=3.6156795024871826), Row(movieId=94, rating=3.3749289512634277), Row(movieId=77, rating=3.2377731800079346), Row(movieId=75, rating=3.1932477951049805), Row(movieId=88, rating=3.186223268508911)])

In [20]:
#추천 데이터를 임시 view로 생성
userRecommend = alsModel.recommendForAllUsers(5)
userRecommend.createOrReplaceTempView('userRecommend')

#inline(단항 표현식) 으로 recommendations열의 movieId, rating 분리
userRecTable = spark.sql('select inline(recommendations) from userRecommend where userId = 20')
userRecTable.show()

+-------+---------+
|movieId|   rating|
+-------+---------+
|     22|3.6156795|
|     94| 3.374929|
|     77|3.2377732|
|     75|3.1932478|
|     88|3.1862233|
+-------+---------+



In [23]:
schema = StructType([
    StructField('movieId', IntegerType(), True),
    StructField('title', StringType(), True),
])

movie_df = spark.read.option('header', True).schema(schema).csv('/content/drive/MyDrive/3-2 university/Big_data_process_and_application/input/uItem.csv')
movie_df.show()

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|    GoldenEye (1995)|
|      3|   Four Rooms (1995)|
|      4|   Get Shorty (1995)|
|      5|      Copycat (1995)|
|      6|Shanghai Triad (Y...|
|      7|Twelve Monkeys (1...|
|      8|         Babe (1995)|
|      9|Dead Man Walking ...|
|     10|  Richard III (1995)|
|     11|Seven (Se7en) (1995)|
|     12|Usual Suspects, T...|
|     13|Mighty Aphrodite ...|
|     14|  Postino, Il (1994)|
|     15|Mr. Holland's Opu...|
|     16|French Twist (Gaz...|
|     17|From Dusk Till Da...|
|     18|White Balloon, Th...|
|     19|Antonia's Line (1...|
|     20|Angels and Insect...|
+-------+--------------------+
only showing top 20 rows



In [27]:
userRecTitle = userRecTable.join(movie_df, userRecTable.movieId == movie_df.movieId, 'inner').select(userRecTable.movieId, 'rating','title')

userRecTitle.show()

+-------+---------+--------------------+
|movieId|   rating|               title|
+-------+---------+--------------------+
|     22|3.6156795|   Braveheart (1995)|
|     94| 3.374929|   Home Alone (1990)|
|     77|3.2377732|    Firm, The (1993)|
|     75|3.1932478|Brother Minister:...|
|     88|3.1862233|Sleepless in Seat...|
+-------+---------+--------------------+



In [29]:
predictions = alsModel.transform(test)
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|     2|     78|   1.0|0.89010644|
|     2|     34|   4.0|0.72920734|
|     2|     12|   3.0| 1.7875842|
|     2|     40|   4.0| 1.0190963|
|     1|     57|   1.0| 1.4010577|
|     0|     96|   1.0| 1.8156339|
|     0|     19|   1.0|  1.676346|
|     2|     15|   2.0| 1.0905502|
|     2|     37|   5.0| 1.1270897|
|     1|      9|   3.0| 1.3716092|
|     1|     72|   1.0| 1.0222877|
|     1|      4|   2.0| 1.4676647|
|     2|     87|   2.0| 1.8413959|
|     0|     51|   1.0| 1.2781427|
|     3|     62|   1.0| 2.4622784|
|     3|     58|   1.0| 1.1714917|
|     0|     11|   1.0|0.93623483|
|     2|     71|   3.0| 1.2085121|
|     0|     71|   1.0| 1.3127949|
|     3|      2|   1.0|  1.910242|
+------+-------+------+----------+
only showing top 20 rows



In [30]:
evaluator = RegressionEvaluator().setMetricName('rmse').setLabelCol('rating').setPredictionCol('prediction')

#MetricName = 측정할 메트릭 이름 (RMSE, MAE 등등)
#LabelCol = 실제 값(label)으로 참조할 열
#predictionCol = 예측 값으로 참조할 열

rmse = evaluator.evaluate(predictions)

print('Root-mean-square error = %f' %rmse)

Root-mean-square error = 1.165621
