In [0]:
import pyspark.sql.functions as F
df = spark.read.options(header = 'True', inferSchema='True', delimiter = ',').csv('dbfs:/FileStore/movies.csv')

In [0]:
df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [0]:
plusone_udf = F.udf(lambda v: v + 1)
df.withColumn("movieId", plusone_udf(F.col("movieId"))).show(5, False)


+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|2      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|3      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|4      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|5      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|6      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows



In [0]:
upper_udf = F.udf(lambda v: v.upper())
df.withColumn("genres", upper_udf(F.col("genres"))).show(5, False)


+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |ADVENTURE|ANIMATION|CHILDREN|COMEDY|FANTASY|
|2      |Jumanji (1995)                    |ADVENTURE|CHILDREN|FANTASY                 |
|3      |Grumpier Old Men (1995)           |COMEDY|ROMANCE                             |
|4      |Waiting to Exhale (1995)          |COMEDY|DRAMA|ROMANCE                       |
|5      |Father of the Bride Part II (1995)|COMEDY                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows



# Groupby and Aggregation

In [0]:
df.show(10)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
+-------+--------------------+--------------------+
only showing top 10 rows



In [0]:
import pyspark.sql.functions as F
df = df.select(df.movieId, df.title, F.explode(F.split(F.col('genres'), '[|]', -1)).alias('genres'))
# insert one genre per row but same title
df.show(10, False)

+-------+-----------------------+---------+
|movieId|title                  |genres   |
+-------+-----------------------+---------+
|1      |Toy Story (1995)       |Adventure|
|1      |Toy Story (1995)       |Animation|
|1      |Toy Story (1995)       |Children |
|1      |Toy Story (1995)       |Comedy   |
|1      |Toy Story (1995)       |Fantasy  |
|2      |Jumanji (1995)         |Adventure|
|2      |Jumanji (1995)         |Children |
|2      |Jumanji (1995)         |Fantasy  |
|3      |Grumpier Old Men (1995)|Comedy   |
|3      |Grumpier Old Men (1995)|Romance  |
+-------+-----------------------+---------+
only showing top 10 rows



In [0]:
df.groupBy('genres').agg(F.sum(F.col('movieId'))).show(3, False)

+--------+------------+
|genres  |sum(movieId)|
+--------+------------+
|Crime   |46970315    |
|Romance |49276621    |
|Thriller|79085328    |
+--------+------------+
only showing top 3 rows



In [0]:
df.groupBy('genres').agg(F.avg(F.col('movieId')).alias('avg')).show(3, False)

+--------+------------------+
|genres  |avg               |
+--------+------------------+
|Crime   |39174.57464553795 |
|Romance |30875.07581453634 |
|Thriller|41755.717001055964|
+--------+------------------+
only showing top 3 rows



In [0]:
df.groupBy('genres').count().sort(F.desc(F.col('count'))).show(5, False)

+--------+-----+
|genres  |count|
+--------+-----+
|Drama   |4361 |
|Comedy  |3756 |
|Thriller|1894 |
|Action  |1828 |
|Romance |1596 |
+--------+-----+
only showing top 5 rows



In [0]:
df.groupBy('genres').agg(F.countDistinct('title').alias('titleCount')).show(3, False)

+--------+----------+
|genres  |titleCount|
+--------+----------+
|Crime   |1198      |
|Romance |1595      |
|Thriller|1892      |
+--------+----------+
only showing top 3 rows



In [0]:
df.groupBy('genres').agg(F.max(F.col('movieId'))).show(3,False)

+--------+------------+
|genres  |max(movieId)|
+--------+------------+
|Crime   |189713      |
|Romance |190207      |
|Thriller|190183      |
+--------+------------+
only showing top 3 rows



# Join

In [0]:
rate = spark.read.options(header='True', inferSchema='True' , delimiter = ',').csv('dbfs:/FileStore/ratings.csv')

In [0]:
rate.printSchema(
)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [0]:
rate.join(df, ['movieId'], 'inner').show(10, False)

