In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import *

In [2]:
!hdfs dfsadmin -safemode leave

Safe mode is OFF


In [3]:
!hdfs dfs -rm -r ml-latest-small
!hdfs dfs -put ml-latest-small .

Deleted ml-latest-small


In [4]:
conf = SparkConf().set("spark.executor.instances", "2").set("spark.executor.cores", "1").set("spark.executor.memory", "1g")

In [5]:
spark = SparkSession.builder.config(conf=conf).master(master="yarn").appName("buyantuev_spark").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/15 17:25:06 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [6]:
spark

### Скриншоты
Скриншоты находятся в папке /notebooks/images

## Playground

In [7]:
%%time
df = spark.range(int(1e6)).withColumn("value", f.lit("some value"))

CPU times: user 8.53 ms, sys: 4.53 ms, total: 13.1 ms
Wall time: 7.98 s


In [8]:
%%time
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+---+----------+
| id|     value|
+---+----------+
|  0|some value|
|  1|some value|
|  2|some value|
|  3|some value|
|  4|some value|
|  5|some value|
|  6|some value|
|  7|some value|
|  8|some value|
|  9|some value|
| 10|some value|
| 11|some value|
| 12|some value|
| 13|some value|
| 14|some value|
| 15|some value|
| 16|some value|
| 17|some value|
| 18|some value|
| 19|some value|
+---+----------+
only showing top 20 rows

CPU times: user 9.83 ms, sys: 10.1 ms, total: 19.9 ms
Wall time: 7.34 s


                                                                                

## Ratings and Tags

In [9]:
ratings_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", LongType()),
])

ratings_df = spark.read.format("csv").option("header", "True").schema(ratings_schema).load("/ml-latest-small/ratings.csv")

In [10]:
%%time
ratings_df.count()

[Stage 3:>                                                          (0 + 1) / 1]

CPU times: user 15.9 ms, sys: 12.6 ms, total: 28.6 ms
Wall time: 8.38 s


                                                                                

100836

In [11]:
tags_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("tag", StringType()),
    StructField("timestamp", LongType()),
])

tags_df = spark.read.format("csv").option("header", "True").schema(tags_schema).load("/ml-latest-small/tags.csv")

In [12]:
%%time
tags_df.count()

[Stage 4:>                                                          (0 + 1) / 1]

CPU times: user 12.8 ms, sys: 1.37 ms, total: 14.2 ms
Wall time: 3.12 s


                                                                                

3683

### Все скриншоты находятся в соседней папке /images/block1

P.S. при этом запуске почему-то появились skipped stages, когда запускал команды `ratings_df.show()` и `tags_df.show()` такого не было

# Task 2

In [13]:
ratings_df.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [19]:
# Посчитать количество уникальных фильмов и уникальных юзеров в таблице “ratings”. (5 баллов)
ratings_df.select('movieId').distinct().count()

9724

In [20]:
# Посчитать количество уникальных фильмов и уникальных юзеров в таблице “ratings”. (5 баллов)
ratings_df.select('userId').distinct().count()

610

In [28]:
# Посчитать, сколько было поставлено оценок >= 4.0. (5 баллов)
ratings_df.select('rating').filter(f.col('rating') >= 4.0).count()

48580

In [29]:
# Вывести топ100 фильмов с самым высоким рейтингом. (6 баллов)
ratings_df.sort(f.col('rating').desc()).show(n=100)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|   1617|   5.0|964982951|
|     1|   1625|   5.0|964983504|
|     1|     47|   5.0|964983815|
|     1|   1732|   5.0|964981125|
|     1|    101|   5.0|964980868|
|     1|   1804|   5.0|964983034|
|     1|    157|   5.0|964984100|
|     1|   1927|   5.0|964981497|
|     1|    216|   5.0|964981208|
|     1|   1954|   5.0|964982176|
|     1|    260|   5.0|964981680|
|     1|   2005|   5.0|964981710|
|     1|    362|   5.0|964982588|
|     1|   2018|   5.0|964980523|
|     1|    527|   5.0|964984002|
|     1|   2033|   5.0|964982903|
|     1|    596|   5.0|964982838|
|     1|   2048|   5.0|964982791|
|     1|    661|   5.0|964982838|
|     1|   2058|   5.0|964982400|
|     1|    923|   5.0|964981529|
|     1|   2078|   5.0|964982838|
|     1|    954|   5.0|964983219|
|     1|   2090|   5.0|964982838|
|     1|   1024|   5.0|964982876|
|     1|   2094|   5.0|964982653|
|     1|   102

