# Processamento de dados usando o Databricks - PySpark
* Este notebok visa fazer o procesamento de dados usando o PySpark.    
* Será ultilizando para este labioratoria 4 arquivos do tipo parquet.
* No final do processamento será gerado um unico arquivo parquert contendo as transformações.

In [0]:
# Importando as bibliotecas que serão usadas no laboratorio.

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Importando os arquivos para processamento.

In [0]:
# Tipo do arquivos
file_type = "parquet"

# Path dos arquivos
file_location1 = "/FileStore/tables/Clientes.parquet"
file_location2 = "/FileStore/tables/ItensVendas.parquet"
file_location3 = "/FileStore/tables/Produtos.parquet"
file_location4 = "/FileStore/tables/Vendas.parquet"
file_location5 = "/FileStore/tables/Vendedores.parquet"

arq_Clientes =  spark \
                .read.format(file_type)\
                .option("inferSchema","True")\
                .option("header", "True")\
                .parquet(file_location1)

#arq_ItensVendas =   spark \
              #  .read.format(file_type)\
              #  .option("inferSchema","True")\
              #  .option("header", "True")\
               # .parquet(file_location2)

arq_Produtos =   spark \
                .read.format(file_type)\
                .option("inferSchema","True")\
                .option("header", "True")\
                .parquet(file_location3)

arq_Vendas =   spark \
                .read.format(file_type)\
                .option("inferSchema","True")\
                .option("header", "True")\
                .parquet(file_location4)

arq_Vendedores =  spark \
                .read.format(file_type)\
                .option("inferSchema","True")\
                .option("header", "True")\
                .parquet(file_location5)

# Modificando o nome das colunas do dataframe  ** arq_Vendas **
arq_Vendas = arq_Vendas.withColumnRenamed("VendasID", "IdVendasID") \
                       .withColumnRenamed("VendedorID", "IdVendedor") \
                       .withColumnRenamed("ClienteID", "IdCliente")

# Modificando o nome das colunas do dataframe  ** arq_ItensVendas **
arq_ItensVendas = arq_ItensVendas.withColumnRenamed("ProdutoID", "IdProduto")


# Conhecendo os arquivos.  
* Inspescionando o schema de cada arquivo.

In [0]:
%fs ls /FileStore/tables/Clientes.parquet 

path,name,size,modificationTime
dbfs:/FileStore/tables/Clientes.parquet,Clientes.parquet,9311,1654426456000


In [0]:
print("Clientes - ",arq_Clientes.take)
print("ItensVendas - ",arq_ItensVendas.take)
print("Produtos - ",arq_Produtos.take)
print("Vendas - ",arq_Vendas.take)
print("Vendedores - ",arq_Vendedores.take)

Clientes -  <bound method DataFrame.take of DataFrame[ClienteID: bigint, Cliente: string, Estado: string, Genero: string, Status: string]>
ItensVendas -  <bound method DataFrame.take of DataFrame[IdProduto: bigint, VendasID: bigint, Quantidade: bigint, ValorUnitario: double, ValorTotal: double, Desconto: string, TotalComDesconto: double]>
Produtos -  <bound method DataFrame.take of DataFrame[ProdutoID: bigint, Produto: string, Preco: string]>
Vendas -  <bound method DataFrame.take of DataFrame[IdVendasID: bigint, IdVendedor: bigint, IdCliente: bigint, Data: string, Total: double]>
Vendedores -  <bound method DataFrame.take of DataFrame[VendedorID: bigint, Vendedor: string]>


#Gerando cache dos DataFrames.
O objetivo é facilitar o processamento uma vez que estaremos realizando algumas operações neles.

In [0]:
arq_Clientes.cache()
arq_Produtos.cache()
arq_Vendas.cache()
arq_Vendedores.cache()

Out[105]: DataFrame[VendedorID: bigint, Vendedor: string]

#Verificando a quantidade de registros de cada arquivo.

In [0]:
print("Qte. de Linhas - Clientes #", arq_Clientes.count())
print("Qte. de Linhas - Prosdutos #",arq_Produtos.count())
print("Qte. de Linhas - Vendas #",arq_Vendas.count())
print("Qte. de Linhas - Vendedores #",arq_Vendedores.count())
print("Qte. de Linhas - Produtos #",arq_ItensVendas.count())

Qte. de Linhas - Clientes # 250
Qte. de Linhas - Prosdutos # 10
Qte. de Linhas - Vendas # 400
Qte. de Linhas - Vendedores # 10
Qte. de Linhas - Produtos # 940


#Display dos dados.

In [0]:
display(arq_Clientes.limit(5))
display(arq_Produtos.limit(5))
display(arq_Vendas.limit(5))
display(arq_Vendedores.limit(5))
display(arq_ItensVendas.limit(5))


