<a href="https://colab.research.google.com/github/devrafael26/AutomacaoDeProcesso/blob/main/Projeto_An%C3%A1lise_de_Dados_com_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

- Primeiramente fizemos os uploads dos dataframes.
- Após, a instalação do pyspark.

In [None]:
pip install pyspark

Collecting pyspark
  Using cached pyspark-3.5.0-py2.py3-none-any.whl
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


Importamos do pyspark, o SparkSeession para abrir nossa sessão, as funções SQL e a classe type para trabalharmos os tipos de dados.

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
# Aqui abrimos e atribuímos nossa sessão ao objeto spark.
spark = SparkSession.builder.master('colab').appName('vendas3').getOrCreate()

In [None]:
df = spark.read.csv('/content/vendas3.csv', header=True, inferSchema=True)
df.show(5, truncate=False)

+---------+-----------------------------------------+----------------+--------------------+-----------------------+------------------------+
|ID       |Descrição do Produto                     |Data de Emissão |Total da Nota Fiscal|Valor Impostos Federais|Valor Impostos Estaduais|
+---------+-----------------------------------------+----------------+--------------------+-----------------------+------------------------+
|99003200 | BOBINA TERMICA 1 VIA 80X40M             |2022-01-03      |270.0               |49.17                  |45.9                    |
|99003200 | BOBINA TERMICA 1 VIA 80X40M             |2022-01-03      |280.0               |50.99                  |47.6                    |
|99010356 | PLACA MAE BRX H81 LGA 1150 DDR3 (M) (UN)|2022-01-03      |750.0               |198.68                 |52.5                    |
|99003200 | BOBINA TERMICA 1 VIA 80X40M             |2022-01-03      |135.0               |24.58                  |22.95                   |
|99003200 | B

In [None]:
# Aqui chamamos o dataframe através do método printSchema() para verificarmos os tipos de dados de cada coluna.
# Em seguida, verificamos valores ausentes.
df.printSchema()
df.toPandas().isna().sum()

root
 |-- ID: string (nullable = true)
 |-- Descrição do Produto: string (nullable = true)
 |-- Data de Emissão : string (nullable = true)
 |-- Total da Nota Fiscal: string (nullable = true)
 |-- Valor Impostos Federais: double (nullable = true)
 |-- Valor Impostos Estaduais: double (nullable = true)



ID                          0
Descrição do Produto        0
Data de Emissão             0
Total da Nota Fiscal        0
Valor Impostos Federais     0
Valor Impostos Estaduais    0
dtype: int64

Renomeando algumas colunas.

In [None]:
df = df.withColumnRenamed("Descrição do Produto", "Descricao")\
.withColumnRenamed("Total da Nota Fiscal", "Total_NF")\
.withColumnRenamed("Valor Impostos Federais", "IPI")\
.withColumnRenamed("Valor Impostos Estaduais", "ICMS")
df.show(5, truncate=False)

+---------+-----------------------------------------+----------------+--------+------+-----+
|ID       |Descricao                                |Data de Emissão |Total_NF|IPI   |ICMS |
+---------+-----------------------------------------+----------------+--------+------+-----+
|99003200 | BOBINA TERMICA 1 VIA 80X40M             |2022-01-03      |270.0   |49.17 |45.9 |
|99003200 | BOBINA TERMICA 1 VIA 80X40M             |2022-01-03      |280.0   |50.99 |47.6 |
|99010356 | PLACA MAE BRX H81 LGA 1150 DDR3 (M) (UN)|2022-01-03      |750.0   |198.68|52.5 |
|99003200 | BOBINA TERMICA 1 VIA 80X40M             |2022-01-03      |135.0   |24.58 |22.95|
|99003200 | BOBINA TERMICA 1 VIA 80X40M             |2022-01-03      |135.0   |24.58 |22.95|
+---------+-----------------------------------------+----------------+--------+------+-----+
only showing top 5 rows



In [None]:
df.show(5)

