# **Tratamento de dados com Pyspark!**

## Neste exemplo vamoscarregar uma sessão Pyspark no Anaconda, rodando em SO Windows 10.
#### Aqui vamos considerar que o Anaconda está instalado rodando na sua máquina.
-- Confirmando em qual env do CONDA você está --

In [1]:
import sys
print(sys.executable)


C:\Users\f07699b\Anaconda3\envs\spark\python.exe


## **Instalando JAVA_HOME e SPARK_HOME!**

In [7]:
#!curl -O https://enos.itcollege.ee/~jpoial/allalaadimised/jdk8/jdk-8u291-linux-x64.tar.gz
#!tar xf jdk-8u291-linux-x64.tar.gz
#!curl -O http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz

In [8]:
import os
os.environ["JAVA_HOME"] = "jre1.8.0_231" 
os.environ["SPARK_HOME"] = "spark-2.3.1-bin-hadoop2.7"

### **Criando o SparkContext e SparkSession**


In [9]:

import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
sc

In [10]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

###  **Baixando o Dataset e criando o Dataframe!**

Retire os comentários das linhas abaixo apenas se você não tem o arquivo.
Caso já tenha, deixe comentado. <br>
Esse é um dataset de **registro de crimes da cidade de Chicago** de **2001 até hoje** e tem quase **2GB**

In [None]:
#!curl -O https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
#!mv 'rows.csv?accessType=DOWNLOAD' reported-crimes.csv

In [11]:
from pyspark.sql.functions import to_timestamp, col, lit
rc = spark.read.csv('reported-crimes.csv', header=True).withColumn('Date', to_timestamp(col('Date'),'MM/dd/yyy hh:mm:ss a')).filter(col('Date') <= lit('2018-11-11'))
rc.show(5, truncate = True)

+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|               Date|               Block|IUCR|Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|10224738|   HY411648|2015-09-05 13:30:00|     043XX S WOOD ST|0486|     BATTERY|DOMESTIC BATTERY ...|           RESIDENCE| false|    true|0924|     00

## **Dando aquela olhada nos dados!**

#### **take()** 
#### Retorna  o conteudo de linhas do dataframe. O numero que passamos como argumento a funão vai representar o númro de linhas coletadas rc.head() tem exatamente a mesma saída de rc.take(), lembrando que esse head() aqui do contecto Spark não é parecido com o head() do pandas. 

In [259]:
rc.take(1) 

[Row(ID='10224738', Case Number='HY411648', Date=datetime.datetime(2015, 9, 5, 13, 30), Block='043XX S WOOD ST', IUCR='0486', Primary Type='BATTERY', Description='DOMESTIC BATTERY SIMPLE', Location Description='RESIDENCE', Arrest='false', Domestic='true', Beat='0924', District='009', Ward='12', Community Area='61', FBI Code='08B', X Coordinate='1165074', Y Coordinate='1875917', Year='2015', Updated On='02/10/2018 03:50:01 PM', Latitude='41.815117282', Longitude='-87.669999562', Location='(41.815117282, -87.669999562)')]

#### **collect()**
#### Coleta todos os dados do dataframe. Cuidado ao usar, pois pode cauisar um crash no driver node!
#### Se após rodar o collect() acontecer esse problema com seu Jupter Noteboolk -->>  **Exception: Java gateway process exited before sending the driver its port number**, apague e descompacte novamente o diretório do **SPARK_HOME**

In [261]:
#Vai testar?
#rc.collect()  

#### **show()**
#### Vai printar 3 linhas do dataset incuindo o header.  Esse sim é igualzinho a saída do .head() do Pandas.

In [12]:
rc.show(3)

+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|               Date|               Block|IUCR|Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|10224738|   HY411648|2015-09-05 13:30:00|     043XX S WOOD ST|0486|     BATTERY|DOMESTIC BATTERY ...|           RESIDENCE| false|    true|0924|     00

In [None]:
rc.count()

## Schemas

O Pyspark, com base nos dados define de forma automática o tipo de dados que está sendo importando. Porpem em situaçãoes de produção, é recomendado que o schema seja definido pelo usuário. Um exemplo são **datas** que na maioroia das vezes são importadas como **strings**<p>
Para trabalhar com schemas precisamos importar algumas coisas antes da biblioteca **pyspark.sql.types**<br>
**StructType**-->>Encapsula a estrutura do schema<br>
**StructField**-->> É usado na definição de cada campo<br>
**Type()**-->> Se refere ao tipo de campo. Pode ser **IntegerType**, **StringType**, **BooleanType**, etc... Acho que deu pra pegar a ideia.
    


