<a href="https://colab.research.google.com/github/JoaoPauloSarzedasRibeiro/data_manipulation_with_Python/blob/main/Spark_Configurando_e_Otimizando.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Instalando o PySpark no Google Colab

In [None]:
# instalar as dependências necessárias para o Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

# Configurando o Spark

SparkSession -> spark-submit -> spark-defaults.conf

In [None]:
import findspark

findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .config('spark.driver.memory', '8g')
    .getOrCreate()
)

In [None]:
spark.conf.get('spark.driver.memory')

'8g'

In [None]:
spark.conf.get('spark.sql.shuffle.partitions')

'200'

In [None]:
spark.conf.set('spark.sql.shuffle.partitions', 100)

In [None]:
spark.conf.get('spark.sql.shuffle.partitions')

'100'

* `spark.master`: seleciona o modo de deploy da aplicação Spark. Será tratado em mais detalhes no capítulo seguinte.
* `spark.driver.memory`: quantidade de memória atribuída para o driver da aplicação.
* `spark.executor.memory`: quantidade de memória atribuída para cada um dos executores da aplicação.
* `spark.serializer`: classe utilizada para realizar a serialização de objetos durante a execução. É recomendado utilizar o valor org.apache.spark.serializer.KryoSerializer para ganhar em velocidade de processamento, uma vez que chega a ser até 10x mais rápido que o default.
* `spark.executor.heartbeatInterval`:  intervalo entre notificações dos executores ao driver. Aumentar esse valor evita que a aplicação sofra com timeouts.
* `spark.sql.adaptive.enabled`: habilita o Adaptive Query Execution, programa que atualiza o plano de execução durante a execução, a partir de métricas coletadas durante o processo. Ativar essa configuração pode otimizar processamentos significativamente.
* `spark.sql.shuffle.partitions`: número padrão de partições utilizadas em shuffles de operações de junção (joins) e agregações (agg). 
* `spark.sql.broadcastTimeout`: tempo de timeout em segundos para operações de broadcast join, a serem tratadas no fim do capítulo.

### Tornando o Spark Escalável

* `spark.dynamicAllocation.enabled`: habilita o uso do recurso de dynamic allocation na aplicação.
* `spark.dynamicAllocation.executorIdleTimeout`: configura o tempo máximo de ociosidade de um executor até que o dynamic allocation o derrube.
* `spark.dynamicAllocation.initialExecutors`: quantidade inicial de executores na aplicação ao utilizar o dynamic allocation.
* `spark.dynamicAllocation.maxExecutors`: quantidade mínima de executores na aplicação ao utilizar o dynamic allocation.
* `spark.dynamicAllocation.minExecutors`: quantidade máxima de executores na aplicação ao utilizar o dynamic allocation.

#Importando dados

In [None]:
# iniciar uma sessão local chamada spark
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from os.path import abspath

warehouse_location = abspath('../data/spark-warehouse')

spark = (
    SparkSession
    .builder
    .master('local[*]')
    .config("spark.driver.memory", '8g')
    .config("spark.sql.warehouse.dir", warehouse_location)
    .enableHiveSupport()
    .getOrCreate()
)

In [None]:
# Conectando ao Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#Criando um novo DataFrame com os dados já no formato Parquet
df_titles = spark.read.format('parquet').load('drive/MyDrive/DataLake/df_titles')

In [None]:
df_titles.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- endYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)



In [None]:
#Criando um novo DataFrame com os dados já no formato Parquet
df_ratings = spark.read.format('parquet').load('drive/MyDrive/DataLake/df_ratings')

In [None]:
df_ratings.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: integer (nullable = true)



# Persistência de Dados na Memória

In [None]:
df_titles_sample = df_titles.sample(fraction = 0.1)

In [None]:
df_titles_sample.count()

819475

