## Bibliotecas

In [None]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

In [None]:
from google.colab import drive

drive.mount("/content/gdrive")

Mounted at /content/gdrive


In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from os.path import abspath

In [None]:
import findspark

findspark.init()

In [None]:
warehouse_location = abspath('../data/spark-warehouse')

## Setup

In [None]:
spark = (
    SparkSession
    .builder
    .config("spark.driver.memory", '8g')
    .config("spark.sql.warehouse.dir", warehouse_location)
    .enableHiveSupport()
    .getOrCreate()
)

In [None]:
# df_titles = spark.read.parquet('../data/imdb/title_basics')
df_titles = spark.read.format("parquet").load('/content/gdrive/MyDrive/pyspark/data/imdb/title_basics')

## Databases e Catalog

O catálogo de metadados do Spark pode ser acessado pelo objeto

`SparkSession.catalog` 

As principais funcionalidades são:

* `listDatabases()`: lista todas os databases disponíveis;
* `listTables()`: lista todas as tabelas disponíveis em um determinado database;
* `listFucntions()`: lista as funções disponíveis em um determinado database;
* `refreshTable()`: atualiza os metadados de uma determinada tabela
* `uncacheTable()`: remove uma tabela salva em memória
* `clearCache()`: remove todas as tabelas salvas em memória

In [None]:
spark.catalog

<pyspark.sql.catalog.Catalog at 0x7f513e2d7b90>

In [None]:
spark.catalog.clearCache()

In [39]:
spark.catalog.listDatabases()

[Database(name='default', description='Default Hive database', locationUri='file:/data/spark-warehouse')]

In [None]:
spark.catalog.listTables()

[]

Os databases do Spark são uma ferramenta para organizar tabelas. Eles podem e devem ser vistos como algo muito próximo dos databases de servidores de bancos de dados relacionais. O Spark utiliza por padrão um database chamado default, que serve para criar tabelas, views e realizar consultas caso o usuário não tenha definido o seu próprio. Um ponto importante é que essas estruturas persistem em diferentes sessões: se o usuário mudar de database, todas as tabelas permanecerão no database anterior e vão precisar ser consultadas de maneira diferente.

Existem alguns comandos do SQL importantes na hora de se trabalhar com databases. Else são:

* `SHOW DATABASES`: lista todas os databases disponíveis, de forma análoga ao Catalog ;
* `CREATE DATABASE <nome_do_db>`: cria um database
* `USE <nome_do_db>`: define o database como o atual para a realização de queries
    * **Obs**: ao se mudar de database, é possível acessar tabelas de um database anterior usando o prefixo “nome_do_db.” antes do nome da tabela. Exemplo:
        ```
        USE db2
        SELECT * FROM db1.table
        ```
* `SELECT current_database()`: retorna qual o database definido como o atual
* `DROP DATABASE IF EXISTS <nome_do_db>`: deleta determinado database dentre aqueles que foram definidos. Atenção: nunca delete o database default do Spark.


## Tabelas e Views

### Tabelas

* **Managed Tables**: o Spark administra tanto os dados quanto os metadados das tabelas, de forma que operações como DROP TABLE afetam também os dados escritos em disco;
* **Unmanaged Tables**: o Spark administra somente os metadados da tabela, e os dados escritos em disco não são alterados em nenhum momento

In [None]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [None]:
df_titles_sample = df_titles.sample(fraction = 0.1)

#### Criando Managed Tables

In [None]:
df_titles_sample.write.saveAsTable("title_basics_managed")

`CREATE TABLE title_basics_managed (schema)`

#### Criando Unmanaged Tables

In [None]:
df_titles_sample.write.option('path', '../data/imdb/title_basics_unmanaged').saveAsTable("title_basics_unmanaged")

`CREATE EXTERNAL TABLE title_basics_unmanaged (schema) 
 USING parquet OPTIONS (path '../data/imdb/title_basics_unmanaged')`

In [None]:
spark.catalog.listTables()

[Table(name='title_basics_managed', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='title_basics_unmanaged', database='default', description=None, tableType='EXTERNAL', isTemporary=False)]

### Views

#### Criando Views

