# PySpark # 3

Link da aula: https://www.youtube.com/watch?v=EoI3XwxCkfI

##### Índice:

    # Importação bibliotecas / funções
    # Criar Sessão PySpark
    # Criar DF / ler arquivo
    
    # Collect()
    # When() / Otherwise()
    # Union (Concat)
    # Joins
    # Join - Simples
    # Inner Join
    # Left Join
    # Right Join
    # Full Join
    # Semi Join
    # Anti Join

##### Importação bibliotecas / funções

In [None]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window # Importando window function

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

##### Criar / Iniciar Sessão PySpark

In [None]:
spark = (
    SparkSession.builder
    .master('local')
    .appName('PySpark_01')
    .getOrCreate()
)

##### Criar DF / ler arquivo

In [None]:
df = spark.read.csv('archive/wc2018-players.csv', header=True, inferSchema=True)

##### Alterações Aula PySpark 01

In [None]:
df = df.withColumnRenamed('Team', 'Selecao').withColumnRenamed('#', 'Numero').withColumnRenamed('Pos.', 'Posicao')\
.withColumnRenamed('FIFA Popular Name', 'Nome_FIFA').withColumnRenamed('Birth Date', 'Nascimento')\
.withColumnRenamed('Shirt Name', 'Nome Camiseta').withColumnRenamed('Club', 'Time').withColumnRenamed('Height', 'Altura')\
.withColumnRenamed('Weight', 'Peso')

dia = udf(lambda data: data.split('.')[0])
mes = udf(lambda data: data.split('.')[1])
ano = udf(lambda data: data.split('.')[2])

df = df.withColumn('Dia', dia('Nascimento')).withColumn('Mes', mes('Nascimento')).withColumn('Ano', ano('nascimento'))
df = df.withColumn('Data_Nascimento', concat_ws('-', 'Ano', 'Mes', 'Dia').cast(DateType()))

##### Alterações Aula PySpark 02

In [None]:
df2 = df
df = df.drop('Nascimento', 'Nome_FIFA')

##### Distinct()

In [None]:
df.select(col('Selecao')).distinct().show(50)

##### Collect()

In [None]:
lista = df.select(col('Selecao')).distinct().collect()
lista

In [None]:
type(lista[0][0])

In [None]:
lista[1][0]

In [None]:
paises =[]

for pais in lista:
    paises.append(pais[0])
    
paises

##### When() / Otherwise()

In [None]:
df.withColumn('Coluna_Nova', when(col('Selecao') == "Argentina", 'Argentinos :)').otherwise('NÃO ARGENTINO')).show(50)

In [None]:
europa = ['Sweden', 'Germany', 'France', 'Belgium', 'Croatia', 'Spain', 'Denmark', 'Iceland', 'Switzerland', 'England', 'Poland', 'Portugal', 'Serbia']
asia = ['Russia', 'IR Iran', 'Nigeria', 'Korea Republic', 'Saudi Arabia', 'Japan', ]
africa = ['Senegal', 'Morocco', 'Tunisia', 'Egypt']
oceania = ['Australia']
america_norte = ['Panama', 'Mexico', 'Costa Rica']
america_sul = ['Argentina', 'Peru', 'Uruguay', 'Brazil', 'Colombia']

In [None]:
df = df.withColumn('Continente', when(col('Selecao').isin(europa), 'Europa')\
             .when(col('Selecao').isin(asia), 'Ásia')\
             .when(col('Selecao').isin(africa), 'África')\
             .when(col('Selecao').isin(oceania), 'Oceania')\
             .when(col('Selecao').isin(america_norte), 'América do Norte')\
             .when(col('Selecao').isin(america_sul), 'América do Sul')\
             .otherwise('Verificar'))
df.show(5)

In [None]:
df.filter('Continente = "Verificar"').show(50)

##### Union (Concat)

