PySpark installation

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 43 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 47.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=ec111458eb72d6056069459a0486bf4b08a277250191e5f16fde32318631f9b9
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


Importing libraries

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession, Window
import pyspark.sql.types as t
import pyspark.sql.functions as f
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Creating SparkSession

In [None]:
spark_session = (SparkSession.builder
                             .master("local")
                             .appName("task app")                          
                             .config(conf=SparkConf())
                             .getOrCreate())
spark_session.conf.set('mapreduce.fileoutputcommitter.marksuccessfuljobs', 'false')

Creating schemas and dataframes for all 7 datasets

In [None]:
path = '/content/drive/MyDrive/imdb_project/name.basics.tsv'
name_basics_schema = t.StructType (
    [
     t.StructField('ncost', t.StringType(), True),
     t.StructField('primaryName', t.StringType(), True),
     t.StructField('birthYear', t.IntegerType(), False),
     t.StructField('deathYear', t.IntegerType(), True),
     t.StructField('primaryProfession', t.StringType(), True),
     t.StructField('knownForTitles', t.StringType(), True)
     ]
)

name_basics_df = spark_session.read.csv(path, sep=r'\t', header=True, nullValue='null', schema=name_basics_schema)
name_basics_df.show()

+---------+-------------------+---------+---------+--------------------+--------------------+
|    ncost|        primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+-------------------+---------+---------+--------------------+--------------------+
|nm0000001|       Fred Astaire|     1899|     1987|soundtrack,actor,...|tt0072308,tt00504...|
|nm0000002|      Lauren Bacall|     1924|     2014|  actress,soundtrack|tt0037382,tt01170...|
|nm0000003|    Brigitte Bardot|     1934|     null|actress,soundtrac...|tt0057345,tt00564...|
|nm0000004|       John Belushi|     1949|     1982|actor,soundtrack,...|tt0077975,tt00725...|
|nm0000005|     Ingmar Bergman|     1918|     2007|writer,director,a...|tt0060827,tt00839...|
|nm0000006|     Ingrid Bergman|     1915|     1982|actress,soundtrac...|tt0034583,tt00381...|
|nm0000007|    Humphrey Bogart|     1899|     1957|actor,soundtrack,...|tt0042593,tt00432...|
|nm0000008|      Marlon Brando|     1924|     2004|actor,sou

In [None]:
name_basics_df.printSchema()

root
 |-- ncost: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: integer (nullable = true)
 |-- deathYear: integer (nullable = true)
 |-- primaryProfession: string (nullable = true)
 |-- knownForTitles: string (nullable = true)



In [None]:
path = '/content/drive/MyDrive/imdb_project/title.akas.tsv'
title_akas_schema = t.StructType (
    [
     t.StructField('titleId', t.StringType(), True),
     t.StructField('ordering', t.IntegerType(), True),
     t.StructField('title', t.StringType(), True),
     t.StructField('region', t.StringType(), True),
     t.StructField('language', t.StringType(), True),
     t.StructField('types', t.StringType(), True),
     t.StructField('attributes', t.StringType(), True),
     t.StructField('isOriginalTitle', t.IntegerType(), True)
     ]
)

title_akas_df = spark_session.read.csv(path, sep=r'\t', header=True, nullValue='\\N' ,schema=title_akas_schema)
title_akas_df.drop('language', 'attributes', 'types').show()

+---------+--------+--------------------+------+---------------+
|  titleId|ordering|               title|region|isOriginalTitle|
+---------+--------+--------------------+------+---------------+
|tt0000001|       1|          Карменсіта|    UA|              0|
|tt0000001|       2|          Carmencita|    DE|              0|
|tt0000001|       3|Carmencita - span...|    HU|              0|
|tt0000001|       4|          Καρμενσίτα|    GR|              0|
|tt0000001|       5|          Карменсита|    RU|              0|
|tt0000001|       6|          Carmencita|    US|              0|
|tt0000001|       7|          Carmencita|  null|              1|
|tt0000001|       8|      カルメンチータ|    JP|              0|
|tt0000002|       1|Le clown et ses c...|  null|              1|
|tt0000002|       2|Le clown et ses c...|    FR|              0|
|tt0000002|       3|   A bohóc és kutyái|    HU|              0|
|tt0000002|       4|Der Clown und sei...|    DE|              0|
|tt0000002|       5|Clovnul si c