In [55]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType, FloatType, TimestampType, DoubleType
#Obs.: Não vamos usar tudo isso, mas é bom saber que temos várias opções

#### **printSchema()**
#### Usamos esse comando para ver o schema do dataframe. 
#### Baseado no output abaixo vemos que o campo "Date" é um timestamp, porém o campo "Updated On" que também é uma data, está como string. Precisamos ajustar isso.

In [56]:
rc.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)



#### **Para ajustar as os campos usamos a sintaxe abaixo**.
StrucType vai receber uma lista onde cada item é um StructField que recebe 3 argumentos<br>
#1 - Coluna<br>
#2 - Filed Type<br>
#3 - Se o campo pode ter nulos ou não (True | False)<br>
Ex.:<br>
**rc_schema = StructType([StructField('ID', StringType, True),StructField('Case Number', StringType, True)])**

Ps.: Devemos fazer a sequencia **StructField(Coluna, Type, True|False)** para TODAS as colunas. (╬ಠ益ಠ)
<p>
**DICA**<br>
Quando estiver trabalhando com vários campos, a melhor coisa a fazer é tratar as colunas que são específicas primeiro.<br>
Depois criar uma tratativa padrão para as demais.<br>
No meu caso foi transformar elguns campos em Timestamp, Boolean e Double e depois todo o resto en string

In [57]:
labels = rc.columns
labels
for coluna in range(len(labels)):
    if labels[coluna] == 'Date'or labels[coluna] == 'Updated On':
        labels[coluna] = (labels[coluna], TimestampType(), True)
    elif labels[coluna] == 'Arrest' or labels[coluna] == 'Domestic':
        labels[coluna] = (labels[coluna], BooleanType(), True)
    elif labels[coluna] == 'Year':
        labels[coluna] = (labels[coluna], IntegerType(),True)
    elif labels[coluna] =='Latitude' or labels[coluna] =='Longitude':
        labels[coluna] = (labels[coluna], DoubleType(), True)
    else: labels[coluna] = (labels[coluna], StringType(), True)
      
print(labels)


[('ID', StringType, True), ('Case Number', StringType, True), ('Date', TimestampType, True), ('Block', StringType, True), ('IUCR', StringType, True), ('Primary Type', StringType, True), ('Description', StringType, True), ('Location Description', StringType, True), ('Arrest', BooleanType, True), ('Domestic', BooleanType, True), ('Beat', StringType, True), ('District', StringType, True), ('Ward', StringType, True), ('Community Area', StringType, True), ('FBI Code', StringType, True), ('X Coordinate', StringType, True), ('Y Coordinate', StringType, True), ('Year', IntegerType, True), ('Updated On', TimestampType, True), ('Latitude', DoubleType, True), ('Longitude', DoubleType, True), ('Location', StringType, True)]


Aqui usamos uma **lambda function**, que vai passar por todos os items da lista labels e executar ação que adiciona os 3 valores 
de cada item daquela iteração na variavel screma. Os valores são: **Index 0** -> Coluna / **Index 1** -> field type / **Index 2** -> True ou False 

In [58]:
schema = StructType([StructField (x[0],x[1],x[2]) for x in labels])
schema

StructType(List(StructField(ID,StringType,true),StructField(Case Number,StringType,true),StructField(Date,TimestampType,true),StructField(Block,StringType,true),StructField(IUCR,StringType,true),StructField(Primary Type,StringType,true),StructField(Description,StringType,true),StructField(Location Description,StringType,true),StructField(Arrest,BooleanType,true),StructField(Domestic,BooleanType,true),StructField(Beat,StringType,true),StructField(District,StringType,true),StructField(Ward,StringType,true),StructField(Community Area,StringType,true),StructField(FBI Code,StringType,true),StructField(X Coordinate,StringType,true),StructField(Y Coordinate,StringType,true),StructField(Year,IntegerType,true),StructField(Updated On,TimestampType,true),StructField(Latitude,DoubleType,true),StructField(Longitude,DoubleType,true),StructField(Location,StringType,true)))

#### **Carregando o Schema**
Aqui a gente pega o schema que foi criado e usa para carregar o CSV. Simples assim. 

