In [1]:
from pyspark.sql import SparkSession, functions as f

In [2]:
spark = (
    SparkSession
    .builder
    .appName("Hands-on-5")
    .master("local[*]")
    .getOrCreate()
)

In [3]:
df_ratings = (
    spark
    .read
    .csv(
        path="../../data-sets/ml-latest/ratings.csv",
        header=True,
        encoding="UTF-8",
        sep=",",
        quote='"',
        schema="userId INT, movieId INT, rating DOUBLE, timestamp INT",
    )
    .withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp")))
)

df_tags = (
    spark
    .read
    .csv(
        path="../../data-sets/ml-latest/tags.csv",
        header=True,
        encoding="UTF-8",
        sep=",",
        quote='"',
        schema="userId INT, movieId INT, tag STRING, timestamp INT",
    )
    .withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp")))
)

In [4]:
df_ratings.show(n=5, truncate=False)
df_ratings.printSchema()
df_tags.show(n=5, truncate=False)
df_tags.printSchema()

+------+-------+------+-------------------+
|userId|movieId|rating|timestamp          |
+------+-------+------+-------------------+
|1     |307    |3.5   |2009-10-28 02:30:21|
|1     |481    |3.5   |2009-10-28 02:34:16|
|1     |1091   |1.5   |2009-10-28 02:34:31|
|1     |1257   |4.5   |2009-10-28 02:34:20|
|1     |1449   |4.5   |2009-10-28 02:31:04|
+------+-------+------+-------------------+
only showing top 5 rows

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+------+-------+------------+-------------------+
|userId|movieId|tag         |timestamp          |
+------+-------+------------+-------------------+
|14    |110    |epic        |2015-09-25 08:05:38|
|14    |110    |Medieval    |2015-09-25 08:05:32|
|14    |260    |sci-fi      |2015-09-14 00:06:50|
|14    |260    |space action|2015-09-14 00:07:01|
|14    |318    |imdb top 250|2015-09-19 03:56:35|
+------+----

### Let's use .agg on groupBy for interesting aggregations

In [5]:
(
    df_ratings
    .groupBy("movieId")
    .agg(
        f.count("*"),
        f.min("rating"),
        f.max("rating"),
        f.avg("rating"),
        f.min("timestamp"),
        f.max("timestamp"),
    )
).show()

+-------+--------+-----------+-----------+------------------+-------------------+-------------------+
|movieId|count(1)|min(rating)|max(rating)|       avg(rating)|     min(timestamp)|     max(timestamp)|
+-------+--------+-----------+-----------+------------------+-------------------+-------------------+
|   1591|    6508|        0.5|        5.0|2.6466656422864165|1997-07-18 13:10:59|2018-09-26 06:38:31|
|   1645|   15215|        0.5|        5.0|3.5352941176470587|1997-10-27 19:32:53|2018-09-26 04:57:00|
|    471|   12308|        0.5|        5.0| 3.652908677283068|1996-03-01 05:30:00|2018-09-23 04:34:55|
|   1088|   14100|        0.5|        5.0|3.2480141843971633|1996-09-25 01:19:39|2018-09-26 05:14:08|
|   1580|   44287|        0.5|        5.0| 3.578533203874726|1997-07-04 16:24:16|2018-09-26 10:33:50|
|   3997|    2484|        0.5|        5.0| 2.072866344605475|2000-11-27 05:01:44|2018-09-22 06:50:23|
|   3175|   16713|        0.5|        5.0|3.5861305570513973|1999-12-26 07:12:01|2

In [6]:
(
    df_tags
    .groupBy("movieId")
    .agg(
        f.collect_set("tag")
    )
).show()