In [None]:
title_akas_df.printSchema()

root
 |-- titleId: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- region: string (nullable = true)
 |-- language: string (nullable = true)
 |-- types: string (nullable = true)
 |-- attributes: string (nullable = true)
 |-- isOriginalTitle: integer (nullable = true)



In [None]:
path = '/content/drive/MyDrive/imdb_project/title.basics.tsv'
title_basics_schema = t.StructType (
    [
     t.StructField('tconst', t.StringType(), True),
     t.StructField('titleType', t.StringType(), True),
     t.StructField('primaryTitle', t.StringType(), True),
     t.StructField('originalTitle', t.StringType(), True),
     t.StructField('isAdult', t.IntegerType(), True),
     t.StructField('startYear', t.IntegerType(), True),
     t.StructField('endYear', t.IntegerType(), True),
     t.StructField('runtimeMinutes', t.IntegerType(), True),
     t.StructField('genres', t.StringType(), True)
     ]
)

title_basics_df = spark_session.read.csv(path, sep=r'\t', header=True, nullValue='null', schema=title_basics_schema)
title_basics_df.show()

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|   null|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|   null|             5|     Animation,Short|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|     1892|   null|             4|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|   null|            12|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|   null|             1|        Comedy

In [None]:
title_basics_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: integer (nullable = true)
 |-- startYear: integer (nullable = true)
 |-- endYear: integer (nullable = true)
 |-- runtimeMinutes: integer (nullable = true)
 |-- genres: string (nullable = true)



In [None]:
path = '/content/drive/MyDrive/imdb_project/title.crew.tsv'
title_crew_df = spark_session.read.csv(path, sep=r'\t', header=True, nullValue='\\N')
title_crew_df.show()

+---------+-------------------+---------+
|   tconst|          directors|  writers|
+---------+-------------------+---------+
|tt0000001|          nm0005690|     null|
|tt0000002|          nm0721526|     null|
|tt0000003|          nm0721526|     null|
|tt0000004|          nm0721526|     null|
|tt0000005|          nm0005690|     null|
|tt0000006|          nm0005690|     null|
|tt0000007|nm0374658,nm0005690|     null|
|tt0000008|          nm0005690|     null|
|tt0000009|          nm0085156|nm0085156|
|tt0000010|          nm0525910|     null|
|tt0000011|          nm0804434|     null|
|tt0000012|nm0525910,nm0525908|     null|
|tt0000013|          nm0525910|     null|
|tt0000014|          nm0525910|     null|
|tt0000015|          nm0721526|     null|
|tt0000016|          nm0525910|     null|
|tt0000017|nm0804434,nm1587194|     null|
|tt0000018|          nm0804434|     null|
|tt0000019|          nm0932055|     null|
|tt0000020|          nm0010291|     null|
+---------+-------------------+---

In [None]:
title_crew_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- directors: string (nullable = true)
 |-- writers: string (nullable = true)



In [None]:
path = '/content/drive/MyDrive/imdb_project/title.episode.tsv'
title_episode_schema = t.StructType (
    [
     t.StructField('tconst', t.StringType(), True),
     t.StructField('parentTconst', t.StringType(), True),
     t.StructField('seasonNumber', t.IntegerType(), True),
     t.StructField('episodeNumber', t.IntegerType(), True)
     ]
)

title_episode_df = spark_session.read.csv(path, sep=r'\t', header=True, nullValue='null', schema=title_episode_schema)
title_episode_df.show()

