# **Módulo 3 - Manipulando dados com Spark - Parte I**

## **Importando bibliotecas do Spark**

In [None]:
%%bash

# Instal Java
apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install PySpark
pip install -q pyspark

In [None]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

**OBS :** A preparação do ambiente não é igual ao da aula porque estou fazendo 
o exercício no Google Colab. 

In [None]:
df = spark.createDataFrame([('Fulano','1'),
                            ('Ciclano','2')], 
                           schema='nome STRING, id STRING')

In [None]:
df.show()

+-------+---+
|   nome| id|
+-------+---+
| Fulano|  1|
|Ciclano|  2|
+-------+---+



## **Acessando os Tipos do Spark**

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import col

In [None]:
int_type = IntegerType()

In [None]:
array_type = ArrayType(IntegerType())

In [None]:
array_type

ArrayType(IntegerType,true)

## **Convertendo os tipos de Colunas**

In [None]:
df.dtypes

[('nome', 'string'), ('id', 'string')]

In [None]:
df.select('nome',col('id').cast(IntegerType()))

DataFrame[nome: string, id: int]

In [None]:
df.show()

+-------+---+
|   nome| id|
+-------+---+
| Fulano|  1|
|Ciclano|  2|
+-------+---+



In [None]:
df.select('nome',col('id').cast('int'))

DataFrame[nome: string, id: int]

In [None]:
df.show()

+-------+---+
|   nome| id|
+-------+---+
| Fulano|  1|
|Ciclano|  2|
+-------+---+



## **Schema e Criação de DataFrames**

Um schema no Spark é uma especificação de tipos de colunas de um DataFrame. Eles são usadaos na leitura de dados externos e criação de DataFrames, e podem
ser passados diretamente no Spark ou podem ser inferidos. Passar um schema na 
leitura traz benefícios interessantes como : 

  - Evita que o Spark faça inferência de tipos, o que é custoso e demorado 
  dependendo do tamanho do arquivo, laém de propenso a erros; 
  - Permite que usuário identifique erros nos dados logo na leitura, caso 
  não sigam o schema especificado. 

In [None]:
df = spark.createDataFrame([('Fulano'  ,1),
                            ('Ciclano' ,2),
                            ('Beltrano',3),
                            ('Deltrano',4)], 
                            schema=['nome','id'])

In [None]:
df.show()

+--------+---+
|    nome| id|
+--------+---+
|  Fulano|  1|
| Ciclano|  2|
|Beltrano|  3|
|Deltrano|  4|
+--------+---+



In [None]:
df.dtypes

[('nome', 'string'), ('id', 'bigint')]

## **Criando schemas programaticamente**

In [None]:
schema = \
  StructType([
    StructField('nome',StringType()),
    StructField('id'  ,IntegerType())
  ])

In [None]:
df = spark.createDataFrame([('Fulano'  ,1),
                            ('Ciclano' ,2),
                            ('Beltrano',3),
                            ('Deltrano',4)], 
                            schema=schema)

In [None]:
df.show()

+--------+---+
|    nome| id|
+--------+---+
|  Fulano|  1|
| Ciclano|  2|
|Beltrano|  3|
|Deltrano|  4|
+--------+---+



In [None]:
df.dtypes

[('nome', 'string'), ('id', 'int')]

## **Criando schemas com DDL**

In [None]:
schema = 'nome STRING, id INT' 

In [None]:
df = spark.createDataFrame([('Fulano'  ,1),
                            ('Ciclano' ,2),
                            ('Beltrano',3),
                            ('Deltrano',4)], 
                            schema=schema)

In [None]:
df.show()

+--------+---+
|    nome| id|
+--------+---+
|  Fulano|  1|
| Ciclano|  2|
|Beltrano|  3|
|Deltrano|  4|
+--------+---+



In [None]:
df.dtypes

[('nome', 'string'), ('id', 'int')]

## **Criando DataFrames**

In [None]:
data = [('Fulano'  ,1),
        ('Ciclano' ,2),
        ('Beltrano',3),
        ('Deltrano',4)]

In [None]:
schema = 'nome STRING, id INT' 

In [None]:
df = spark.createDataFrame(data,schema=schema)

In [None]:
df.dtypes

[('nome', 'string'), ('id', 'int')]

In [None]:
df.printSchema()

root
 |-- nome: string (nullable = true)
 |-- id: integer (nullable = true)



In [None]:
spark.range(100).show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
only showing top 20 rows



## **Leitura e Escrita de Dados**

In [None]:
link_tab = '/content/drive/MyDrive/igti_bootcamps/eng_dados_cloud/mod3/tab_cnae.csv'

### **DataFrameReader**

      `spark.read.format(format).option(args).load(file_path)`

### **DataFrameWriter**

      `spark.write.format(format).option(args).load(file_path)`

### **Lendo e Escrevendo CSV**
Opções mais comuns : 
  - header
  - inferSchema
  - sep
  - encoding

In [None]:
#df = spark.read.format('csv').load(link_tab)
df = spark.read.csv(link_tab,sep=',',header=True)

In [None]:
df.limit(5).show()

