# DataFrame com Spark
---

## 1. Bibliotecas

In [16]:
# Instalando os pacotes necessários

#!conda install pyspark -y
#!conda install -c conda-forge findspark -y

In [17]:
import findspark
findspark.find()
findspark.init()
import pyspark

from pyspark.sql.functions import *

In [18]:
# Iniciando uma secção spark

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

## 2. Leitura dos dados

In [19]:
clientes = spark.read.parquet('../../dataset/Clientes.parquet', inferSchema=True)
clientes.show()

+---------+--------------------+------+------+--------+
|ClienteID|             Cliente|Estado|Genero|  Status|
+---------+--------------------+------+------+--------+
|        1|Adelina Buenaventura|    RJ|     M|  Silver|
|        2|        Adelino Gago|    RJ|     M|  Silver|
|        3|     Adolfo Patrício|    PE|     M|  Silver|
|        4|    Adriana Guedelha|    RO|     F|Platinum|
|        5|       Adélio Lisboa|    SE|     M|  Silver|
|        6|       Adérito Bahía|    MA|     M|  Silver|
|        7|       Aida Dorneles|    RN|     F|  Silver|
|        8|   Alarico Quinterno|    AC|     M|  Silver|
|        9|    Alberto Cezimbra|    AM|     M|  Silver|
|       10|    Alberto Monsanto|    RN|     M|    Gold|
|       11|       Albino Canela|    AC|     M|  Silver|
|       12|     Alceste Varanda|    RR|     F|  Silver|
|       13|  Alcides Carvalhais|    RO|     M|  Silver|
|       14|        Aldo Martins|    GO|     M|  Silver|
|       15|   Alexandra Tabares|    MG|     F|  

In [20]:
clientes.printSchema()

root
 |-- ClienteID: long (nullable = true)
 |-- Cliente: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Genero: string (nullable = true)
 |-- Status: string (nullable = true)



In [21]:
vendas = spark.read.parquet('../../dataset/Vendas.parquet', inerSchema=True)
vendas.show()

+--------+----------+---------+---------+--------+
|VendasID|VendedorID|ClienteID|     Data|   Total|
+--------+----------+---------+---------+--------+
|       1|         1|       91| 1/1/2019|  8053.6|
|       2|         6|      185| 1/1/2020|   150.4|
|       3|         7|       31| 2/1/2020|  6087.0|
|       4|         5|       31| 2/1/2019| 13828.6|
|       5|         5|       31| 3/1/2018|26096.66|
|       6|         5|       31| 4/1/2020| 18402.0|
|       7|         5|       31| 6/1/2019|  7524.2|
|       8|         5|      186| 6/1/2019| 12036.6|
|       9|         7|       91| 6/1/2020| 2804.75|
|      10|         2|      202| 6/1/2020|  8852.0|
|      11|         7|       58| 8/1/2019|16545.25|
|      12|         7|       58| 9/1/2018|11411.88|
|      13|         7|       58|10/1/2019| 15829.7|
|      14|         3|      249|12/1/2020| 6154.36|
|      15|         4|      249|12/1/2018| 3255.08|
|      16|         7|      192|13/1/2020| 2901.25|
|      17|         2|       79|

In [22]:
vendas.printSchema()

root
 |-- VendasID: long (nullable = true)
 |-- VendedorID: long (nullable = true)
 |-- ClienteID: long (nullable = true)
 |-- Data: string (nullable = true)
 |-- Total: double (nullable = true)



## 3. Consultas

In [23]:
df_reordenado = clientes.select('Cliente', 'Estado', 'Status')
df_reordenado.show()

+--------------------+------+--------+
|             Cliente|Estado|  Status|
+--------------------+------+--------+
|Adelina Buenaventura|    RJ|  Silver|
|        Adelino Gago|    RJ|  Silver|
|     Adolfo Patrício|    PE|  Silver|
|    Adriana Guedelha|    RO|Platinum|
|       Adélio Lisboa|    SE|  Silver|
|       Adérito Bahía|    MA|  Silver|
|       Aida Dorneles|    RN|  Silver|
|   Alarico Quinterno|    AC|  Silver|
|    Alberto Cezimbra|    AM|  Silver|
|    Alberto Monsanto|    RN|    Gold|
|       Albino Canela|    AC|  Silver|
|     Alceste Varanda|    RR|  Silver|
|  Alcides Carvalhais|    RO|  Silver|
|        Aldo Martins|    GO|  Silver|
|   Alexandra Tabares|    MG|  Silver|
|      Alfredo Cotrim|    SC|  Silver|
|     Almeno Figueira|    SC|  Silver|
|      Alvito Peralta|    AM|  Silver|
|     Amadeu Martinho|    RN|  Silver|
|      Amélia Estévez|    PE|  Silver|
+--------------------+------+--------+
only showing top 20 rows



In [24]:
df_filtrado = clientes.where('Status == "Platinum" or Status == "Gold"')
df_filtrado.show()

+---------+-------------------+------+------+--------+
|ClienteID|            Cliente|Estado|Genero|  Status|
+---------+-------------------+------+------+--------+
|        4|   Adriana Guedelha|    RO|     F|Platinum|
|       10|   Alberto Monsanto|    RN|     M|    Gold|
|       28|      Anna Carvajal|    RS|     F|    Gold|
|       49|      Bento Quintão|    SP|     M|    Gold|
|       68|      Carminda Dias|    AM|     F|    Gold|
|       83|      Cláudio Jorge|    TO|     M|    Gold|
|      121|    Dionísio Saltão|    PR|     M|    Gold|
|      166|   Firmino Meireles|    AM|     M|    Gold|
|      170|      Flor Vilanova|    CE|     M|Platinum|
|      220|Honorina Villaverde|    PE|     F|    Gold|
|      230|    Ibijara Botelho|    RR|     F|Platinum|
|      237|  Iracema Rodríguez|    BA|     F|    Gold|
|      247|         Joana Ataí|    GO|     F|Platinum|
+---------+-------------------+------+------+--------+



In [25]:
cont_status = clientes.groupby('Status').agg(count('Status')).orderBy('count(status)', ascending=False)
cont_status.show()

+--------+-------------+
|  Status|count(Status)|
+--------+-------------+
|  Silver|          237|
|    Gold|            9|
|Platinum|            4|
+--------+-------------+



In [26]:
# O spark não salva o DataFrame por que não aceita os caracteres do nome da
# coluna que foi agregada
cont_status = cont_status.withColumnRenamed('count(Status)', 'Qtd_status')
cont_status.columns

['Status', 'Qtd_status']

## 4. Salvando os resultados

In [27]:
df_reordenado.write.mode('overwrite').format('csv').save('../../dataset/resultados/df_reordenado_csv')
df_reordenado.write.mode('overwrite').format('json').save('../../dataset/resultados/df_reordenado_json')

In [28]:
df_filtrado.write.mode('overwrite').format('orc').save('../../dataset/resultados/df_filtrado_orc')
df_filtrado.write.mode('overwrite').format('parquet').save('../../dataset/resultados/df_filtrado_parquet')

In [29]:
cont_status.write.mode('overwrite').format('csv').save('../../dataset/resultados/cont_status_csv')
cont_status.write.mode('overwrite').format('json').save('../../dataset/resultados/cont_status_json')
cont_status.write.mode('overwrite').format('orc').save('../../dataset/resultados/cont_status_orc')
cont_status.write.mode('overwrite').format('parquet').save('../../dataset/resultados/cont_status_parquet')