+---------+--------------------+----------------+--------+------+-----+
|       ID|           Descricao|Data de Emissão |Total_NF|   IPI| ICMS|
+---------+--------------------+----------------+--------+------+-----+
|99003200 | BOBINA TERMICA 1...|      2022-01-03|   270.0| 49.17| 45.9|
|99003200 | BOBINA TERMICA 1...|      2022-01-03|   280.0| 50.99| 47.6|
|99010356 | PLACA MAE BRX H8...|      2022-01-03|   750.0|198.68| 52.5|
|99003200 | BOBINA TERMICA 1...|      2022-01-03|   135.0| 24.58|22.95|
|99003200 | BOBINA TERMICA 1...|      2022-01-03|   135.0| 24.58|22.95|
+---------+--------------------+----------------+--------+------+-----+
only showing top 5 rows



Importamos do módulo Datetime a classe date.

In [None]:
from datetime import date
df = df.withColumn("Total_NF", col("Total_NF").cast(FloatType()))
df = df.withColumn("IPI", col("IPI").cast(FloatType()))
df = df.withColumn("ICMS", col("ICMS").cast(FloatType()))
df = df.withColumn("ID", col("ID").cast(IntegerType()))
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Descricao: string (nullable = true)
 |-- Data de Emissão : string (nullable = true)
 |-- Total_NF: float (nullable = true)
 |-- IPI: float (nullable = true)
 |-- ICMS: float (nullable = true)



Usando a função sum(), calculamos qual o valor total pago em todas as vendas, de IPI e ICMS, ao longo de todo o período.


In [None]:
df.select(sum(df.ICMS)).show()
df.select(sum(df.IPI)).show()

+-----------------+
|        sum(ICMS)|
+-----------------+
|175146.3895679824|
+-----------------+

+-----------------+
|         sum(IPI)|
+-----------------+
|814306.3698568121|
+-----------------+



Usando o select, chamamos as colulas Descricao e Total_NF, e filtramos pela coluna Total_NF apenas valores maior igual a 1000.

In [None]:
df.select("Descricao", "Total_NF").filter(df.Total_NF >= 1000).show(10)

+--------------------+--------+
|           Descricao|Total_NF|
+--------------------+--------+
| COMPUTADOR PENTI...|  1550.0|
| PROCESSADOR CORE I7|  1799.0|
| CENTRAL DE COMAN...|  2040.0|
| PLACA MAE ASUS P...|  1338.0|
| CONJ. PPA DZ JET...|  2200.0|
| CADEIRA FORTREK ...|  1599.0|
| CADEIRA GAMER TH...|  1999.0|
| NOBREAK 600VA SM...|  1440.0|
| COMPUTADOR NTC P...|  5980.0|
| TELA DE NOTEBOOK...|  1100.0|
+--------------------+--------+
only showing top 10 rows



Agora, aplicamos duas condições em nossa busca, usando o & (and).

In [None]:
df.select("ID", "Total_NF").filter((col("ID") == "99003200") & (col("Total_NF") >= 1000)).show()

+--------+--------+
|      ID|Total_NF|
+--------+--------+
|99003200|  3780.0|
|99003200|  2900.0|
+--------+--------+



Aqui também utilizamos duas condições em nosso filtro e usamos o operador like, para mostrar qualquer coisa da coluna Descricao que tenha a palavra MONITOR e que tenha sido uma venda maior igual a 2000.

In [None]:
df.select("Descricao", "Total_NF").filter(df.Descricao.like("%MONITOR%") & (col("Total_NF") >= 2000)).show(10)

+--------------------+--------+
|           Descricao|Total_NF|
+--------------------+--------+
|" MONITOR 23,6"" ...|  2440.0|
|" MONITOR LED 21,...| 5749.78|
| MONITOR 18,5 VGA...|  2196.0|
| MONITOR 18,5 VGA...|  2995.0|
| MONITOR 18,5 VGA...|  3434.7|
| MONITOR 18,5 VGA...|  3414.6|
| MONITOR 18,5 VGA...|  3414.6|
| MONITOR 18,5 VGA...| 3876.75|
| MONITOR 18,5 VGA...|  2047.8|
|" MONITOR 23,6"" ...| 2525.89|
+--------------------+--------+
only showing top 10 rows