+---------+------------+------------+-------------+
|   tconst|parentTconst|seasonNumber|episodeNumber|
+---------+------------+------------+-------------+
|tt0020666|  tt15180956|           1|            2|
|tt0020829|  tt15180956|           1|            1|
|tt0021166|  tt15180956|           1|            3|
|tt0021612|  tt15180956|           2|            2|
|tt0021655|  tt15180956|           2|            5|
|tt0021663|  tt15180956|           2|            6|
|tt0021664|  tt15180956|           2|            4|
|tt0021701|  tt15180956|           2|            1|
|tt0021802|  tt15180956|           2|           11|
|tt0022009|  tt15180956|           2|           10|
|tt0022031|  tt15180956|           2|            8|
|tt0022127|  tt15180956|           2|            9|
|tt0022152|  tt15180956|           2|            7|
|tt0022385|  tt15180956|           2|            3|
|tt0022604|  tt15180956|           3|            8|
|tt0022610|  tt15180956|           3|           10|
|tt0022631| 

In [None]:
title_episode_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- parentTconst: string (nullable = true)
 |-- seasonNumber: integer (nullable = true)
 |-- episodeNumber: integer (nullable = true)



In [None]:
path = '/content/drive/MyDrive/imdb_project/title.principals.tsv'
title_principals_schema = t.StructType (
    [
     t.StructField('tconst', t.StringType(), True),
     t.StructField('ordering', t.IntegerType(), True),
     t.StructField('nconst', t.StringType(), True),
     t.StructField('category', t.StringType(), True),
     t.StructField('job', t.StringType(), True),
     t.StructField('characters', t.StringType(), True)
     ]
)

title_principals_df = spark_session.read.csv(path, sep=r'\t', header=True, nullValue='\\N', schema=title_principals_schema)
title_principals_df.show()

+---------+--------+---------+---------------+--------------------+--------------+
|   tconst|ordering|   nconst|       category|                 job|    characters|
+---------+--------+---------+---------------+--------------------+--------------+
|tt0000001|       1|nm1588970|           self|                null|      ["Self"]|
|tt0000001|       2|nm0005690|       director|                null|          null|
|tt0000001|       3|nm0374658|cinematographer|director of photo...|          null|
|tt0000002|       1|nm0721526|       director|                null|          null|
|tt0000002|       2|nm1335271|       composer|                null|          null|
|tt0000003|       1|nm0721526|       director|                null|          null|
|tt0000003|       2|nm1770680|       producer|            producer|          null|
|tt0000003|       3|nm1335271|       composer|                null|          null|
|tt0000003|       4|nm5442200|         editor|                null|          null|
|tt0

In [None]:
title_principals_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- nconst: string (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)
 |-- characters: string (nullable = true)



In [None]:
path = '/content/drive/MyDrive/imdb_project/title.ratings.tsv'
title_ratings_schema = t.StructType (
    [
     t.StructField('tconst', t.StringType(), True),
     t.StructField('averageRating', t.FloatType(), True),
     t.StructField('numVotes', t.IntegerType(), True)
     ]
)

title_ratings_df = spark_session.read.csv(path, sep=r'\t', header=True, nullValue='null', schema=title_ratings_schema)
title_ratings_df.show()

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1891|
|tt0000002|          5.9|     253|
|tt0000003|          6.5|    1684|
|tt0000004|          5.7|     166|
|tt0000005|          6.2|    2502|
|tt0000006|          5.1|     170|
|tt0000007|          5.4|     782|
|tt0000008|          5.4|    2030|
|tt0000009|          5.3|     197|
|tt0000010|          6.9|    6845|
|tt0000011|          5.3|     351|
|tt0000012|          7.4|   11741|
|tt0000013|          5.7|    1814|
|tt0000014|          7.1|    5265|
|tt0000015|          6.2|    1010|
|tt0000016|          5.9|    1419|
|tt0000017|          4.6|     310|
|tt0000018|          5.3|     569|
|tt0000019|          5.2|      31|
|tt0000020|          4.8|     337|
+---------+-------------+--------+
only showing top 20 rows



In [None]:
title_ratings_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- averageRating: float (nullable = true)
 |-- numVotes: integer (nullable = true)



Get all titles of series/movies etc. that are available in Ukrainian

In [None]:
titles_available_in_Ukraine_df = title_akas_df.select('title', 'region').filter(f.col('region') == 'UA')
titles_available_in_Ukraine_df.show(truncate = False)