In [None]:
df_titles_sample.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000013,short,The Photographical Congress Arrives in Lyon,Le débarquement du congrès de photographie à Lyon,0,1895,\N,1,"Documentary,Short"
1,tt0000019,short,The Clown Barber,The Clown Barber,0,1898,\N,\N,"Comedy,Short"
2,tt0000026,short,The Messers. Lumière at Cards,Partie d'écarté,0,1896,\N,1,"Documentary,Short"
3,tt0000044,short,Le bivouac,Le bivouac,0,1896,\N,1,Short
4,tt0000051,short,The Bohemian Encampment,Campement de bohémiens,0,1896,\N,\N,"Documentary,Short"


In [None]:
df_ratings.limit(5).toPandas()

Unnamed: 0,tconst,averageRating,numVotes
0,tt0000001,5.7,1809
1,tt0000002,6.0,233
2,tt0000003,6.5,1560
3,tt0000004,6.1,152
4,tt0000005,6.2,2383


In [None]:
int_cols = ['startYear', 'endYear', 'runtimeMinutes', 'isAdult']
for c in int_cols:
    df_titles_sample = (
        df_titles_sample
        .withColumn(c, f.col(c).cast('int'))
    )
# Limpa os Strings
str_cols = ['primaryTitle', 'originalTitle', 'titleType']
for c in str_cols:
    df_titles_sample = (
        df_titles_sample
        .withColumn(c, f.initcap(f.trim(f.col(c))))
    )

In [None]:
df_join = (
    df_titles_sample
    .replace('\\N', None)
    .withColumn('genres', f.split(f.col('genres'), ','))
    .join(df_ratings, 'tconst', 'left')
)

In [None]:
df_join.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres,averageRating,numVotes
0,tt0000013,Short,The Photographical Congress Arrives In Lyon,Le Débarquement Du Congrès De Photographie À Lyon,0,1895,,1.0,"[Documentary, Short]",5.8,1726
1,tt0000019,Short,The Clown Barber,The Clown Barber,0,1898,,,"[Comedy, Short]",5.3,28
2,tt0000026,Short,The Messers. Lumière At Cards,Partie D'écarté,0,1896,,1.0,"[Documentary, Short]",5.7,1420
3,tt0000044,Short,Le Bivouac,Le Bivouac,0,1896,,1.0,[Short],4.4,39
4,tt0000051,Short,The Bohemian Encampment,Campement De Bohémiens,0,1896,,,"[Documentary, Short]",3.7,31


In [None]:
df_final = (
    df_join
    .withColumn('genres', f.explode(f.col('genres')))
    .groupBy('titleType')
    .pivot('genres')
    .agg(f.round(f.mean('averageRating'), 2))
    .fillna(0)
)

In [None]:
df_final.limit(5).toPandas()

Unnamed: 0,titleType,Action,Adult,Adventure,Animation,Biography,Comedy,Crime,Documentary,Drama,...,News,Reality-TV,Romance,Sci-Fi,Short,Sport,Talk-Show,Thriller,War,Western
0,Tvepisode,7.5,7.34,7.39,7.3,7.42,7.34,7.56,7.54,7.63,...,6.63,7.04,7.54,7.41,6.98,7.1,6.77,7.5,7.81,7.71
1,Video,5.89,6.5,6.04,6.51,7.33,6.36,5.77,6.94,6.15,...,4.52,6.12,6.05,6.0,6.72,6.79,7.29,5.01,6.03,6.35
2,Videogame,6.98,0.0,7.26,6.77,0.0,7.03,7.24,6.55,7.52,...,0.0,6.95,7.57,7.01,0.0,6.67,0.0,6.9,7.12,5.8
3,Tvminiseries,6.83,5.03,7.03,6.5,7.52,6.92,6.93,7.61,7.12,...,5.37,6.35,6.92,7.06,6.88,7.31,6.08,6.81,7.17,7.57
4,Tvmovie,5.66,6.0,5.97,6.7,6.9,6.45,6.21,7.11,6.58,...,7.1,5.99,6.1,5.8,0.0,6.93,0.0,5.67,7.06,6.3


In [None]:
df_final.explain('formatted')

