In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder\
                    .appName('movie')\
                    .master('local')\
                    .getOrCreate()

23/12/13 23:03:54 WARN Utils: Your hostname, debian resolves to a loopback address: 127.0.1.1; using 192.168.1.8 instead (on interface wlp0s20f3)
23/12/13 23:03:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/12/13 23:03:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
# define the schema for u.data
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("movie_id", IntegerType(), True),
    StructField("rating", IntegerType(), True),
    StructField("timestamp", LongType(), True),
    StructField("_corrupt_record", StringType(), True)
])

opt = {'header':False, 'sep':'\t', 'mode':'permissive', 'inferschema':False}
data_df = spark.read.format('csv')\
                    .options(**opt)\
                    .schema(schema)\
                    .load("/home/chaitanya/GIT/P-SPARK/case_study1/u.data")
data_df = data_df.withColumn('timestamp', from_unixtime(col('timestamp')).cast('timestamp'))
data_df.show(5)
data_df.printSchema()
data_df.columns

+-------+--------+------+-------------------+---------------+
|user_id|movie_id|rating|          timestamp|_corrupt_record|
+-------+--------+------+-------------------+---------------+
|    196|     242|     3|1997-12-04 21:25:49|           null|
|    186|     302|     3|1998-04-05 00:52:22|           null|
|     22|     377|     1|1997-11-07 12:48:36|           null|
|    244|      51|     2|1997-11-27 10:32:03|           null|
|    166|     346|     1|1998-02-02 11:03:16|           null|
+-------+--------+------+-------------------+---------------+
only showing top 5 rows

root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- _corrupt_record: string (nullable = true)



['user_id', 'movie_id', 'rating', 'timestamp', '_corrupt_record']

In [14]:
data_df.count()
data_df.show(4)

+-------+--------+------+-------------------+---------------+
|user_id|movie_id|rating|          timestamp|_corrupt_record|
+-------+--------+------+-------------------+---------------+
|    196|     242|     3|1997-12-04 21:25:49|           null|
|    186|     302|     3|1998-04-05 00:52:22|           null|
|     22|     377|     1|1997-11-07 12:48:36|           null|
|    244|      51|     2|1997-11-27 10:32:03|           null|
+-------+--------+------+-------------------+---------------+
only showing top 4 rows



In [9]:

#define the schema for u.user
user_schema_field = [StructField('UserId', IntegerType(), True),
                     StructField('Age', IntegerType(), True),
                     StructField('Gender',StringType(), True),
                     StructField('Occupation',StringType(),True),
                     StructField('ZipCode', IntegerType(), True),
                     StructField('_corrupt_record',StringType(), True)]

user_schema = StructType(user_schema_field)

u_opt= {'header':False, 'sep':'|', 'mode':'permissive'}
user_df = spark.read.format('csv')\
                    .options(**u_opt)\
                    .schema(user_schema)\
                    .load('/home/chaitanya/GIT/P-SPARK/case_study1/u.user')

user_df.show(5)                 
user_df.printSchema()

+------+---+------+----------+-------+---------------+
|UserId|Age|Gender|Occupation|ZipCode|_corrupt_record|
+------+---+------+----------+-------+---------------+
|     1| 24|     M|technician|  85711|           null|
|     2| 53|     F|     other|  94043|           null|
|     3| 23|     M|    writer|  32067|           null|
|     4| 24|     M|technician|  43537|           null|
|     5| 33|     F|     other|  15213|           null|
+------+---+------+----------+-------+---------------+
only showing top 5 rows

root
 |-- UserId: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- ZipCode: integer (nullable = true)
 |-- _corrupt_record: string (nullable = true)



In [10]:
user_df.count()

943

In [16]:
# 1. Give gender-wise breakup of the users 
gender_df = user_df.groupBy(col('Gender')).count()
gender_df.show()

+------+-----+
|Gender|count|
+------+-----+
|     F|  273|
|     M|  670|
+------+-----+



In [49]:
# 2 Give the top 5 movies which are reviewed maximum number of times 
from pyspark.sql.window import Window


