# <center>Rocket Lab Dados - 2025.2</center>
# <center> Introdução à Pyspark</center>
___
Todo o conteúdo que você terá acesso ao longo desse período é confidencial, não sendo possível compartilhar ou comercializar os links ou os materiais recebidos que sejam de propriedade do Programa Rocket Lab da V(dev)

Dessa forma, ao participar do curso você está aceitando os termos de confidencialidade e não-comercialização dos conteúdos que serão recebidos.
___

# <center> Objetivos de aprendizado </center>
- Familiarizar-se com as funcionalidades básicas do PySpark
- Ser capaz de carregar dados em um DataFrame
- Ser capaz de realizar manipulações básicas de dados
___


### 1. Juntando DataFrames

É muito comum ter a necessidade de juntar *DataFrames* diferentes. Se você já utilizou SQL ou qualquer outro banco de dados relacional, deve conhecer isso como *join*. O Pandas também tem a mesma função utilizando o método *.merge()*. Antes do exemplo, vamos aprender/relembrar os tipos de *joins* mais comuns:<br>
![Joining Methods](https://i.imgur.com/HaSBT91.jpg) <br>
Agora, vamos carregar um DataFrame mais simples para testar os tipos de *merge*.

Para os exemplos abaixo iremos utilizar o Datafram: **metal_bands**, contendo as informações sobre bandas de metal do mundo todo, suas origens e estilos musicais.

Principais colunas:
- Band — nome da banda
- Origin — país de origem
- Fans — número aproximado de fãs
- Formed — ano de formação
- Split — ano de separação ('-', se ainda ativa)
- Style — subgênero do metal (ex: Heavy Metal, Black Metal, Thrash Metal)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Criar sessão spark
spark = SparkSession.builder.appName("AtividadePraticaSpark").getOrCreate()

# Execute esta célula para carregar o dataframe metal_bands com dados de bandas de metal
metal_bands = spark.table("workspace.default.metal_bands")

metal_bands.printSchema()
display(metal_bands.limit(5))

Vamos separar alguns dataframes a partir de *metal_bands* para testar os merges. Observe a célula abaixo.

In [0]:
# ano de formação e país das bandas
bands_origin = metal_bands.select('id','band_name','formed','origin')

# estilo das bandas
bands_style = metal_bands.select('id','band_name','style') # estilo das bandas

# bandas que se separaram
bands_split = (metal_bands
               .select('id','band_name','split')
               .where(F.column('split') != "-")
               )

# bandas com mais de 4000 fans
bands_4000_fans = (metal_bands
                   .select('id','band_name','fans')
                   .where(F.column('fans') > 4000)
                   )

# bandas formadas nos EUA
bands_USA = (metal_bands
             .select('id','band_name','formed','origin')
             .where(F.column('origin') == "USA")
             )

# bandas formadas na Suécia
bands_Sweden = (metal_bands
                .select('id','band_name','formed','origin')
                .where(F.column('origin') == 'Sweden')
                )

Vamos criar um DataFrame a partir de ```bands_origin``` e ```bands_split```, utilizando *merge*.

In [0]:
origin_split = (bands_origin # o DataFrame da esquerda
                .join(bands_split, # o DataFrame da direita
                      on=['id', 'band_name'], # baseado em quais valores em comum (chave)
                      how='inner' # o tipo de join que queremos fazer
                      )
                )
display(origin_split.limit(5))

Ótimo! Conseguimos fazer o *Join* de dois *DataFrames*. Observe que utilizamos o argumento ```how='inner'```. Lembre-se que *inner*, *left*, *right* e *outer* terão resultados diferentes, observe os merges abaixo e a explicação ao final.

In [0]:
left_origin_split = (bands_origin
                     .join(bands_split,
                           on= ['id', 'band_name'],
                           how="left"
                           )
                     )
display(left_origin_split.limit(5))

In [0]:
right_origin_split = (bands_origin
                     .join(bands_split,
                           on= ['id', 'band_name'],
                           how="right"
                           )
                     )
display(right_origin_split.limit(5))

In [0]:
print('Numero de linhas do DataFrame bands_4000_fans:', bands_4000_fans.count())
print('Numero de linhas do DataFrame bands_USA:', bands_USA.count())
print('----------------------------------------------')

outer_origin_split = (bands_4000_fans
                     .join(bands_USA,
                           on= ['id', 'band_name'],
                           how="outer"
                           )
                     )

print('Numero de linhas do DataFrame após Outer entre bands_4000_fans & bands_USA:', outer_origin_split.count())
display(outer_origin_split.limit(5))

In [0]:
display(bands_4000_fans)
display(bands_USA.where("id IN ('0', '1', '50', '51')"))

Como podemos ver, os resultados são de fato bem diferentes.

O *inner* mantém apenas os dados das bandas encontradas nos dois dataframes (onde há correspondência de *id*), dessa forma, a posição do dataframe não faz diferença.

No *left*, mantemos os dados do dataframe à esquerda, e trazemos os dados do dataframe à direita no qual encontrou-se a chave (neste exemplo, o *id* da banda).

Por outro lado, no *right* ocorre o contrário, mantemos os dados do dataframe à direita e, quando há correspondência da chave, trazemos os dados do dataframe à esquerda. Note que o número de entradas (*entries*) é diferente do caso com o *left*. Isso ocorre porque no *left* mantemos os dados de formação das bandas (ou seja, o dataframe contém todas as bandas do .csv), enquanto no *right*, mantemos apenas os dados de bandas que se separaram (e existem muitas bandas que ainda continuam juntas).

Por fim, no *outer* utilizamos dois dataframes diferentes dos anteriores para facilitar o entendimento. Observe pelos prints que existem apenas 4 bandas com mais de 4000 fans e 1139 bandas formadas nos EUA. Quando fazemos o *join* com *outer*, observe que o total de linhas passa a ser 1143. O que acontece é que esse tipo de join mantém os dados de ambos os dataframes, independente se houve correspondência de chave ou não.

Podemos também querer apenas concatenar dois *DataDrames*, isto é, juntá-los colocando um abaixo do outro. Para isso, utilizamos o método *.union()*:

In [0]:
# concatenando bandas formadas nos EUA e bandas formadas na Suécia
USA_Sweden = bands_USA.union(bands_Sweden)

print('Numero de linhas do DataFrame bands_USA:', bands_USA.count())
print('Numero de linhas do DataFrame bands_Sweden:', bands_Sweden.count())
print('Numero de linhas do DataFrame após union entre bands_USA & bands_Sweden:', USA_Sweden.count())
display(USA_Sweden.limit(5))

## Exercício 1
O Ultimate Team (FUT) é um modo do jogo FIFA no qual o jogador monta seu próprio time adquirindo atletas virtuais.
Cada atleta possui atributos que influenciam seu desempenho em campo — como drible, chute, passe, defesa, velocidade e físico.

Principais colunas:
- `player_id` — identificador único do jogador
- `player_name` — nome do atleta
- `nationality` — país de origem
- `club` — clube atual
- `overall` — nota geral do jogador
- `potential` — potencial máximo de evolução
- `value_eur, wage_eur` — valor de mercado e salário
- `age, height_cm, weight_kg` — características físicas
- `pace, shooting, passing, dribbling, defending, physic` — atributos técnicos

_**Preencha os espacos ____ para carregar os dados e realizar as consultas propostas.**_

### Exercício 1.1 Faça a leitura do arquivo fut_players (fut_player_data.csv) e retorne as 5 primeiras linhas

In [0]:
# Faça a leitura do arquivo fut_players 
fut_players = spark.table('workspace.default.fut_players_data')

# Retorne as 5 primeiras linhas do DF
display(fut_players.limit(5))

### Exercício 1.2 - Retorna a nacionalidade dos jogadores "The Bests"

São considerados jogadores The Bests os que possuem os atributos de drible (_dribbling_) e chute (_shooting_) superior a 90. 
Após a geração do DF _The_Best_ realize o join com o df _nationalities_ para obter a nacionalidade dos jogadores.

A sua tabela final deve conter as seguintes informações:
- `player_id`
- `player_name`
- `nationality`
- `position`
- `dribbling`
- `shooting`
- `overall`

In [0]:
# Aplique os filtros para retornar os jogadores the bests
the_best = fut_players.select(
    'player_id', 'player_name', 'position', 'dribbling', 'shooting', 'overall'
    ).where(F.column('dribbling') > 90)\
    .where(F.column('shooting') > 90)

# nationalities é um DataFrame da nacionalidade dos jogadores
nationalities = (fut_players.select('player_id', 'player_name', 'nationality'))

# faça um join dos dois DataDrames, mantendo todos os jogadores de the_best e obtendo suas nacionalidades (dica: a chave é o id)
the_best_nationality = the_best.join(nationalities, on= ['player_id', 'player_name'], how='left')

the_best_nationality.display()

## 2. Alterando o dataframe


Agora iremos utilizar o DataFrame _**pokemon_data**_. Essa base reúne informações sobre os Pokémons das diversas gerações da franquia, contendo atributos, classificações e estatísticas de batalha.

Principais Colunas:
- Name — nome do Pokémon 
- Type 1, Type 2 — tipos primário e secundário (ex: Fire, Water, Grass) 
- HP, Attack, Defense, Sp. Atk, Sp. Def, Speed — atributos de combate 
- Generation — geração à qual pertence
- Legendary - Se e ou não um Pokémon lendário

In [0]:
pkmn = spark.table("workspace.default.pokemon_data")

display(pkmn.limit(5))

Até o momento apenas utilizamos os dados da forma que nos foram fornecidos, mas e se precisássemos criar alguma coluna que fosse a combinação das demais? Por exemplo, caso eu deseje criar uma coluna que corresponde à soma do ataque e velocidade dos Pokémons? Observe abaixo:

In [0]:
# Criando a coluna desejada
pkmn = pkmn.withColumn("Sum_Attack_Speed", F.col("Attack") + F.col("Speed"))
display(pkmn.limit(5))

Observe como foi fácil! Apenas utilizamos o operador de soma com as duas colunas necessárias. Você pode fazer isso com outras operações também, basta utilizar ```-```, ```/``` ou ```*```. Além disso, você pode combinar quantas colunas quiser!

Mas e se precisarmos alterar apenas algumas linhas do nosso DataFrame?

Por exemplo, suponha que você percebeu que seus dados estão errados, e todos os Pokémons com velocidade acima de 100 deveriam estar marcados como Type_1 = 'Fire', podemos seguir o procedimento abaixo:

In [0]:
# Observe os valores unicos da coluna Type_1 para os Pokémons com mais de 100 de velocidade
pkmn.filter(pkmn["Speed"] > 100).select("Type 1").distinct().display()

In [0]:
# Vamos alterar os casos onde Speed é superior a 100 para Fire
pkmn = pkmn.withColumn(
    "Type 1",
    F.when(pkmn["Speed"] > 100, "Fire").otherwise(pkmn["Type 1"])
)

In [0]:
# Observe como os valores mudaram
pkmn.filter(pkmn["Speed"] > 100).select("Type 1").distinct().display()

Relendo o arquivo para desconsiderar os tratamentos de exemplos que fizemos acima

In [0]:

pkmn = spark.table("workspace.default.pokemon_data")

# Renomeando as colunas
pkmn = (
    pkmn
    .withColumnRenamed("Type 1", "Type_1")
    .withColumnRenamed("Type 2", "Type_2")
    .withColumnRenamed("Sp. Atk", "Sp_Atk")
    .withColumnRenamed("Sp. Def", "Sp_Def")
)



## 3. Operações em grupo

Com PySpark nós podemos aplicar operações em grupos usando o método *.groupby()*. Ele é muito útil por ser uma forma bem simples de extrair informação de dados agregados. Para utilizá-lo, passamos as colunas nas quais queremos agrupar os dados e a operação que queremos fazer. Para exemplificar, vamos ver quantos Pokémons lendários cada geração tem:

In [0]:
pkmn_soma = (pkmn
            .groupBy("Generation") # Campo que sera agrupado
            .agg(
                F.sum(F.col("Legendary").cast("int")) # Converte a coluna "Legendary" em inteiro e faz a soma
                .alias("Qtd_Legendary") # Nomeando a coluna que receberá o resultado da soma
                )
            )
pkmn_soma.display()

Podemos obter um relatório da média de diversas colunas para cada tipo de Pokémon:

In [0]:
pkmn_media = (pkmn
                .groupBy("Type_1")
                .agg(
                    F.mean("HP").alias("HP_medio"),
                    F.mean("Attack").alias("Attack_medio"),
                    F.mean("Defense").alias("Defense_medio")
                    )
                )
pkmn_media.display()



###  Exercício 2
Use o método *.groupby()* para descobrir qual país tem o melhor *overall* médio. Crie a coluna 'avg_overall'

Seu df country_avg_overall deve conter as seguintes colunas:
- `nationality`
- `overall`
- `avg_overall`

In [0]:
country_avg_overall = fut_players.groupBy('nationality').agg(F.mean('overall').alias('avg_overall'))

# Retornar a nacionalidade com maior overall médio e o overall médio do brasil
melhor = (
    country_avg_overall
    .orderBy(F.col("avg_overall").desc())
    .limit(1)
    .collect()[0]
)

brasil = (
    country_avg_overall
    .filter(F.col("nationality") == "Brazil")
    .collect()[0]
)

display({
    "Melhor overall médio": f"{melhor['nationality']}: {melhor['avg_overall']:.2f}",
    "Overall médio do Brasil": round(brasil['avg_overall'], 2)
})

Agora nós já cobrimos toda a parte básica do Spark! Vamos praticar essa última parte!

### Exercício 2.1
Crie um racional que retorne a classificação para o jogador de acordo com as instruções abaixo, então aplique isso para o dataframe fut_players.

*Observação:* considere os limites dentro do intervalo de classificação.
exemplo

-50 contém todos os valores menores que 50 e o valor 50 incluso;


51-60 contém todos os valores entre 51 e 60 com os limites [51,60] inclusos no grupo;


e assim por diante ...

In [0]:

"""
    Através do overall do jogador retorne a classificação conforme a seguir:
    Overall -> classification
    -50     -> "Amador"
    51-60   -> "Ruim"
    61-70   -> "Ok"
    71-80   -> "Bom"
    81-90   -> "Ótimo"
    91+     -> "Lenda"
    
    I: int overall
    O: string
"""
# Dica utilize as clasulas when e otherwise
fut_players = (fut_players
               .select('player_id', 'overall')
               .withColumn('classification', 
                           F.when(F.col('overall') <= 50, 'Amador')
                            .when((F.col('overall') >= 51) & 
                                  (F.col('overall') <= 60), 'Ruim')
                            .when((F.col('overall') >= 61) & 
                                  (F.col('overall') <= 70), 'Ok')
                            .when((F.col('overall') >= 71) &
                                  (F.col('overall') <= 80), 'Bom')
                            .when((F.col('overall') >= 81) &
                                  (F.col('overall') <= 90), 'Ótimo')
                            .when(F.col('overall') >= 91, 'Lenda')
                            .otherwise(None)
                          )
              )

# Contar quantos jogadores há em cada classificação
fut_players.groupBy("classification").count().orderBy("count", ascending=False).display()

## Desafio — Montando o Time dos Sonhos do 🇧🇷

Ainda utilizando a base **`fut_players_data`**, imagine que você é um grande fã do jogo *FIFA*, e deseja montar o **Time dos Sonhos (Dream Team)** do **Brasil**, selecionando os **melhores jogadores por posição**, ou seja, aqueles com o **maior overall** dentro de cada grupo de posição.

Para isso, adote a **formação tática 4-4-2**, composta por:

- **1 Goleiro (GK)**  
- **4 Defensores (Defesa)**  
- **4 Meio-campistas (Meio)**  
- **2 Atacantes (Ataque)**  

### Objetivo
Criar um *DataFrame* com **11 linhas**, representando o **melhor jogador de cada posição dentro da formação 4-4-2**, com as seguintes colunas:

- `nationality` — nacionalidade do jogador  
- `position_group` — posição agrupada (Goleiro, Defesa, Meio, Ataque)  
- `player_name` — nome do jogador  
- `overall` — nota geral (overall)

---

### Agrupamento de posições
Para facilitar a análise, agrupe as posições originais da base conforme a tabela abaixo:

| **position_group** | **Posições incluídas (`position`)** | **Descrição** |
|:--------------------|:------------------------------------|:---------------|
| **Goleiro** | `GK` | Jogadores que atuam exclusivamente no gol. |
| **Defesa** | `CB`, `LB`, `RB`, `LWB`, `RWB` | Zagueiros e laterais (defensores). |
| **Meio** | `CM`, `CDM`, `CAM`, `LM`, `RM` | Meio-campistas centrais, volantes e meias ofensivos/laterais. |
| **Ataque** | `ST`, `CF`, `LW`, `RW`, `LF`, `RF` | Atacantes e pontas. |
| **Outros** | *(demais posições não classificadas)* | Jogadores fora do esquema tático principal (ex: cartas especiais). |

---

### 🏁 Entrega esperada
Seu *DataFrame final* deve retornar **11 jogadores**, representando o **Time dos Sonhos do Brasil (formação 4-4-2)**, conforme os critérios acima.

In [0]:
# Realize novamente o import da base
fut_players = spark.table('workspace.default.fut_players_data')

fut_players_br = fut_players.filter(F.col('nationality') == 'Brazil')

fut_players_and_group = fut_players_br\
                        .withColumn('position_group', 
                       F.when(F.col('position').isin('GK'), 'Goleiro')
                        .when(F.col('position').isin('CB', 'LB', 'RB', 'LWB', 'RWB'), 'Defesa')
                        .when(F.col('position').isin('CM', 'CDM', 'CAM', 'LM', 'RM'), 'Meio')
                        .when(F.col('position').isin('ST', 'CF', 'LW', 'RW', 'LF', 'RF'), 'Ataque')
                        .otherwise(None)
                        ).select('nationality', 'player_name', 'overall', 'position_group')

janela = Window.partitionBy('position_group').orderBy(F.col('overall').desc())

# rank() gera duplicatas no rankeamento, row_number() não
players_ranked = fut_players_and_group.withColumn('rank', F.row_number().over(janela))

best_11 = players_ranked.filter(
    ((F.col('position_group') == 'Goleiro') & (F.col('rank') == 1)) |
    ((F.col('position_group') == 'Ataque') & (F.col('rank') <= 2)) |
    ((F.col('position_group') == 'Meio') & (F.col('rank') <= 4)) |
    ((F.col('position_group') == 'Defesa') & (F.col('rank') <= 4)) 
).drop('rank')

display(best_11)

### Desafio Bônus

Você deve ter notado que **Neymar** aparece tanto entre os melhores jogadores de **ataque** quanto do **meio-campo**.  
Isso acontece porque o dataset contém **múltiplas versões do mesmo jogador**, inclusive atuando em **outras posições**, o que é típico dos modos do *FIFA/Ultimate Team*.

O seu desafio agora é **refazer o exercício anterior**, garantindo que **cada jogador apareça apenas uma vez** no *DataFrame final*.

- Caso o jogador possua mais de uma versão (carta), **considere apenas aquela com o maior valor de `overall`**.  
- Em seguida, **reaplique a lógica da formação 4-4-2**, selecionando os melhores por grupo de posição.

---

### 🏁 Entrega Esperada
Seu *DataFrame final* deve retornar **11 jogadores únicos**, representando o **Dream Team do Brasil** na **formação tática 4-4-2**, **sem repetição de atletas**, conforme os critérios estabelecidos acima.


In [0]:
fut_players_br = fut_players.filter(F.col('nationality') == 'Brazil')

# para evitar repetições, vamos retirar as linhas com os nomes repetidos dos players - critério de desempate é o 'overall'
fut_players_unique = fut_players_br.orderBy(F.col('overall').desc()).dropDuplicates(['player_name'])

fut_players_and_group = fut_players_unique\
                        .withColumn('position_group', 
                       F.when(F.col('position').isin('GK'), 'Goleiro')
                        .when(F.col('position').isin('CB', 'LB', 'RB', 'LWB', 'RWB'), 'Defesa')
                        .when(F.col('position').isin('CM', 'CDM', 'CAM', 'LM', 'RM'), 'Meio')
                        .when(F.col('position').isin('ST', 'CF', 'LW', 'RW', 'LF', 'RF'), 'Ataque')
                        .otherwise(None)
                        ).select('nationality', 'player_name', 'overall', 'position_group')

janela = Window.partitionBy('position_group').orderBy(F.col('overall').desc())

# rank() gera duplicatas no rankeamento, row_number() não
players_ranked = fut_players_and_group.withColumn('rank', F.row_number().over(janela))

best_11 = players_ranked.filter(
    ((F.col('position_group') == 'Goleiro') & (F.col('rank') == 1)) |
    ((F.col('position_group') == 'Ataque') & (F.col('rank') <= 2)) |
    ((F.col('position_group') == 'Meio') & (F.col('rank') <= 4)) |
    ((F.col('position_group') == 'Defesa') & (F.col('rank') <= 4)) 
).drop('rank')

display(best_11)

# Declaração de Inexistência de Plágio:

1. Eu sei que plágio é utilizar o trabalho de outra pessoa e apresentar como meu.
2. Eu sei que plágio é errado e declaro que este notebook foi feito por mim.
3. Tenho consciência de que a utilização do trabalho de terceiros é antiético e está sujeito a medidas administrativas.
4. Declaro também que não compartilhei e não compartilharei meu trabalho com o intuito de que seja copiado e submetido por outra pessoa.

# Fim da aula!

Obrigado por participar do curso, você acaba de finalizar o Módulo de Pyspark. Neste momento você já deve ser capaz de manipular seus dados no Spark, utilizando as bibliotecas que acabamos de aprender!

Lembre-se que sempre que surgir alguma dúvida, você pode olhar a documentação do [PySpark](https://spark.apache.org/docs/latest/api/python/reference).