In [None]:
df_america_sul = df.filter('Continente = "América do Sul"')
df_america_sul.select('Selecao').distinct().show()

In [None]:
df_america_norte = df.filter('Continente = "América do Norte"')
df_america_norte.select('Selecao').distinct().show()

In [None]:
df_americas = df_america_sul.union(df_america_norte)

In [None]:
df_americas.show(100)

In [None]:
df_americas.select('Selecao').distinct().show()

In [None]:
# A QUANTIDADE DE COLUNAS DEVE SER A MESMA

In [None]:
df_america_sul = df_america_sul.drop('Continente')

In [None]:
df_america_norte.show(4)

In [None]:
df_americas = df_america_sul.union(df_america_norte)

In [None]:
# COLUNAS COM TIPOS OU NOMES DIFERENTES TAMBÉM CONCATENAM

# OBS: Cuidado para não concatenar colunas em posições diferentes

In [None]:
df_america_norte = df_america_norte.withColumn('Continente', lit(100))
df_america_norte.printSchema()

In [None]:
df_america_norte.show(6)

In [None]:
df_americas = df_america_norte.union(df_america_sul)
df_americas.show(200)

In [None]:
df_america_sul = df_america_sul.withColumnRenamed('Continente', 'NOVA COLUNA DOIDA')

In [None]:
df_america_norte.show(5)

In [None]:
df_americas = df_america_sul.union(df_america_norte)
df_americas.show(200)

##### Joins

In [None]:
df.show(5)

In [None]:
arg = df.filter('Selecao = "Argentina"')
bra = df.filter('Selecao = "Brazil"')

In [None]:
arg = arg.drop('Time', 'Dia', 'Mes', 'Ano', 'Continente', 'Peso', 'Data_Nascimento')
bra = bra.drop('Time', 'Dia', 'Mes', 'Ano', 'Continente', 'Peso', 'Data_Nascimento')

In [None]:
arg.show(5)

In [None]:
bra.show(50)

In [None]:
arg.count()

##### Join - Simples

In [None]:
# Dados dos joins 100% correspondentes

In [None]:
dfnovo = arg.join(bra, arg.Numero == bra.Numero)
dfnovo.show(25)

In [None]:
arg = arg.withColumn('Numero', col('Numero') + 1)
arg.show(23)

##### Inner Join

In [None]:
# Mostra apenas os dados que têm correspondências

In [None]:
dfnovo = arg.join(bra, arg['Numero'] == bra['Numero'], 'inner')
dfnovo.show(40)

##### Left Join

In [None]:
# Mostra todos os dados do DF do lado ESQUERDO
# (valores do lado dir que não tiverem correspondências, serão mostrados como nulos)

In [None]:
dfnovo = arg.join(bra, arg['Numero'] == bra['Numero'], 'left')
dfnovo.show(40)

##### Right Join

In [None]:
# Mostra todos os dados do DF do lado DIREITO
# (valores do lado esquerdo que não tiverem correspondências, serão mostrados como nulos)

In [None]:
dfnovo = arg.join(bra, arg['Numero'] == bra['Numero'], 'right')
dfnovo.show(40)

##### Full Join

In [None]:
# Será mostrado todas as linhas
# (valores que não corresponderem, serão mostrado como nulos em ambos os lados)

In [None]:
dfnovo = arg.join(bra, arg['Numero'] == bra['Numero'], 'full')
dfnovo.show(40)

##### Semi Join

In [None]:
# Similar ao INNER JOIN, porém apenas os dados do DataFrame esquerdo é mostrado

In [None]:
dfnovo = bra.join(arg, arg['Numero'] == bra['Numero'], 'semi')
dfnovo.show(40)

##### Anti Join

In [None]:
# Mostra os dados do DF do lado esquerdo que NÃO possuem correspondências

In [None]:
dfnovo = arg.join(bra, arg['Numero'] == bra['Numero'], 'anti')
dfnovo.show(40)