In [1]:
from pyspark.sql.functions import to_date, months_between, current_date, datediff, col
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, BooleanType

spark = SparkSession.builder.getOrCreate()

# Ignore all logs except errors when running this script on pyspark (i.e. spark-submit)
spark.sparkContext.setLogLevel('ERROR')

# Task 1
# Read json dataset into a DataFrame and apply the schema defined above.
df = spark.read.csv('hw_2_user_data.csv', header=True) 

# Rename columns. 
df = df.withColumnRenamed('_c0', 'Index')\
       .withColumn('UserID', df['UserID'].cast(IntegerType()))\
       .withColumn('SessionLength', df['SessionLength'].cast(IntegerType()))\
       .withColumn('WatchedMovie', df['WatchedMovie'].cast(BooleanType()))\
       .withColumn('LastLoginDate', to_date(df['LastLoginDate']))



# Display the updated schema 
df.printSchema()
# Display data
df.show()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/07 19:18:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/07 19:18:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


root
 |-- Index: string (nullable = true)
 |-- UserID: integer (nullable = true)
 |-- UserName: string (nullable = true)
 |-- WatchedMovie: boolean (nullable = true)
 |-- MovieGenre: string (nullable = true)
 |-- SessionLength: integer (nullable = true)
 |-- LastLoginDate: date (nullable = true)

+-----+------+------------------+------------+--------------------+-------------+-------------+
|Index|UserID|          UserName|WatchedMovie|          MovieGenre|SessionLength|LastLoginDate|
+-----+------+------------------+------------+--------------------+-------------+-------------+
|    0| 34557|    Leslie Shelton|       false|                NULL|          175|   2012-04-30|
|    1| 48013|    Hannah Sanders|       false|                NULL|         1409|   2020-05-30|
|    2| 13230|Christopher Torres|        true|     Adventure Drama|          181|   2009-11-25|
|    3| 18988|Christopher Stokes|        true|Animation Adventu...|          179|   2022-02-15|
|    4| 29844|          Joel C

In [2]:
# Task 2
# Calculate the total number of watched movies for each genre.
df.filter(df.WatchedMovie == True).groupBy('MovieGenre').count()\
       .orderBy('MovieGenre').show()

+--------------------+-----+
|          MovieGenre|count|
+--------------------+-----+
|    Action Adventure| 1557|
|Action Adventure ...| 1624|
|Action Adventure ...| 1577|
|Action Adventure ...| 1643|
|Action Adventure ...| 1583|
|Action Adventure ...| 1598|
|Action Adventure ...| 1603|
|Action Adventure ...| 1582|
|Action Adventure ...| 1553|
|Action Adventure ...| 1598|
|Action Adventure ...| 1628|
|Action Adventure ...| 1583|
|Action Biography ...| 1642|
|        Action Crime| 1565|
|Action Crime Dram...| 1580|
|Action Crime Fant...| 1506|
|Action Crime Thri...| 1563|
|Action Drama Thri...| 1553|
|    Action Drama War| 1600|
|Action Mystery Sc...| 1609|
+--------------------+-----+
only showing top 20 rows



In [3]:
# Task 3
# Identify the number of unique users who logged in during the last three months.
df.filter(months_between(current_date(), df.LastLoginDate) <= 3)\
       .select('UserName').distinct().count()

1878

In [4]:
# Task 4
# Determine the average session length for users who have watched more than two movies.

# find users who have watched 
df.filter(df.WatchedMovie == True)\
    .groupBy('UserID').agg({'WatchedMovie': 'count', 'SessionLength': 'mean'})\
    .where(col('count(WatchedMovie)') > 2).orderBy('UserID').show()


+------+-------------------+------------------+
|UserID|count(WatchedMovie)|avg(SessionLength)|
+------+-------------------+------------------+
|     2|                  3|             839.0|
|     6|                  4|            783.75|
|     9|                  3| 788.6666666666666|
|    13|                  4|             998.5|
|    17|                  3| 979.6666666666666|
|    20|                  4|             737.0|
|    21|                  3|             702.0|
|    22|                  5|             490.0|
|    26|                  3| 350.6666666666667|
|    31|                  5|             447.8|
|    32|                  3| 695.3333333333334|
|    35|                  3| 840.6666666666666|
|    36|                  4|            454.75|
|    37|                  3| 762.6666666666666|
|    38|                  4|            631.75|
|    40|                  4|             717.5|
|    41|                  3| 484.6666666666667|
|    44|                  3| 419.6666666

In [5]:
# Task 5
# Add a new column indicating the days since the last login for each user.

df = df.withColumn('DaysSinceLastLogin', datediff(current_date(), df.LastLoginDate)) 
# df.printSchema()
df.show()

+-----+------+------------------+------------+--------------------+-------------+-------------+------------------+
|Index|UserID|          UserName|WatchedMovie|          MovieGenre|SessionLength|LastLoginDate|DaysSinceLastLogin|
+-----+------+------------------+------------+--------------------+-------------+-------------+------------------+
|    0| 34557|    Leslie Shelton|       false|                NULL|          175|   2012-04-30|              4360|
|    1| 48013|    Hannah Sanders|       false|                NULL|         1409|   2020-05-30|              1408|
|    2| 13230|Christopher Torres|        true|     Adventure Drama|          181|   2009-11-25|              5247|
|    3| 18988|Christopher Stokes|        true|Animation Adventu...|          179|   2022-02-15|               782|
|    4| 29844|          Joel Cox|       false|                NULL|          227|   2012-06-09|              4320|
|    5| 12305|       Jerry Perez|       false|                NULL|          288