+------+--------------------+---------+-----------+
|  CNAE|           DESCRIÇÃO|CÓD.SETOR| NOME SETOR|
+------+--------------------+---------+-----------+
|111301|    Cultivo de arroz|        1|AGRICULTURA|
|111302|    Cultivo de milho|        1|AGRICULTURA|
|111303|    Cultivo de trigo|        1|AGRICULTURA|
|111399|Cultivo de outros...|        1|AGRICULTURA|
|112101|Cultivo de algodã...|        1|AGRICULTURA|
+------+--------------------+---------+-----------+



## **Definindo o schema**

In [None]:
schema = 'cod_cnae STRING, descricao STRING, cod_setor INT, nome_setor STRING '

In [None]:
df = spark.read.csv(link_tab,sep=',',header=True, schema=schema)

In [None]:
df.dtypes

[('cod_cnae', 'string'),
 ('descricao', 'string'),
 ('cod_setor', 'int'),
 ('nome_setor', 'string')]

outra forma de fazer seria

In [None]:
df = (
    spark.read
    .format('csv')
    .option('header','true')
    .option('sep',',')
    .schema(schema)
    .load(link_tab)
    )

df.limit(5).show()

+--------+--------------------+---------+-----------+
|cod_cnae|           descricao|cod_setor| nome_setor|
+--------+--------------------+---------+-----------+
|  111301|    Cultivo de arroz|        1|AGRICULTURA|
|  111302|    Cultivo de milho|        1|AGRICULTURA|
|  111303|    Cultivo de trigo|        1|AGRICULTURA|
|  111399|Cultivo de outros...|        1|AGRICULTURA|
|  112101|Cultivo de algodã...|        1|AGRICULTURA|
+--------+--------------------+---------+-----------+



In [None]:
df = (
    spark.read
    .format('csv')
    .options(header=True,sep=',')  
    .schema(schema)
    .load(link_tab)
)
df.show(5)

+--------+--------------------+---------+-----------+
|cod_cnae|           descricao|cod_setor| nome_setor|
+--------+--------------------+---------+-----------+
|  111301|    Cultivo de arroz|        1|AGRICULTURA|
|  111302|    Cultivo de milho|        1|AGRICULTURA|
|  111303|    Cultivo de trigo|        1|AGRICULTURA|
|  111399|Cultivo de outros...|        1|AGRICULTURA|
|  112101|Cultivo de algodã...|        1|AGRICULTURA|
+--------+--------------------+---------+-----------+
only showing top 5 rows



**OBS :** Utilizando o método "options" podemos parametrizar melhor nossa função
 usando um dicionário

In [None]:
options_dict = {
    'sep' : ',' , 
    'header' : 'True'
}

df = (
      spark.read
    .format('csv')
    .options(**options_dict)  
    .schema(schema)
    .load(link_tab)
)
df.show(5)

+--------+--------------------+---------+-----------+
|cod_cnae|           descricao|cod_setor| nome_setor|
+--------+--------------------+---------+-----------+
|  111301|    Cultivo de arroz|        1|AGRICULTURA|
|  111302|    Cultivo de milho|        1|AGRICULTURA|
|  111303|    Cultivo de trigo|        1|AGRICULTURA|
|  111399|Cultivo de outros...|        1|AGRICULTURA|
|  112101|Cultivo de algodã...|        1|AGRICULTURA|
+--------+--------------------+---------+-----------+
only showing top 5 rows



In [None]:
df.printSchema()

root
 |-- cod_cnae: string (nullable = true)
 |-- descricao: string (nullable = true)
 |-- cod_setor: integer (nullable = true)
 |-- nome_setor: string (nullable = true)



In [None]:
schema = 'cod_cnae INT, descricao STRING, cod_setor INT, nome_setor STRING '

In [None]:
options_dict = {
    'sep' : ',' , 
    'header' : 'True'
}

df = (
      spark.read
    .format('csv')
    .options(**options_dict)  
    .schema(schema)
    .load(link_tab)
)
df.show(5)

+--------+--------------------+---------+-----------+
|cod_cnae|           descricao|cod_setor| nome_setor|
+--------+--------------------+---------+-----------+
|  111301|    Cultivo de arroz|        1|AGRICULTURA|
|  111302|    Cultivo de milho|        1|AGRICULTURA|
|  111303|    Cultivo de trigo|        1|AGRICULTURA|
|  111399|Cultivo de outros...|        1|AGRICULTURA|
|  112101|Cultivo de algodã...|        1|AGRICULTURA|
+--------+--------------------+---------+-----------+
only showing top 5 rows



In [None]:
df.printSchema()

root
 |-- cod_cnae: integer (nullable = true)
 |-- descricao: string (nullable = true)
 |-- cod_setor: integer (nullable = true)
 |-- nome_setor: string (nullable = true)



In [None]:
path_save = '/content/drive/MyDrive/igti_bootcamps/eng_dados_cloud/mod3/'

In [None]:
#df.write.format('csv').save(path_save + 'df_cnae_teste',header=True)

AnalysisException: ignored

In [None]:
spark.read.format('csv').load(path_save + 'df_cnae_teste',header=True).printSchema()

root
 |-- cod_cnae: string (nullable = true)
 |-- descricao: string (nullable = true)
 |-- cod_setor: string (nullable = true)
 |-- nome_setor: string (nullable = true)



In [None]:
df.toPandas().to_csv(
    path_save + 'df_cnae_teste.csv',
    index=False, header=True
    )