+-----------------------------------------+------+
|title                                    |region|
+-----------------------------------------+------+
|Карменсіта                               |UA    |
|Бідний П'єро                             |UA    |
|Ковальська сцена                         |UA    |
|Чхання Фреда Отта                        |UA    |
|Вихід робітників із фабрики              |UA    |
|Прибуття потяга на вокзал Ла-Сьота       |UA    |
|Прибуття делегатів на фотоконгрес у Ліоні|UA    |
|Политий поливальник                      |UA    |
|Навколо кабінки                          |UA    |
|Ковалі                                   |UA    |
|Морське купання                          |UA    |
|Партія в карти                           |UA    |
|Площа Кордельє в Ліоні                   |UA    |
|Виловлювання червоних рибок              |UA    |
|Сніданок немовляти                       |UA    |
|Стрибок через брезент                    |UA    |
|Вольтижування                 

In [None]:
path_to_save = '/content/drive/MyDrive/imdb_project/imdb_results/titles_available_in_Ukraine'
titles_available_in_Ukraine_df.coalesce(1).write.csv(path_to_save, header=True)

2.	Get the list of people’s names, who were born in the 19th century

In [None]:
born_in_19th_century_df = name_basics_df.select('primaryName', 'birthYear').filter(f.col('birthYear') < 1901)
born_in_19th_century_df.show()

+------------------+---------+
|       primaryName|birthYear|
+------------------+---------+
|      Fred Astaire|     1899|
|   Humphrey Bogart|     1899|
|      James Cagney|     1899|
|  Alfred Hitchcock|     1899|
|     Buster Keaton|     1895|
|      Groucho Marx|     1890|
|     Alfred Newman|     1900|
|Edward G. Robinson|     1893|
|    Randolph Scott|     1898|
|       Max Steiner|     1888|
|     Spencer Tracy|     1900|
|      Victor Young|     1900|
|   Charles Chaplin|     1889|
|      Robert Ellis|     1892|
|      Robert Ellis|     1888|
|       Annie Rosar|     1888|
|       Luis Buñuel|     1900|
|         John Ford|     1894|
|     D.W. Griffith|     1875|
|     Boris Karloff|     1887|
+------------------+---------+
only showing top 20 rows



In [None]:
path_to_save = '/content/drive/MyDrive/imdb_project/imdb_results/born_in_19th_century'
born_in_19th_century_df.coalesce(1).write.csv(path_to_save, header=True)

3.	Get titles of all movies that last more than 2 hours

In [None]:
last_more_than_2h = title_basics_df.select('primaryTitle', 'originalTitle', 'runtimeMinutes').filter(f.col('runtimeMinutes') > 120).filter(f.col('titleType') == 'movie')
last_more_than_2h.show(truncate = False)

+---------------------------------------+---------------------------------------+--------------+
|primaryTitle                           |originalTitle                          |runtimeMinutes|
+---------------------------------------+---------------------------------------+--------------+
|Atlantis                               |Atlantis                               |121           |
|Germinal; or, The Toll of Labor        |Germinal                               |150           |
|Les Misérables, Part 2: Fantine        |Les misérables - Époque 2: Fantine     |300           |
|The Active Life of Dolly of the Dailies|The Active Life of Dolly of the Dailies|170           |
|The Beloved Adventurer                 |The Beloved Adventurer                 |450           |
|Cabiria                                |Cabiria                                |148           |
|L'enfant de Paris                      |L'enfant de Paris                      |124           |
|The Exploits of Elaine       

In [None]:
path_to_save = '/content/drive/MyDrive/imdb_project/imdb_results/last_more_than_2h'
last_more_than_2h.coalesce(1).write.csv(path_to_save, header=True)

5.	Get information about how many adult movies/series etc. there are per region. Get the top 100 of them from the region with the biggest count to the region with the smallest one

In [None]:
adult_movies_per_region_df = title_akas_df.join(title_basics_df, title_akas_df.titleId == title_basics_df.tconst, 'left').select('region', 'isAdult').filter(f.col('isAdult') == 1)
adult_movies_per_region_df = adult_movies_per_region_df.groupBy('region').agg(f.sum('isAdult').alias('isAdult')).orderBy(f.desc('isAdult')).dropna().limit(100)
adult_movies_per_region_df.show()