== Parsed Logical Plan ==
'Project [titleType#146, coalesce(nanvl(Action#267, null), cast(0.0 as double)) AS Action#353, coalesce(nanvl(Adult#268, null), cast(0.0 as double)) AS Adult#354, coalesce(nanvl(Adventure#269, null), cast(0.0 as double)) AS Adventure#355, coalesce(nanvl(Animation#270, null), cast(0.0 as double)) AS Animation#356, coalesce(nanvl(Biography#271, null), cast(0.0 as double)) AS Biography#357, coalesce(nanvl(Comedy#272, null), cast(0.0 as double)) AS Comedy#358, coalesce(nanvl(Crime#273, null), cast(0.0 as double)) AS Crime#359, coalesce(nanvl(Documentary#274, null), cast(0.0 as double)) AS Documentary#360, coalesce(nanvl(Drama#275, null), cast(0.0 as double)) AS Drama#361, coalesce(nanvl(Family#276, null), cast(0.0 as double)) AS Family#362, coalesce(nanvl(Fantasy#277, null), cast(0.0 as double)) AS Fantasy#363, coalesce(nanvl(Film-Noir#278, null), cast(0.0 as double)) AS Film-Noir#364, coalesce(nanvl(Game-Show#279, null), cast(0.0 as double)) AS Game-Show#365, coa

In [None]:
%%time
df_final.limit(5).toPandas()

CPU times: user 65.3 ms, sys: 5.68 ms, total: 71 ms
Wall time: 9.09 s


Unnamed: 0,titleType,Action,Adult,Adventure,Animation,Biography,Comedy,Crime,Documentary,Drama,...,News,Reality-TV,Romance,Sci-Fi,Short,Sport,Talk-Show,Thriller,War,Western
0,Tvepisode,7.5,7.34,7.39,7.3,7.42,7.34,7.56,7.54,7.63,...,6.63,7.04,7.54,7.41,6.98,7.1,6.77,7.5,7.81,7.71
1,Video,5.89,6.5,6.04,6.51,7.33,6.36,5.77,6.94,6.15,...,4.52,6.12,6.05,6.0,6.72,6.79,7.29,5.01,6.03,6.35
2,Videogame,6.98,0.0,7.26,6.77,0.0,7.03,7.24,6.55,7.52,...,0.0,6.95,7.57,7.01,0.0,6.67,0.0,6.9,7.12,5.8
3,Tvminiseries,6.83,5.03,7.03,6.5,7.52,6.92,6.93,7.61,7.12,...,5.37,6.35,6.92,7.06,6.88,7.31,6.08,6.81,7.17,7.57
4,Tvmovie,5.66,6.0,5.97,6.7,6.9,6.45,6.21,7.11,6.58,...,7.1,5.99,6.1,5.8,0.0,6.93,0.0,5.67,7.06,6.3


In [None]:
%%time
df_join.cache()
df_join.count()

CPU times: user 75.8 ms, sys: 10.8 ms, total: 86.6 ms
Wall time: 13.2 s


In [None]:
%%time
df_join.count()

CPU times: user 1.66 ms, sys: 86 µs, total: 1.75 ms
Wall time: 103 ms


819475

In [None]:
df_final.cache()
df_final.count()

10

In [None]:
%%time
df_final.limit(5).toPandas()

CPU times: user 11.8 ms, sys: 4.12 ms, total: 15.9 ms
Wall time: 869 ms


Unnamed: 0,titleType,Action,Adult,Adventure,Animation,Biography,Comedy,Crime,Documentary,Drama,...,News,Reality-TV,Romance,Sci-Fi,Short,Sport,Talk-Show,Thriller,War,Western
0,Tvepisode,7.5,7.34,7.39,7.3,7.42,7.34,7.56,7.54,7.63,...,6.63,7.04,7.54,7.41,6.98,7.1,6.77,7.5,7.81,7.71
1,Video,5.89,6.5,6.04,6.51,7.33,6.36,5.77,6.94,6.15,...,4.52,6.12,6.05,6.0,6.72,6.79,7.29,5.01,6.03,6.35
2,Videogame,6.98,0.0,7.26,6.77,0.0,7.03,7.24,6.55,7.52,...,0.0,6.95,7.57,7.01,0.0,6.67,0.0,6.9,7.12,5.8
3,Tvminiseries,6.83,5.03,7.03,6.5,7.52,6.92,6.93,7.61,7.12,...,5.37,6.35,6.92,7.06,6.88,7.31,6.08,6.81,7.17,7.57
4,Tvmovie,5.66,6.0,5.97,6.7,6.9,6.45,6.21,7.11,6.58,...,7.1,5.99,6.1,5.8,0.0,6.93,0.0,5.67,7.06,6.3


