## Bibliotecas

In [32]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from os.path import abspath

In [33]:
warehouse_location = abspath('./data/spark-warehouse')

In [34]:
warehouse_location

'C:\\Users\\barba\\Desktop\\bootcamp-dados-igti\\semana02\\data\\spark-warehouse'

In [35]:
import findspark

findspark.init()

## Setup

In [36]:
spark = (
    SparkSession
    .builder
    .config("spark.driver.memory", '8g')
    .config("spark.sql.warehouse.dir", warehouse_location)
    .enableHiveSupport()
    .getOrCreate()
)

In [40]:
imdb_path = '../data/tp/'

In [42]:
df_titles = (spark.read
    .format('csv')
    .options(sep='\t', header=True)
    .load(imdb_path + 'title_basics.tsv')

    
)


In [43]:
df_titles.limit(5).show()

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|     \N|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|     \N|             5|     Animation,Short|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|     1892|     \N|             4|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|     \N|            12|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|     \N|             1|        Comedy

## Databases e Catalog

O catálogo de metadados do Spark pode ser acessado pelo objeto

`SparkSession.catalog` 

As principais funcionalidades são:

* `listDatabases()`: lista todas os databases disponíveis;
* `listTables()`: lista todas as tabelas disponíveis em um determinado database;
* `listFucntions()`: lista as funções disponíveis em um determinado database;
* `refreshTable()`: atualiza os metadados de uma determinada tabela
* `uncacheTable()`: remove uma tabela salva em memória
* `clearCache()`: remove todas as tabelas salvas em memória

In [44]:
spark.catalog

<pyspark.sql.catalog.Catalog at 0x1be37f71188>

In [45]:
spark.catalog.clearCache()

In [46]:
spark.catalog.listDatabases()

[Database(name='default', description='Default Hive database', locationUri='file:/E:/bootcamp-dados-igti/data/spark-warehouse')]

In [47]:
spark.catalog.listTables()

[Table(name='title_basics_managed', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='title_basics_unmanaged', database='default', description=None, tableType='EXTERNAL', isTemporary=False)]

Os databases do Spark são uma ferramenta para organizar tabelas. Eles podem e devem ser vistos como algo muito próximo dos databases de servidores de bancos de dados relacionais. O Spark utiliza por padrão um database chamado default, que serve para criar tabelas, views e realizar consultas caso o usuário não tenha definido o seu próprio. Um ponto importante é que essas estruturas persistem em diferentes sessões: se o usuário mudar de database, todas as tabelas permanecerão no database anterior e vão precisar ser consultadas de maneira diferente.

Existem alguns comandos do SQL importantes na hora de se trabalhar com databases. Else são:

* `SHOW DATABASES`: lista todas os databases disponíveis, de forma análoga ao Catalog ;
* `CREATE DATABASE <nome_do_db>`: cria um database
* `USE <nome_do_db>`: define o database como o atual para a realização de queries
    * **Obs**: ao se mudar de database, é possível acessar tabelas de um database anterior usando o prefixo “nome_do_db.” antes do nome da tabela. Exemplo:
        ```
        USE db2
        SELECT * FROM db1.table
        ```
* `SELECT current_database()`: retorna qual o database definido como o atual
* `DROP DATABASE IF EXISTS <nome_do_db>`: deleta determinado database dentre aqueles que foram definidos. Atenção: nunca delete o database default do Spark.


## Tabelas e Views

### Tabelas

* **Managed Tables**: o Spark administra tanto os dados quanto os metadados das tabelas, de forma que operações como DROP TABLE afetam também os dados escritos em disco;
* **Unmanaged Tables**: o Spark administra somente os metadados da tabela, e os dados escritos em disco não são alterados em nenhum momento

In [None]:
df_titles.limit(5).toPandas()

In [11]:
df_titles_sample = df_titles.sample(fraction = 0.1)

#### Criando Managed Tables

In [12]:
df_titles_sample.write.saveAsTable("title_basics_managed")

AnalysisException: Table `title_basics_managed` already exists.

`CREATE TABLE title_basics_managed (schema)`

#### Criando Unmanaged Tables

In [13]:
df_titles_sample.write.option('path', './data/spark-warehouse/title_basics_unmanaged').saveAsTable("title_basics_unmanaged")

AnalysisException: Table `title_basics_unmanaged` already exists.