+------+-------+
|region|isAdult|
+------+-------+
|    US|  93035|
|    JP|  21044|
|    DE|  12412|
|    FR|   8010|
|    ES|   6219|
|    IT|   5923|
|    CA|   5360|
|    GB|   4451|
|    VE|   3685|
|    PT|   3488|
|    IN|   3151|
|   XWW|   2688|
|    NL|   2011|
|    BR|   1937|
|    CZ|   1536|
|    SE|   1362|
|   XWG|   1162|
|    HU|    864|
|    GR|    857|
|    DK|    807|
+------+-------+
only showing top 20 rows



In [None]:
path_to_save = '/content/drive/MyDrive/imdb_project/imdb_results/adult_movies_per_region'
adult_movies_per_region_df.coalesce(1).write.csv(path_to_save, header=True)

6.	Get information about how many episodes in each TV Series. Get the top 50 of them starting from the TV Series with the biggest quantity of episodes

In [None]:
episodes_in_each_series_df = title_basics_df.join(title_episode_df, title_basics_df.tconst == title_episode_df.parentTconst, 'inner').select(title_basics_df.primaryTitle, title_episode_df.episodeNumber).dropna()
episodes_in_each_series_df = episodes_in_each_series_df.groupBy('primaryTitle').agg(f.count('episodeNumber').alias('episodeNumber')).orderBy(f.desc('episodeNumber')).limit(50)
episodes_in_each_series_df.show(truncate = False)

+-----------------------------+-------------+
|primaryTitle                 |episodeNumber|
+-----------------------------+-------------+
|Days of Our Lives            |14428        |
|The Young and the Restless   |12490        |
|See the World by Train       |10674        |
|Coronation Street            |10495        |
|Eat Bulaga                   |9948         |
|ASAP                         |9567         |
|Neighbours                   |9525         |
|Ohayou Tokushima             |9502         |
|Unser Sandmännchen           |8990         |
|Jeopardy!                    |8810         |
|The Bold and the Beautiful   |8773         |
|The Price Is Right           |8440         |
|Emmerdale Farm               |8278         |
|It's Okay to Laugh!          |8055         |
|Six O'Clock News             |8032         |
|Saksi                        |8023         |
|Home and Away                |7851         |
|Wheel of Fortune             |7769         |
|Countdown                    |767

In [None]:
path_to_save = '/content/drive/MyDrive/imdb_project/imdb_results/episodes_in_each_series'
episodes_in_each_series_df.coalesce(1).write.csv(path_to_save, header=True)

4.	Get names of people, corresponding movies/series and characters they played in those films

In [None]:
actors_roles_df = name_basics_df.join(title_principals_df, name_basics_df.ncost == title_principals_df.nconst, 'inner').filter(f.col('category').contains('act')).drop('birthYear', 'deathYear', 'primaryProfession', 'ordering', 'job', 'knownForTitles', 'ncost')

In [None]:
actors_roles_movies_df = actors_roles_df.join(title_basics_df, actors_roles_df.tconst == title_basics_df.tconst, 'inner').select('primaryTitle', 'primaryName', 'characters').dropna()
actors_roles_movies_df.show(truncate = False)

+-------------------------------+------------------------+--------------------------+
|primaryTitle                   |primaryName             |characters                |
+-------------------------------+------------------------+--------------------------+
|Rip Leaving Sleepy Hollow      |Joseph Jefferson        |["Rip Van Winkle"]        |
|Výstavní párkar a lepic plakátù|Josef Sváb-Malostranský |["Sausage Vendor"]        |
|Výstavní párkar a lepic plakátù|Ferdinand Gýra          |["Sticker"]               |
|Summoning the Spirits          |Georges Méliès          |["L'illusioniste"]        |
|A táncz                        |Emilia Márkus           |["Salome"]                |
|A táncz                        |Ilona Hegedüsné Berzétey|["Táncos"]                |
|Mary Jane's Mishap             |Laura Bayley            |["Mary Jane"]             |
|En foræring til min Kone       |Robert Storm Petersen   |["Petersen"]              |
|That Fatal Sneeze              |Gertie Potter        

