In [1]:
!hdfs dfsadmin -safemode leave

Safe mode is OFF


# BLOCK 1

In [1]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [2]:
conf = SparkConf().set("spark.executor.instances", "2").set("spark.executor.cores", "1").set("spark.executor.memory", "1g")

In [3]:
spark = SparkSession.builder.master("yarn").appName("vorkhlik_spark").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/14 23:43:53 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [5]:
!hdfs dfs -rm -r ml-latest-small
!hdfs dfs -put ml-latest-small .

rm: `ml-latest-small': No such file or directory


In [4]:
from pyspark.sql.types import *

In [5]:
tags_schema = StructType([
    StructField("userId", IntegerType(), True), 
    StructField("movieId", IntegerType(), True), 
    StructField("tag", StringType(), True), 
    StructField("timestamp", LongType(), True)])

ratings_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", LongType()),
])

In [6]:
%%time
ratings_df = spark\
    .read\
    .format("csv")\
    .option("header", "True")\
    .schema(ratings_schema)\
    .load("ml-latest-small/ratings.csv")

CPU times: user 9.98 ms, sys: 3.98 ms, total: 14 ms
Wall time: 11.2 s


In [7]:
%%time
tags_df = spark\
    .read\
    .format("csv")\
    .option("header", "True")\
    .schema(tags_schema)\
    .load("ml-latest-small/tags.csv")

CPU times: user 13.8 ms, sys: 5.7 ms, total: 19.5 ms
Wall time: 391 ms


In [32]:
print(tags_df.count(), ratings_df.count())

3683 100836


Для расчета было выполнено 2 стейджа для каждой операции count, в каждом стейдже 1 таска

# BLOCK 2

In [33]:
tags_df.show()

                                                                                

+------+-------+-----------------+----------+
|userId|movieId|              tag| timestamp|
+------+-------+-----------------+----------+
|     2|  60756|            funny|1445714994|
|     2|  60756|  Highly quotable|1445714996|
|     2|  60756|     will ferrell|1445714992|
|     2|  89774|     Boxing story|1445715207|
|     2|  89774|              MMA|1445715200|
|     2|  89774|        Tom Hardy|1445715205|
|     2| 106782|            drugs|1445715054|
|     2| 106782|Leonardo DiCaprio|1445715051|
|     2| 106782|  Martin Scorsese|1445715056|
|     7|  48516|     way too long|1169687325|
|    18|    431|        Al Pacino|1462138765|
|    18|    431|         gangster|1462138749|
|    18|    431|            mafia|1462138755|
|    18|   1221|        Al Pacino|1461699306|
|    18|   1221|            Mafia|1461699303|
|    18|   5995|        holocaust|1455735472|
|    18|   5995|       true story|1455735479|
|    18|  44665|     twist ending|1456948283|
|    18|  52604|  Anthony Hopkins|

In [34]:
ratings_df.show()

[Stage 25:>                                                         (0 + 1) / 1]

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



                                                                                

In [35]:
unique_movies_count = ratings_df.select("movieId").distinct().count()
unique_users_count = ratings_df.select("userId").distinct().count()
print(f"Unique Movies: {unique_movies_count}, Unique Users: {unique_users_count}")


[Stage 32:>                                                         (0 + 1) / 1]

Unique Movies: 9724, Unique Users: 610


                                                                                

In [36]:
high_ratings_count = ratings_df.filter(ratings_df["rating"] >= 4.0).count()
print(f"Number of Ratings >= 4.0: {high_ratings_count}")

[Stage 38:>                                                         (0 + 1) / 1]

Number of Ratings >= 4.0: 48580


                                                                                

In [9]:
from pyspark.sql import functions as F

In [46]:
top_rated_movies = ratings_df.groupBy("movieId").agg(F.avg("rating").alias("avg_rating")) \
    .orderBy([F.desc("avg_rating"), F.asc("movieId")]).limit(100)
top_rated_movies.show(100)

[Stage 54:>                                                         (0 + 1) / 1]

+-------+----------+
|movieId|avg_rating|
+-------+----------+
|     53|       5.0|
|     99|       5.0|
|    148|       5.0|
|    467|       5.0|
|    495|       5.0|
|    496|       5.0|
|    626|       5.0|
|    633|       5.0|
|    876|       5.0|
|   1140|       5.0|
|   1151|       5.0|
|   1310|       5.0|
|   1349|       5.0|
|   1631|       5.0|
|   1759|       5.0|
|   2075|       5.0|
|   2196|       5.0|
|   2512|       5.0|
|   2824|       5.0|
|   2969|       5.0|
|   2972|       5.0|
|   3073|       5.0|
|   3086|       5.0|
|   3096|       5.0|
|   3303|       5.0|
|   3473|       5.0|
|   3496|       5.0|
|   3531|       5.0|
|   3567|       5.0|
|   3637|       5.0|
|   3678|       5.0|
|   3687|       5.0|
|   3792|       5.0|
|   3795|       5.0|
|   3851|       5.0|
|   3939|       5.0|
|   3940|       5.0|
|   3941|       5.0|
|   3942|       5.0|
|   3951|       5.0|
|   4116|       5.0|
|   4135|       5.0|
|   4180|       5.0|
|   4402|       5.0|
|   4454|    

                                                                                

In [10]:
joined_df = ratings_df.join(tags_df.withColumnRenamed("timestamp", "tag_timestamp"), ["userId", "movieId"])
average_time_diff = joined_df.withColumn("time_difference", (F.col("tag_timestamp") - F.col("timestamp")) / 1000) \
    .agg(F.avg("time_difference")).first()[0]

print(f"Average Time Difference (seconds): {average_time_diff}")

[Stage 3:>                                                          (0 + 1) / 1]

Average Time Difference (seconds): 26243.727372267072


                                                                                

In [55]:
user_avg_ratings = ratings_df.groupBy("userId").agg(F.avg("rating").alias("avg_user_rating"))
average_of_averages = user_avg_ratings.agg(F.avg("avg_user_rating")).first()[0]
print(f"Average of Averages: {average_of_averages}")

[Stage 67:>                                                         (0 + 1) / 1]

Average of Averages: 3.6572223377474016


                                                                                

# BLOCK 3

In [11]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDRegressor
from sklearn.metrics import mean_squared_error
import pandas as pd

In [61]:
tags_df.show(1)

[Stage 78:>                                                         (0 + 1) / 1]

+------+-------+-----+-------------+
|userId|movieId|  tag|tag_timestamp|
+------+-------+-----+-------------+
|     2|  60756|funny|   1445714994|
+------+-------+-----+-------------+
only showing top 1 row



                                                                                

In [16]:
# Join the dataframes
joined_df = ratings_df.join(tags_df.withColumnRenamed("timestamp", "timestamp_tag"), ["userId", "movieId"])

# Step 1: Train TfidfVectorizer on the "tag" column
df = joined_df.toPandas()
tfidf_vectorizer = TfidfVectorizer()
tags_numerical_features = tfidf_vectorizer.fit_transform(df["tag"])

                                                                                

In [17]:
df.head()

Unnamed: 0,userId,movieId,rating,timestamp,tag,timestamp_tag
0,2,60756,5.0,1445714980,will ferrell,1445714992
1,2,60756,5.0,1445714980,Highly quotable,1445714996
2,2,60756,5.0,1445714980,funny,1445714994
3,2,89774,5.0,1445715189,Tom Hardy,1445715205
4,2,89774,5.0,1445715189,MMA,1445715200


In [18]:
sgd_regressor = SGDRegressor()
sgd_regressor.fit(tags_numerical_features, df["rating"])

In [20]:
from pyspark.sql.functions import col, udf

In [21]:
@udf(DoubleType())
def predict_rating(tag):
    tag_numerical_features = tfidf_vectorizer.transform([tag])
    return float(sgd_regressor.predict(tag_numerical_features))

In [23]:
result_df = joined_df.withColumn("predicted_rating", predict_rating(col("tag")))

In [24]:
result_pandas = result_df.select("rating", "predicted_rating").toPandas()

                                                                                

In [25]:
rmse = mean_squared_error(result_pandas["rating"], result_pandas["predicted_rating"], squared=False)
print(f"Root Mean Squared Error (RMSE): {rmse}")


Root Mean Squared Error (RMSE): 0.8734182429099265


In [26]:
result_df.show(50)

                                                                                

+------+-------+------+----------+--------------------+-------------+------------------+
|userId|movieId|rating| timestamp|                 tag|timestamp_tag|  predicted_rating|
+------+-------+------+----------+--------------------+-------------+------------------+
|     2|  60756|   5.0|1445714980|        will ferrell|   1445714992| 4.034152661202453|
|     2|  60756|   5.0|1445714980|     Highly quotable|   1445714996| 3.952807195306802|
|     2|  60756|   5.0|1445714980|               funny|   1445714994| 4.370088129140697|
|     2|  89774|   5.0|1445715189|           Tom Hardy|   1445715205|3.8407134876416285|
|     2|  89774|   5.0|1445715189|                 MMA|   1445715200|3.4214209469260037|
|     2|  89774|   5.0|1445715189|        Boxing story|   1445715207|3.9287583662664822|
|     2| 106782|   5.0|1445714966|     Martin Scorsese|   1445715056|3.8523537113695143|
|     2| 106782|   5.0|1445714966|   Leonardo DiCaprio|   1445715051| 4.242214632283111|
|     2| 106782|   5.

Для запуска был произведен 1 stage с 1 task