<a href="https://colab.research.google.com/github/Olhaiva/PySpark/blob/main/PySpark_movies.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# PYSPARK
!pip install pyspark

# Import modules
from pyspark import SparkConf
from pyspark.sql import SparkSession, Window
import pyspark.sql.types as t
import pyspark.sql.functions as f

from google.colab import drive
drive.mount ('/content/drive')

# Creating a sparksession
spark_session = (SparkSession.builder
                             .master ("local")
                             .appName ("task app")
                             .config (conf=SparkConf())
                             .getOrCreate())


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=cc2ce5565931c4489fcdc64327391ddfd6cdc27ccb36571c502ccfbcf65b9f00
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0
Mounted at /content/drive


In [None]:
# Reading data to df
path ='/content/drive/MyDrive/Diploma_spark/title.akas.tsv/data.tsv'
title_akas_df = spark_session.read.csv(path, sep='\t',header=True)

path ='/content/drive/MyDrive/Diploma_spark/name.basics.tsv.gz/data.tsv'
name_basics_df = spark_session.read.csv(path, sep='\t',header=True)

path ='/content/drive/MyDrive/Diploma_spark/title.basics.tsv/data.tsv'
title_basics_df = spark_session.read.csv(path, sep='\t',header=True)

path ='/content/drive/MyDrive/Diploma_spark/title.principals.tsv/data.tsv'
title_principals_df = spark_session.read.csv(path, sep='\t',header=True).filter(f.col('characters')!='\\N').select('tconst','nconst', 'characters')

path ='/content/drive/MyDrive/Diploma_spark/title.episode.tsv/data.tsv'
title_episode_df = spark_session.read.csv(path, sep='\t',header=True)

path ='/content/drive/MyDrive/Diploma_spark/title.ratings.tsv/data.tsv'
title_ratings_df = spark_session.read.csv(path, sep='\t',header=True)

In [None]:
'''
Task 1. Get all titles of series/movies etc. that are available in Ukrainian
https://www.imdb.com/interfaces/
title.akas.tsv.gz

    title (string) – the localized title
    language (string) - the language of the title
'''
# Transforming data df for 1 task
task1_df = title_akas_df.select('title').filter((f.col('language')=='ua') | (f.col('region')=='UA')).dropDuplicates()

# writing results to the file
path = '/content/drive/MyDrive/Diploma_spark/'
path_for_task1= path + 'task1'
task1_df.write.csv(path_for_task1, header=True, mode='overwrite')


In [None]:
'''
Task 2. Get the list of peopleʼs names, who were born in the 19th century.
https://www.imdb.com/interfaces/
name.basics.tsv.gz
    primaryName (string)– name by which the person is most often credited
    birthYear – in YYYY format
    primaryProfession (array of strings)– the top-3 professions of the person
'''
# transforming data df for 2 task (choosing columns, modifying types)
task2_df = name_basics_df.select('primaryName','birthYear')\
                         .withColumn('birthYear',f.col('birthYear').cast(t.IntegerType()))\
                         .select('primaryName')\
                         .filter(f.col('birthYear').between(1800, 1900))

# writing results to the file
path_for_task2= path +'task2'
task2_df.write.csv(path_for_task2, header=True, mode='overwrite')

In [None]:
'''
Task 3
Get titles of all movies that last more than 2 hours.

title.basics.tsv.gz
    titleType (string) – the type/format of the title (e.g. movie, short, tvseries, tvepisode, video, etc)
    originalTitle (string) - original title, in the original language
    runtimeMinutes – primary runtime of the title, in minutes
'''
# Transforming data for 3 task
task3_df = title_basics_df.select('originalTitle')\
                          .filter(f.col('titleType')=='movie')\
                          .filter(f.col('runtimeMinutes').cast(t.IntegerType()) >= 120)

# writing results to the file
path_for_task3= path +'task3'
task3_df.write.csv(path_for_task3, header=True, mode='overwrite')