In [59]:
rc_schema = spark.read.csv('reported-crimes.csv', schema = schema, header = True)
rc_schema.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: timestamp (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



# **Trabalhando com Colunas**

#### **Selecionando colunas**

Podemos acessar as colunas em dataframe dem Pyspark de duas maneiras:<p>
Por indexing  -> **df['Column_name']**<br>
Por função    -> **df.select('column_name')**<p>
É importante lembrar que se o nome da coluna tiver espaços ou nomes reservados você **não vai conseguir acessar** usando o acesso via atributo

In [60]:
rc.select('ID','Date','Arrest').show(5)

+--------+-------------------+------+
|      ID|               Date|Arrest|
+--------+-------------------+------+
|10224738|2015-09-05 13:30:00| false|
|10224739|2015-09-04 11:30:00| false|
|11646166|2018-09-01 00:01:00| false|
|10224740|2015-09-05 12:45:00|  true|
|10224741|2015-09-05 13:00:00| false|
+--------+-------------------+------+
only showing top 5 rows



In [61]:
rc['ID', 'Date','Arrest'].show(5)

+--------+-------------------+------+
|      ID|               Date|Arrest|
+--------+-------------------+------+
|10224738|2015-09-05 13:30:00| false|
|10224739|2015-09-04 11:30:00| false|
|11646166|2018-09-01 00:01:00| false|
|10224740|2015-09-05 12:45:00|  true|
|10224741|2015-09-05 13:00:00| false|
+--------+-------------------+------+
only showing top 5 rows



#### **Trabalhando com os headers**
Para retornar o header de um Dataframe em **Pyspark** fazemos igual ao **Pandas**.<br>
E como sa saída é uma lista, podemos acessar essa lista via index ou usar outras ações aplicaveis a listas

In [None]:
#Vaocê tirar os brackets e testar as saídas
rc.columns[0:3]
#rc.columns
#list(reversed(rc.columns))
#rc.columns[::-1]
#len(rc.columns)

#### **Adicionando novas colunas**

Pandas -> **df['coluna_nova'] = df['coluna_velha'] * 2**<br>
Pyspark -> **df.withColumn('coluna_nova', 2 * df['coluna_velha']**

In [None]:
rc = rc.withColumn('coluna_nova_2', rc['ID'] / 2)
rc.select('ID', 'coluna_nova_2').show(2) 

#### **Removendo colunas**

Pyspark -> **df = df.drop('column')**

In [None]:
rc = rc.drop('coluna_nova_2')
rc.columns

# **Trabalhando Filtros e Linhas**

#### **filter()**
#### Diferente do Pandas, onde podemos filtrar direto na seleção da coluna, ex: df['coluna' > 50], em pyspark nós usamos a função filter()

In [36]:
rc.filter(col('Date')  > '2017-11-11' ).show(2)

+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11646166|   JC213529|2018-09-01 00:01:00|082XX S INGLESIDE...|0810|             THEFT|           OVER $500|           RESIDENCE| false|    true|0631|     006|   8|            44|      06|     

#### **distinct()**
#### Selecionando valores únicos em um dataframe
#### No pandas usamos **df['coluna'].unique()**, já aqui é um pouco diferente.

In [51]:
rc.select('Arrest').distinct().show()

+------+
|Arrest|
+------+
| false|
|  true|
+------+



#### **count()**
#### Com ele contamos os valores selecionados

In [49]:
prisoes = rc.filter(col('Arrest') == True).count() 
print(prisoes)


1874411


#### **orderBy()**
#### Usamos para fazer a ordenação do dataframe de acordo com a coluna selecionada

In [70]:
rc.filter(col('District') != 'null').select(col('District')).distinct().orderBy(col('District')).show()

+--------+
|District|
+--------+
|     001|
|     002|
|     003|
|     004|
|     005|
|     006|
|     007|
|     008|
|     009|
|     010|
|     011|
|     012|
|     014|
|     015|
|     016|
|     017|
|     018|
|     019|
|     020|
|     021|
+--------+
only showing top 20 rows



#### **union()**
#### **Concatenando Dataframes**
#### Seguindo as premissas já conhecidas de Python, Dataframes são imutaveis, deste modo não podemos fazer um append como fazemos com listas. Neste caso devemos concatenar os ataframes uns con os outros. 
#### **Critérios para concatenação**
Os Dataframes devem ter o mesmo numero de colunas<br>
Os Dataframes devem ter o mesmo schema
#### No Pandas nós usamnos **pd.concat(df,df2)**, no Pyspark usamos:

In [82]:
#Para esse exemplo usaremso uma parte menor do Dataframe
print(f"Para o Distrito 008 temos {rc.filter(col('District') == '008').count()} registros")
print(f"Para o Distrito 009 temos {rc.filter(col('District') == '009').count()} registros")
rc1 = rc.filter(col('District') == '008')
rc2 = rc.filter(col('District') == '009')
print(f"Apos usar o union() o total de registros do novo dataframe é {rc1.union(rc2).count()}, que representa a soma dos totais anteriores.")
     

Para o Distrito 008 temos 459542 registros
Para o Distrito 009 temos 335902 registros
Apos usar o union() o total de registros do novo dataframe é 795444, que representa a soma dos totais anteriores.
