# Part 1

In [1]:
!hdfs dfsadmin -safemode leave

Safe mode is OFF


In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import *

In [3]:
conf = (
    SparkConf()
    .set("spark.executor.instances", "2")
    .set("spark.executor.cores", "1")
    .set("spark.executor.memory", "1g")
)

In [4]:
spark = (
    SparkSession.builder.appName("Solomatin_spark")
    .config(conf=conf)
    .master(master="yarn")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/02 14:41:36 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [5]:
!hdfs dfs -rm -r ml-latest-small
!hdfs dfs -put ml-latest-small .

rm: `ml-latest-small': No such file or directory


In [6]:
ratings_schema = StructType(
    fields=[
        StructField("userId", IntegerType()),
        StructField("movieId", IntegerType()),
        StructField("rating", DoubleType()),
        StructField("timestamp", LongType()),
    ]
)
ratings_df = (
    spark.read.format("csv")
    .option("header", "True")
    .schema(ratings_schema)
    .load("ml-latest-small/ratings.csv")
)

In [7]:
tags_schema = StructType(
    fields=[
        StructField("userId", IntegerType()),
        StructField("movieId", IntegerType()),
        StructField("tag", StringType()),
        StructField("timestamp", LongType()),
    ]
)

tags_df = (
    spark.read.format("csv")
    .option("header", "True")
    .schema(tags_schema)
    .load("ml-latest-small/tags.csv")
)

In [8]:
movies_schema = StructType(
    fields=[
        StructField("movieId", IntegerType()),
        StructField("title", StringType()),
        StructField("genres", StringType()),
    ]
)

movies_df = (
    spark.read.format("csv")
    .option("header", "True")
    .schema(movies_schema)
    .load("ml-latest-small/movies.csv")
)

In [9]:
tags_df.count(), ratings_df.count()

                                                                                

(3683, 100836)

Было выполнено по 1 стейдж и 1 таске для каждого count

# Part 2

In [10]:
(
    ratings_df.agg(f.countDistinct("movieId")).show(),
    ratings_df.agg(f.countDistinct("userId")).show(),
)

                                                                                

+-----------------------+
|count(DISTINCT movieId)|
+-----------------------+
|                   9724|
+-----------------------+

+----------------------+
|count(DISTINCT userId)|
+----------------------+
|                   610|
+----------------------+



(None, None)

In [11]:
ratings_df.filter(ratings_df["rating"] >= 4).count()

48580

In [12]:
(
    ratings_df.groupBy("movieId")
    .agg(f.avg("rating").alias("average_rating"))
    .orderBy(f.col("average_rating").desc())
    .limit(100)
    .join(movies_df, "movieId")
    .show(100)
)

                                                                                

+-------+--------------+--------------------+--------------------+
|movieId|average_rating|               title|              genres|
+-------+--------------+--------------------+--------------------+
|    148|           5.0|Awfully Big Adven...|               Drama|
|    496|           5.0|What Happened Was...|Comedy|Drama|Roma...|
|    626|           5.0|Thin Line Between...|              Comedy|
|    633|           5.0|Denise Calls Up (...|              Comedy|
|    876|           5.0|Supercop 2 (Proje...|Action|Comedy|Cri...|
|   1151|           5.0| Lesson Faust (1994)|Animation|Comedy|...|
|   1349|           5.0|Vampire in Venice...|              Horror|
|   2196|           5.0|    Knock Off (1998)|              Action|
|   2824|           5.0| On the Ropes (1999)|   Documentary|Drama|
|   3086|           5.0|Babes in Toyland ...|Children|Comedy|F...|
|   3473|           5.0|Jonah Who Will Be...|              Comedy|
|   3496|           5.0|Madame Sousatzka ...|               Dr

In [13]:
joined_df = ratings_df.join(
    tags_df.withColumnRenamed("timestamp", "timestamp_tag"), ["userId", "movieId"]
)
(
    joined_df.withColumn(
        "time_difference", (f.col("timestamp_tag") - f.col("timestamp")) / 1000
    )
    .agg(f.avg("time_difference"))
    .show()
)

+--------------------+
|avg(time_difference)|
+--------------------+
|  26243.727372267072|
+--------------------+



In [14]:
(
    ratings_df.groupBy("userId")
    .agg(f.avg("rating").alias("avg_user_rating"))
    .agg(f.avg("avg_user_rating"))
    .show()
)

+--------------------+
|avg(avg_user_rating)|
+--------------------+
|  3.6572223377474016|
+--------------------+



# Task 3

In [15]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDRegressor
from sklearn.pipeline import Pipeline
import pandas as pd

In [16]:
tag_raiting = ratings_df.join(
    tags_df.withColumnRenamed("timestamp", "timestamp_tag"), ["userId", "movieId"]
)
tag_raiting.show(5)

+------+-------+------+----------+---------------+-------------+
|userId|movieId|rating| timestamp|            tag|timestamp_tag|
+------+-------+------+----------+---------------+-------------+
|     2|  60756|   5.0|1445714980|   will ferrell|   1445714992|
|     2|  60756|   5.0|1445714980|Highly quotable|   1445714996|
|     2|  60756|   5.0|1445714980|          funny|   1445714994|
|     2|  89774|   5.0|1445715189|      Tom Hardy|   1445715205|
|     2|  89774|   5.0|1445715189|            MMA|   1445715200|
+------+-------+------+----------+---------------+-------------+
only showing top 5 rows



In [17]:
tag_raiting_df = tag_raiting.toPandas()
tag_raiting_df.head()

Unnamed: 0,userId,movieId,rating,timestamp,tag,timestamp_tag
0,2,60756,5.0,1445714980,will ferrell,1445714992
1,2,60756,5.0,1445714980,Highly quotable,1445714996
2,2,60756,5.0,1445714980,funny,1445714994
3,2,89774,5.0,1445715189,Tom Hardy,1445715205
4,2,89774,5.0,1445715189,MMA,1445715200


In [18]:
tfidf_vectorizer = TfidfVectorizer(stop_words="english")
tfidf_features = tfidf_vectorizer.fit_transform(tag_raiting_df["tag"])

sgd_regressor = SGDRegressor()
sgd_regressor.fit(tfidf_features, tag_raiting_df["rating"])


@f.pandas_udf(returnType=DoubleType())
def predict_rating_udf(tag_series):
    tfidf_features = tfidf_vectorizer.transform(tag_series)
    rating_predictions = sgd_regressor.predict(tfidf_features)
    return pd.Series(rating_predictions)

In [19]:
tag_raiting = tag_raiting.withColumn(
    "predicted_rating", predict_rating_udf(f.col("tag"))
)

In [20]:
tag_raiting.show()

[Stage 40:>                                                         (0 + 1) / 1]

+------+-------+------+----------+-----------------+-------------+------------------+
|userId|movieId|rating| timestamp|              tag|timestamp_tag|  predicted_rating|
+------+-------+------+----------+-----------------+-------------+------------------+
|     2|  60756|   5.0|1445714980|     will ferrell|   1445714992|4.0097508059381655|
|     2|  60756|   5.0|1445714980|  Highly quotable|   1445714996| 3.953764252414835|
|     2|  60756|   5.0|1445714980|            funny|   1445714994| 4.381228647134793|
|     2|  89774|   5.0|1445715189|        Tom Hardy|   1445715205| 3.845329604650603|
|     2|  89774|   5.0|1445715189|              MMA|   1445715200| 3.431430028539658|
|     2|  89774|   5.0|1445715189|     Boxing story|   1445715207| 3.945523150434228|
|     2| 106782|   5.0|1445714966|  Martin Scorsese|   1445715056| 3.860100728122621|
|     2| 106782|   5.0|1445714966|Leonardo DiCaprio|   1445715051| 4.247528216899366|
|     2| 106782|   5.0|1445714966|            drugs|  

                                                                                

In [21]:
res = tag_raiting.select(
    f.sqrt(f.avg((f.col("rating") - f.col("predicted_rating")) ** 2)).alias("rmse")
)
res.show()

[Stage 42:>                                                         (0 + 1) / 1]

+------------------+
|              rmse|
+------------------+
|0.8731991743777643|
+------------------+



                                                                                

1 стейдж 1 таска
![](../imgs/rmse_stage.png)