In [None]:
'''
Task 4
 Get names of people, corresponding movies/series and characters they
played in those films.

title.principals.tsv.gz – Contains the principal cast crew for titles
    tconst (string) - alphanumeric unique identifier of the title
    nconst (string) - alphanumeric unique identifier of the name/person
    characters (string) - the name of the character played if applicable, else 'N'

title.basics.tsv.gz
    tconst (string) - alphanumeric unique identifier of the title
    originalTitle (string) - original title, in the original language

name.basics.tsv.gz – Contains the following information for names:
    nconst (string) - alphanumeric unique identifier of the name person
    primaryName (string)– name by which the person is most often credited
'''
# transforning data for 4 task
n_df = name_basics_df.select('nconst','primaryName')
t_df = title_basics_df.select('tconst','originalTitle')
task4_df = title_principals_df.join(t_df, on='tconst', how='inner')\
                              .join(n_df, on='nconst', how='inner')
task4_df = task4_df.select('primaryName', 'originalTitle', 'characters')

# writing results to the file
path_for_task4= path +'task4'
task4_df.write.csv(path_for_task4, header=True, mode='overwrite')


In [None]:
'''
Task 5
Get information about how many adult movies/series etc. there are per
region. Get the top 100 of them from the region with the biggest count to
the region with the smallest one.

title.akas.tsv.gz
    titleId (string) - a tconst, an alphanumeric unique identifier of the title
    region (string) - the region for this version of the title

title.basics.tsv.gz - Contains the following information for titles:
•	tconst (string) - alphanumeric unique identifier of the title
•	isAdult (boolean) - 0: non-adult title; 1: adult title
'''
# Creating dataframe for 5 task
region_df = title_akas_df.select(f.col('titleId').alias('tconst'),'region')
adult_df = title_basics_df.select('tconst')\
                          .filter(f.col('isAdult')=='1')

region_join_df = region_df.join(adult_df, on='tconst', how='inner')
task5_df = region_join_df.groupby('region').count()
task5_df = task5_df.orderBy(f.desc('count')).limit(100)

# writing results to the file
path_for_task5= path +'task5'
task5_df.write.csv(path_for_task5, header=True, mode='overwrite')

In [None]:
'''
Task 6
Get information about how many episodes in each TV Series. Get the top
50 of them starting from the TV Series with the biggest quantity of
episodes.

title.episode.tsv.gz – Contains the tv episode information. Fields include:
    tconst (string) - alphanumeric identifier of episode
    parentTconst (string) - alphanumeric identifier of the parent TV Series

title.basics.tsv.gz - Contains the following information for titles:
    tconst (string) - alphanumeric unique identifier of the title
    titleType (string) – the type/format of the title (e.g. movie, short, tvseries, tvepisode, video, etc)
    primaryTitle (string) – the more popular title / the title used by the filmmakers on promotional materials at the point of release
'''

# Creating dataframe for 5 task
episode_df = title_episode_df.select('tconst','parentTconst')\
                             .groupby('parentTconst').count()

titles_df = title_basics_df.select('tconst','primaryTitle')\
                           .filter(f.col('titleType') == 'tvSeries')

task6_df = episode_df.join(titles_df, episode_df['parentTconst']==titles_df['tconst'], how='left')\
                     .select('primaryTitle','count')\
                     .orderBy(f.desc('count'))\
                     .limit(50)

# writing results to the file
path_for_task6= path + 'task6'
task6_df.write.csv(path_for_task6, header=True, mode='overwrite')


