# Configurando o ambiente

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz

In [3]:
!tar xf /content/spark-3.4.0-bin-hadoop3.tgz

In [4]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.4.0-bin-hadoop3'

In [5]:
!pip install -q findspark

In [6]:
import findspark
findspark.init('')

In [7]:
from google.colab import files
arq = files.upload()

Saving movies_dataset.csv to movies_dataset.csv


In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master('local')\
        .appName('sparkmovies')\
        .getOrCreate()

In [9]:
# o caracter § é usado como delimitador pois o dataset possui atributos com valores que podem
# apresentar caracteres que são comumente usados como delimitadores
movies = spark.read.option('delimiter', '§').csv('movies_dataset.csv', header=True, inferSchema=True)

# Explorando os dados

In [10]:
movies.show(10)

+-----+---------+--------------------+--------------------+------+--------------------+-----------------+--------------------+--------------------+------------------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+------------+----------+--------------------+--------------------+-----------------+
|index|   budget|              genres|            homepage|    id|            keywords|original_language|      original_title|            overview|        popularity|production_companies|production_countries|release_date|   revenue|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|                cast|                crew|         director|
+-----+---------+--------------------+--------------------+------+--------------------+-----------------+--------------------+--------------------+------------------+--------------------+-------------------

In [11]:
movies.printSchema()

root
 |-- index: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- director: string (nullable = true)



In [12]:
from pyspark.sql.functions import col
from pyspark.sql.types import DateType, DoubleType, IntegerType

In [13]:
# Conversão de tipos de alguns atributos (inicialmete todos são string)
movies = movies.withColumn('budget', col('budget').cast(IntegerType()))\
      .withColumn('popularity', col('popularity').cast(DoubleType()))\
      .withColumn('release_date', col('release_date').cast(DateType()))\
      .withColumn('revenue', col('revenue').cast(DoubleType()))\
      .withColumn('runtime', col('runtime').cast(DoubleType()))\
      .withColumn('vote_average', col('vote_average').cast(DoubleType()))\
      .withColumn('vote_count', col('vote_count').cast(IntegerType()))

In [14]:
movies.printSchema()