`CREATE EXTERNAL TABLE title_basics_unmanaged (schema) 
 USING parquet OPTIONS (path '../data/imdb/title_basics_unmanaged')`

In [None]:
spark.catalog.listTables()

### Views

#### Criando Views

In [None]:
df_titles_sample.createOrReplaceTempView('title_basics_view')

`CREATE OR REPLACE TEMP VIEW AS title_basics_view
 SELECT * FROM <nome da tabela>`

#### Criando Views Globais

In [None]:
df_titles_sample.createOrReplaceGlobalTempView('title_basics_global_view')

`CREATE OR REPLACE GLOBAL TEMP VIEW AS title_basics_global_view
 SELECT * FROM <nome da tabela>`

In [None]:
spark.catalog.listTables()

#### Deletando Views

In [None]:
spark.catalog.dropTempView("title_basics_view")

In [None]:
spark.catalog.dropGlobalTempView("title_basics_global_view")

## Acessando a interface de queries

In [None]:
spark.sql('SHOW DATABASES').toPandas()

In [None]:
spark.sql("""
CREATE TABLE title_basics_managed 
(tconst STRING, 
 titleType STRING, 
 primaryTitle STRING,
 originalTitle STRING,
 isAdult STRING,
 startYear STRING, 
 endYear STRING,
 runtimeMinutes STRING,
 genres STRING)
""").toPandas()

In [None]:
spark.sql("""
            INSERT INTO title_basics_managed  SELECT * FROM default.title_basics_managed
          """
).toPandas()

In [None]:
spark.sql('DROP TABLE title_basics_managed').toPandas()

In [None]:
spark.sql('USE DEFAULT').toPandas()

In [None]:
spark.sql('DROP TABLE title_basics_unmanaged').toPandas()

In [None]:
spark.sql('SHOW TABLES').toPandas()

In [None]:
spark.sql('SELECT * FROM title_basics_view').count()

In [None]:
spark.sql('SELECT CAST(runTimeMinutes as INT) FROM title_basics_view')\
.withColumn('teste', f.col('runTimeMinutes') + 1).limit(5).toPandas()

In [None]:
df_titles.count()

## Configurando o Spark

SparkSession -> spark-submit -> spark-defaults.conf

In [48]:
import findspark

findspark.init()

In [49]:
from pyspark.sql import SparkSession
spark = (
    SparkSession
    
    .builder
    .config('spark.driver.memory', '8g')
    .getOrCreate()
)

In [50]:
spark.conf.get('spark.spark.serializer')

Py4JJavaError: An error occurred while calling o204.get.
: java.util.NoSuchElementException: spark.spark.serializer
	at org.apache.spark.sql.internal.SQLConf.$anonfun$getConfString$3(SQLConf.scala:3732)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.internal.SQLConf.getConfString(SQLConf.scala:3732)
	at org.apache.spark.sql.RuntimeConfig.get(RuntimeConfig.scala:71)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [53]:
spark.conf.get('spark.driver.memory')

'8g'

In [52]:
spark.conf.get('spark.sql.shuffle.partitions')

'200'

In [None]:
spark.conf.set('spark.sql.shuffle.partitions', 100)

In [51]:
spark.conf.get('spark.sql.shuffle.partitions')

'200'

* `spark.master`: seleciona o modo de deploy da aplicação Spark. Será tratado em mais detalhes no capítulo seguinte.
* `spark.driver.memory`: quantidade de memória atribuída para o driver da aplicação.
* `spark.executor.memory`: quantidade de memória atribuída para cada um dos executores da aplicação.
* `spark.serializer`: classe utilizada para realizar a serialização de objetos durante a execução. É recomendado utilizar o valor org.apache.spark.serializer.KryoSerializer para ganhar em velocidade de processamento, uma vez que chega a ser até 10x mais rápido que o default.
* `spark.executor.heartbeatInterval`:  intervalo entre notificações dos executores ao driver. Aumentar esse valor evita que a aplicação sofra com timeouts.
* `spark.sql.adaptive.enabled`: habilita o Adaptive Query Execution, programa que atualiza o plano de execução durante a execução, a partir de métricas coletadas durante o processo. Ativar essa configuração pode otimizar processamentos significativamente.
* `spark.sql.shuffle.partitions`: número padrão de partições utilizadas em shuffles de operações de junção (joins) e agregações (agg). 
* `spark.sql.broadcastTimeout`: tempo de timeout em segundos para operações de broadcast join, a serem tratadas no fim do capítulo.