In [None]:
df_titles_sample.createOrReplaceTempView('title_basics_view')

`CREATE OR REPLACE TEMP VIEW AS title_basics_view
 SELECT * FROM <nome da tabela>`

#### Criando Views Globais

In [None]:
df_titles_sample.createOrReplaceGlobalTempView('title_basics_global_view')

`CREATE OR REPLACE GLOBAL TEMP VIEW AS title_basics_global_view
 SELECT * FROM <nome da tabela>`

In [None]:
spark.catalog.listTables()

[Table(name='title_basics_managed', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='title_basics_unmanaged', database='default', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='title_basics_view', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

#### Deletando Views

In [None]:
spark.catalog.dropTempView("title_basics_view")

In [None]:
spark.catalog.dropGlobalTempView("title_basics_global_view")

## Acessando a interface de queries

In [None]:
spark.sql('SHOW DATABASES').toPandas()

Unnamed: 0,databaseName
0,default


In [40]:
spark.sql("""
CREATE TABLE title_basics_managed 
(tconst STRING, 
 titleType STRING, 
 primaryTitle STRING,
 originalTitle STRING,
 isAdult STRING,
 startYear STRING, 
 endYear STRING,
 runtimeMinutes STRING,
 genres STRING)
""").toPandas()

In [41]:
spark.sql("""
            INSERT INTO title_basics_managed  SELECT * FROM default.title_basics_managed
          """
).toPandas()

In [None]:
spark.sql('DROP TABLE title_basics_managed').toPandas()

In [None]:
spark.sql('USE DEFAULT').toPandas()

In [None]:
spark.sql('DROP TABLE title_basics_unmanaged').toPandas()

In [None]:
spark.sql('SHOW TABLES').toPandas()

In [None]:
df_titles.count()

8135937

## Configurando o Spark

SparkSession -> spark-submit -> spark-defaults.conf

In [None]:
import findspark

findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .config('spark.driver.memory', '8g')
    .getOrCreate()
)

In [None]:
spark.conf.get('spark.driver.memory')

'8g'

In [None]:
spark.conf.get('spark.sql.shuffle.partitions')

'200'

In [None]:
spark.conf.set('spark.sql.shuffle.partitions', 100)

In [None]:
spark.conf.get('spark.sql.shuffle.partitions')

'100'

* `spark.master`: seleciona o modo de deploy da aplicação Spark. Será tratado em mais detalhes no capítulo seguinte.
* `spark.driver.memory`: quantidade de memória atribuída para o driver da aplicação.
* `spark.executor.memory`: quantidade de memória atribuída para cada um dos executores da aplicação.
* `spark.serializer`: classe utilizada para realizar a serialização de objetos durante a execução. É recomendado utilizar o valor org.apache.spark.serializer.KryoSerializer para ganhar em velocidade de processamento, uma vez que chega a ser até 10x mais rápido que o default.
* `spark.executor.heartbeatInterval`:  intervalo entre notificações dos executores ao driver. Aumentar esse valor evita que a aplicação sofra com timeouts.
* `spark.sql.adaptive.enabled`: habilita o Adaptive Query Execution, programa que atualiza o plano de execução durante a execução, a partir de métricas coletadas durante o processo. Ativar essa configuração pode otimizar processamentos significativamente.
* `spark.sql.shuffle.partitions`: número padrão de partições utilizadas em shuffles de operações de junção (joins) e agregações (agg). 
* `spark.sql.broadcastTimeout`: tempo de timeout em segundos para operações de broadcast join, a serem tratadas no fim do capítulo.

### Tornando o Spark Escalável

* `spark.dynamicAllocation.enabled`: habilita o uso do recurso de dynamic allocation na aplicação.
* `spark.dynamicAllocation.executorIdleTimeout`: configura o tempo máximo de ociosidade de um executor até que o dynamic allocation o derrube.
* `spark.dynamicAllocation.initialExecutors`: quantidade inicial de executores na aplicação ao utilizar o dynamic allocation.
* `spark.dynamicAllocation.maxExecutors`: quantidade mínima de executores na aplicação ao utilizar o dynamic allocation.
* `spark.dynamicAllocation.minExecutors`: quantidade máxima de executores na aplicação ao utilizar o dynamic allocation.

## Persistência de Dados na Memória

In [44]:
df_titles_sample = df_titles.sample(fraction = 0.1)

In [45]:
df_titles_sample.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000015,short,Autour d'une cabine,Autour d'une cabine,0,1894,\N,2,"Animation,Short"
1,tt0000020,short,The Derby 1895,The Derby 1895,0,1895,\N,1,"Documentary,Short,Sport"
2,tt0000028,short,Fishing for Goldfish,La pêche aux poissons rouges,0,1895,\N,1,"Documentary,Short"
3,tt0000036,short,Awakening of Rip,Awakening of Rip,0,1896,\N,0,"Drama,Short"
4,tt0000040,short,Barque sortant du port de Trouville,Barque sortant du port de Trouville,0,1896,\N,\N,"Documentary,Short"


In [46]:
# df_ratings = spark.read.format('parquet').load('../data/imdb/title_ratings')
df_ratings = spark.read.format("parquet").load('/content/gdrive/MyDrive/pyspark/data/imdb/title_ratings')

In [47]:
df_ratings.limit(5).toPandas()

Unnamed: 0,tconst,averageRating,numVotes
0,tt0000001,5.7,1759
1,tt0000002,6.0,223
2,tt0000003,6.5,1516
3,tt0000004,6.1,144
4,tt0000005,6.2,2330


In [48]:
int_cols = ['startYear', 'endYear', 'runtimeMinutes', 'isAdult']
for c in int_cols:
    df_titles_sample = (
        df_titles_sample
        .withColumn(c, f.col(c).cast('int'))
    )
# Limpa os Strings
str_cols = ['primaryTitle', 'originalTitle', 'titleType']
for c in str_cols:
    df_titles_sample = (
        df_titles_sample
        .withColumn(c, f.initcap(f.trim(f.col(c))))
    )

In [49]:
df_join = (
    df_titles_sample
    .replace('\\N', None)
    .withColumn('genres', f.split(f.col('genres'), ','))
    .join(df_ratings, 'tconst', 'left')
)

In [50]:
df_join.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres,averageRating,numVotes
0,tt0000015,Short,Autour D'une Cabine,Autour D'une Cabine,0,1894,,2.0,"[Animation, Short]",6.2,914
1,tt0000020,Short,The Derby 1895,The Derby 1895,0,1895,,1.0,"[Documentary, Short, Sport]",5.0,299
2,tt0000028,Short,Fishing For Goldfish,La Pêche Aux Poissons Rouges,0,1895,,1.0,"[Documentary, Short]",5.2,930
3,tt0000036,Short,Awakening Of Rip,Awakening Of Rip,0,1896,,0.0,"[Drama, Short]",4.4,542
4,tt0000040,Short,Barque Sortant Du Port De Trouville,Barque Sortant Du Port De Trouville,0,1896,,,"[Documentary, Short]",4.3,47


In [51]:
df_final = (
    df_join
    .withColumn('genres', f.explode(f.col('genres')))
    .groupBy('titleType')
    .pivot('genres')
    .agg(f.round(f.mean('averageRating'), 2))
    .fillna(0)
)

In [52]:
df_final.limit(5).toPandas()

Unnamed: 0,titleType,Action,Adult,Adventure,Animation,Biography,Comedy,Crime,Documentary,Drama,...,News,Reality-TV,Romance,Sci-Fi,Short,Sport,Talk-Show,Thriller,War,Western
0,Tvmovie,5.73,5.4,6.03,6.52,6.89,6.47,6.3,7.11,6.59,...,6.8,5.91,6.17,5.29,5.8,6.94,5.0,5.65,7.03,6.5
1,Tvseries,6.97,6.28,7.06,6.76,7.35,6.79,7.0,7.3,7.04,...,6.28,6.12,6.94,6.81,6.89,7.02,6.2,7.27,7.47,7.33
2,Tvepisode,7.47,6.31,7.37,7.31,7.35,7.36,7.56,7.51,7.63,...,6.69,7.03,7.56,7.47,7.04,7.03,6.76,7.48,7.79,7.7
3,Video,6.03,6.5,5.9,6.35,7.24,6.39,5.95,7.09,6.27,...,5.98,6.82,6.19,6.14,6.77,7.09,7.29,5.05,7.0,5.99
4,Videogame,7.01,0.0,7.26,7.24,4.5,7.23,7.34,4.5,7.64,...,0.0,0.0,7.17,7.04,0.0,6.64,0.0,6.39,7.07,7.97


In [53]:
df_final.explain('formatted')

== Parsed Logical Plan ==
'Project [titleType#259, coalesce(nanvl(Action#380, null), cast(0.0 as double)) AS Action#466, coalesce(nanvl(Adult#381, null), cast(0.0 as double)) AS Adult#467, coalesce(nanvl(Adventure#382, null), cast(0.0 as double)) AS Adventure#468, coalesce(nanvl(Animation#383, null), cast(0.0 as double)) AS Animation#469, coalesce(nanvl(Biography#384, null), cast(0.0 as double)) AS Biography#470, coalesce(nanvl(Comedy#385, null), cast(0.0 as double)) AS Comedy#471, coalesce(nanvl(Crime#386, null), cast(0.0 as double)) AS Crime#472, coalesce(nanvl(Documentary#387, null), cast(0.0 as double)) AS Documentary#473, coalesce(nanvl(Drama#388, null), cast(0.0 as double)) AS Drama#474, coalesce(nanvl(Family#389, null), cast(0.0 as double)) AS Family#475, coalesce(nanvl(Fantasy#390, null), cast(0.0 as double)) AS Fantasy#476, coalesce(nanvl(Film-Noir#391, null), cast(0.0 as double)) AS Film-Noir#477, coalesce(nanvl(Game-Show#392, null), cast(0.0 as double)) AS Game-Show#478, coa

In [54]:
%%time
df_final.limit(5).toPandas()

CPU times: user 67.4 ms, sys: 9.82 ms, total: 77.3 ms
Wall time: 8.45 s


Unnamed: 0,titleType,Action,Adult,Adventure,Animation,Biography,Comedy,Crime,Documentary,Drama,...,News,Reality-TV,Romance,Sci-Fi,Short,Sport,Talk-Show,Thriller,War,Western
0,Tvmovie,5.73,5.4,6.03,6.52,6.89,6.47,6.3,7.11,6.59,...,6.8,5.91,6.17,5.29,5.8,6.94,5.0,5.65,7.03,6.5
1,Tvseries,6.97,6.28,7.06,6.76,7.35,6.79,7.0,7.3,7.04,...,6.28,6.12,6.94,6.81,6.89,7.02,6.2,7.27,7.47,7.33
2,Tvepisode,7.47,6.31,7.37,7.31,7.35,7.36,7.56,7.51,7.63,...,6.69,7.03,7.56,7.47,7.04,7.03,6.76,7.48,7.79,7.7
3,Video,6.03,6.5,5.9,6.35,7.24,6.39,5.95,7.09,6.27,...,5.98,6.82,6.19,6.14,6.77,7.09,7.29,5.05,7.0,5.99
4,Videogame,7.01,0.0,7.26,7.24,4.5,7.23,7.34,4.5,7.64,...,0.0,0.0,7.17,7.04,0.0,6.64,0.0,6.39,7.07,7.97


In [55]:
%%time
df_join.cache()
df_join.count()

CPU times: user 105 ms, sys: 14.4 ms, total: 120 ms
Wall time: 16.7 s


In [56]:
df_final.cache()
df_final.count()

10

In [57]:
%%time
df_final.limit(5).toPandas()

CPU times: user 12.2 ms, sys: 647 µs, total: 12.8 ms
Wall time: 485 ms


Unnamed: 0,titleType,Action,Adult,Adventure,Animation,Biography,Comedy,Crime,Documentary,Drama,...,News,Reality-TV,Romance,Sci-Fi,Short,Sport,Talk-Show,Thriller,War,Western
0,Tvmovie,5.73,5.4,6.03,6.52,6.89,6.47,6.3,7.11,6.59,...,6.8,5.91,6.17,5.29,5.8,6.94,5.0,5.65,7.03,6.5
1,Tvseries,6.97,6.28,7.06,6.76,7.35,6.79,7.0,7.3,7.04,...,6.28,6.12,6.94,6.81,6.89,7.02,6.2,7.27,7.47,7.33
2,Tvepisode,7.47,6.31,7.37,7.31,7.35,7.36,7.56,7.51,7.63,...,6.69,7.03,7.56,7.47,7.04,7.03,6.76,7.48,7.79,7.7
3,Video,6.03,6.5,5.9,6.35,7.24,6.39,5.95,7.09,6.27,...,5.98,6.82,6.19,6.14,6.77,7.09,7.29,5.05,7.0,5.99
4,Videogame,7.01,0.0,7.26,7.24,4.5,7.23,7.34,4.5,7.64,...,0.0,0.0,7.17,7.04,0.0,6.64,0.0,6.39,7.07,7.97


#### Retirando dados da persistência em memória

In [58]:
df_final.unpersist()

DataFrame[titleType: string, Action: double, Adult: double, Adventure: double, Animation: double, Biography: double, Comedy: double, Crime: double, Documentary: double, Drama: double, Family: double, Fantasy: double, Film-Noir: double, Game-Show: double, History: double, Horror: double, Music: double, Musical: double, Mystery: double, News: double, Reality-TV: double, Romance: double, Sci-Fi: double, Short: double, Sport: double, Talk-Show: double, Thriller: double, War: double, Western: double]

In [59]:
spark.catalog.clearCache()

## Estratégias de Particionamento de Dados

### Bucketing

In [60]:
df_ratings = spark.read.format("parquet").load('/content/gdrive/MyDrive/pyspark/data/imdb/title_ratings')

In [61]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [62]:
df_ratings.limit(5).toPandas()

Unnamed: 0,tconst,averageRating,numVotes
0,tt0000001,5.7,1759
1,tt0000002,6.0,223
2,tt0000003,6.5,1516
3,tt0000004,6.1,144
4,tt0000005,6.2,2330


In [63]:
df_titles.write.format('parquet').bucketBy(5, 'tconst').saveAsTable('title_basics')

In [64]:
df_ratings.write.format('parquet').bucketBy(5, 'tconst').saveAsTable('title_ratings')

In [65]:
df_titles_bucket = spark.sql('SELECT * FROM title_basics')

In [66]:
df_ratings_bucket = spark.sql('SELECT * FROM title_ratings')

In [67]:
%%time
df_titles.join(df_ratings, 'tconst').count()

CPU times: user 37.5 ms, sys: 4.28 ms, total: 41.8 ms
Wall time: 4.12 s


1174232

In [68]:
%%time
df_titles_bucket.join(df_ratings_bucket, 'tconst').count()

CPU times: user 32.1 ms, sys: 6.22 ms, total: 38.3 ms
Wall time: 3.73 s


1174232

### Partiticionando por Colunas

In [69]:
df_titles.filter('titleType = "short"').explain("formatted")

== Parsed Logical Plan ==
'Filter ('titleType = short)
+- Relation[tconst#0,titleType#1,primaryTitle#2,originalTitle#3,isAdult#4,startYear#5,endYear#6,runtimeMinutes#7,genres#8] parquet

== Analyzed Logical Plan ==
tconst: string, titleType: string, primaryTitle: string, originalTitle: string, isAdult: string, startYear: string, endYear: string, runtimeMinutes: string, genres: string
Filter (titleType#1 = short)
+- Relation[tconst#0,titleType#1,primaryTitle#2,originalTitle#3,isAdult#4,startYear#5,endYear#6,runtimeMinutes#7,genres#8] parquet

== Optimized Logical Plan ==
Filter (isnotnull(titleType#1) && (titleType#1 = short))
+- Relation[tconst#0,titleType#1,primaryTitle#2,originalTitle#3,isAdult#4,startYear#5,endYear#6,runtimeMinutes#7,genres#8] parquet

== Physical Plan ==
*(1) Project [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]
+- *(1) Filter (isnotnull(titleType#1) && (titleType#1 = short))
   +- *(1) FileS

In [70]:
(
    df_titles
    .write
    .format('parquet')
    .partitionBy('titleType')
    .save('../data/imdb/df_titles_partitioned')
)

In [71]:
df_titles_partitions = spark.read.parquet('../data/imdb/df_titles_partitioned')

In [72]:
df_titles_partitions.filter('titleType = "short"').explain("formatted")

== Parsed Logical Plan ==
'Filter ('titleType = short)
+- Relation[tconst#35890,primaryTitle#35891,originalTitle#35892,isAdult#35893,startYear#35894,endYear#35895,runtimeMinutes#35896,genres#35897,titleType#35898] parquet

== Analyzed Logical Plan ==
tconst: string, primaryTitle: string, originalTitle: string, isAdult: string, startYear: string, endYear: string, runtimeMinutes: string, genres: string, titleType: string
Filter (titleType#35898 = short)
+- Relation[tconst#35890,primaryTitle#35891,originalTitle#35892,isAdult#35893,startYear#35894,endYear#35895,runtimeMinutes#35896,genres#35897,titleType#35898] parquet

== Optimized Logical Plan ==
Filter (isnotnull(titleType#35898) && (titleType#35898 = short))
+- Relation[tconst#35890,primaryTitle#35891,originalTitle#35892,isAdult#35893,startYear#35894,endYear#35895,runtimeMinutes#35896,genres#35897,titleType#35898] parquet

== Physical Plan ==
*(1) FileScan parquet [tconst#35890,primaryTitle#35891,originalTitle#35892,isAdult#35893,start

## Reparticionando DataFrames

In [74]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [75]:
df_titles.rdd.getNumPartitions()

3

In [76]:
df_titles.repartition(5).rdd.getNumPartitions()

5

In [77]:
df_titles.coalesce(5).rdd.getNumPartitions()

3

In [78]:
df_titles.repartition(5).explain('formatted')

== Parsed Logical Plan ==
Repartition 5, true
+- Relation[tconst#0,titleType#1,primaryTitle#2,originalTitle#3,isAdult#4,startYear#5,endYear#6,runtimeMinutes#7,genres#8] parquet

== Analyzed Logical Plan ==
tconst: string, titleType: string, primaryTitle: string, originalTitle: string, isAdult: string, startYear: string, endYear: string, runtimeMinutes: string, genres: string
Repartition 5, true
+- Relation[tconst#0,titleType#1,primaryTitle#2,originalTitle#3,isAdult#4,startYear#5,endYear#6,runtimeMinutes#7,genres#8] parquet

== Optimized Logical Plan ==
Repartition 5, true
+- Relation[tconst#0,titleType#1,primaryTitle#2,originalTitle#3,isAdult#4,startYear#5,endYear#6,runtimeMinutes#7,genres#8] parquet

== Physical Plan ==
Exchange RoundRobinPartitioning(5)
+- *(1) FileScan parquet [tconst#0,titleType#1,primaryTitle#2,originalTitle#3,isAdult#4,startYear#5,endYear#6,runtimeMinutes#7,genres#8] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/content/gdrive/MyDrive/pyspark/

1000000 MB / 50 = 20GB/partição

1000000 MB / 5 = 200GB/partição

In [79]:
df_titles.coalesce(5).explain('formatted')

== Parsed Logical Plan ==
Repartition 5, false
+- Relation[tconst#0,titleType#1,primaryTitle#2,originalTitle#3,isAdult#4,startYear#5,endYear#6,runtimeMinutes#7,genres#8] parquet

== Analyzed Logical Plan ==
tconst: string, titleType: string, primaryTitle: string, originalTitle: string, isAdult: string, startYear: string, endYear: string, runtimeMinutes: string, genres: string
Repartition 5, false
+- Relation[tconst#0,titleType#1,primaryTitle#2,originalTitle#3,isAdult#4,startYear#5,endYear#6,runtimeMinutes#7,genres#8] parquet

== Optimized Logical Plan ==
Repartition 5, false
+- Relation[tconst#0,titleType#1,primaryTitle#2,originalTitle#3,isAdult#4,startYear#5,endYear#6,runtimeMinutes#7,genres#8] parquet

== Physical Plan ==
Coalesce 5
+- *(1) FileScan parquet [tconst#0,titleType#1,primaryTitle#2,originalTitle#3,isAdult#4,startYear#5,endYear#6,runtimeMinutes#7,genres#8] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/content/gdrive/MyDrive/pyspark/data/imdb/title_basic

In [80]:
df_titles.repartition(50).write.parquet('../data/imdb/df_titles_repartioned')

In [81]:
df_titles.coalesce(1).write.parquet('../data/imdb/df_titles_coalesced')

## Escolhendo o melhor tipo de join

* **Broadcast Hash Join (BHJ)**: a estratégia consiste em enviar os dados completos para cada um dos executores, de forma que só há necessidade de realizar o shuffle uma vez. O Spark costuma utilizar esse join automaticamente com base em algumas configurações, como o `spark.sql.autoBroadcastJoinThreshold`, que define o tamanho máximo do menor DataFrame para que esse método seja escolhido, mas é sempre interessante analisar cada situação e ter autonomia para indicar o seu uso;
* **Sort Merge Join (SMJ)**: é o algoritmo padrão do Spark, uma vez que o tamanho dos DataFrames não impacta na viabilidade do algoritmo. Nesse caso, os dados são enviados entre os executores via shuffle e os posteriormente ordenados, para que os dados estejam particionados corretamente e na mesma ordem;
* **Shuffle Hash Join (SHJ)**: é um algoritmo que também usa shuffles, mas compensa essa operação com o uso de um mapa de hash que exime a necessidade de ordenação dos dados. A única condição é que um dos DataFrames seja significativamente menor do que o outro, mas não tanto quanto o BHJ.


In [82]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [83]:
df_ratings.limit(5).toPandas()

Unnamed: 0,tconst,averageRating,numVotes
0,tt0000001,5.7,1759
1,tt0000002,6.0,223
2,tt0000003,6.5,1516
3,tt0000004,6.1,144
4,tt0000005,6.2,2330


In [84]:
df_titles.join(df_ratings.hint('broadcast'), 'tconst').explain('formatted')

== Parsed Logical Plan ==
'Join UsingJoin(Inner,Buffer(tconst))
:- Relation[tconst#0,titleType#1,primaryTitle#2,originalTitle#3,isAdult#4,startYear#5,endYear#6,runtimeMinutes#7,genres#8] parquet
+- ResolvedHint (broadcast)
   +- Relation[tconst#35785,averageRating#35786,numVotes#35787] parquet

== Analyzed Logical Plan ==
tconst: string, titleType: string, primaryTitle: string, originalTitle: string, isAdult: string, startYear: string, endYear: string, runtimeMinutes: string, genres: string, averageRating: string, numVotes: string
Project [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8, averageRating#35786, numVotes#35787]
+- Join Inner, (tconst#0 = tconst#35785)
   :- Relation[tconst#0,titleType#1,primaryTitle#2,originalTitle#3,isAdult#4,startYear#5,endYear#6,runtimeMinutes#7,genres#8] parquet
   +- ResolvedHint (broadcast)
      +- Relation[tconst#35785,averageRating#35786,numVotes#35787] parquet

== Optimized Log

In [85]:
import time
import numpy as np

#### Sort Merge Join

In [86]:
times = []
for i in range(100):
    start = time.time()
    df_titles.join(df_ratings.hint('merge'), 'tconst').count()
    end = time.time()
    times.append(end - start)

print('Média: ', np.mean(times), '\n',
      'DP: ', np.std(times))

Média:  3.5233033323287963 
 DP:  0.33850659415565226


#### Shuffled Hash Join

In [None]:
times = []
for i in range(100):
    start = time.time()
    df_titles.join(df_ratings.hint('shuffle_hash'), 'tconst').count()
    end = time.time()
    times.append(end - start)
print('Média: ', np.mean(times), '\n',
      'DP: ', np.std(times))

Média:  3.400471501350403 
 DP:  0.28339930071275393


#### Broadcast Join

In [None]:
times = []
for i in range(100):
    start = time.time()
    df_titles.join(df_ratings.hint('broadcast'), 'tconst').count()
    end = time.time()
    times.append(end - start)
print('Média: ', np.mean(times), '\n',
      'DP: ', np.std(times))

Média:  3.445064709186554 
 DP:  0.3599514298278037