#### Retirando dados da persistência em memória

In [None]:
df_final.unpersist()

DataFrame[titleType: string, Action: double, Adult: double, Adventure: double, Animation: double, Biography: double, Comedy: double, Crime: double, Documentary: double, Drama: double, Family: double, Fantasy: double, Film-Noir: double, Game-Show: double, History: double, Horror: double, Music: double, Musical: double, Mystery: double, News: double, Reality-TV: double, Romance: double, Sci-Fi: double, Short: double, Sport: double, Talk-Show: double, Thriller: double, War: double, Western: double]

In [None]:
spark.catalog.clearCache()

In [None]:
spark

# Estratégias de Particionamento de Dados

### Bucketing

In [None]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [None]:
df_ratings.limit(5).toPandas()

Unnamed: 0,tconst,averageRating,numVotes
0,tt0000001,5.7,1809
1,tt0000002,6.0,233
2,tt0000003,6.5,1560
3,tt0000004,6.1,152
4,tt0000005,6.2,2383


In [None]:
df_titles.write.format('parquet').bucketBy(5, 'tconst').saveAsTable('title_basics')

In [None]:
df_ratings.write.format('parquet').bucketBy(5, 'tconst').saveAsTable('title_ratings')

In [None]:
df_titles_bucket = spark.sql('SELECT * FROM title_basics')

In [None]:
df_ratings_bucket = spark.sql('SELECT * FROM title_ratings')

In [None]:
%%time
df_titles.join(df_ratings, 'tconst').count()

CPU times: user 24.9 ms, sys: 3.4 ms, total: 28.3 ms
Wall time: 4.01 s


1182639

In [None]:
%%time
df_titles_bucket.join(df_ratings_bucket, 'tconst').count()

CPU times: user 26.8 ms, sys: 1.2 ms, total: 28 ms
Wall time: 3.56 s


1182639

### Partiticionando por Colunas

In [None]:
df_titles.filter('titleType = "short"').explain("formatted")

