In [27]:
import pyspark
from pyspark import SparkContext, SparkConf

In [3]:
!hdfs dfsadmin -safemode leave

Safe mode is OFF


In [4]:
conf = SparkConf().set("spark.executor.instances", "2").set("spark.executor.cores", "1").set("spark.executor.memory", "1g")

In [5]:
sc = SparkContext(appName="shein_spark", master="yarn")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/20 22:52:36 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [28]:
!hdfs dfs -rm -r ml-latest-small
!hdfs dfs -put ml-latest-small .

Deleted ml-latest-small


In [29]:
rdd_ratings = sc.textFile("ml-latest-small/ratings.csv")

In [30]:
rdd_ratings.count()

100837

In [31]:
rdd_ratings.getNumPartitions()

2

### Вывод
Был выполнен один stage и две таски

In [32]:
rdd_tags = sc.textFile("ml-latest-small/tags.csv")

In [33]:
rdd_tags.count()

3684

In [34]:
rdd_tags.getNumPartitions()

2

### Вывод
Был выполнен один stage и две таски

# Вторая часть

In [47]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as f

In [37]:
spark = SparkSession.builder.master("yarn").appName("pyspark_sql_api").getOrCreate()

In [39]:
ratings_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", LongType()),
])

In [40]:
ratings = spark.read.format("csv").option("header", "True").schema(ratings_schema).load("ml-latest-small/ratings.csv")

In [59]:
# количество уникальных фильмов
ratings.groupBy(f.col("movieId")).count().count()

9724

In [58]:
# количество уникальных пользователей 
ratings.groupBy(f.col("userId")).count().count()

610

In [54]:
# оценок не меньших 4.0
ratings.filter(f.col("rating") >= 4.0).count()

48580

In [74]:
# топ-100 фильмов по рейтингу
ratings.groupBy(f.col("movieId")).avg("rating").orderBy("avg(rating)", ascending=False).show(100)

+-------+-----------+
|movieId|avg(rating)|
+-------+-----------+
|  80124|        5.0|
|  74226|        5.0|
| 175387|        5.0|
|    148|        5.0|
| 158027|        5.0|
|  67618|        5.0|
|  42556|        5.0|
| 142444|        5.0|
|  44943|        5.0|
|  27523|        5.0|
| 113829|        5.0|
|   3073|        5.0|
| 152711|        5.0|
| 132333|        5.0|
|     53|        5.0|
|  27704|        5.0|
| 147300|        5.0|
| 187717|        5.0|
|    633|        5.0|
|   8238|        5.0|
|    876|        5.0|
| 120138|        5.0|
|  53578|        5.0|
| 163072|        5.0|
| 160644|        5.0|
| 172909|        5.0|
|   5490|        5.0|
| 172583|        5.0|
|   2972|        5.0|
|   2196|        5.0|
|  25887|        5.0|
| 118834|        5.0|
|   3795|        5.0|
| 156025|        5.0|
|   3941|        5.0|
| 120130|        5.0|
|    626|        5.0|
|   1349|        5.0|
|  26928|        5.0|
| 122092|        5.0|
|  40491|        5.0|
|  71268|        5.0|
| 167064| 

In [79]:
tags_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("tag", StringType()),
    StructField("timestamp", LongType()),
])

In [80]:
tags = spark.read.format("csv").option("header", "True").schema(tags_schema).load("ml-latest-small/tags.csv")

In [102]:
# средняя дельта по времени
ratings_with_tags = ratings.alias("r").join(tags.alias("t"), on=["userId", "movieId"], how="inner")
deltas = ratings_with_tags.select((f.col("t.timestamp") - f.col("r.timestamp")).alias("delta"))
deltas.select(f.avg(f.col("delta"))).show()

+--------------------+
|          avg(delta)|
+--------------------+
|2.6243727372266974E7|
+--------------------+



In [99]:
# Средняя от средних оценок пользователей
avgs = ratings.groupBy(f.col("userId")).avg("rating")
avgs.select(f.avg(f.col("avg(rating)"))).show()

+------------------+
|  avg(avg(rating))|
+------------------+
|3.6572223377474016|
+------------------+