+-------+--------------------+
|movieId|    collect_set(tag)|
+-------+--------------------+
|    148|[nudity (topless)...|
|    471|[suicide, rags to...|
|    496|[Sundance award w...|
|    833|[Jon Lovitz, ZAZ,...|
|   1088|[music, ff, docto...|
|   1238|[CLV, Scotland, D...|
|   1342|[Grimy, weak paci...|
|   1580|[comic book, acti...|
|   1591|[comic book, CLV,...|
|   1645|[lawyer as protag...|
|   1829|[independent film...|
|   1959|[CLV, Malick Bowe...|
|   2122|[Eric's Dvds, sup...|
|   2142|[mice, comedy, we...|
|   2366|[Bruce Cabot, DVD...|
|   2659|[movie business, ...|
|   2866|[gary busey, Stev...|
|   3175|[star trek, DVD-V...|
|   3749|[John Malkovich, ...|
|   3794|[gay, independent...|
+-------+--------------------+
only showing top 20 rows



### f.collect_set is the sort of the inverse of f.explode we used in earlier Hands-on. f.collect_set is used for collecting distinct values in a array, there is also f.collect_list for collecting duplicates one too. Let's add some more interesting information

In [7]:
(
    df_tags
    .groupBy("movieId")
    .agg(
        f.collect_set("tag"),
        f.count("tag"),
        f.collect_set("userId"),
        f.count("userId"),
        f.min("timestamp"),
        f.max("timestamp"),
    )
).show()

+-------+--------------------+----------+--------------------+-------------+-------------------+-------------------+
|movieId|    collect_set(tag)|count(tag)| collect_set(userId)|count(userId)|     min(timestamp)|     max(timestamp)|
+-------+--------------------+----------+--------------------+-------------+-------------------+-------------------+
|    148|[nudity (topless)...|         8|[146340, 40716, 1...|            8|2006-09-20 12:16:17|2018-06-07 11:14:07|
|    471|[suicide, rags to...|       179|[198033, 258249, ...|          179|2006-01-13 20:47:10|2018-06-12 05:25:47|
|    496|[Sundance award w...|        16|[54594, 124534, 1...|           16|2006-12-08 08:14:49|2018-06-07 11:31:44|
|    833|[Jon Lovitz, ZAZ,...|        12|[197461, 195628, ...|           12|2007-12-31 03:52:15|2018-06-09 07:29:24|
|   1088|[music, ff, docto...|       259|[27594, 199122, 1...|          259|2006-01-13 22:06:05|2018-09-09 23:53:39|
|   1238|[CLV, Scotland, D...|        44|[268681, 126741, ...|  

### You can notice that the columns generated by default have ugly names. We can fix that with the .alias method

In [8]:
(
    df_tags
    .groupBy("movieId")
    .agg(
        f.collect_set("tag").alias("tags"),
        f.count("tag").alias("# of tags"),
        f.collect_set("userId").alias("users"),
        f.count("userId").alias("# of users"),
        f.min("timestamp").alias("first tagged on"),
        f.max("timestamp").alias("last tagged on"),
    )
).show()

+-------+--------------------+---------+--------------------+----------+-------------------+-------------------+
|movieId|                tags|# of tags|               users|# of users|    first tagged on|     last tagged on|
+-------+--------------------+---------+--------------------+----------+-------------------+-------------------+
|    148|[nudity (topless)...|        8|[146340, 40716, 1...|         8|2006-09-20 12:16:17|2018-06-07 11:14:07|
|    471|[suicide, rags to...|      179|[198033, 258249, ...|       179|2006-01-13 20:47:10|2018-06-12 05:25:47|
|    496|[Sundance award w...|       16|[54594, 124534, 1...|        16|2006-12-08 08:14:49|2018-06-07 11:31:44|
|    833|[Jon Lovitz, ZAZ,...|       12|[197461, 195628, ...|        12|2007-12-31 03:52:15|2018-06-09 07:29:24|
|   1088|[music, ff, docto...|      259|[27594, 199122, 1...|       259|2006-01-13 22:06:05|2018-09-09 23:53:39|
|   1238|[CLV, Scotland, D...|       44|[268681, 126741, ...|        44|2006-09-14 06:07:06|2018

### Let's sort this by ascending order of # of tags

In [9]:
(
    df_tags
    .groupBy("movieId")
    .agg(
        f.collect_set("tag").alias("tags"),
        f.count("tag").alias("# of tags"),
        f.collect_set("userId").alias("users"),
        f.count("userId").alias("# of users"),
        f.min("timestamp").alias("first tagged on"),
        f.max("timestamp").alias("last tagged on"),
    )
).sort("# of tags").show()

+-------+--------------------+---------+--------+----------+-------------------+-------------------+
|movieId|                tags|# of tags|   users|# of users|    first tagged on|     last tagged on|
+-------+--------------------+---------+--------+----------+-------------------+-------------------+
|  76121|  [independent film]|        1| [73406]|         1|2018-05-25 08:23:05|2018-05-25 08:23:05|
|    876|      [martial arts]|        1| [73406]|         1|2018-05-28 01:18:25|2018-05-28 01:18:25|
|   2835|      [Hugh Johnson]|        1|[195628]|         1|2012-01-27 12:40:14|2012-01-27 12:40:14|
|  26831|[not available fr...|        1|[103013]|         1|2009-07-27 23:18:52|2009-07-27 23:18:52|
| 105308| [lackluster sequel]|        1| [10797]|         1|2013-11-09 12:29:14|2013-11-09 12:29:14|
| 108435|     [serial killer]|        1| [73406]|         1|2018-05-19 05:44:24|2018-05-19 05:44:24|
| 108476|[Alekos Sakellarios]|        1|[195628]|         1|2014-01-29 16:04:04|2014-01-29 

### Let's sort by descending order of # of tags

In [10]:
# 1st way
(
    df_tags
    .groupBy("movieId")
    .agg(
        f.collect_set("tag").alias("tags"),
        f.count("tag").alias("# of tags"),
        f.collect_set("userId").alias("users"),
        f.count("userId").alias("# of users"),
        f.min("timestamp").alias("first tagged on"),
        f.max("timestamp").alias("last tagged on"),
    )
).sort(f.col("# of tags").desc()).show()

+-------+--------------------+---------+--------------------+----------+-------------------+-------------------+
|movieId|                tags|# of tags|               users|# of users|    first tagged on|     last tagged on|
+-------+--------------------+---------+--------------------+----------+-------------------+-------------------+
|    260|[Classic, Space e...|     9478|[271990, 118560, ...|      9478|2006-01-13 06:50:26|2018-09-19 18:41:56|
|    296|[ontology, Biblio...|     4963|[17546, 13488, 55...|      4963|2006-01-13 02:33:58|2018-09-23 22:19:46|
|  79132|[Intense, mystery...|     4670|[89073, 201766, 1...|      4670|2010-07-18 18:52:51|2018-09-23 22:25:45|
|   2571|[DVD-Video, Oscar...|     3915|[17546, 55578, 17...|      3915|2006-01-12 03:55:11|2018-09-23 22:24:54|
|   2959|[sabotage, dvd, H...|     3864|[55578, 179602, 1...|      3864|2006-01-14 07:20:45|2018-09-12 07:51:14|
|    318|[friendship, comp...|     3834|[27594, 201766, 1...|      3834|2006-01-13 03:03:27|2018

In [11]:
# 2nd way
(
    df_tags
    .groupBy("movieId")
    .agg(
        f.collect_set("tag").alias("tags"),
        f.count("tag").alias("# of tags"),
        f.collect_set("userId").alias("users"),
        f.count("userId").alias("# of users"),
        f.min("timestamp").alias("first tagged on"),
        f.max("timestamp").alias("last tagged on"),
    )
).sort("# of tags", ascending=False).show()

+-------+--------------------+---------+--------------------+----------+-------------------+-------------------+
|movieId|                tags|# of tags|               users|# of users|    first tagged on|     last tagged on|
+-------+--------------------+---------+--------------------+----------+-------------------+-------------------+
|    260|[Classic, Space e...|     9478|[271990, 118560, ...|      9478|2006-01-13 06:50:26|2018-09-19 18:41:56|
|    296|[ontology, Biblio...|     4963|[17546, 13488, 55...|      4963|2006-01-13 02:33:58|2018-09-23 22:19:46|
|  79132|[Intense, mystery...|     4670|[89073, 201766, 1...|      4670|2010-07-18 18:52:51|2018-09-23 22:25:45|
|   2571|[DVD-Video, Oscar...|     3915|[17546, 55578, 17...|      3915|2006-01-12 03:55:11|2018-09-23 22:24:54|
|   2959|[sabotage, dvd, H...|     3864|[55578, 179602, 1...|      3864|2006-01-14 07:20:45|2018-09-12 07:51:14|
|    318|[friendship, comp...|     3834|[27594, 201766, 1...|      3834|2006-01-13 03:03:27|2018

### I prefer the former one, as it's more intuitive

### As you can see we can do some really powerful things with .groupBy and .agg

In [12]:
spark.stop()