In [6]:
# reading the files in RDD
credits = sc.textFile('data/credits.csv')
keywords = sc.textFile('data/keywords.csv')
links = sc.textFile('data/links.csv')
links_small = sc.textFile('data/links_small.csv')
ratings = sc.textFile('data/ratings.csv')
ratings_small = sc.textFile('data/ratings_small.csv')
movies_metadata = sc.textFile('data/movies_metadata.csv')

In [7]:
# get size of each RDD
cred_count = credits.count()
key_count = keywords.count()
link_count = links.count()
rating_count = ratings.count()
movies_metadata_count = movies_metadata.count()

# print the size of each RDD
print('credits: ', cred_count)
print('keywords: ', key_count)
print('links: ', link_count)
print('ratings: ', rating_count)
print('movies_metadata: ', movies_metadata_count)

                                                                                

credits:  45477
keywords:  46420
links:  45844
ratings:  26024290
movies_metadata:  45573


In [8]:
# reduce the duplicate data
cred_distinct = credits.distinct()
key_distinct = keywords.distinct()
link_distinct = links.distinct()
rating_distinct = ratings.distinct()
movies_metadata_distinct = movies_metadata.distinct()

print(f'There are {cred_count - cred_distinct.count()} duplicate data in credits: ')
print(f'There are {key_count - key_distinct.count()} duplicate data in keywords: ')
print(f'There are {link_count - link_distinct.count()} duplicate data in links: ')
print(f'There are {rating_count - rating_distinct.count()} duplicate data in ratings: ')
print(f'There are {movies_metadata_count - movies_metadata_distinct.count()} duplicate data in movies_metadata: ')

                                                                                

There are 37 duplicate data in credits: 
There are 987 duplicate data in keywords: 
There are 0 duplicate data in links: 


                                                                                

There are 0 duplicate data in ratings: 



[Stage 15:>                                                         (0 + 2) / 2]

There are 17 duplicate data in movies_metadata: 



                                                                                

In [120]:
from pyspark.sql.functions import from_json, col, explode,\
   get_json_object, regexp_replace, array
from pyspark.sql.types import StructType, StructField,\
    StringType, IntegerType, ArrayType, MapType, DoubleType
cred_df = spark.read.csv('movie/credits.csv', header=True, escape='"')
# movi_df = spark.read.csv('movie/movies_metadata.csv', header=True, escape='"')
json_schema = ArrayType(StructType([
    StructField("cast_id", IntegerType(), True),
    StructField("character", StringType(), True),
    StructField("credit_id", StringType(), True),
    StructField("gender", IntegerType(), True),
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("order", IntegerType(), True),
    StructField("profile_path", StringType(), True)
]))

cred_df = cred_df.withColumn('cast',\
                             regexp_replace("cast", 'None', "'None'"))
cred_df = cred_df.withColumn("cast_array", from_json(col("cast"),\
                                                     json_schema))
cred_df = cred_df.withColumn("cast_array",\
                             array(col('cast_array')[0],\
                                   col('cast_array')[1],\
                                   col('cast_array')[2]))
cast_df = cred_df.select(col("id").alias("movieId"), explode(col("cast_array"))\
                         .alias("cast_item"))
cast_name_df = cast_df.withColumn("castId", col("cast_item")["id"])\
                    .drop('cast_item')
# cast_name_df.show()


+-------+------+
|movieId|castId|
+-------+------+
|    862|    31|
|    862| 12898|
|    862|  7167|
|   8844|  2157|
|   8844|  8537|
|   8844|   205|
|  15602|  6837|
|  15602|  3151|
|  15602| 13567|
|  31357|  8851|
|  31357|  9780|
|  31357| 18284|
|  11862| 67773|
|  11862|  3092|
|  11862|   519|
|    949|  1158|
|    949|   380|
|    949|  5576|
|  11860|     3|
|  11860| 15887|
+-------+------+
only showing top 20 rows



In [126]:
ratings_df = spark.read.csv('movie/ratings.csv', header=True, escape='"')
ratings_df = ratings_df.withColumn('rating', ratings_df['rating']\
                                   .cast(DoubleType()))
ratings_df = ratings_df.groupBy("movieId")\
                        .avg("rating")\
                        .withColumnRenamed("avg(rating)", "rating")

In [128]:
cast_name_with_rating = cast_name_df\
                        .join(ratings_df,\
                        cast_name_df.movieId == ratings_df.movieId,"inner")\
                        .groupBy("castId")\
                        .avg("rating")\
                        .withColumnRenamed("avg(rating)", "rating")
cast_name_with_rating.show()



+-------+------------------+
| castId|            rating|
+-------+------------------+
|1362204|             3.375|
|  18979|3.7976480836236934|
|   1959| 3.250567650909128|
|  69637|3.7530362663626198|
| 107536|2.9383561643835616|
|  11317| 3.397006755182856|
|1468011| 3.617865085248332|
|  54190|             3.175|
|   4935|3.1809296622875527|
|  77234| 2.857900845501748|
|  96044|3.5194805194805197|
| 137501| 3.392857142857143|
|  16861| 2.758401760038804|
|1539681| 3.790260404374164|
|  42373|  2.97196261682243|
|  33722|2.4183381088825215|
|   1088|3.0361702127659576|
|  49855|3.0555555555555554|
|   4818|3.1184075420129953|
|  12799| 2.084072104018913|
+-------+------------------+
only showing top 20 rows



                                                                                

In [130]:
cast_name_with_rating.count()

                                                                                

12335