# Pyspark Fundamentos
---

## O que é o Databricks?
- Plataforma de análise baseada no Apache Spark;
- Possui fluxos de trabalho simplificados e um workspace interativo que permite a colaboração entre cientistas, engenheiros e analistas de dados;
- Suporta desenvolvimento em Python, Scala, R e SQL;
- O Databricks é uma ferramenta paga, disponível no Azure, AWS e Google Cloude Platform;

## PySpark
- Apache Spark - escrito na linguagem Scala;
- Criado para apoiar a colaboração do Apache Spark e Python - é uma API Python para Spark

In [2]:
import os
import sys

In [None]:
from pyspark.sql import SparkSession

## Criar DF / ler arquivo
---

In [0]:
# header=True -> define a primeira linha como cabeçalho
# inferSchema=True -> define o tipo das colunas automaticamente
df = spark.read.csv("/Volumes/lcb_qas_sandbox_comercial/pedro_silva/learning/wc2018-players.csv", header=True, inferSchema=True)


## Exibir DF
---

In [0]:
# default value do .show() é 20
df.show(5)

+---------+---+----+------------------+----------+----------+--------------------+------+------+
|     Team|  #|Pos.| FIFA Popular Name|Birth Date|Shirt Name|                Club|Height|Weight|
+---------+---+----+------------------+----------+----------+--------------------+------+------+
|Argentina|  3|  DF|TAGLIAFICO Nicolas|1992-08-31|TAGLIAFICO|      AFC Ajax (NED)|   169|    65|
|Argentina| 22|  MF|    PAVON Cristian|1996-01-21|     PAVÓN|CA Boca Juniors (...|   169|    65|
|Argentina| 15|  MF|    LANZINI Manuel|1993-02-15|   LANZINI|West Ham United F...|   167|    66|
|Argentina| 18|  DF|    SALVIO Eduardo|1990-07-13|    SALVIO|    SL Benfica (POR)|   167|    69|
|Argentina| 10|  FW|      MESSI Lionel|1987-06-24|     MESSI|  FC Barcelona (ESP)|   170|    72|
+---------+---+----+------------------+----------+----------+--------------------+------+------+
only showing top 5 rows



In [0]:
# é melhor de visualizar
display(df)

Team,#,Pos.,FIFA Popular Name,Birth Date,Shirt Name,Club,Height,Weight
Argentina,3,DF,TAGLIAFICO Nicolas,1992-08-31,TAGLIAFICO,AFC Ajax (NED),169,65
Argentina,22,MF,PAVON Cristian,1996-01-21,PAVÓN,CA Boca Juniors (ARG),169,65
Argentina,15,MF,LANZINI Manuel,1993-02-15,LANZINI,West Ham United FC (ENG),167,66
Argentina,18,DF,SALVIO Eduardo,1990-07-13,SALVIO,SL Benfica (POR),167,69
Argentina,10,FW,MESSI Lionel,1987-06-24,MESSI,FC Barcelona (ESP),170,72
Argentina,4,DF,ANSALDI Cristian,1986-09-20,ANSALDI,Torino FC (ITA),181,73
Argentina,5,MF,BIGLIA Lucas,1986-01-30,BIGLIA,AC Milan (ITA),175,73
Argentina,7,MF,BANEGA Ever,1988-06-29,BANEGA,Sevilla FC (ESP),175,73
Argentina,14,DF,MASCHERANO Javier,1984-06-08,MASCHERANO,Hebei China Fortune FC (CHN),174,73
Argentina,21,FW,DYBALA Paulo,1993-11-15,DYBALA,Juventus FC (ITA),177,73


## Verificar tipos de colunas
---

In [0]:
# é como se fosse o df.info() do pandas
df.printSchema()

root
 |-- Team: string (nullable = true)
 |-- #: integer (nullable = true)
 |-- Pos.: string (nullable = true)
 |-- FIFA Popular Name: string (nullable = true)
 |-- Birth Date: date (nullable = true)
 |-- Shirt Name: string (nullable = true)
 |-- Club: string (nullable = true)
 |-- Height: integer (nullable = true)
 |-- Weight: integer (nullable = true)



## Verificando dados nulos
---

In [0]:
# no caso de um df menor, transformar para pandas e fazer uma contagem de valores nulos
# pois o pyspark não tem uma função nativa de verificação de nulos
df.toPandas().isna().sum()

Team                 0
#                    0
Pos.                 0
FIFA Popular Name    0
Birth Date           0
Shirt Name           0
Club                 0
Height               0
Weight               0
dtype: int64

In [0]:
# jeito que funciona no caso de um df grande, utilizando pyspark mesmo
for coluna in df.columns:
    print(coluna, df.filter(df[coluna].isNull()).count())

Team 0
# 0


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4853991865760292>, line 3[0m
[1;32m      1[0m [38;5;66;03m# jeito que funciona no caso de um df grande, utilizando pyspark mesmo[39;00m
[1;32m      2[0m [38;5;28;01mfor[39;00m coluna [38;5;129;01min[39;00m df[38;5;241m.[39mcolumns:
[0;32m----> 3[0m     [38;5;28mprint[39m(coluna, df[38;5;241m.[39mfilter(df[coluna][38;5;241m.[39misNull())[38;5;241m.[39mcount())

File [0;32m/databricks/spark/python/pyspark/sql/connect/dataframe.py:1861[0m, in [0;36mDataFrame.__getitem__[0;34m(self, item)[0m
[1;32m   1858[0m             [38;5;66;03m# Try best to verify the column name with cached schema[39;00m
[1;32m   1859[0m             [38;5;66;03m# If fails, fall back to the server side validation[39;00m
[1;32m   1860[0m             [38;5;28;01mif[39;00m [38;5;129;01mnot

## Renomeando Colunas
---

In [0]:
# no pyspark não conseguimos usar o replace, igual no pandas, pois os dfs são imutáveis
# temos que atribuir novamente a uma variável o df modificado
# a \ no final serve pra ignorar o próximo caractere, fazendo ela ignorar o 'enter' permitindo pular para a próxima linha
df = df.withColumnRenamed('Team', 'Selecao').withColumnRenamed('#', 'Numero').withColumnRenamed('Pos.', 'Posicao')\
    .withColumnRenamed('FIFA Popular Name', 'Nome_FIFA').withColumnRenamed('Birth Date', 'Nascimento').withColumnRenamed('Shirt Name', 'Nome Camiseta')\
    .withColumnRenamed('Club', 'Time').withColumnRenamed('Height', 'Altura').withColumnRenamed('Weight', 'Peso')

df.show(5)

+---------+------+-------+------------------+----------+-------------+--------------------+------+----+
|  Selecao|Numero|Posicao|         Nome_FIFA|Nascimento|Nome Camiseta|                Time|Altura|Peso|
+---------+------+-------+------------------+----------+-------------+--------------------+------+----+
|Argentina|     3|     DF|TAGLIAFICO Nicolas|1992-08-31|   TAGLIAFICO|      AFC Ajax (NED)|   169|  65|
|Argentina|    22|     MF|    PAVON Cristian|1996-01-21|        PAVÓN|CA Boca Juniors (...|   169|  65|
|Argentina|    15|     MF|    LANZINI Manuel|1993-02-15|      LANZINI|West Ham United F...|   167|  66|
|Argentina|    18|     DF|    SALVIO Eduardo|1990-07-13|       SALVIO|    SL Benfica (POR)|   167|  69|
|Argentina|    10|     FW|      MESSI Lionel|1987-06-24|        MESSI|  FC Barcelona (ESP)|   170|  72|
+---------+------+-------+------------------+----------+-------------+--------------------+------+----+
only showing top 5 rows



In [0]:
# mostra todas as colunas que temos no df
df.columns

['Selecao',
 'Numero',
 'Posicao',
 'Nome_FIFA',
 'Nascimento',
 'Nome Camiseta',
 'Time',
 'Altura',
 'Peso']

In [0]:
# se for um df grande é demorado pois o spark não foi feito para rodar de maneira procedural (uma sequência única), mas sim de maneira paralela
for coluna in df.columns:
    print(coluna, df.filter(df[coluna].isNull()).count())

Selecao 0
Numero 0
Posicao 0
Nome_FIFA 0
Nascimento 0
Nome Camiseta 0
Time 0
Altura 0
Peso 0


## Selecionar Colunas
---

In [0]:
# tendo em vista que o pyspark é mesclado com o sql, utiliza-se um pouco da sintaxe do sql dentro do pyspark
# o nome das colunas no select não é case sensitive, ou seja, vai ficar do jeito que eu escrever
df.select('Selecao', 'Nome_FIFA').show()

+---------+------------------+
|  Selecao|         Nome_FIFA|
+---------+------------------+
|Argentina|TAGLIAFICO Nicolas|
|Argentina|    PAVON Cristian|
|Argentina|    LANZINI Manuel|
|Argentina|    SALVIO Eduardo|
|Argentina|      MESSI Lionel|
|Argentina|  ANSALDI Cristian|
|Argentina|      BIGLIA Lucas|
|Argentina|       BANEGA Ever|
|Argentina| MASCHERANO Javier|
|Argentina|      DYBALA Paulo|
|Argentina|     AGUERO Sergio|
|Argentina|   HIGUAIN Gonzalo|
|Argentina|    DI MARIA Angel|
|Argentina|  LO CELSO Giovani|
|Argentina|  MEZA Maximiliano|
|Argentina|      ACUNA Marcos|
|Argentina|CABALLERO Wilfredo|
|Argentina|   MERCADO Gabriel|
|Argentina|  OTAMENDI Nicolas|
|Argentina|       ROJO Marcos|
+---------+------------------+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import col
# é necessário importar essa função pra utiliar ela
df.select(col('Selecao'), col('Altura')).show(5)

+---------+------+
|  Selecao|Altura|
+---------+------+
|Argentina|   169|
|Argentina|   169|
|Argentina|   167|
|Argentina|   167|
|Argentina|   170|
+---------+------+
only showing top 5 rows



In [0]:
df.select(df['Selecao']).show(5)

+---------+
|  Selecao|
+---------+
|Argentina|
|Argentina|
|Argentina|
|Argentina|
|Argentina|
+---------+
only showing top 5 rows



## Selecionar Colunas com ALIAS
---

In [0]:
df.select('Selecao'.alias('Time')).show(5)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
File [0;32m<command-4853991865760310>, line 1[0m
[0;32m----> 1[0m df[38;5;241m.[39mselect([38;5;124m'[39m[38;5;124mSelecao[39m[38;5;124m'[39m[38;5;241m.[39malias([38;5;124m'[39m[38;5;124mTime[39m[38;5;124m'[39m))[38;5;241m.[39mshow([38;5;241m5[39m)

[0;31mAttributeError[0m: 'str' object has no attribute 'alias'

In [0]:
df.select(df['Selecao'].alias('Time')).show(5)

+---------+
|     Time|
+---------+
|Argentina|
|Argentina|
|Argentina|
|Argentina|
|Argentina|
+---------+
only showing top 5 rows



In [0]:
df.select(col('Selecao').alias('Time')).show(5)

+---------+
|     Time|
+---------+
|Argentina|
|Argentina|
|Argentina|
|Argentina|
|Argentina|
+---------+
only showing top 5 rows



In [0]:
# meio inútil mas funciona
df.select('Selecao Nome_FIFA Altura'.split()).show(5)

+---------+------------------+------+
|  Selecao|         Nome_FIFA|Altura|
+---------+------------------+------+
|Argentina|TAGLIAFICO Nicolas|   169|
|Argentina|    PAVON Cristian|   169|
|Argentina|    LANZINI Manuel|   167|
|Argentina|    SALVIO Eduardo|   167|
|Argentina|      MESSI Lionel|   170|
+---------+------------------+------+
only showing top 5 rows