In [40]:
# Посчитать разницу во времени в секундах между временем тегирования 
# пользователя данного фильма и временем, когда пользователь поставил оценку фильму.
# В качестве ответа выведете среднюю дельту по времени. (7 баллов)

In [41]:
ratings_tags_df = ratings_df.alias('rating').join(tags_df.alias('tag'), how='inner', on=['userId', 'movieId'])

In [42]:
ratings_tags_df.show()

+------+-------+------+----------+-----------------+----------+
|userId|movieId|rating| timestamp|              tag| timestamp|
+------+-------+------+----------+-----------------+----------+
|     2|  60756|   5.0|1445714980|     will ferrell|1445714992|
|     2|  60756|   5.0|1445714980|  Highly quotable|1445714996|
|     2|  60756|   5.0|1445714980|            funny|1445714994|
|     2|  89774|   5.0|1445715189|        Tom Hardy|1445715205|
|     2|  89774|   5.0|1445715189|              MMA|1445715200|
|     2|  89774|   5.0|1445715189|     Boxing story|1445715207|
|     2| 106782|   5.0|1445714966|  Martin Scorsese|1445715056|
|     2| 106782|   5.0|1445714966|Leonardo DiCaprio|1445715051|
|     2| 106782|   5.0|1445714966|            drugs|1445715054|
|     7|  48516|   1.0|1169687318|     way too long|1169687325|
|    18|    431|   4.0|1462138790|            mafia|1462138755|
|    18|    431|   4.0|1462138790|         gangster|1462138749|
|    18|    431|   4.0|1462138790|      

In [43]:
timestamp_diff_df = ratings_tags_df.select(
    f.col('rating.timestamp').alias('rating_timestamp'), 
    f.col('tag.timestamp').alias('tag_timestamp'),
    (f.col('tag.timestamp') - f.col('rating.timestamp')).alias('timestamp_diff')
)

In [44]:
timestamp_diff_df.show()

+----------------+-------------+--------------+
|rating_timestamp|tag_timestamp|timestamp_diff|
+----------------+-------------+--------------+
|      1445714980|   1445714992|            12|
|      1445714980|   1445714996|            16|
|      1445714980|   1445714994|            14|
|      1445715189|   1445715205|            16|
|      1445715189|   1445715200|            11|
|      1445715189|   1445715207|            18|
|      1445714966|   1445715056|            90|
|      1445714966|   1445715051|            85|
|      1445714966|   1445715054|            88|
|      1169687318|   1169687325|             7|
|      1462138790|   1462138755|           -35|
|      1462138790|   1462138749|           -41|
|      1462138790|   1462138765|           -25|
|      1460242083|   1461699303|       1457220|
|      1460242083|   1461699306|       1457223|
|      1455735416|   1455735479|            63|
|      1455735416|   1455735472|            56|
|      1455049870|   1456948283|       1

In [53]:
# / 1e3 для того чтобы преобразовать в секунды из мс
timestamp_diff_df.select((f.avg('timestamp_diff') / 1e3).alias('avg_timestamp_seconds')).show()

+---------------------+
|avg_timestamp_seconds|
+---------------------+
|   26243.727372266974|
+---------------------+



In [52]:
# Посчитать среднюю оценку от каждого пользователя, в качестве ответа выведете 
# среднее от всех усредненных оценок всех пользователей. (7 баллов)

In [64]:
avg_ratings = ratings_df.groupBy('userId').avg('rating')
avg_ratings.show()

+------+------------------+
|userId|       avg(rating)|
+------+------------------+
|   148|3.7395833333333335|
|   463| 3.787878787878788|
|   471|             3.875|
|   496| 3.413793103448276|
|   243| 4.138888888888889|
|   392|               3.2|
|   540|               4.0|
|    31|              3.92|
|   516|3.6923076923076925|
|    85|3.7058823529411766|
|   137| 3.978723404255319|
|   251| 4.869565217391305|
|   451|3.7941176470588234|
|   580| 3.529816513761468|
|    65| 4.029411764705882|
|   458|4.1525423728813555|
|    53|               5.0|
|   255|2.5681818181818183|
|   481| 2.806451612903226|
|   588|              3.25|
+------+------------------+
only showing top 20 rows



