In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import *

In [2]:
!hdfs dfsadmin -safemode leave

Safe mode is OFF


In [8]:
#!hdfs fs -rm -r ml-latest-small
!hdfs dfs -put ml-latest-small /

In [9]:
conf = SparkConf()\
    .set("spark.executor.instances", "2")\
    .set("spark.executor.cores", "1")\
    .set("spark.executor.memory", "1g")

In [11]:
spark = SparkSession.builder.appName("Svirin_spark").config(conf=conf).master(master="yarn").getOrCreate()

Block1:

Приложить скрин YARN-а, где запущено приложение, приложить скрин UI приложения spark

![Spark UI.png](attachment:e02913ab-f048-4fdc-a0e0-83daf78d7486.png)![YARN.png](attachment:f6062a81-74b5-47f3-bcf1-2654596a8893.png)

Прочитать таблицы ratings, tags в директории ml-latest-small; отобразить количество строчек и в том, и в другом датасете.

In [12]:
ratings_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", LongType()),
])

tags_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("tag", StringType()),
    StructField("timestamp", LongType()),
])

In [23]:
%%time
ratings_df = spark\
    .read\
    .format("csv")\
    .option("header", "True")\
    .schema(ratings_schema)\
    .load("/ml-latest-small/ratings.csv")

CPU times: user 2.09 ms, sys: 0 ns, total: 2.09 ms
Wall time: 17.2 ms


In [24]:
%%time
tags_df = spark\
    .read\
    .format("csv")\
    .option("header", "True")\
    .schema(tags_schema)\
    .load("/ml-latest-small/tags.csv")

CPU times: user 1.55 ms, sys: 1.45 ms, total: 2.99 ms
Wall time: 15.8 ms


In [25]:
ratings_df.count()

100836

In [26]:
tags_df.count() 

3683

Приложить скрин spark-ui с выполненной job-ой (можно приложить прям в markdown ячейки, можно положить в той же папке, что и ноутбук). Написать, сколько было выполнено стейджей и тасок.

![jobs.png](attachment:2bfc1f53-fe03-44c3-a811-732a2960c68f.png)

![job11.png](attachment:b800078e-bd05-40cb-9523-b7c2e72e73b3.png)

Выполнено 2 стейджа, 2 таски.

Block2:

1) Посчитать количество уникальных фильмов и уникальных юзеров в таблице “ratings”.

In [29]:
ratings_df.select("movieId").distinct().count()

9724

In [30]:
ratings_df.select("userId").distinct().count()

610

2) Посчитать, сколько было поставлено оценок >= 4.0. 

In [31]:
ratings_df.filter(4 <= ratings_df["rating"]).count()

48580

3) Вывести топ100 фильмов с самым высоким рейтингом.

In [34]:
ratings_df.groupBy("movieId").agg(f.avg("rating").alias("average_rating")).orderBy(f.desc("average_rating")).take(100)