ClienteID,Cliente,Estado,Genero,Status
1,Adelina Buenaventura,RJ,M,Silver
2,Adelino Gago,RJ,M,Silver
3,Adolfo Patrício,PE,M,Silver
4,Adriana Guedelha,RO,F,Platinum
5,Adélio Lisboa,SE,M,Silver


ProdutoID,Produto,Preco
1,Bicicleta Aro 29 Mountain Bike Endorphine 6.3 - 24 Marchas - Shimano - Alumínio,"8.852,00"
2,Bicicleta Altools Stroll Aro 26 Freio À Disco 21 Marchas,"9.201,00"
3,Bicicleta Gts Advanced 1.0 Aro 29 Freio Disco Câmbio Traseiro Shimano 24 Marchas,"4.255,00"
4,Bicicleta Trinc Câmbios Shimano Aro 29 Freio A Disco 24v,"7.658,00"
5,Bicicleta Gometws Endorphine 7.3 - Shimano Alumínio Aro 29 - 24 Marchas,"2.966,00"


IdVendasID,IdVendedor,IdCliente,Data,Total
1,1,91,1/1/2019,8053.6
2,6,185,1/1/2020,150.4
3,7,31,2/1/2020,6087.0
4,5,31,2/1/2019,13828.6
5,5,31,3/1/2018,26096.66


VendedorID,Vendedor
1,Armando Lago
2,Capitolino Bahía
3,Daniel Pirajá
4,Godo Capiperibe
5,Hélio Liberato


IdProduto,VendasID,Quantidade,ValorUnitario,ValorTotal,Desconto,TotalComDesconto
2,400,2,9201.0,18402.0,625668,12145.32
2,385,2,9201.0,18402.0,570462,12697.38
4,395,2,6892.2,13784.4,510023,8684.17
4,367,2,6509.3,13018.6,481688,8201.72
2,380,2,7038.77,14077.54,436404,9713.5


#Fazendo Inner Join dos arquivos
* Objetivo dos Joins é unir todas as tabelas auxiliares gerando somente uma tabela Fato.

In [0]:
ft_vendas = arq_Vendas.join(arq_Clientes,    arq_Vendas.IdVendasID ==  arq_Clientes.ClienteID , "inner")   \
                      .join(arq_Vendedores,  arq_Vendas.IdVendedor == arq_Vendedores.VendedorID , "inner") \
                      .join(arq_ItensVendas, arq_Vendas.IdVendasID == arq_ItensVendas.VendasID , "inner")   \
                      .join(arq_Produtos,    arq_ItensVendas.IdProduto == arq_Produtos.ProdutoID , "inner")


In [0]:
 ft_vendas.show()

+----------+----------+---------+--------+--------+---------+--------------------+------+------+--------+----------+----------------+---------+--------+----------+-------------+----------+--------+----------------+---------+--------------------+---------+
|IdVendasID|IdVendedor|IdCliente|    Data|   Total|ClienteID|             Cliente|Estado|Genero|  Status|VendedorID|        Vendedor|IdProduto|VendasID|Quantidade|ValorUnitario|ValorTotal|Desconto|TotalComDesconto|ProdutoID|             Produto|    Preco|
+----------+----------+---------+--------+--------+---------+--------------------+------+------+--------+----------+----------------+---------+--------+----------+-------------+----------+--------+----------------+---------+--------------------+---------+
|         1|         1|       91|1/1/2019|  8053.6|        1|Adelina Buenaventura|    RJ|     M|  Silver|         1|    Armando Lago|        2|       1|         1|      7820.85|   7820.85|     -  |         7820.85|        2|Biciclet

#Ordenando as colunas para melhorar a visualização
* Nesta etapa foi selecionado apenas algumas colunas da união de todas os arquivos, aplicando a ordenação Ascendente. (Do maior Valor para o Menor)

In [0]:
ft_vendas_ordenado = ft_vendas.select("VendasID","Data","VendedorID","Vendedor","ClienteID","Cliente","Status", \
                                      "ProdutoID","Produto","Quantidade","ValorUnitario","ValorTotal","Desconto","TotalComDesconto") \
                                      .orderBy(desc("ValorTotal"))
ft_vendas_ordenado.show(10)


