In [36]:
from pyspark.sql import SparkSession, Column, Window, types
import pyspark.sql.functions as fn
import re
    
session = SparkSession.builder.master("spark://gateway:7077").getOrCreate()

In [40]:
dataset = session.read.option("escape", '"').option("header",True).option("multiLine",True).option("quote",'"').csv("hdfs://node1:9000/tmp/tmdb_5000_movies.csv")
dataset.printSchema()

root
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nullable = true)



In [41]:
dataset.filter(dataset['vote_count'].cast('int').isNull()).collect()

[]

## First Req. : most popular film in each original language

In [51]:
dataset = dataset.withColumn('popularity', dataset['popularity'].cast('int'))
win = Window.partitionBy('original_language').orderBy('popularity')
req1 = dataset.withColumn('ranking', fn.rank().over(win)).where(fn.col('ranking') == 1).select(['original_language','title'])
req1.show()

+-----------------+--------------------+
|original_language|               title|
+-----------------+--------------------+
|               af|              Tsotsi|
|               ar|          The Square|
|               ar|             Caramel|
|               cn|             Z Storm|
|               cs|     Dancin' It's On|
|               da|               Ordet|
|               de|     The Tooth Fairy|
|               de|         Fascination|
|               de|          The Timber|
|               de|My Last Day Witho...|
|               de|         Food Chains|
|               el|            Dogtooth|
|               en|             Déjà Vu|
|               en|              Inchon|
|               en|  Warriors of Virtue|
|               en| Black Water Transit|
|               en|     The Magic Flute|
|               en|The Bridge of San...|
|               en|Megiddo: The Omeg...|
|               en|        Darling Lili|
+-----------------+--------------------+
only showing top

In [53]:
req1.write.csv('hdfs://node1:9000//tmp/popular_films.csv',mode='overwrite')

## Second Req. : a new file on HDFS called Genres_Agggregations.csv with the id, name and number of movies for each genre

In [108]:
dataset.withColumn('genres',fn.btrim(dataset['genres'],fn.lit('[]'))).head(5)

[Row(budget='237000000', genres='{"id": 28, "name": "Action"}, {"id": 12, "name": "Adventure"}, {"id": 14, "name": "Fantasy"}, {"id": 878, "name": "Science Fiction"}', homepage='http://www.avatarmovie.com/', id='19995', keywords='[{"id": 1463, "name": "culture clash"}, {"id": 2964, "name": "future"}, {"id": 3386, "name": "space war"}, {"id": 3388, "name": "space colony"}, {"id": 3679, "name": "society"}, {"id": 3801, "name": "space travel"}, {"id": 9685, "name": "futuristic"}, {"id": 9840, "name": "romance"}, {"id": 9882, "name": "space"}, {"id": 9951, "name": "alien"}, {"id": 10148, "name": "tribe"}, {"id": 10158, "name": "alien planet"}, {"id": 10987, "name": "cgi"}, {"id": 11399, "name": "marine"}, {"id": 13065, "name": "soldier"}, {"id": 14643, "name": "battle"}, {"id": 14720, "name": "love affair"}, {"id": 165431, "name": "anti war"}, {"id": 193554, "name": "power relations"}, {"id": 206690, "name": "mind and soul"}, {"id": 209714, "name": "3d"}]', original_language='en', original

In [114]:
req2 = dataset.withColumn('genres',fn.split(fn.btrim(dataset['genres'],fn.lit('[]')),'(?<=\})(, *)(?=\{)')).withColumn('genres',fn.explode('genres'))

In [151]:
req2_1 = req2.groupBy(req2['genres']).count().withColumn('genres',fn.from_json(req2['genres'],'MAP<STRING,STRING>'))

In [160]:
req2_2 = req2_1.select(req2_1['genres']['id'],req2_1['genres']['name'],req2_1['count']).show()

+----------+---------------+-----+
|genres[id]|   genres[name]|count|
+----------+---------------+-----+
|       878|Science Fiction|  535|
|        12|      Adventure|  790|
|        28|         Action| 1154|
|        14|        Fantasy|  424|
|     10752|            War|  144|
|     10402|          Music|  185|
|        35|         Comedy| 1722|
|     10749|        Romance|  894|
|     10769|        Foreign|   34|
|        37|        Western|   82|
|        36|        History|  197|
|        99|    Documentary|  110|
|        18|          Drama| 2297|
|     10751|         Family|  513|
|        27|         Horror|  519|
|        80|          Crime|  696|
|        16|      Animation|  234|
|      NULL|           NULL|   28|
|        53|       Thriller| 1274|
|      9648|        Mystery|  348|
+----------+---------------+-----+
only showing top 20 rows



In [161]:
req2_2.write.csv('hdfs://node1:9000//tmp/Genres_Aggregations.csv',mode='overwrite')