In [71]:
avg_ratings.agg(f.avg('avg(rating)')).show()

+------------------+
|  avg(avg(rating))|
+------------------+
|3.6572223377474016|
+------------------+



# Task 3

In [72]:
ratings_tags_df.show()

+------+-------+------+----------+-----------------+----------+
|userId|movieId|rating| timestamp|              tag| timestamp|
+------+-------+------+----------+-----------------+----------+
|     2|  60756|   5.0|1445714980|     will ferrell|1445714992|
|     2|  60756|   5.0|1445714980|  Highly quotable|1445714996|
|     2|  60756|   5.0|1445714980|            funny|1445714994|
|     2|  89774|   5.0|1445715189|        Tom Hardy|1445715205|
|     2|  89774|   5.0|1445715189|              MMA|1445715200|
|     2|  89774|   5.0|1445715189|     Boxing story|1445715207|
|     2| 106782|   5.0|1445714966|  Martin Scorsese|1445715056|
|     2| 106782|   5.0|1445714966|Leonardo DiCaprio|1445715051|
|     2| 106782|   5.0|1445714966|            drugs|1445715054|
|     7|  48516|   1.0|1169687318|     way too long|1169687325|
|    18|    431|   4.0|1462138790|            mafia|1462138755|
|    18|    431|   4.0|1462138790|         gangster|1462138749|
|    18|    431|   4.0|1462138790|      

In [73]:
tag_rating_df = ratings_tags_df.select('tag', 'rating')
tag_rating_pdf = tag_rating_df.toPandas()
tag_rating_pdf.head()

Unnamed: 0,tag,rating
0,will ferrell,5.0
1,Highly quotable,5.0
2,funny,5.0
3,Tom Hardy,5.0
4,MMA,5.0


In [74]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDRegressor

In [83]:
# fit vectorizer on tags
tfidf_vectorizer = TfidfVectorizer()
tfidf_vectorizer.fit(tag_rating_pdf['tag'])

In [84]:
# extract features
features = tfidf_vectorizer.transform(tag_rating_pdf['tag'])

In [85]:
# fit regressor
sgd_regressor = SGDRegressor()
sgd_regressor.fit(features, tag_rating_pdf['rating'])

In [133]:
# predict rating
@f.udf(DoubleType())
def predict_ratings(tags):
    features = tfidf_vectorizer.transform([tags])
    pred_ratings = sgd_regressor.predict(features)
    return pred_ratings.item()

In [134]:
pred_ratings = predict_ratings(f.col('tag'))

In [135]:
tag_rating_pred_df = tag_rating_df.withColumn('pred_rating', pred_ratings)

In [142]:
tag_rating_pred_df.show(50)

[Stage 132:>                                                        (0 + 1) / 1]

+--------------------+------+------------------+
|                 tag|rating|       pred_rating|
+--------------------+------+------------------+
|        will ferrell|   5.0| 4.031698632075657|
|     Highly quotable|   5.0| 3.947672391325878|
|               funny|   5.0| 4.366629736064444|
|           Tom Hardy|   5.0|3.8392031080428284|
|                 MMA|   5.0| 3.420916147368894|
|        Boxing story|   5.0| 3.925578391957534|
|     Martin Scorsese|   5.0| 3.850810963585713|
|   Leonardo DiCaprio|   5.0| 4.247305779367469|
|               drugs|   5.0|4.3038837673231205|
|        way too long|   1.0|3.4290982787726616|
|               mafia|   4.0| 4.159357567358086|
|            gangster|   4.0| 3.623891768880919|
|           Al Pacino|   4.0|  3.86081795770398|
|               Mafia|   5.0| 4.159357567358086|
|           Al Pacino|   5.0|  3.86081795770398|
|          true story|   4.5|3.8517460702505626|
|           holocaust|   4.5|3.9723142370643676|
|        twist endin

                                                                                

In [143]:
# rmse
# formula: sqrt( sum( (y_pred - y_true)^2 ) / n )
# sum / n = avg

In [141]:
tag_rating_pred_df.select(f.sqrt(f.avg(f.pow(f.col('rating') - f.col('pred_rating'), 2))).alias('rmse')).show()

[Stage 128:>                                                        (0 + 1) / 1]

+------------------+
|              rmse|
+------------------+
|0.8734283467400913|
+------------------+



                                                                                

### Все скриншоты находятся в соседней папке /images/block3