### Tornando o Spark Escalável

* `spark.dynamicAllocation.enabled`: habilita o uso do recurso de dynamic allocation na aplicação.
* `spark.dynamicAllocation.executorIdleTimeout`: configura o tempo máximo de ociosidade de um executor até que o dynamic allocation o derrube.
* `spark.dynamicAllocation.initialExecutors`: quantidade inicial de executores na aplicação ao utilizar o dynamic allocation.
* `spark.dynamicAllocation.maxExecutors`: quantidade mínima de executores na aplicação ao utilizar o dynamic allocation.
* `spark.dynamicAllocation.minExecutors`: quantidade máxima de executores na aplicação ao utilizar o dynamic allocation.

## Persistência de Dados na Memória

In [None]:
df_titles_sample = df_titles.sample(fraction = 0.1)

In [None]:
df_titles_sample.limit(5).toPandas()

In [56]:
df_ratings = spark.read.format('csv').load('../data/tp/title_ratings.tsv', header=True, sep='\t')

In [57]:
df_ratings.limit(5).toPandas()

Unnamed: 0,tconst,averageRating,numVotes
0,tt0000001,5.7,1809
1,tt0000002,6.0,233
2,tt0000003,6.5,1560
3,tt0000004,6.1,152
4,tt0000005,6.2,2383


In [58]:
int_cols = ['startYear', 'endYear', 'runtimeMinutes', 'isAdult']
for c in int_cols:
    df_titles_sample = (
        df_titles_sample
        .withColumn(c, f.col(c).cast('int'))
    )
# Limpa os Strings
str_cols = ['primaryTitle', 'originalTitle', 'titleType']
for c in str_cols:
    df_titles_sample = (
        df_titles_sample
        .withColumn(c, f.initcap(f.trim(f.col(c))))
    )

AnalysisException: cannot resolve '`startYear`' given input columns: [];
'Project [cast('startYear as int) AS startYear#205]
+- Sample 0.0, 0.1, false, -155773922351212477
   +- Relation[] csv


In [59]:
df_join = (
    df_titles_sample
    .replace('\\N', None)
    .withColumn('genres', f.split(f.col('genres'), ','))
    .join(df_ratings, 'tconst', 'left')
)

AnalysisException: cannot resolve '`genres`' given input columns: [];
'Project [split('genres, ,, -1) AS genres#206]
+- Sample 0.0, 0.1, false, -155773922351212477
   +- Relation[] csv


In [None]:
df_join.limit(5).toPandas()

In [60]:
df_final = (
    df_join
    .withColumn('genres', f.explode(f.col('genres')))
    .groupBy('titleType')
    .pivot('genres')
    .agg(f.round(f.mean('averageRating'), 2))
    .fillna(0)
)

NameError: name 'df_join' is not defined

In [61]:
df_final.limit(5).toPandas()

NameError: name 'df_final' is not defined

In [None]:
df_final.explain('formatted')

In [None]:
%%time
df_final.limit(5).toPandas()

In [None]:
%%time
df_join.cache()
df_join.count()

In [None]:
df_final.cache()
df_final.count()

In [None]:
%%time
df_final.limit(5).toPandas()

#### Retirando dados da persistência em memória

In [62]:
spark.catalog.unpersistAll()

AttributeError: 'Catalog' object has no attribute 'unpersistAll'

In [None]:
spark


In [63]:
spark.catalog.clearCache()

## Estratégias de Particionamento de Dados

### Bucketing

In [64]:
df_ratings = spark.read.format('csv').load('../data/tp/title_ratings.tsv', header=True, sep='\t')

In [65]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [66]:
df_ratings.limit(5).toPandas()

Unnamed: 0,tconst,averageRating,numVotes
0,tt0000001,5.7,1809
1,tt0000002,6.0,233
2,tt0000003,6.5,1560
3,tt0000004,6.1,152
4,tt0000005,6.2,2383


In [67]:
df_titles.write.format('parquet').bucketBy(5, 'tconst').saveAsTable('title_basics')

In [68]:
df_ratings.write.format('parquet').bucketBy(5, 'tconst').saveAsTable('title_ratings')