In [None]:
path_to_save = '/content/drive/MyDrive/imdb_project/imdb_results/actors_roles_movies'
actors_roles_movies_df.coalesce(1).write.csv(path_to_save, header=True)

7.	Get 10 titles of the most popular movies/series etc. by each decade

In [None]:
popular_by_decade_df = title_basics_df.join(title_ratings_df, title_basics_df.tconst == title_ratings_df.tconst, 'inner')
popular_by_decade_df = popular_by_decade_df.drop('tconst', 'titleType', 'originalTitle', 'isAdult', 'endYear', 'runtimeMinutes', 'genres')
popular_by_decade_df = popular_by_decade_df.withColumn('decade', (f.floor(f.col('startYear')/10)*10).cast('int'))

In [None]:
windowPopularByDecade = Window.partitionBy('decade').orderBy(f.col('averageRating').desc(), f.col('numVotes').desc())
popular_by_decade_df = popular_by_decade_df.withColumn('row', f.row_number().over(windowPopularByDecade)).filter(f.col('row') <= 10).drop('row').dropna()
popular_by_decade_df.show(truncate = False)

+-----------------------------------------------+---------+-------------+--------+------+
|primaryTitle                                   |startYear|averageRating|numVotes|decade|
+-----------------------------------------------+---------+-------------+--------+------+
|Sallie Gardner at a Gallop                     |1878     |7.4          |2940    |1870  |
|Passage de Venus                               |1874     |6.9          |1644    |1870  |
|Le singe musicien                              |1878     |6.2          |220     |1870  |
|La Rosace Magique                              |1877     |5.9          |117     |1870  |
|The Tight-rope Dance                           |1877     |5.5          |26      |1870  |
|Skipping Rope                                  |1877     |5.4          |26      |1870  |
|Dancing on the Rope                            |1878     |5.4          |19      |1870  |
|Dzing. Boom. Boom!                             |1877     |5.3          |24      |1870  |
|Roundhay 

In [None]:
path_to_save = '/content/drive/MyDrive/imdb_project/imdb_results/popular_by_decade'
popular_by_decade_df.coalesce(1).write.csv(path_to_save, header=True)

8.	Get 10 titles of the most popular movies/series etc. by each genre

In [None]:
popular_by_genre_df = title_basics_df.drop('titleType', 'originalTitle', 'isAdult', 'startYear', 'endYear', 'runtimeMinutes')
popular_by_genre_df = popular_by_genre_df.withColumn('genres', f.explode(f.split('genres', ',')))
popular_by_genre_df = popular_by_genre_df.join(title_ratings_df, popular_by_genre_df.tconst == title_ratings_df.tconst, 'inner').drop('tconst')

In [None]:
windowPopularByGenre = Window.partitionBy('genres').orderBy(f.col('averageRating').desc(), f.col('numVotes').desc())
popular_by_genre_df = popular_by_genre_df.withColumn('row', f.row_number().over(windowPopularByGenre)).filter(f.col('row') <= 10).drop('row').dropna()
popular_by_genre_df.show(truncate = False)

+----------------------------------------------------------------------------+---------+-------------+--------+
|primaryTitle                                                                |genres   |averageRating|numVotes|
+----------------------------------------------------------------------------+---------+-------------+--------+
|Berlin Avantgarde Extreme 4 - Dr. Taurus & Heidi, die fröhliche Putzsklavin!|Adult    |10.0         |30      |
|Berlin Avantgarde Extreme 3 - Die gemeine Erpressung des Herrn Koboldsky    |Adult    |10.0         |30      |
|Avantgarde Extreme 10 - Die teuflische Erfindung des Dr. Brainstorm         |Adult    |10.0         |28      |
|Avantgarde Extreme 11 - Eine Sache des Blickwinkels                         |Adult    |10.0         |28      |
|All Anal Service                                                            |Adult    |10.0         |24      |
|The Best of Phoenix Marie                                                   |Adult    |10.0         |24

In [None]:
path_to_save = '/content/drive/MyDrive/imdb_project/imdb_results/popular_by_genre'
popular_by_genre_df.coalesce(1).write.csv(path_to_save, header=True)