[Row(movieId=2196, average_rating=5.0),
 Row(movieId=88448, average_rating=5.0),
 Row(movieId=172909, average_rating=5.0),
 Row(movieId=67618, average_rating=5.0),
 Row(movieId=3496, average_rating=5.0),
 Row(movieId=148, average_rating=5.0),
 Row(movieId=142444, average_rating=5.0),
 Row(movieId=496, average_rating=5.0),
 Row(movieId=8911, average_rating=5.0),
 Row(movieId=118834, average_rating=5.0),
 Row(movieId=173963, average_rating=5.0),
 Row(movieId=156025, average_rating=5.0),
 Row(movieId=5513, average_rating=5.0),
 Row(movieId=120130, average_rating=5.0),
 Row(movieId=26350, average_rating=5.0),
 Row(movieId=1349, average_rating=5.0),
 Row(movieId=147300, average_rating=5.0),
 Row(movieId=122092, average_rating=5.0),
 Row(movieId=633, average_rating=5.0),
 Row(movieId=71268, average_rating=5.0),
 Row(movieId=876, average_rating=5.0),
 Row(movieId=99636, average_rating=5.0),
 Row(movieId=53578, average_rating=5.0),
 Row(movieId=6086, average_rating=5.0),
 Row(movieId=160644, a

4) Посчитать разницу во времени в секундах между временем тегирования пользователя данного фильма и временем, когда пользователь поставил оценку фильму. В качестве ответа выведете среднюю дельту по времени.

In [39]:
joined = ratings_df.withColumnRenamed("timestamp", "ratings_timestamp").join(tags_df, on=['userId', 'movieId'])
time_diff = joined.withColumn("time_diff", f.col("timestamp") -  f.col("ratings_timestamp"))
#в секундах
print(time_diff.agg(f.mean('time_diff')).collect()[0][0]  / 1000)

26243.727372266974


5. Посчитать среднюю оценку от каждого пользователя, в качестве ответа выведете среднее от всех усредненных оценок всех пользователей. (7 баллов)

In [40]:
ans = ratings_df.groupBy("userId").agg(f.avg("rating").alias("average_by_user")).agg(f.avg("average_by_user"))

In [42]:
ans.collect()[0][0]

3.6572223377474016

Block3:

Обучите модель предсказания оценок по тегам
с помощью TfidfVectorizer и SGDRegressor из модуля scikit-learn - тут уже можно сконвертировать два датасета в
pandas через .toPandas

In [84]:
rating_tag = joined.toPandas()

In [85]:
rating_tag_spark = joined

In [86]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDRegressor
import pandas as pd

сначала  TfidfVectorizer обучаете на колонке “tag”
получаете численные признаки transform-ом от tfidf на той же колонке “tag”

In [87]:
tfidf_vectorizer = TfidfVectorizer()
transformed = tfidf_vectorizer.fit_transform(rating_tag["tag"])

обучаете SGDRegressor на новых численных признаках от  TfidfVectorizer-а с лейблом “rating”


In [88]:
sgd_regressor = SGDRegressor()
sgd_regressor.fit(transformed, rating_tag["rating"])

Напишите UDF, которая делает предсказание рейтинга по столбцу “tag”

In [106]:
@f.pandas_udf(DoubleType())
def predict(tags):
    transformed = tfidf_vectorizer.transform(tags)
    return sgd_regressor.predict(transformed)


In [107]:
a = rating_tag_spark.withColumn("predicted", predict(f.col("tag")))



In [108]:
a.show(10)



24/02/05 00:07:19 WARN TaskSetManager: Lost task 0.0 in stage 73.0 (TID 61) (nodemanager1 executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1707060676298_0001/container_1707060676298_0001_01_000003/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1707060676298_0001/container_1707060676298_0001_01_000003/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1707060676298_0001/container_1707060676298_0001_01_000003/pyspark.zip/pyspark/sql/pandas/serializers.py", line 470, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
  File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1707060676298_0001/co

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1707060676298_0001/container_1707060676298_0001_01_000003/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1707060676298_0001/container_1707060676298_0001_01_000003/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1707060676298_0001/container_1707060676298_0001_01_000003/pyspark.zip/pyspark/sql/pandas/serializers.py", line 470, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
  File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1707060676298_0001/container_1707060676298_0001_01_000003/pyspark.zip/pyspark/sql/pandas/serializers.py", line 100, in dump_stream
    for batch in iterator:
  File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1707060676298_0001/container_1707060676298_0001_01_000003/pyspark.zip/pyspark/sql/pandas/serializers.py", line 464, in init_stream_yield_batches
    batch = self._create_batch(series)
  File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1707060676298_0001/container_1707060676298_0001_01_000003/pyspark.zip/pyspark/sql/pandas/serializers.py", line 450, in _create_batch
    arrs.append(self._create_array(s, t, arrow_cast=self._arrow_cast))
  File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1707060676298_0001/container_1707060676298_0001_01_000003/pyspark.zip/pyspark/sql/pandas/serializers.py", line 235, in _create_array
    if hasattr(series.array, "__arrow_array__"):
AttributeError: 'numpy.ndarray' object has no attribute 'array'