+-------+------+------+---------+-----------------------+---------+
|movieId|userId|rating|timestamp|title                  |genres   |
+-------+------+------+---------+-----------------------+---------+
|1      |1     |4.0   |964982703|Toy Story (1995)       |Fantasy  |
|1      |1     |4.0   |964982703|Toy Story (1995)       |Comedy   |
|1      |1     |4.0   |964982703|Toy Story (1995)       |Children |
|1      |1     |4.0   |964982703|Toy Story (1995)       |Animation|
|1      |1     |4.0   |964982703|Toy Story (1995)       |Adventure|
|3      |1     |4.0   |964981247|Grumpier Old Men (1995)|Romance  |
|3      |1     |4.0   |964981247|Grumpier Old Men (1995)|Comedy   |
|6      |1     |4.0   |964982224|Heat (1995)            |Thriller |
|6      |1     |4.0   |964982224|Heat (1995)            |Crime    |
|6      |1     |4.0   |964982224|Heat (1995)            |Action   |
+-------+------+------+---------+-----------------------+---------+
only showing top 10 rows



In [0]:
rate.join(df, ['movieId'], 'left_outer').show(10, False)

+-------+------+------+---------+-----------------------+---------+
|movieId|userId|rating|timestamp|title                  |genres   |
+-------+------+------+---------+-----------------------+---------+
|1      |1     |4.0   |964982703|Toy Story (1995)       |Fantasy  |
|1      |1     |4.0   |964982703|Toy Story (1995)       |Comedy   |
|1      |1     |4.0   |964982703|Toy Story (1995)       |Children |
|1      |1     |4.0   |964982703|Toy Story (1995)       |Animation|
|1      |1     |4.0   |964982703|Toy Story (1995)       |Adventure|
|3      |1     |4.0   |964981247|Grumpier Old Men (1995)|Romance  |
|3      |1     |4.0   |964981247|Grumpier Old Men (1995)|Comedy   |
|6      |1     |4.0   |964982224|Heat (1995)            |Thriller |
|6      |1     |4.0   |964982224|Heat (1995)            |Crime    |
|6      |1     |4.0   |964982224|Heat (1995)            |Action   |
+-------+------+------+---------+-----------------------+---------+
only showing top 10 rows



In [0]:
rate.crossJoin(df).show(10, False)

+------+-------+------+---------+-------+-----------------------+---------+
|userId|movieId|rating|timestamp|movieId|title                  |genres   |
+------+-------+------+---------+-------+-----------------------+---------+
|1     |1      |4.0   |964982703|1      |Toy Story (1995)       |Adventure|
|1     |1      |4.0   |964982703|1      |Toy Story (1995)       |Animation|
|1     |1      |4.0   |964982703|1      |Toy Story (1995)       |Children |
|1     |1      |4.0   |964982703|1      |Toy Story (1995)       |Comedy   |
|1     |1      |4.0   |964982703|1      |Toy Story (1995)       |Fantasy  |
|1     |1      |4.0   |964982703|2      |Jumanji (1995)         |Adventure|
|1     |1      |4.0   |964982703|2      |Jumanji (1995)         |Children |
|1     |1      |4.0   |964982703|2      |Jumanji (1995)         |Fantasy  |
|1     |1      |4.0   |964982703|3      |Grumpier Old Men (1995)|Comedy   |
|1     |1      |4.0   |964982703|3      |Grumpier Old Men (1995)|Romance  |
+------+----

# Window

In [0]:
from pyspark.sql.window import Window
# print the highest movieId per genres
w = Window.partitionBy('genres') \
.orderBy(F.desc(F.col('movieId'))) # creates windows per genres, and sort by movieId in each window
df.withColumn('row', F.row_number().over(w)).filter(F.col('row') == 1).drop('row') \
.show(10,False) # numbering each row in each window

+-------+----------------------------------------+------------------+
|movieId|title                                   |genres            |
+-------+----------------------------------------+------------------+
|182727 |A Christmas Story Live! (2017)          |(no genres listed)|
|193587 |Bungo Stray Dogs: Dead Apple (2018)     |Action            |
|191005 |Gintama (2017)                          |Adventure         |
|193587 |Bungo Stray Dogs: Dead Apple (2018)     |Animation         |
|187595 |Solo: A Star Wars Story (2018)          |Children          |
|193609 |Andrew Dice Clay: Dice Rules (1991)     |Comedy            |
|189713 |BlacKkKlansman (2018)                   |Crime             |
|193579 |Jon Stewart Has Left the Building (2015)|Documentary       |
|193585 |Flint (2017)                            |Drama             |
|193583 |No Game No Life: Zero (2017)            |Fantasy           |
+-------+----------------------------------------+------------------+
only showing top 10 