== Parsed Logical Plan ==
'Filter ('titleType = short)
+- Relation[tconst#28,titleType#29,primaryTitle#30,originalTitle#31,isAdult#32,startYear#33,endYear#34,runtimeMinutes#35,genres#36] parquet

== Analyzed Logical Plan ==
tconst: string, titleType: string, primaryTitle: string, originalTitle: string, isAdult: string, startYear: string, endYear: string, runtimeMinutes: string, genres: string
Filter (titleType#29 = short)
+- Relation[tconst#28,titleType#29,primaryTitle#30,originalTitle#31,isAdult#32,startYear#33,endYear#34,runtimeMinutes#35,genres#36] parquet

== Optimized Logical Plan ==
Filter (isnotnull(titleType#29) && (titleType#29 = short))
+- Relation[tconst#28,titleType#29,primaryTitle#30,originalTitle#31,isAdult#32,startYear#33,endYear#34,runtimeMinutes#35,genres#36] parquet

== Physical Plan ==
*(1) Project [tconst#28, titleType#29, primaryTitle#30, originalTitle#31, isAdult#32, startYear#33, endYear#34, runtimeMinutes#35, genres#36]
+- *(1) Filter (isnotnull(titleType#29) &&

In [None]:
(
    df_titles
    .write
    .format('parquet')
    .partitionBy('titleType')
    .save('drive/MyDrive/DataLake/df_titles_partitioned')
)

In [None]:
df_titles_partitions = spark.read.parquet('drive/MyDrive/DataLake/df_titles_partitioned')

In [None]:
df_titles_partitions.filter('titleType = "short"').explain("formatted")

== Parsed Logical Plan ==
'Filter ('titleType = short)
+- Relation[tconst#51648,primaryTitle#51649,originalTitle#51650,isAdult#51651,startYear#51652,endYear#51653,runtimeMinutes#51654,genres#51655,titleType#51656] parquet

== Analyzed Logical Plan ==
tconst: string, primaryTitle: string, originalTitle: string, isAdult: string, startYear: string, endYear: string, runtimeMinutes: string, genres: string, titleType: string
Filter (titleType#51656 = short)
+- Relation[tconst#51648,primaryTitle#51649,originalTitle#51650,isAdult#51651,startYear#51652,endYear#51653,runtimeMinutes#51654,genres#51655,titleType#51656] parquet

== Optimized Logical Plan ==
Filter (isnotnull(titleType#51656) && (titleType#51656 = short))
+- Relation[tconst#51648,primaryTitle#51649,originalTitle#51650,isAdult#51651,startYear#51652,endYear#51653,runtimeMinutes#51654,genres#51655,titleType#51656] parquet

== Physical Plan ==
*(1) FileScan parquet [tconst#51648,primaryTitle#51649,originalTitle#51650,isAdult#51651,start

In [None]:
(
    df_titles_partitions
    .filter('titleType = "short"')
    .filter('genres = "Action"')
    .explain("formatted")
)

== Parsed Logical Plan ==
'Filter ('genres = Action)
+- Filter (titleType#51656 = short)
   +- Relation[tconst#51648,primaryTitle#51649,originalTitle#51650,isAdult#51651,startYear#51652,endYear#51653,runtimeMinutes#51654,genres#51655,titleType#51656] parquet

== Analyzed Logical Plan ==
tconst: string, primaryTitle: string, originalTitle: string, isAdult: string, startYear: string, endYear: string, runtimeMinutes: string, genres: string, titleType: string
Filter (genres#51655 = Action)
+- Filter (titleType#51656 = short)
   +- Relation[tconst#51648,primaryTitle#51649,originalTitle#51650,isAdult#51651,startYear#51652,endYear#51653,runtimeMinutes#51654,genres#51655,titleType#51656] parquet

== Optimized Logical Plan ==
Filter (((isnotnull(titleType#51656) && isnotnull(genres#51655)) && (titleType#51656 = short)) && (genres#51655 = Action))
+- Relation[tconst#51648,primaryTitle#51649,originalTitle#51650,isAdult#51651,startYear#51652,endYear#51653,runtimeMinutes#51654,genres#51655,titleTyp

## Reparticionando DataFrames

In [None]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [None]:
df_titles.rdd.getNumPartitions()

12

In [None]:
df_titles.repartition(5).rdd.getNumPartitions()

5

In [None]:
df_titles.coalesce(5).rdd.getNumPartitions()

5

In [None]:
df_titles.repartition(5).explain('formatted')

== Physical Plan ==
Exchange (3)
+- * ColumnarToRow (2)
   +- Scan parquet  (1)


(1) Scan parquet 
Output [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]
Batched: true
Location: InMemoryFileIndex [file:/C:/Users/Pedro Toledo/Documents/igti/edc-mod3-igti/data/imdb/title_basics]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genres:string>

(2) ColumnarToRow [codegen id : 1]
Input [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]

(3) Exchange
Input [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]
Arguments: RoundRobinPartitioning(5), REPARTITION_WITH_NUM, [id=#1269]




1000000 MB / 50 = 20GB/partição

1000000 MB / 5 = 200GB/partição

In [None]:
df_titles.coalesce(5).explain('formatted')

== Physical Plan ==
Coalesce (3)
+- * ColumnarToRow (2)
   +- Scan parquet  (1)


(1) Scan parquet 
Output [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]
Batched: true
Location: InMemoryFileIndex [file:/C:/Users/Pedro Toledo/Documents/igti/edc-mod3-igti/data/imdb/title_basics]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genres:string>

(2) ColumnarToRow [codegen id : 1]
Input [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]

(3) Coalesce
Input [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]
Arguments: 5




In [None]:
df_titles.repartition(50).write.parquet('../data/imdb/df_titles_repartioned')

In [None]:
df_titles.coalesce(1).write.parquet('../data/imdb/df_titles_coalesced')

## Escolhendo o melhor tipo de join

* **Broadcast Hash Join (BHJ)**: a estratégia consiste em enviar os dados completos para cada um dos executores, de forma que só há necessidade de realizar o shuffle uma vez. O Spark costuma utilizar esse join automaticamente com base em algumas configurações, como o `spark.sql.autoBroadcastJoinThreshold`, que define o tamanho máximo do menor DataFrame para que esse método seja escolhido, mas é sempre interessante analisar cada situação e ter autonomia para indicar o seu uso;
* **Sort Merge Join (SMJ)**: é o algoritmo padrão do Spark, uma vez que o tamanho dos DataFrames não impacta na viabilidade do algoritmo. Nesse caso, os dados são enviados entre os executores via shuffle e os posteriormente ordenados, para que os dados estejam particionados corretamente e na mesma ordem;
* **Shuffle Hash Join (SHJ)**: é um algoritmo que também usa shuffles, mas compensa essa operação com o uso de um mapa de hash que exime a necessidade de ordenação dos dados. A única condição é que um dos DataFrames seja significativamente menor do que o outro, mas não tanto quanto o BHJ.


In [None]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [None]:
df_ratings.limit(5).toPandas()

Unnamed: 0,tconst,averageRating,numVotes
0,tt0000001,5.7,1759
1,tt0000002,6.0,223
2,tt0000003,6.5,1516
3,tt0000004,6.1,144
4,tt0000005,6.2,2330


In [None]:
df_titles.join(df_ratings.hint('broadcast'), 'tconst').explain('formatted')

== Physical Plan ==
* Project (9)
+- * BroadcastHashJoin Inner BuildRight (8)
   :- * Filter (3)
   :  +- * ColumnarToRow (2)
   :     +- Scan parquet  (1)
   +- BroadcastExchange (7)
      +- * Filter (6)
         +- * ColumnarToRow (5)
            +- Scan parquet  (4)


(1) Scan parquet 
Output [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]
Batched: true
Location: InMemoryFileIndex [file:/C:/Users/Pedro Toledo/Documents/igti/edc-mod3-igti/data/imdb/title_basics]
PushedFilters: [IsNotNull(tconst)]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genres:string>

(2) ColumnarToRow [codegen id : 2]
Input [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]

(3) Filter [codegen id : 2]
Input [9]: [tconst#0, titleType#1, primaryTitle

In [None]:
import time
import numpy as np

#### Sort Merge Join

In [None]:
times = []
for i in range(100):
    start = time.time()
    df_titles.join(df_ratings.hint('merge'), 'tconst').count()
    end = time.time()
    times.append(end - start)

print('Média: ', np.mean(times), '\n',
      'DP: ', np.std(times))

Média:  1.880633978843689 
 DP:  0.2220382350619025


#### Shuffled Hash Join

In [None]:
times = []
for i in range(100):
    start = time.time()
    df_titles.join(df_ratings.hint('shuffle_hash'), 'tconst').count()
    end = time.time()
    times.append(end - start)
print('Média: ', np.mean(times), '\n',
      'DP: ', np.std(times))

Média:  1.9259305787086487 
 DP:  0.447783649974506


#### Broadcast Join

In [None]:
times = []
for i in range(100):
    start = time.time()
    df_titles.join(df_ratings.hint('broadcast'), 'tconst').count()
    end = time.time()
    times.append(end - start)
print('Média: ', np.mean(times), '\n',
      'DP: ', np.std(times))

Média:  0.961837158203125 
 DP:  0.10518858795581905
