In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark

In [2]:
df = spark.read.csv('combined_data_2.txt')
df = df.withColumnRenamed("_c0", "user_id") \
        .withColumnRenamed("_c1", "rating") \
        .withColumnRenamed("_c2", "date_created")
df.limit(10)

user_id,rating,date_created
2532865,4,2005-07-26
573364,3,2005-06-20
1696725,3,2004-02-27
1253431,3,2004-03-31
1265574,2,2003-09-01
1049643,1,2003-11-15
1601348,4,2005-04-05
1495289,5,2005-07-09
1254903,3,2003-09-02
2604070,3,2005-05-15


In [3]:
df.columns

['user_id', 'rating', 'date_created']

In [4]:
df.select(df.user_id)

user_id
2532865
573364
1696725
1253431
1265574
1049643
1601348
1495289
1254903
2604070


average rating by year

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

netflix_with_year = df.withColumn("year", year(df["date_created"]))
result = netflix_with_year.groupBy("year").agg(avg("rating").alias("average_rating"))
result.show()

+----+------------------+
|year|    average_rating|
+----+------------------+
|2003|3.4083814501413294|
|NULL|              NULL|
|2004| 3.578503101120277|
|2001| 3.382906912772836|
|2005| 3.660584134874719|
|2000|3.3424697306162163|
|1999| 3.365296803652968|
|2002| 3.374683384360372|
+----+------------------+



top users by record count

In [7]:
user_rating_counts = df.groupBy("user_id").agg(count("rating").alias("rating_count"))
top_users_by_rating_count = user_rating_counts.orderBy(desc("rating_count")).limit(5)
top_users_by_rating_count.show()

+-------+------------+
|user_id|rating_count|
+-------+------------+
| 305344|        4682|
| 387418|        4636|
|2439493|        4360|
|1664010|        4203|
|2118461|        3899|
+-------+------------+



average rating by year and month

In [8]:
year_month_avg_rating = df.withColumn("year", year("date_created")).withColumn("month", month("date_created"))
avg_rating_by_year_month = year_month_avg_rating.groupBy("year", "month").agg(avg("rating").alias("average_rating"))
avg_rating_by_year_month.show()

+----+-----+------------------+
|year|month|    average_rating|
+----+-----+------------------+
|2005|    5| 3.656830882713842|
|2001|    1| 3.380106976214863|
|2002|    3| 3.415450875369734|
|2004|    6|3.5522613557476546|
|2005|   10|3.6906041917813397|
|2000|    4| 3.344291986827662|
|2004|    8| 3.604412025211573|
|2003|    2|3.4003521101354326|
|1999|   12|3.3646153846153846|
|2002|   12| 3.437042103788434|
|2001|    9|3.3490489982383327|
|2003|   10| 3.443140114145375|
|2004|   10| 3.660078595519999|
|2000|    7| 3.352878716375649|
|2001|    6| 3.384667637069384|
|2002|    8|3.3906751089548557|
|2005|    6|3.6668165870170397|
|2003|    3|3.3537817744735787|
|NULL| NULL|              NULL|
|2002|   11| 3.426248396348987|
+----+-----+------------------+
only showing top 20 rows



top users by average rating with at least 1000 records

In [6]:
user_avg_ratings = df.groupBy("user_id").agg(avg("rating").alias("avg_rating"), count("date_created").alias("rating_count"))

top_users_with_1000_or_more_ratings = user_avg_ratings.filter(user_avg_ratings["rating_count"] >= 1000)

top_5_users_by_rating = top_users_with_1000_or_more_ratings.orderBy(desc("avg_rating")).limit(5)

top_5_users_by_rating.show()

+-------+------------------+------------+
|user_id|        avg_rating|rating_count|
+-------+------------------+------------+
| 794999|  4.99627143922446|        1341|
| 447759| 4.942363112391931|        1041|
| 722591|4.4698235840297125|        1077|
| 716173| 4.344579646017699|        1808|
|1664010| 4.257673090649536|        4203|
+-------+------------------+------------+



In [4]:
from pyspark.sql.functions import dayofweek, collect_list, size

df.withColumn("year", year("date_created")).select("user_id", "year").distinct().groupBy("year").agg(count("user_id").alias("count")).order_by("count")

year,count
2003.0,110125
,4710
2004.0,241356
2001.0,18473
2005.0,428593
2000.0,7697
1999.0,71
2002.0,47317