+--------+---------+----------+----------------+---------+--------------------+------+---------+--------------------+----------+-------------+----------+--------+----------------+
|VendasID|     Data|VendedorID|        Vendedor|ClienteID|             Cliente|Status|ProdutoID|             Produto|Quantidade|ValorUnitario|ValorTotal|Desconto|TotalComDesconto|
+--------+---------+----------+----------------+---------+--------------------+------+---------+--------------------+----------+-------------+----------+--------+----------------+
|     156|17/5/2019|         7|Jéssica Castelão|      156|   Faustino Maranhão|Silver|        2|Bicicleta Altools...|         2|       9201.0|   18402.0| 1104,12|        17297.88|
|     173| 4/6/2020|         5|  Hélio Liberato|      173|   Florinda Assunção|Silver|        2|Bicicleta Altools...|         2|       9201.0|   18402.0| 1840,20|         16561.8|
|     110|10/4/2019|         4| Godo Capiperibe|      110|Deolinda Castelbr...|Silver|        2|Bici

#Respondendo algumas perguntas apartir dos dados transformados

In [0]:
#---------  Qual cliente que menos gastou(comprou) ? ---------# 
#-------------------------------------------------------------#

ft_vendas_ordenado.select("VendasID","Cliente","ValorTotal").groupBy("Cliente").agg(sum("ValorTotal")).orderBy(desc(sum("ValorTotal"))).show()


+--------------------+------------------+
|             Cliente|   sum(ValorTotal)|
+--------------------+------------------+
|    David Carvalhais|29307.480000000003|
|     Floriano Siebra|28660.300000000003|
|     Joaquim Hurtado|26852.870000000003|
|       Adélio Lisboa|          26096.66|
|     Ibijara Botelho|          25146.85|
|Godofredo Mascareñas|           22751.4|
|       Iuri Guterres|          22711.68|
|   Florinda Assunção|          21471.75|
|Deolinda Castelbr...|           21357.0|
|      Belmira Colaço|          21155.46|
| Clotilde Carvalhoso|20543.440000000002|
|   Gertrudes Hidalgo|          20345.23|
|    Davide Alcántara|          20338.43|
|          Dora Rocha|          20082.85|
|    Feliciano Franca|          19528.45|
|        Hugo Covilhã|          19416.16|
|       Aníbal Bastos|          18838.35|
|   Faustino Maranhão|          18786.84|
|   Dinarte Mangueira|           18413.6|
|       Adérito Bahía|           18402.0|
+--------------------+------------

In [0]:
#---------  Qual cliente que menos gastou(comprou) ? ---------# 
#-------------------------------------------------------------#

ft_vendas_ordenado.select("VendasID","Cliente","ValorTotal").groupBy("Cliente").agg(sum("ValorTotal")).orderBy(asc(sum("ValorTotal"))).show()

+--------------------+---------------+
|             Cliente|sum(ValorTotal)|
+--------------------+---------------+
|           Cid Pardo|           91.8|
| Guadalupe Rodrigues|           92.0|
|          Joana Ataí|           97.2|
|    Ifigénia Lustosa|          97.75|
|      Cosme Zambujal|         103.28|
|  Guilhermina Vilaça|          103.5|
|    Catarina Montero|         114.75|
| Bartolomeu Vila-Chã|          115.0|
|       Estela Mattos|         115.06|
|   Francisca Ramallo|          121.5|
|      Cecília Carlos|         127.84|
|      Amélia Estévez|         131.75|
|       Flor Ginjeira|         131.75|
|Godinho ou Godim ...|          139.5|
|       Elvira Açores|          139.5|
|       Cássia Guerra|         143.82|
|        Adelino Gago|          150.4|
|      Célia Meireles|          155.0|
|     Amadeu Martinho|          155.0|
|     Dinarte Tabares|          155.0|
+--------------------+---------------+
only showing top 20 rows



In [0]:
#---------  Qual foi a quantidade total  de produtos vendidos?  ---------# 
#           Qual foi o valor total dos produtos vendidos ?               #
#------------------------------------------------------------------------#

ft_vendas_ordenado.select("ValorTotal","Quantidade").agg(sum("ValorTotal"),sum("Quantidade")).show()

+------------------+---------------+
|   sum(ValorTotal)|sum(Quantidade)|
+------------------+---------------+
|1939142.6400000032|            678|
+------------------+---------------+



In [0]:
#---------       Qual o total de clientes ?        ---------#
#           Qual o total de clientes por Status?   ---------#
#-----------------------------------------------------------#

ft_vendas_ordenado.select("ClienteID").agg(countDistinct("ClienteID")).show()
ft_vendas_ordenado.select("ClienteID","Status").groupBy("Status").agg(countDistinct("ClienteID")).show()

+--------+----------------+
|  Status|count(ClienteID)|
+--------+----------------+
|Platinum|               4|
|  Silver|             237|
|    Gold|               9|
+--------+----------------+

+----------------+
|count(ClienteID)|
+----------------+
|             250|
+----------------+