root
 |-- index: string (nullable = true)
 |-- budget: integer (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: double (nullable = true)
 |-- runtime: double (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: integer (nullable = true)
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- director: string (nullable = true)



# Filtrando dados

In [15]:
# Atributos menos releantes para situação
movies = movies.drop('homepage', 'id', 'overview', 'production_countries', 'spoken_languages',
            'status', 'tagline', 'cast', 'crew')

In [16]:
from pyspark.sql.functions import current_date

In [17]:
movies.count()

4814

In [18]:
# Filmes que possuem uma nota média superior a 10 ou uma data de lançamento superior a data atual
movies.filter((movies.vote_average > 10) | (movies.runtime > 300.0) | (movies.release_date > current_date())).count()

195

In [19]:
# Removendo linhas com valores nulos
movies = movies.dropna()

In [20]:
movies = movies.filter((movies.vote_average <= 10) & (movies.runtime <= 300.0) & (movies.release_date <= current_date()))

In [21]:
movies.count()

4071

# Convertendo atributo production_companies do tipo string para uma lista

In [22]:
companies = movies.select('production_companies')
companies = companies.rdd.map(lambda x: (x[0][1:-1],)).toDF(['production_companies'])

In [23]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import json

In [24]:
# Função que converte cada elemento string do atributo production_companies em uma lista de strings com os nomes das produtoras
def convert_str_to_dict(dict_string):
  item_str = dict_string.replace('""', '"')
  item_str = item_str.replace('},', '}§')
  item_str = item_str[1:-1]
  item_list = item_str.split('§')
  col_dic = []
  for il in item_list:
    if '" "' in il:
      il = il.replace('" "', '", "')
    il = json.loads(il)
    col_dic.append(il['name'])
  return col_dic

In [25]:
udf_convert = udf(convert_str_to_dict, ArrayType(StringType()))

In [26]:
companies = companies.withColumn('production_companies_names', udf_convert('production_companies'))
companies = companies.drop('production_companies')

In [27]:
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window

In [28]:
companies = companies.withColumn('index', row_number().over(Window.orderBy(monotonically_increasing_id())))
movies = movies.join(companies, on = ['index'])
movies.show(5, truncate=False)

+-----+---------+--------------------------------+----------------------------------------------------------------------------+-----------------+----------------------------------------+------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-------------+-------+----------------------------------------+------------+----------+-----------------+---------------------------------------------------------------------------------------------------------------+
|index|budget   |genres                          |keywords                                                                    |original_language|original_title                          |popularity        |production_companies                                                                                                                                                             

### Range de valores dos principais atributos numéricos

In [29]:
from pyspark.sql.functions import min, max

In [30]:
movies.select(min(movies.budget), max(movies.budget)).show()
movies.select(min(movies.release_date), max(movies.release_date)).show()
movies.select(min(movies.revenue), max(movies.revenue)).show()
movies.select(min(movies.runtime), max(movies.runtime)).show()
movies.select(min(movies.vote_average), max(movies.vote_average)).show()
movies.select(min(movies.vote_count), max(movies.vote_count)).show()

+-----------+-----------+
|min(budget)|max(budget)|
+-----------+-----------+
|          0|  380000000|
+-----------+-----------+

+-----------------+-----------------+
|min(release_date)|max(release_date)|
+-----------------+-----------------+
|       1927-01-10|       2016-09-16|
+-----------------+-----------------+

+------------+-------------+
|min(revenue)| max(revenue)|
+------------+-------------+
|         0.0|1.845034188E9|
+------------+-------------+

+------------+------------+
|min(runtime)|max(runtime)|
+------------+------------+
|        41.0|       254.0|
+------------+------------+

+-----------------+-----------------+
|min(vote_average)|max(vote_average)|
+-----------------+-----------------+
|              0.0|              8.5|
+-----------------+-----------------+

+---------------+---------------+
|min(vote_count)|max(vote_count)|
+---------------+---------------+
|              0|          13752|
+---------------+---------------+



# Ranqueando alguns filmes

In [31]:
# Filmes que arrecadaram acima de 1 bilhão
print(movies.filter(movies.revenue >= 1.0E9).count())
movies.filter(movies.revenue >= 1.0E9)\
      .select('budget', 'original_title', 'release_date', 'revenue', 'title', 'vote_average', 'director')\
      .sort(col('revenue').desc())\
      .toPandas()

19


Unnamed: 0,budget,original_title,release_date,revenue,title,vote_average,director
0,200000000,Titanic,1997-11-18,1845034000.0,Titanic,7.5,James Cameron
1,220000000,The Avengers,2012-04-25,1519558000.0,The Avengers,7.4,Joss Whedon
2,150000000,Jurassic World,2015-06-09,1513529000.0,Jurassic World,6.5,Colin Trevorrow
3,190000000,Furious 7,2015-04-01,1506249000.0,Furious 7,7.3,James Wan
4,280000000,Avengers: Age of Ultron,2015-04-22,1405404000.0,Avengers: Age of Ultron,7.3,Joss Whedon
5,150000000,Frozen,2013-11-27,1274219000.0,Frozen,7.3,Chris Buck
6,200000000,Iron Man 3,2013-04-18,1215440000.0,Iron Man 3,6.8,Shane Black
7,74000000,Minions,2015-06-17,1156731000.0,Minions,6.4,Kyle Balda
8,250000000,Captain America: Civil War,2016-04-27,1153304000.0,Captain America: Civil War,7.1,Anthony Russo
9,195000000,Transformers: Dark of the Moon,2011-06-28,1123747000.0,Transformers: Dark of the Moon,6.1,Michael Bay


In [32]:
# Filmes com as maiores notas
movies.sort(col('vote_average').desc(), col('vote_count').desc())\
      .select('original_title', 'release_date', 'title', 'vote_average', 'vote_count', 'director')\
      .toPandas()\
      .head(10)

Unnamed: 0,original_title,release_date,title,vote_average,vote_count,director
0,The Shawshank Redemption,1994-09-23,The Shawshank Redemption,8.5,8205,Frank Darabont
1,The Godfather,1972-03-14,The Godfather,8.4,5893,Francis Ford Coppola
2,The Prisoner of Zenda,1937-09-03,The Prisoner of Zenda,8.4,11,John Cromwell
3,Fight Club,1999-10-15,Fight Club,8.3,9413,David Fincher
4,Pulp Fiction,1994-10-08,Pulp Fiction,8.3,8428,Quentin Tarantino
5,Schindler's List,1993-11-29,Schindler's List,8.3,4329,Steven Spielberg
6,Whiplash,2014-10-10,Whiplash,8.3,4254,Damien Chazelle
7,千と千尋の神隠し,2001-07-20,Spirited Away,8.3,3840,Hayao Miyazaki
8,The Godfather: Part II,1974-12-20,The Godfather: Part II,8.3,3338,Francis Ford Coppola
9,The Dark Knight,2008-07-16,The Dark Knight,8.2,12002,Christopher Nolan


In [33]:
# Filmes de língua não inglesa com as maiores notas
print(movies.filter(movies.original_language != 'en').count())
movies.filter(movies.original_language != 'en')\
      .sort(col('vote_average').desc())\
      .select('budget', 'original_language', 'original_title', 'release_date', 'runtime', 'title', 'vote_average')\
      .toPandas()\
      .head(10)

171


Unnamed: 0,budget,original_language,original_title,release_date,runtime,title,vote_average
0,15000000,ja,千と千尋の神隠し,2001-07-20,125.0,Spirited Away,8.3
1,24000000,ja,ハウルの動く城,2004-11-19,119.0,Howl's Moving Castle,8.2
2,26500000,ja,もののけ姫,1997-07-12,134.0,Princess Mononoke,8.2
3,5000000,it,C'era una volta il West,1968-12-21,175.0,Once Upon a Time in the West,8.1
4,3300000,pt,Cidade de Deus,2002-02-05,130.0,City of God,8.1
5,92620000,de,Metropolis,1927-01-10,153.0,Metropolis,8.0
6,3000000,ko,올드보이,2003-01-01,120.0,Oldboy,8.0
7,14000000,de,Das Boot,1981-09-16,149.0,Das Boot,7.9
8,0,fr,Polisse,2011-10-06,127.0,Polisse,7.9
9,6800000,fr,Incendies,2010-09-04,130.0,Incendies,7.9
