In [None]:
# This script demostrates PySpark functions to join,
# group and aggregate data

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder.appName("Aggregation").getOrCreate()

In [5]:
ratings_location = "/home/datamaking/Documents/Hadoop/ml-latest/ratings.csv"

ratings = spark.read.csv(
            path=ratings_location,
            sep=",",
            header=True,
            quote='"',
            schema="userId int, movieId int, rating double, timestamp int"
            ).withColumn("timestamp",f.to_timestamp(f.from_unixtime("timestamp")))

In [6]:
ratings.show(5)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|    307|   3.5|2009-10-28 00:00:21|
|     1|    481|   3.5|2009-10-28 00:04:16|
|     1|   1091|   1.5|2009-10-28 00:04:31|
|     1|   1257|   4.5|2009-10-28 00:04:20|
|     1|   1449|   4.5|2009-10-28 00:01:04|
+------+-------+------+-------------------+
only showing top 5 rows



In [9]:
tags_location = "/home/datamaking/Documents/Hadoop/ml-latest/tags.csv"

tags = spark.read.csv(
        path=tags_location,
        sep=",",
        header=True,
        quote='"',
        schema="userId int, movieId int, tag string, timestamp int"
        ).withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp")))

tags.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [10]:
tags.show(5)

+------+-------+------------+-------------------+
|userId|movieId|         tag|          timestamp|
+------+-------+------------+-------------------+
|    14|    110|        epic|2015-09-25 05:35:38|
|    14|    110|    Medieval|2015-09-25 05:35:32|
|    14|    260|      sci-fi|2015-09-13 21:36:50|
|    14|    260|space action|2015-09-13 21:37:01|
|    14|    318|imdb top 250|2015-09-19 01:26:35|
+------+-------+------------+-------------------+
only showing top 5 rows



In [11]:
# using the aggregation function

ratings.groupBy("movieId").agg(
    f.count("*"),
    f.min("rating"),
    f.max("rating"),
    f.avg("rating"),
    f.min("timestamp"),
    f.max("timestamp")
).show()

+-------+--------+-----------+-----------+------------------+-------------------+-------------------+
|movieId|count(1)|min(rating)|max(rating)|       avg(rating)|     min(timestamp)|     max(timestamp)|
+-------+--------+-----------+-----------+------------------+-------------------+-------------------+
|   1591|    6508|        0.5|        5.0|2.6466656422864165|1997-07-18 10:40:59|2018-09-26 04:08:31|
|   1645|   15215|        0.5|        5.0|3.5352941176470587|1997-10-27 17:02:53|2018-09-26 02:27:00|
|    471|   12308|        0.5|        5.0| 3.652908677283068|1996-03-01 03:00:00|2018-09-23 02:04:55|
|   1088|   14100|        0.5|        5.0|3.2480141843971633|1996-09-24 22:49:39|2018-09-26 02:44:08|
|   1580|   44287|        0.5|        5.0| 3.578533203874726|1997-07-04 13:54:16|2018-09-26 08:03:50|
|   3997|    2484|        0.5|        5.0| 2.072866344605475|2000-11-27 02:31:44|2018-09-22 04:20:23|
|   3175|   16713|        0.5|        5.0|3.5861305570513973|1999-12-26 04:42:01|2

In [16]:
tags.groupBy("userId").agg(
    f.collect_set("tag").alias("tags"),
    f.count("tag").alias("tag_count"),
    f.collect_set("userId").alias("users"),
    f.count("userId").alias("user_count"),
    f.min("timestamp").alias("first_tagged_date"),
    f.max("timestamp").alias("last_tagged_date")
).sort(f.col("tag_count").desc()).show()

+------+--------------------+---------+--------+----------+-------------------+-------------------+
|userId|                tags|tag_count|   users|user_count|  first_tagged_date|   last_tagged_date|
+------+--------------------+---------+--------+----------+-------------------+-------------------+
| 73406|[loch ness monste...|   188999| [73406]|    188999|2014-11-18 11:16:42|2018-08-22 04:30:59|
|103013|[china, David Mit...|    21452|[103013]|     21452|2006-05-19 13:46:38|2016-09-06 13:40:07|
|195628|[Ellen Goosenberg...|    14428|[195628]|     14428|2007-12-21 21:58:50|2018-09-18 03:03:05|
| 54594|[camera as portal...|    11632| [54594]|     11632|2006-01-13 09:13:38|2018-07-05 08:41:23|
|268681|[Technirama, Sovs...|    10701|[268681]|     10701|2006-03-19 05:49:07|2017-12-09 00:04:54|
| 60651|[girl, Heart of D...|    10664| [60651]|     10664|2007-01-02 02:45:53|2015-10-17 16:17:53|
| 47960|[girl, internet c...|    10184| [47960]|     10184|2011-02-19 01:43:04|2018-09-16 00:38:42|


In [20]:
ratings.groupBy("userId").agg(
    f.collect_set("movieId").alias("movieId"),
    f.count("*").alias("count"),
    f.avg("rating").alias("Average_User_Rating"),
    f.min("rating"),
    f.max("rating")
).sort("count", ascending=False).show()

+------+--------------------+-----+-------------------+-----------+-----------+
|userId|             movieId|count|Average_User_Rating|min(rating)|max(rating)|
+------+--------------------+-----+-------------------+-----------+-----------+
|123100|[171057, 3702, 25...|23715| 3.1306346194391734|        0.5|        5.0|
|117490|[4189, 25743, 629...| 9279| 3.2784243991809463|        0.5|        5.0|
|134596|[3702, 146544, 58...| 8381|  3.198305691444935|        0.5|        5.0|
|212343|[6296, 256, 12203...| 7884| 2.5880263825469303|        0.5|        5.0|
|242683|[4189, 6296, 4445...| 7515| 3.2083166999334662|        1.0|        5.0|
|111908|[6296, 5809, 3476...| 6645| 1.5249811888638074|        0.5|        5.0|
| 77609|[6296, 5809, 2594...| 6398| 2.8122069396686467|        0.5|        5.0|
| 63783|[6296, 45611, 256...| 6346| 3.4854238890639775|        0.5|        5.0|
|172357|[6296, 5809, 3958...| 5868|  2.442058623040218|        0.5|        5.0|
|141955|[118560, 6296, 58...| 5810| 2.87