In [26]:
df_titles_bucket = spark.sql('SELECT * FROM title_basics')

AnalysisException: Table or view not found: title_basics; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [title_basics], [], false


In [None]:
df_ratings_bucket = spark.sql('SELECT * FROM title_ratings')

In [None]:
%%time
df_titles.join(df_ratings, 'tconst').count()

In [None]:
%%time
df_titles_bucket.join(df_ratings_bucket, 'tconst').count()

### Partiticionando por Colunas

In [None]:
df_titles.filter('titleType = "short"').explain("formatted")

In [None]:
(
    df_titles
    .write
    .format('parquet')
    .partitionBy('titleType')
    .save('../data/imdb/df_titles_partitioned')
)

In [None]:
df_titles_partitions = spark.read.parquet('../data/imdb/df_titles_partitioned')

In [None]:
df_titles_partitions.filter('titleType = "short"').explain("formatted")

In [None]:
df_further_partitions = spark.read.parquet('../data/imdb/df_titles_partitioned_further')

In [None]:
(
    df_further_partitions
    .filter('titleType = "short"')
    .filter('genres = "Action"')
    .explain("formatted")
)

## Reparticionando DataFrames

In [None]:
df_titles.limit(5).toPandas()

In [None]:
df_titles.rdd.getNumPartitions()

In [None]:
df_titles.repartition(5).rdd.getNumPartitions()

In [None]:
df_titles.coalesce(5).rdd.getNumPartitions()

In [None]:
df_titles.repartition(5).explain('formatted')

1000000 MB / 50 = 20GB/partição

1000000 MB / 5 = 200GB/partição

In [None]:
df_titles.coalesce(5).explain('formatted')

In [27]:
df_titles.repartition(50).write.parquet('../data/imdb/df_titles_repartioned')

AnalysisException: 
Datasource does not support writing empty or nested empty schemas.
Please make sure the data schema has at least one or more column(s).
         

In [None]:
df_titles.coalesce(1).write.parquet('../data/imdb/df_titles_coalesced')

## Escolhendo o melhor tipo de join

* **Broadcast Hash Join (BHJ)**: a estratégia consiste em enviar os dados completos para cada um dos executores, de forma que só há necessidade de realizar o shuffle uma vez. O Spark costuma utilizar esse join automaticamente com base em algumas configurações, como o `spark.sql.autoBroadcastJoinThreshold`, que define o tamanho máximo do menor DataFrame para que esse método seja escolhido, mas é sempre interessante analisar cada situação e ter autonomia para indicar o seu uso;
* **Sort Merge Join (SMJ)**: é o algoritmo padrão do Spark, uma vez que o tamanho dos DataFrames não impacta na viabilidade do algoritmo. Nesse caso, os dados são enviados entre os executores via shuffle e os posteriormente ordenados, para que os dados estejam particionados corretamente e na mesma ordem;
* **Shuffle Hash Join (SHJ)**: é um algoritmo que também usa shuffles, mas compensa essa operação com o uso de um mapa de hash que exime a necessidade de ordenação dos dados. A única condição é que um dos DataFrames seja significativamente menor do que o outro, mas não tanto quanto o BHJ.


In [None]:
df_titles.limit(5).toPandas()

In [None]:
df_ratings.limit(5).toPandas()

In [None]:
df_titles.join(df_ratings.hint('broadcast'), 'tconst').explain('formatted')

In [None]:
import time
import numpy as np

#### Sort Merge Join

In [None]:
times = []
for i in range(100):
    start = time.time()
    df_titles.join(df_ratings.hint('merge'), 'tconst').count()
    end = time.time()
    times.append(end - start)

print('Média: ', np.mean(times), '\n',
      'DP: ', np.std(times))

#### Shuffled Hash Join

In [None]:
times = []
for i in range(100):
    start = time.time()
    df_titles.join(df_ratings.hint('shuffle_hash'), 'tconst').count()
    end = time.time()
    times.append(end - start)
print('Média: ', np.mean(times), '\n',
      'DP: ', np.std(times))

#### Broadcast Join

In [None]:
times = []
for i in range(100):
    start = time.time()
    df_titles.join(df_ratings.hint('broadcast'), 'tconst').count()
    end = time.time()
    times.append(end - start)
print('Média: ', np.mean(times), '\n',
      'DP: ', np.std(times))