In [None]:
'''
Task 7.
Get 10 titles of the most popular movies/series etc. by each decade.

title.ratings.tsv.gz – Contains the IMDb rating and votes information for titles
    tconst (string) - alphanumeric unique identifier of the title
    averageRating – weighted average of all the individual user ratings
    numVotes - number of votes the title has received

title.basics.tsv.gz - Contains the following information for titles:

    tconst (string) - alphanumeric unique identifier of the title
    titleType (string) – the type/format of the title (e.g. movie, short, tvseries, tvepisode, video, etc)
    primaryTitle (string) – the more popular title / the title used by the filmmakers on promotional materials at the point of release
    startYear (YYYY) – represents the release year of a title. In the case of TV Series, it is the series start year
    endYear (YYYY) – TV Series end year. ‘\\N’ for all other title types
'''
# Creating df
rating_df = title_ratings_df.select('tconst','averageRating','numVotes')\
                            .withColumn('averageRating', f.col('averageRating').cast(t.FloatType()))\
                            .withColumn('numVotes', f.col('numVotes').cast(t.IntegerType()))

years_df = title_basics_df.select('tconst','originalTitle', 'startYear')\
                          .withColumn('startYear', f.col('startYear').cast(t.IntegerType()))\
                          .withColumn('decade', (f.floor(f.col('startYear')/10)*10).cast(t.IntegerType()))\
                          .drop('startYear')

task7_df = rating_df.join(years_df, on='tconst', how='inner')

min_decade = task7_df.agg(f.min('decade')).collect()[0][0]
max_decade = task7_df.agg(f.max('decade')).collect()[0][0]

from pyspark.sql.dataframe import DataFrame
schema = t.StructType([
    t.StructField('decade', t.IntegerType(), True),
    t.StructField('originalTitle', t.StringType(), True),
    t.StructField('averageRating', t.FloatType(), True),
    t.StructField('numVotes', t.FloatType(), True),
    t.StructField('order', t.IntegerType(), True)])

result_df = spark_session.createDataFrame([],schema)

window = Window.orderBy(f.desc('averageRating'), f.desc('numVotes')).partitionBy('decade')

for d in range(min_decade, max_decade+1, 10):
    auxiliary_df = task7_df.filter(f.col('decade')==d)\
                           .select('decade', 'originalTitle', 'averageRating', 'numVotes')\
                           .orderBy(f.desc('averageRating'), f.desc('numVotes'))\
                           .limit(10)

    auxiliary_df = auxiliary_df.withColumn('order_byrow_numder', f.row_number().over(window))

    if isinstance(result_df, DataFrame):
        result_df = result_df.union(auxiliary_df)
    else:
        result_df = auxiliary_df

# writing results to the file
path_for_task7='/content/drive/MyDrive/Diploma_spark/task7'
result_df.write.csv(path_for_task7, header=True, mode='overwrite')

In [None]:
'''
Task 8.
Get 10 titles of the most popular movies/series etc. by each genre.

title.ratings.tsv.gz – Contains the IMDb rating and votes information for titles
    tconst (string) - alphanumeric unique identifier of the title
    averageRating – weighted average of all the individual user ratings
    numVotes - number of votes the title has received

title.basics.tsv.gz - Contains the following information for titles:

    tconst (string) - alphanumeric unique identifier of the title
    titleType (string) – the type/format of the title (e.g. movie, short, tvseries, tvepisode, video, etc)
    primaryTitle (string) – the more popular title / the title used by the filmmakers on promotional materials at the point of release
    genres (string array) – includes up to three genres associated with the title
'''
# Creating df
genres_df = title_basics_df.select('tconst','originalTitle', 'genres')
genres_df = genres_df.withColumn('genres', f.split(genres_df.genres, ","))
genres_df = genres_df.withColumn('genre', f.explode('genres'))

task8_df = rating_df.join(genres_df, on='tconst', how='inner')

window = Window.orderBy(f.desc('averageRating'), f.desc('numVotes')).partitionBy('genre')

task8_df = task8_df.drop('tconst', 'genres')\
                   .withColumn('order_numder', f.row_number().over(window))\
                   .filter(f.col('order_numder')<=10)\
                   .select('order_numder', 'originalTitle', 'genre')

# writing results to the file
path_for_task8 = path + 'task8'
task8_df.write.csv(path_for_task8, header=True, mode='overwrite')