data_groupby_df = data_df.groupBy(col('movie_id')).count().withColumnRenamed('count', 'movie_rated_count')
# data_groupby_df.show()
window_movie = Window.orderBy(col('movie_rated_count').desc())
review_df = data_groupby_df.withColumn('ranking', dense_rank().over(window_movie))
review_df.show(5)

23/12/13 23:48:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+--------+-----------------+-------+
|movie_id|movie_rated_count|ranking|
+--------+-----------------+-------+
|      50|              583|      1|
|     258|              509|      2|
|     100|              508|      3|
|     181|              507|      4|
|     294|              485|      5|
+--------+-----------------+-------+
only showing top 5 rows



In [66]:
# 3. Give the top 5 users who reviewed maximum number of movies 
from pyspark.sql.window import Window

# data_df.show(5)
user_review_df = data_df.groupBy(col('user_id')).count().withColumnRenamed('count', 'count_user')
# user_review_df.show()
window_user = Window.orderBy(col('count_user').desc())
user_review_count_df = user_review_df.withColumn('user_rating_rank', dense_rank().over(window_user))
user_review_count_df.show(5)

23/12/14 00:14:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------+----------+----------------+
|user_id|count_user|user_rating_rank|
+-------+----------+----------------+
|    405|       737|               1|
|    655|       685|               2|
|     13|       636|               3|
|    450|       540|               4|
|    276|       518|               5|
+-------+----------+----------------+
only showing top 5 rows



In [71]:
# 4. List the top 10 movies which received highest number of 5 star ratings 
from pyspark.sql.window import Window
window_star = Window.orderBy(col('count').desc())
movie_star_df = data_df.filter(col('rating') == 5)\
    .groupBy(col('movie_id')).count()\
    .withColumn('5_star_count', dense_rank().over(window_star))\
    .filter(col('5_star_count') <= 10)
movie_star_df.show()


23/12/14 00:22:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+--------+-----+------------+
|movie_id|count|5_star_count|
+--------+-----+------------+
|      50|  325|           1|
|     100|  227|           2|
|     127|  214|           3|
|     174|  202|           4|
|      56|  188|           5|
|     318|  186|           6|
|      98|  181|           7|
|     313|  179|           8|
|     172|  172|           9|
|     181|  171|          10|
+--------+-----+------------+



In [79]:
# 5. List the top 10 users who gave highest number of 5 star ratings
# data_df.show()
from pyspark.sql.window import Window

user_star_window_specs = Window.orderBy(col('count').desc())
user_count_star_df = data_df.filter(col('rating') == 5)\
                            .groupBy(col('user_id'))\
                            .count().orderBy(col('count').desc())\
                            .withColumn('user_gave_5_start_rank', dense_rank().over(user_star_window_specs))\
                            .filter(col('user_gave_5_start_rank') <= 10)
user_count_star_df.show()

23/12/14 00:29:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

+-------+-----+----------------------+
|user_id|count|user_gave_5_start_rank|
+-------+-----+----------------------+
|    416|  172|                     1|
|      7|  161|                     2|
|     90|  147|                     3|
|    592|  145|                     4|
|    747|  142|                     5|
|    312|  139|                     6|
|    551|  137|                     7|
|     13|  136|                     8|
|     59|  134|                     9|
|    472|  126|                    10|
|    450|  126|                    10|
|    130|  126|                    10|
+-------+-----+----------------------+



                                                                                

In [81]:
user_df.show(5)

+------+---+------+----------+-------+---------------+
|UserId|Age|Gender|Occupation|ZipCode|_corrupt_record|
+------+---+------+----------+-------+---------------+
|     1| 24|     M|technician|  85711|           null|
|     2| 53|     F|     other|  94043|           null|
|     3| 23|     M|    writer|  32067|           null|
|     4| 24|     M|technician|  43537|           null|
|     5| 33|     F|     other|  15213|           null|
+------+---+------+----------+-------+---------------+
only showing top 5 rows



datetime.datetime(2023, 12, 14, 16, 39, 38, 789490)

2023-12-14 16:40:23.970065
