# Manipulação de Dados - Parte 2

##### Joins
##### Explode
##### Collect List
##### Case When
##### Pivot
##### Window

# Configurações Iniciais

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

spark = SparkSession.builder.getOrCreate()

# Import das Bases

In [2]:
dfPokemon = (spark.read.format('json')
             .load('/home/jovyan/files/pokemons.json'))

dfTypes = (spark.read.format('json')
           .load('/home/jovyan/files/types.json'))

# Conhecendo as Bases

In [None]:
possuo 2 dataframes pyspark onde o primeiro contem o seguinte schema: "altura: bigint, experiencia: bigint, formas: array<struct<name:string,url:string>>, id: bigint, nome: string, peso: bigint, type: string" e o segundo possui o seguinte schema: "moves: array<struct<name:string,url:string>>, name: string", gere pra mim 10 exercicios praticos utilizando: "join, explode, collect_list, when, pivot e Window"

In [56]:
dfTypes.printSchema

<bound method DataFrame.printSchema of DataFrame[moves: array<struct<name:string,url:string>>, name: string]>

In [3]:
dfPokemon.show(10, False)

+------+-----------+---------------------------------------------------------+---+----------+----+-----+
|altura|experiencia|formas                                                   |id |nome      |peso|type |
+------+-----------+---------------------------------------------------------+---+----------+----+-----+
|7     |64         |[{bulbasaur, https://pokeapi.co/api/v2/pokemon-form/1/}] |1  |bulbasaur |69  |grass|
|10    |142        |[{ivysaur, https://pokeapi.co/api/v2/pokemon-form/2/}]   |2  |ivysaur   |130 |grass|
|20    |263        |[{venusaur, https://pokeapi.co/api/v2/pokemon-form/3/}]  |3  |venusaur  |1000|grass|
|6     |62         |[{charmander, https://pokeapi.co/api/v2/pokemon-form/4/}]|4  |charmander|85  |fire |
|11    |142        |[{charmeleon, https://pokeapi.co/api/v2/pokemon-form/5/}]|5  |charmeleon|190 |fire |
|17    |267        |[{charizard, https://pokeapi.co/api/v2/pokemon-form/6/}] |6  |charizard |905 |fire |
|5     |63         |[{squirtle, https://pokeapi.co/api/

In [4]:
dfTypes.show(10)

+--------------------+--------+
|               moves|    name|
+--------------------+--------+
|[{pound, https://...|  normal|
|[{karate-chop, ht...|fighting|
|[{twineedle, http...|     bug|
|[{night-shade, ht...|   ghost|
|[{steel-wing, htt...|   steel|
|[{fire-punch, htt...|    fire|
|[{thunder-punch, ...|electric|
|[{psybeam, https:...| psychic|
|[{bite, https://p...|    dark|
|[{sweet-kiss, htt...|   fairy|
+--------------------+--------+
only showing top 10 rows



# Exemplos

### Joins

In [None]:
dfPokemon.join(dfTypes, dfPokemon.type == dfTypes.name, how='inner').show()

### Explode

In [None]:
(dfPokemon
 .select(fn.explode_outer('formas').alias('formas_explode'), '*')
).show(30)

### Collect List

In [None]:
(dfPokemon
.groupby('altura')
.agg(fn.collect_list('nome').alias('colecao'),
     fn.count('nome').alias('count'),
     fn.collect_set('nome').alias('colecao_distinct'),
     fn.countDistinct('nome').alias('countDistinct'))
.show(4, False)
)

### Case When

In [None]:
(dfPokemon
 .withColumn('classe_altura', fn.when(fn.col('altura') < 20, 'baixinho' )
                                .otherwise(fn.when(fn.col('altura').between(20,50), 'medios' )
                                             .otherwise('altinhos')))
 .select('nome','altura','classe_altura')
 .show()
)

### Pivot

In [None]:
df_pivot = (dfPokemon
 .withColumn('classe_altura', fn.when(fn.col('altura') < 20, 'baixinho' )
                                .otherwise(fn.when(fn.col('altura').between(20,50), 'medios' )
                                             .otherwise('altinhos')))
 .select('type','classe_altura')
)

df_pivot.groupby('type').pivot('classe_altura').count().show()

### Window

In [None]:
from pyspark.sql.window import Window

windowSpec = Window.partitionBy('type').orderBy(fn.col('experiencia').desc())

(dfPokemon
 .withColumn('rank', fn.rank().over(windowSpec))
 .select('type', 'nome', 'experiencia', 'rank')
 .filter(fn.col('rank') <= 3)
 .show(100)
)

# Exercícios

### Exercício 01

In [31]:
# mostre na tela 10 pokemons e sua lista de movimentos
(dfPokemon
 .join(dfTypes, dfPokemon.type == dfTypes.name , how='inner')
 .select('nome', 'moves')
 .limit(10)
).show(10,False)

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Exercício 02

In [32]:
# mostre um explode SOMENTE dos pokemons que possuem mais de uma forma
(dfPokemon
 .filter(fn.size('formas') > 1)
 .select('nome', fn.explode_outer('formas').alias('forma'))
 .select('nome', 'forma.name') #PLUS
 .show(10, False)
)

+-----+-----------------+
|nome |name             |
+-----+-----------------+
|pichu|pichu            |
|pichu|pichu-spiky-eared|
|unown|unown-a          |
|unown|unown-b          |
|unown|unown-c          |
|unown|unown-d          |
|unown|unown-e          |
|unown|unown-f          |
|unown|unown-g          |
|unown|unown-h          |
+-----+-----------------+
only showing top 10 rows



### Exercício 03

In [35]:
(dfPokemon
.join(dfTypes, dfPokemon.type == dfTypes.name, how='inner')
.select('id', 'nome','type', fn.col('moves')['name'].alias('lista_movimentos')).show(100,False)
)

+---+----------+--------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [30]:
# mostre o nome dos pokemons e uma lista com o nome de seus movimentos
(dfPokemon
 .join(dfTypes, dfPokemon.type == dfTypes.name, how='inner')
 .select('nome', fn.explode_outer('moves').alias('move'))
 .select('nome', 'move.name')
 .groupBy('nome').agg(fn.collect_list('name').alias('lista_movimentos'))
 .show(12, False)
)

+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Exercício 04

In [39]:
# dos pokemons do tipo 'rock', quantos movimentos possuem os pokemons mais pesados?
max_peso = (dfPokemon
            .filter(fn.col('type') == 'rock')
            .select(fn.max('peso')).first()[0]
           )

(dfPokemon
 .filter(fn.col('peso') == max_peso)
 .filter(fn.col('type') == 'rock')
 .join(dfTypes, dfPokemon.type == dfTypes.name , how='inner')
 .select('nome', 'type', 'peso', fn.size('moves').alias('qtd_movimentos'))
 .show(100, False) 
)

+--------------+----+-----+--------------+
|nome          |type|peso |qtd_movimentos|
+--------------+----+-----+--------------+
|coalossal-gmax|rock|10000|25            |
+--------------+----+-----+--------------+



### Exercício 05

In [41]:
# considerando que pokemons leves estão abaixo de 1000kg, mostre quantos pokemos leves e pesados existem em cada categoria de peso
(dfPokemon
 .withColumn('classe_peso', fn.when(fn.col('peso') < 1000, 'leves')
                              .otherwise('pesados'))
 .groupby('classe_peso').agg(fn.count('nome').alias('qtd'))
 .show(100,False)
)

+-----------+----+
|classe_peso|qtd |
+-----------+----+
|pesados    |270 |
|leves      |1011|
+-----------+----+



### DESAFIO

In [50]:
# mostrar na tela um ranking dos 3 pokemons mais experientes de cada tipo. 
# o ranking deve estar agrupado e para cada pokemon mostrar o nome e sua posição

from pyspark.sql.window import Window

windowSpec = Window.partitionBy('type').orderBy(fn.col('experiencia').desc())

(dfPokemon
 .filter(fn.col('experiencia').isNotNull())
 .withColumn('rank', fn.rank().over(windowSpec))
 .filter(fn.col('rank') <= 3)
 .select('nome', 'type', 'experiencia', 'rank')
 .groupBy('type').agg(fn.collect_list(fn.concat_ws('-','nome','rank')).alias('nome'))
 .show(30, False)
)

+--------+----------------------------------------------------------------------------------------------------------------------------------------------+
|type    |nome                                                                                                                                          |
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+
|bug     |[genesect-1, buzzwole-2, pheromosa-2]                                                                                                         |
|dark    |[yveltal-1, hydreigon-2, zarude-2, zarude-dada-2]                                                                                             |
|dragon  |[zygarde-complete-1, rayquaza-mega-2, kyurem-black-3, kyurem-white-3]                                                                         |
|electric|[zeraora-1, zapdos-2, raikou-2, thundurus-incarnate-2, regieleki-2

In [53]:
from pyspark.sql.window import Window

group_exp_type = (dfPokemon
.groupby('type','experiencia')
.agg(fn.collect_list('nome').alias('pokemons'),
     fn.count('nome').alias('qtd_pokemons'))
.orderBy(fn.col('type'), fn.col('experiencia').desc())
)

windowSpec = Window.partitionBy('type').orderBy(fn.col('experiencia').desc())

(group_exp_type
.withColumn('rank', fn.rank().over(windowSpec))
.select('type','pokemons', 'rank','qtd_pokemons','experiencia')
.filter(fn.col('rank') <= 3)
.orderBy(fn.col('type'), fn.col('rank'))
.show(truncate=False)
)

+--------+-------------------------------------------------------------------+----+------------+-----------+
|type    |pokemons                                                           |rank|qtd_pokemons|experiencia|
+--------+-------------------------------------------------------------------+----+------------+-----------+
|bug     |[genesect]                                                         |1   |1           |300        |
|bug     |[buzzwole, pheromosa]                                              |2   |2           |285        |
|bug     |[volcarona]                                                        |3   |1           |275        |
|dark    |[yveltal]                                                          |1   |1           |340        |
|dark    |[hydreigon, zarude, zarude-dada]                                   |2   |3           |300        |
|dark    |[moltres-galar]                                                    |3   |1           |290        |
|dragon  |[zygarde-