<a href="https://colab.research.google.com/github/FthiagoOliveiraS/Engenharia_dados_spark/blob/main/Main_pipeline_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
## Instalando as dependências

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [3]:
## Configurando as variaveis de ambiente

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [4]:
## Localizando o SPARK
import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master('local[2]') \
        .appName("Iniciando com Spark") \
        .getOrCreate()

In [6]:
spark

# EMPRESA 01

### IMPORTANDO AS BASES DE DADOS

In [7]:
# Resgatando os dados da empresa 01
# O arquivo original é um json

df1 = spark.read.json("/content/dados_empresaA.json")
df1.show(truncate=False)   # Mostra os dados
df1.printSchema()          # Mostra a estrutura (campos e tipos)

+--------------------+---------+--------------------+---------------------+---------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Categoria do Produto|Filial   |Nome do Produto     |Preço do Produto (R$)|Quantidade em Estoque|_corrupt_record                                                                                                                                                         |
+--------------------+---------+--------------------+---------------------+---------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|null                |null     |null                |null                 |null                 |[{"Nome do Produto":"Blush em p\u00f3","Categoria do Produto":"Eletrodom\u00e9sticos","Pre\u00e7o do P

## EXPLORANDO OS DADOS

In [8]:
df1.describe().show()

+-------+--------------------+--------+---------------+---------------------+---------------------+--------------------+
|summary|Categoria do Produto|  Filial|Nome do Produto|Preço do Produto (R$)|Quantidade em Estoque|     _corrupt_record|
+-------+--------------------+--------+---------------+---------------------+---------------------+--------------------+
|  count|                3122|    3122|           3122|                 3122|                 3122|                   1|
|   mean|                null|    null|           null|    50.59896860986547|    49.78891736066624|                null|
| stddev|                null|    null|           null|    28.28181202706821|   28.697931251283403|                null|
|    min|           Alimentos|Filial 1|   Base líquida|                 1.03|                    1|[{"Nome do Produt...|
|    max|              Roupas|Filial 9|Sombra de olhos|                99.98|                  100|[{"Nome do Produt...|
+-------+--------------------+--

## DATA CLEAN

In [9]:
# Percebemos que uma única linha não foi reconhecida devidamente, vamos elimina-la.

df1 = df1.drop("_corrupt_record")

In [10]:
# Certificando o novo formato

df1.show(truncate=False)

+--------------------+---------+--------------------+---------------------+---------------------+
|Categoria do Produto|Filial   |Nome do Produto     |Preço do Produto (R$)|Quantidade em Estoque|
+--------------------+---------+--------------------+---------------------+---------------------+
|null                |null     |null                |null                 |null                 |
|Eletrodomésticos    |Filial 8 |Lápis de sobrancelha|85.47                |78                   |
|Roupas              |Filial 8 |Base líquida        |75.02                |34                   |
|Roupas              |Filial 10|Base líquida        |44.94                |90                   |
|Alimentos           |Filial 7 |Lápis de sobrancelha|58.63                |91                   |
|Roupas              |Filial 7 |Blush em pó         |50.79                |77                   |
|Eletrodomésticos    |Filial 4 |Corretivo           |20.46                |2                    |
|Roupas             

# EMPRESA 02

### IMPORTANDO AS BASES DE DADOS

In [14]:
# Resgatando os dados da empresa 01
# O arquivo original é um CSV

df2 = spark.read.csv("/content/dados_empresaB.csv", header=True, sep=',')
df2.show(truncate=False)     # Mostra os dados
df2.printSchema()            # Mostra a estrutura (campos e tipos)

+--------------------+------------------------+-------------------+---------------------+------------+--------------------------+
|Nome do Item        |Classificação do Produto|Valor em Reais (R$)|Quantidade em Estoque|Nome da Loja|Data da Venda             |
+--------------------+------------------------+-------------------+---------------------+------------+--------------------------+
|Lápis de sobrancelha|Roupas                  |55.17              |62                   |Filial 1    |2023-04-13 18:58:06.794203|
|Batom matte         |Eletrônicos             |74.15              |48                   |Filial 9    |2023-06-03 18:58:06.794203|
|Corretivo           |Roupas                  |54.86              |36                   |Filial 5    |2023-06-08 18:58:06.794203|
|Delineador líquido  |Roupas                  |91.29              |1                    |Filial 8    |2023-02-27 18:58:06.794203|
|Batom líquido       |Roupas                  |44.65              |52                   |F

## EXPLORANDO OS DADOS

In [15]:
df2.describe().show()

+-------+---------------+------------------------+-------------------+---------------------+------------+--------------------+
|summary|   Nome do Item|Classificação do Produto|Valor em Reais (R$)|Quantidade em Estoque|Nome da Loja|       Data da Venda|
+-------+---------------+------------------------+-------------------+---------------------+------------+--------------------+
|  count|           1323|                    1323|               1323|                 1323|        1323|                1323|
|   mean|           null|                    null|  51.07123960695392|    51.33106575963719|        null|                null|
| stddev|           null|                    null| 28.784596700957888|    29.09516693853117|        null|                null|
|    min|   Base líquida|               Alimentos|               1.08|                    1|    Filial 1|2022-09-05 18:58:...|
|    max|Sombra de olhos|                  Roupas|              99.98|                   99|    Filial 9|2023-0

## DATA CLEAN

In [17]:
# A data de venda não precisa ter dado de horas. Isso aumenta o tempo de processamento.
# Vamos tratar esse dado

from pyspark.sql.functions import to_date

df2 = df2.withColumn("Data da Venda", to_date("Data da Venda"))

In [18]:
df2.show(truncate=False)

+--------------------+------------------------+-------------------+---------------------+------------+-------------+
|Nome do Item        |Classificação do Produto|Valor em Reais (R$)|Quantidade em Estoque|Nome da Loja|Data da Venda|
+--------------------+------------------------+-------------------+---------------------+------------+-------------+
|Lápis de sobrancelha|Roupas                  |55.17              |62                   |Filial 1    |2023-04-13   |
|Batom matte         |Eletrônicos             |74.15              |48                   |Filial 9    |2023-06-03   |
|Corretivo           |Roupas                  |54.86              |36                   |Filial 5    |2023-06-08   |
|Delineador líquido  |Roupas                  |91.29              |1                    |Filial 8    |2023-02-27   |
|Batom líquido       |Roupas                  |44.65              |52                   |Filial 2    |2022-09-05   |
|Base líquida        |Eletrônicos             |5.22             

# PREPARANDO A UNIÃO DAS BASES

## VAMOS MANTER UM PADRÃO ENTRE OS ATRIBUTOS DE CADA BASE

### RENOMEANDO OS ATRIBUTOS DA BASE DA EMPRESA 01

In [19]:
df1.columns

['Categoria do Produto',
 'Filial',
 'Nome do Produto',
 'Preço do Produto (R$)',
 'Quantidade em Estoque']

In [20]:
df1 = df1.withColumnRenamed("Nome do Produto", "Nome_Produto") \
       .withColumnRenamed("Categoria do Produto", "Categoria_Produto") \
       .withColumnRenamed("Preço do Produto (R$)", "Preco_Produto") \
       .withColumnRenamed("Quantidade em Estoque", "Quantidade_Estoque") \
       .withColumnRenamed("Filial", "Filial") \
       .withColumnRenamed("Data da Venda", "Data_Venda")

df1.printSchema()
df1.show(5, truncate=False)

root
 |-- Categoria_Produto: string (nullable = true)
 |-- Filial: string (nullable = true)
 |-- Nome_Produto: string (nullable = true)
 |-- Preco_Produto: double (nullable = true)
 |-- Quantidade_Estoque: long (nullable = true)

+-----------------+---------+--------------------+-------------+------------------+
|Categoria_Produto|Filial   |Nome_Produto        |Preco_Produto|Quantidade_Estoque|
+-----------------+---------+--------------------+-------------+------------------+
|null             |null     |null                |null         |null              |
|Eletrodomésticos |Filial 8 |Lápis de sobrancelha|85.47        |78                |
|Roupas           |Filial 8 |Base líquida        |75.02        |34                |
|Roupas           |Filial 10|Base líquida        |44.94        |90                |
|Alimentos        |Filial 7 |Lápis de sobrancelha|58.63        |91                |
+-----------------+---------+--------------------+-------------+------------------+
only showing t

### RENOMEANDO OS ATRIBUTOS DA BASE DA EMPRESA 02

In [21]:
df2.columns

['Nome do Item',
 'Classificação do Produto',
 'Valor em Reais (R$)',
 'Quantidade em Estoque',
 'Nome da Loja',
 'Data da Venda']

In [23]:
df2 = df2.withColumnRenamed("Nome do Item", "Nome_Produto") \
       .withColumnRenamed("Classificação do Produto", "Categoria_Produto") \
       .withColumnRenamed("Valor em Reais (R$)", "Preco_Produto") \
       .withColumnRenamed("Quantidade em Estoque", "Quantidade_Estoque") \
       .withColumnRenamed("Nome da Loja", "Filial") \
       .withColumnRenamed("Data da Venda", "Data_Venda")

df2.printSchema()
df2.show(5, truncate=False)

root
 |-- Nome_Produto: string (nullable = true)
 |-- Categoria_Produto: string (nullable = true)
 |-- Preco_Produto (R$): string (nullable = true)
 |-- Quantidade_Estoque: string (nullable = true)
 |-- Filial: string (nullable = true)
 |-- Data_Venda: date (nullable = true)

+--------------------+-----------------+------------------+------------------+--------+----------+
|Nome_Produto        |Categoria_Produto|Preco_Produto (R$)|Quantidade_Estoque|Filial  |Data_Venda|
+--------------------+-----------------+------------------+------------------+--------+----------+
|Lápis de sobrancelha|Roupas           |55.17             |62                |Filial 1|2023-04-13|
|Batom matte         |Eletrônicos      |74.15             |48                |Filial 9|2023-06-03|
|Corretivo           |Roupas           |54.86             |36                |Filial 5|2023-06-08|
|Delineador líquido  |Roupas           |91.29             |1                 |Filial 8|2023-02-27|
|Batom líquido       |Roupas  

### CRIANDO A COLUNA DATA_VENDA NO DF1

In [25]:
# A quantidade de atributos entre as bases deverá ser exatamente a mesma

from pyspark.sql.functions import lit

df1 = df1.withColumn("Data_Venda", lit(None))  # cria a coluna nula

In [26]:
df1.show(5, truncate=False)

+-----------------+---------+--------------------+-------------+------------------+----------+
|Categoria_Produto|Filial   |Nome_Produto        |Preco_Produto|Quantidade_Estoque|Data_Venda|
+-----------------+---------+--------------------+-------------+------------------+----------+
|null             |null     |null                |null         |null              |null      |
|Eletrodomésticos |Filial 8 |Lápis de sobrancelha|85.47        |78                |null      |
|Roupas           |Filial 8 |Base líquida        |75.02        |34                |null      |
|Roupas           |Filial 10|Base líquida        |44.94        |90                |null      |
|Alimentos        |Filial 7 |Lápis de sobrancelha|58.63        |91                |null      |
+-----------------+---------+--------------------+-------------+------------------+----------+
only showing top 5 rows



### UNINDO AS BASES DAS 2 EMPRESAS

In [27]:
df3 = df1.union(df2)
df2.show(20, truncate=False)

+--------------------+-----------------+------------------+------------------+---------+----------+
|Nome_Produto        |Categoria_Produto|Preco_Produto (R$)|Quantidade_Estoque|Filial   |Data_Venda|
+--------------------+-----------------+------------------+------------------+---------+----------+
|Lápis de sobrancelha|Roupas           |55.17             |62                |Filial 1 |2023-04-13|
|Batom matte         |Eletrônicos      |74.15             |48                |Filial 9 |2023-06-03|
|Corretivo           |Roupas           |54.86             |36                |Filial 5 |2023-06-08|
|Delineador líquido  |Roupas           |91.29             |1                 |Filial 8 |2023-02-27|
|Batom líquido       |Roupas           |44.65             |52                |Filial 2 |2022-09-05|
|Base líquida        |Eletrônicos      |5.22              |85                |Filial 3 |2022-09-11|
|Máscara de cílios   |Eletrônicos      |1.08              |31                |Filial 6 |2023-02-08|


In [29]:
df3.describe().show()

+-------+-----------------+---------+------------------+------------------+------------------+
|summary|Categoria_Produto|   Filial|      Nome_Produto|     Preco_Produto|Quantidade_Estoque|
+-------+-----------------+---------+------------------+------------------+------------------+
|  count|             4445|     4445|              4445|              4445|              4445|
|   mean|             null|     null| 51.07123960695392|50.816868391451074| 49.78891736066624|
| stddev|             null|     null|28.784596700957888| 28.52500154422628|28.697931251283403|
|    min|        Alimentos|Alimentos|              1.08|                 1|                 1|
|    max|  Sombra de olhos|   Roupas|   Sombra de olhos|             99.98|          Filial 9|
+-------+-----------------+---------+------------------+------------------+------------------+



# SALVANDO OS DADOS EM ARQUIVO PARQUET

In [30]:
df3.write.parquet("/content/df3.parquet")