Aqui utilizamos apenas o filter, o operador like e as condições, para que tenhamos de retorno a tabela com todas as colunas.

In [None]:
df.filter(df.Descricao.like("%PLACA MAE%") & (col("Total_NF") >= 1500)).show(30, truncate=False)

+--------+------------------------------------------------------------------+----------------+--------+-------+------+
|ID      |Descricao                                                         |Data de Emissão |Total_NF|IPI    |ICMS  |
+--------+------------------------------------------------------------------+----------------+--------+-------+------+
|99000069| COMPUTADOR AMD ATHLON 200GE / PLACA MAE GIGABYTE A320M           |2022-02-21      |5144.8  |1362.86|360.14|
|99000069| COMPUTADOR AMD ATHLON 200GE / PLACA MAE GIGABYTE A320M           |2022-02-28      |1687.0  |446.89 |118.09|
|99010365| PLACA MAE ASUS PRIME H510M                                       |2022-03-14      |2286.0  |605.56 |160.02|
|99010365| PLACA MAE ASUS PRIME H510M                                       |2022-03-15      |2100.0  |556.29 |147.0 |
|99010365| PLACA MAE ASUS PRIME H510M                                       |2022-03-30      |1500.0  |397.35 |105.0 |
|99000298| PLACA MAE INTEL 1200 PCWARE H510G (UN

Aqui nós agrupamos pela coluna Descricao através da função groupBy(), utilizamos a função agg() para fazermos a soma de quanto foi pago de IPI por produto ao longo de todo período, ordenamos com o OrderBy e colocamos em ordem decrescente.

In [None]:
df.groupBy('Descricao').agg({'IPI' : 'sum'}).orderBy(('sum(IPI)'), ascending=False).show(10, truncate=False)

+---------------------------------------------------------+------------------+
|Descricao                                                |sum(IPI)          |
+---------------------------------------------------------+------------------+
| MONITOR 18,5 VGA AOC LED E970SWHNL US ( F ) (UN)        |14571.32992553711 |
| PROCESSADOR INTEL CORE I5                               |13894.719947814941|
| PLACA MAE INTEL 1200 GIGABYTE  H410M H V3 (UN)          |10339.959976196289|
| Memoria 16GB DDR4 Servidor HP DL160 (UN)                |10194.690124511719|
| NOTEBOOK DELL INSPIRON i7                               |9185.170166015625 |
| SSD 240GB KINGSTON (UN)                                 |9038.160026550293 |
| AP UNIFI AC LITE BR U6 (UN)                             |8727.410095214844 |
| IMPRESSORA FISCAL BEMATECH MP                           |8250.89990234375  |
| CONTROLE DE ACESSO FACIAL HIKVISION (UN)                |8176.7999267578125|
| IMPRESSORA TÉRMICA NÃO FISCAL ELGIN I9 USB/ETHERNE

Outra maneira um pouco diferente sintaticamente para chamar a função agg, porém agora para saber quanto foi pago de ICMS.

In [None]:
df.groupBy("Descricao").agg(sum("ICMS")).orderBy("sum(ICMS)", ascending=False).show(10, truncate=False)


+---------------------------------------------------------+------------------+
|Descricao                                                |sum(ICMS)         |
+---------------------------------------------------------+------------------+
| BOBINA TERMICA 1 VIA 80X40M                             |4603.469985187054 |
| PROCESSADOR INTEL CORE I5                               |3828.169990539551 |
| MONITOR 18,5 VGA AOC LED E970SWHNL US ( F ) (UN)        |3360.109998703003 |
| PISO ELEVADO (M)                                        |3221.9998779296875|
| CONTROLE DE ACESSO FACIAL HIKVISION (UN)                |1863.4999389648438|
| NOTEBOOK LENOVO I7                                      |1826.2700500488281|
| PROCESSADOR INTEL CORE I3                               |1772.879991531372 |
| SSD 240GB KINGSTON (UN)                                 |1700.1600069999695|
| SERVIDOR 96 RAM DELL R540 (UN)                          |1624.0            |
| IMPRESSORA TÉRMICA NÃO FISCAL ELGIN I9 USB/ETHERNE

Criamos uma coluna com o withColumn, nomeamos como "Melhores Vendas" e aplicamos uma condição com o when e o otherwise. São analogamente um if e else. Atribuímos uma condição ao when, caso nao seja verdadeiro o otherwise será validado.
Se as vendas forem maior ou igual a 500 reais, será escrito ao lado "Venda BOA", caso contrário, irá aparecer um "emoji" triste.

In [None]:
df.withColumn("Melhores Vendas", when(col("Total_NF") >= 500, "Venda BOA").otherwise(":(")).show(10)

+--------+--------------------+----------------+--------+------+-----+---------------+
|      ID|           Descricao|Data de Emissão |Total_NF|   IPI| ICMS|Melhores Vendas|
+--------+--------------------+----------------+--------+------+-----+---------------+
|99003200| BOBINA TERMICA 1...|      2022-01-03|   270.0| 49.17| 45.9|             :(|
|99003200| BOBINA TERMICA 1...|      2022-01-03|   280.0| 50.99| 47.6|             :(|
|99010356| PLACA MAE BRX H8...|      2022-01-03|   750.0|198.68| 52.5|      Venda BOA|
|99003200| BOBINA TERMICA 1...|      2022-01-03|   135.0| 24.58|22.95|             :(|
|99003200| BOBINA TERMICA 1...|      2022-01-03|   135.0| 24.58|22.95|             :(|
|99003200| BOBINA TERMICA 1...|      2022-01-03|   138.0| 25.13|23.46|             :(|
|99010464| COMPUTADOR PENTI...|      2022-01-03|  1550.0| 410.6|108.5|      Venda BOA|
|99008788| SSD 120GB HIKVIS...|      2022-01-03|   200.0| 84.56| 14.0|             :(|
|99009622| IMPRESSORA GEREN...|      2022-0

In [None]:
# segunda maneira de chamar o comando acima usando o lit().
#df.withColumn("Melhores Vendas", when(col("Total_NF") == lit(2000), "Venda BOA :)").otherwise(':/')).show(30)

Carregando o dataframe compras.csv.

In [None]:
df_compras = spark.read.csv('/content/compras.csv', header=True, inferSchema=True)
df_compras.show(10, truncate=False)

+--------------------------------------------------------------------+----------------+-------------------------------------------+-----------+----------+-------------------+------------+-------------+
|Descrição do Produto                                                |Data de Registro|Fornecedor                                 |Nota Fiscal|Quantidade|Total de Mercadoria|Valor do IPI|Valor do ICMS|
+--------------------------------------------------------------------+----------------+-------------------------------------------+-----------+----------+-------------------+------------+-------------+
| 46I9USECKD02_SC ELGIN IMPRESSORA I9 FULL USB SERIAL ETHERNET C GUIL|2021-11-24      |SCANSOURCE DO BRASIL                       |21905      |15.0      |7875.0             |0.0         |945.0        |
| PLACA MON PRINCIPAL C BABY BOARD L42PRO                            |2021-11-24      |ELGIN S A - Matriz                         |307034     |1.0       |1.0                |0.15        |0.04 

Carregando o dataframe fornecedores.csv.

In [None]:
df_fornec = spark.read.csv('/content/fornecedores.csv', header=True, inferSchema=True, sep=";")
df_fornec.show(10, truncate=False)

+------------------+----------------------------------------------------+------------------+---------------------------+--------------------------+------------------+--------------------+
|CNPJ / CPF        |Nome Fantasia                                       |Telefone          |E-mail                     |Cidade                    |Inscrição Estadual|Contribuinte do ICMS|
+------------------+----------------------------------------------------+------------------+---------------------------+--------------------------+------------------+--------------------+
|42.846.634/0001-00|PAULINERIS TRANSP. E ENCOMENDAS LTDA.               |NULL              |NULL                       |Alfenas (MG)              |NULL              |Não                 |
|28.542.149/0002-00|JS LOCADORA DE VEICULOS LTDA EM RECUPERACAO JUDICIAL|(47) 9615-9276    |societario@blu.ideal.cnt.br|São Paulo (SP)            |123.953.965.112   |Sim                 |
|48.740.351/0003-27|BRASPRESS                               

Verificando valores ausentes.

In [None]:
df_fornec.toPandas().isna().sum()

CNPJ / CPF                2
Nome Fantasia             0
Telefone                463
E-mail                  975
Cidade                    0
Inscrição Estadual      176
Contribuinte do ICMS      0
dtype: int64

Excluindo a coluna e-mail pelo excesso de valores ausentes.

In [None]:
df_fornec = df_fornec.drop('E-mail')
df_fornec.show(10, truncate=False)

+------------------+----------------------------------------------------+------------------+--------------------------+------------------+--------------------+
|CNPJ / CPF        |Nome Fantasia                                       |Telefone          |Cidade                    |Inscrição Estadual|Contribuinte do ICMS|
+------------------+----------------------------------------------------+------------------+--------------------------+------------------+--------------------+
|42.846.634/0001-00|PAULINERIS TRANSP. E ENCOMENDAS LTDA.               |NULL              |Alfenas (MG)              |NULL              |Não                 |
|28.542.149/0002-00|JS LOCADORA DE VEICULOS LTDA EM RECUPERACAO JUDICIAL|(47) 9615-9276    |São Paulo (SP)            |123.953.965.112   |Sim                 |
|48.740.351/0003-27|BRASPRESS                                           |(41) 412105-2800  |Curitiba (PR)             |9030546625        |Sim                 |
|48.740.351/0036-95|BRASPRESS           

Renomeando a coluna Nome Fantasia.

In [None]:
df_fornec = df_fornec.withColumnRenamed("Nome Fantasia", "Fornecedor")
df_fornec.show(5, truncate=False)

+------------------+----------------------------------------------------+------------------+--------------+------------------+--------------------+
|CNPJ / CPF        |Fornecedor                                          |Telefone          |Cidade        |Inscrição Estadual|Contribuinte do ICMS|
+------------------+----------------------------------------------------+------------------+--------------+------------------+--------------------+
|42.846.634/0001-00|PAULINERIS TRANSP. E ENCOMENDAS LTDA.               |NULL              |Alfenas (MG)  |NULL              |Não                 |
|28.542.149/0002-00|JS LOCADORA DE VEICULOS LTDA EM RECUPERACAO JUDICIAL|(47) 9615-9276    |São Paulo (SP)|123.953.965.112   |Sim                 |
|48.740.351/0003-27|BRASPRESS                                           |(41) 412105-2800  |Curitiba (PR) |9030546625        |Sim                 |
|48.740.351/0036-95|BRASPRESS                                           |NULL              |Maringa (PR)  |90319

Juntando informações entre duas tabelas pela coluna Fornecedor, através do INNER JOIN.

In [None]:
df_novo = df_compras.join(df_fornec, df_compras['Fornecedor'] == df_fornec['Fornecedor'], 'inner')
df_novo.show(10, truncate=False)

+--------------------------------------------------------------------+----------------+-------------------------------------------+-----------+----------+-------------------+------------+-------------+------------------+-------------------------------------------+----------------+-----------------------+------------------+--------------------+
|Descrição do Produto                                                |Data de Registro|Fornecedor                                 |Nota Fiscal|Quantidade|Total de Mercadoria|Valor do IPI|Valor do ICMS|CNPJ / CPF        |Fornecedor                                 |Telefone        |Cidade                 |Inscrição Estadual|Contribuinte do ICMS|
+--------------------------------------------------------------------+----------------+-------------------------------------------+-----------+----------+-------------------+------------+-------------+------------------+-------------------------------------------+----------------+-----------------------+---