<a href="https://colab.research.google.com/github/OBERDAN-sys/Prevendo_vendas_usando_pyspark/blob/main/sales_forest_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [172]:
# Remover pasta de dados do google
! rm -rf /content/sample_data

In [173]:
# Criar pasta dentro do google drive para colocação dos datasets
! mkdir -p "/content/drive/MyDrive/Colab_datasets"

In [174]:
# Criar pasta dentro do google drive para colocação do dataset por setor
! mkdir -p "/content/drive/MyDrive/Colab_datasets/retail"

In [175]:
# Criar pasta dentro do google drive para colocação do dataset por projeto
! mkdir -p "/content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast"

In [176]:
# Criar as pastas de dados dentro do google drive para colocação do dataset por projeto
! mkdir -p "/content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast/raw"
! mkdir -p "/content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast/processed"

In [177]:
# Descompactando o arquivo zipado dentro drive para colab
! unzip /content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast/raw/store_sales.zip

unzip:  cannot find or open /content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast/raw/store_sales.zip, /content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast/raw/store_sales.zip.zip or /content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast/raw/store_sales.zip.ZIP.


In [178]:
# Remover pasta de dados zipada do google drive
! rm -rf /content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast/raw/store_sales.zip

PREPARAR O AMBIENTE DE TRABALHO PARA O PYSPARK

In [179]:
# importando pacote para configura a variaveis de amabiente
import os

In [180]:
# Instalar o java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [181]:
# Baixar apache spark
# Baixando...
! wget -q https://dlcdn.apache.org/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz

In [182]:
#  Descompactar apache spark
! tar -xf spark-3.4.3-bin-hadoop3.tgz

In [183]:
# Removendo a pasta  zipada
! rm -rf spark-3.4.3-bin-hadoop3.tgz

In [184]:
# Criar variaveis ambiente do java e spark
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ['SPARK_HOME'] = "/content/spark-3.4.3-bin-hadoop3"

# PREVENDO VENDAS DE DIFERENTES PRODUTOS NAS LOJAS FAVORITAS, USANDO MLIB DO PYSPARK

RESUMO EXECUTIVO

Neste projeto tenho um conjunto de dados do periodo 2013 a 2017 com informações sobre as vendas das lojas Corporation Favorita, uma importante varejista de alimentos sediada no Equador. O principal objetivo é prever vendas unitárias de diferentes produtos em diferentes lojas Favorita. Uma previsão precisa de vendas das lojas é crucial em um negócio porque serve como um elemento fundamental para vários aspectos das operações de um negócio. Isso permitá aos gestores das lojas Favorita melhorar as tomadas de decisão, otimizando o gerenciamento de estoque, alocando recursos e estratégias de marketing. Todo o processo de foi desenvolvido, usando a metodológia da estrutura de INTELIGENCIA ANALITICA DO BIG DATA PARA OS NEGÓCIOS. Buscando simplificar e acelerar as tarefas de processamento e análise de dados em larga escala usamos o PYSPARK.

In [185]:
# Instalar o localizador do PYSPARK
! pip install -q findspark

In [241]:
# Importando bibliotecas
import findspark
findspark.init()

from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, DateType, TimestampType, BooleanType
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Window
from collections import defaultdict

# Tratamento de dados
import pandas as pd
import numpy as np
from pyspark.ml.feature import Imputer

# Recursos de Processamento
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.linalg import DenseVector

# Algoritmos
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Bibliotecas de visualização
import matplotlib.pyplot as plt
import seaborn as sns

In [187]:
! pip install pandas seaborn



In [188]:
# Criando a sessão PYSPARK
spark = (
    SparkSession
    .builder
    .master("local[*]")
    .appName('prev_vend_ml')
    .getOrCreate() )

Carregar, Ler, visualizar e Checar Schema dos dataset

In [189]:
# Carregar, Ler, visualizar e Checar Schema do dataset  train
path_train = ("/content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast/raw/train.csv")

df_train = spark.read\
    .format("csv")\
    .option('sep', ',')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .csv(path_train)

print("Visualizando os dados :")
df_train.show(5)

print("Schema dos dados :")
df_train.printSchema()

Visualizando os dados :
+---+----------+---------+----------+-----+-----------+
| id|      date|store_nbr|    family|sales|onpromotion|
+---+----------+---------+----------+-----+-----------+
|  0|2013-01-01|        1|AUTOMOTIVE|  0.0|          0|
|  1|2013-01-01|        1| BABY CARE|  0.0|          0|
|  2|2013-01-01|        1|    BEAUTY|  0.0|          0|
|  3|2013-01-01|        1| BEVERAGES|  0.0|          0|
|  4|2013-01-01|        1|     BOOKS|  0.0|          0|
+---+----------+---------+----------+-----+-----------+
only showing top 5 rows

Schema dos dados :
root
 |-- id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- store_nbr: integer (nullable = true)
 |-- family: string (nullable = true)
 |-- sales: double (nullable = true)
 |-- onpromotion: integer (nullable = true)



Verificando a integridade das colunas do dataset do train

In [190]:
# Calculando  valores nulos de train
df_train.select([count(when(isnull(c), c))\
          .alias(c) for c in df_train.columns]).show()

+---+----+---------+------+-----+-----------+
| id|date|store_nbr|family|sales|onpromotion|
+---+----+---------+------+-----+-----------+
|  0|   0|        0|     0|    0|          0|
+---+----+---------+------+-----+-----------+



In [191]:
# Carregar, Ler, visualizar e Checar Schema do dataset test.csv
path_test = ("/content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast/raw/test.csv")

df_test = spark.read\
    .format("csv")\
    .option('sep', ',')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .csv(path_test)

print("Visualizando os dados :")
df_test.show(5)

print("Schema dos dados :")
df_test.printSchema()

Visualizando os dados :
+-------+----------+---------+----------+-----------+
|     id|      date|store_nbr|    family|onpromotion|
+-------+----------+---------+----------+-----------+
|3000888|2017-08-16|        1|AUTOMOTIVE|          0|
|3000889|2017-08-16|        1| BABY CARE|          0|
|3000890|2017-08-16|        1|    BEAUTY|          2|
|3000891|2017-08-16|        1| BEVERAGES|         20|
|3000892|2017-08-16|        1|     BOOKS|          0|
+-------+----------+---------+----------+-----------+
only showing top 5 rows

Schema dos dados :
root
 |-- id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- store_nbr: integer (nullable = true)
 |-- family: string (nullable = true)
 |-- onpromotion: integer (nullable = true)



Verificando a integridade das colunas do dataset test

In [192]:
# Calculando  valores nulos de train
df_test.select([count(when(isnull(c), c))\
          .alias(c) for c in df_test.columns]).show()

+---+----+---------+------+-----------+
| id|date|store_nbr|family|onpromotion|
+---+----+---------+------+-----------+
|  0|   0|        0|     0|          0|
+---+----+---------+------+-----------+



In [193]:
# Carregar, Ler, visualizar e Checar Schema do dataset stores
path_stores = ("/content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast/raw/stores.csv")

df_stores = spark.read\
    .format("csv")\
    .option('sep', ',')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .csv(path_stores)

print("Visualizando os dados :")
df_stores.show(5)

print("Schema dos dados :")
df_stores.printSchema()

Visualizando os dados :
+---------+-------------+--------------------+----+-------+
|store_nbr|         city|               state|type|cluster|
+---------+-------------+--------------------+----+-------+
|        1|        Quito|           Pichincha|   D|     13|
|        2|        Quito|           Pichincha|   D|     13|
|        3|        Quito|           Pichincha|   D|      8|
|        4|        Quito|           Pichincha|   D|      9|
|        5|Santo Domingo|Santo Domingo de ...|   D|      4|
+---------+-------------+--------------------+----+-------+
only showing top 5 rows

Schema dos dados :
root
 |-- store_nbr: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- type: string (nullable = true)
 |-- cluster: integer (nullable = true)



Verificando a integridade das colunas do dataset do test

In [194]:
# Calculando  valores nulos
df_stores.select([count(when(isnull(c), c))\
          .alias(c) for c in df_stores.columns]).show()

+---------+----+-----+----+-------+
|store_nbr|city|state|type|cluster|
+---------+----+-----+----+-------+
|        0|   0|    0|   0|      0|
+---------+----+-----+----+-------+



In [195]:
# Carregar, Ler, visualizar e Checar Schema do dataset transações
path_transacao = ("/content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast/raw/transactions.csv")
df_transacao = spark.read\
    .format("csv")\
    .option('sep', ',')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .csv(path_transacao)

print("Visualizando os dados :")
df_transacao.show(5)

print("Schema dos dados :")
df_transacao.printSchema()

Visualizando os dados :
+----------+---------+------------+
|      date|store_nbr|transactions|
+----------+---------+------------+
|2013-01-01|       25|         770|
|2013-01-02|        1|        2111|
|2013-01-02|        2|        2358|
|2013-01-02|        3|        3487|
|2013-01-02|        4|        1922|
+----------+---------+------------+
only showing top 5 rows

Schema dos dados :
root
 |-- date: date (nullable = true)
 |-- store_nbr: integer (nullable = true)
 |-- transactions: integer (nullable = true)



Verificando a integridade das colunas do dataset transações

In [196]:
df_transacao.select([count(when(isnull(c), c))\
          .alias(c) for c in df_transacao.columns]).show()

+----+---------+------------+
|date|store_nbr|transactions|
+----+---------+------------+
|   0|        0|           0|
+----+---------+------------+



In [197]:
# Carregar, Ler, visualizar e Checar Schema do dataset holidays_events
path_holidays = ("/content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast/raw/holidays_events.csv")

df_holidays = spark.read\
    .format("csv")\
    .option('sep', ',')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .csv(path_holidays)

print("Visualizando os dados :")
df_holidays.show(5)

print("Schema dos dados :")
df_holidays.printSchema()

Visualizando os dados :
+----------+-------+--------+-----------+--------------------+-----------+
|      date|   type|  locale|locale_name|         description|transferred|
+----------+-------+--------+-----------+--------------------+-----------+
|2012-03-02|Holiday|   Local|      Manta|  Fundacion de Manta|      false|
|2012-04-01|Holiday|Regional|   Cotopaxi|Provincializacion...|      false|
|2012-04-12|Holiday|   Local|     Cuenca| Fundacion de Cuenca|      false|
|2012-04-14|Holiday|   Local|   Libertad|Cantonizacion de ...|      false|
|2012-04-21|Holiday|   Local|   Riobamba|Cantonizacion de ...|      false|
+----------+-------+--------+-----------+--------------------+-----------+
only showing top 5 rows

Schema dos dados :
root
 |-- date: date (nullable = true)
 |-- type: string (nullable = true)
 |-- locale: string (nullable = true)
 |-- locale_name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- transferred: boolean (nullable = true)



Verificando a integridade das colunas do dataset holidays_events

In [198]:
# Calculando  valores nulos do dataset holidays_events
df_holidays.select([count(when(isnull(c), c))\
          .alias(c) for c in df_holidays.columns]).show()

+----+----+------+-----------+-----------+-----------+
|date|type|locale|locale_name|description|transferred|
+----+----+------+-----------+-----------+-----------+
|   0|   0|     0|          0|          0|          0|
+----+----+------+-----------+-----------+-----------+



In [199]:
# Carregar, Ler, visualizar e Checar Schema do dataset oil
path_oil = ("/content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast/raw/oil.csv")

df_oil = spark.read\
    .format("csv")\
    .option('sep', ',')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .csv(path_oil)

print("Visualizando os dados :")
df_oil.show(5)

print("Schema dos dados :")
df_oil.printSchema()

Visualizando os dados :
+----------+----------+
|      date|dcoilwtico|
+----------+----------+
|2013-01-01|      null|
|2013-01-02|     93.14|
|2013-01-03|     92.97|
|2013-01-04|     93.12|
|2013-01-07|      93.2|
+----------+----------+
only showing top 5 rows

Schema dos dados :
root
 |-- date: date (nullable = true)
 |-- dcoilwtico: double (nullable = true)



Verificando a integridade das colunas do dataset oil

In [200]:
# Calculando  valores nulos
df_oil.select([count(when(isnull(c), c))\
          .alias(c) for c in df_oil.columns]).show()

+----+----------+
|date|dcoilwtico|
+----+----------+
|   0|        43|
+----+----------+



Alterando nomes da colunas para facilitar a leitura de dois dataset

In [201]:
df_holidays = df_holidays.withColumnRenamed("type","type_holy")
df_stores = df_stores.withColumnRenamed("type","type_store")

Mesclando dataset do train com os datasets de store, transactions, holidays_events e oil

In [202]:
# meclando datasets
df_merge = df_train.join(df_stores, on='store_nbr', how='inner')
df_merge1 = df_merge.join(df_transacao, on=['date', 'store_nbr'], how='inner')
df_merge2 = df_merge1.join(df_holidays, on='date', how='inner')
df_merge3 = df_merge2.join(df_oil, on='date', how='inner')

In [203]:
# Contagem de linhas e colunas do dataset df_vrf
linhas = df_merge3.count()
print(f"Contagem de linhas do DataFrame  : {linhas}")
colunas = len(df_merge3.columns)
print(f"Contagem de colunas do DataFrame : {colunas}")

Contagem de linhas do DataFrame  : 322047
Contagem de colunas do DataFrame : 17


In [204]:
# Calculando  valores nulos de df_merge3
df_merge3.select([count(when(isnull(c), c))\
          .alias(c) for c in df_merge3.columns]).show()

# Imputando valores medio no campos ausentes da coluna dcoilwtico
input_cols=['dcoilwtico']
output_cols_mean=["dcoilwtico_new"]

i_mean = Imputer(strategy='mean', inputCols=input_cols, outputCols=output_cols_mean)

imputer_model_mean = i_mean.fit(df_merge3)

imputed_df=imputer_model_mean.transform(df_merge3)


+----+---------+---+------+-----+-----------+----+-----+----------+-------+------------+---------+------+-----------+-----------+-----------+----------+
|date|store_nbr| id|family|sales|onpromotion|city|state|type_store|cluster|transactions|type_holy|locale|locale_name|description|transferred|dcoilwtico|
+----+---------+---+------+-----+-----------+----+-----+----------+-------+------------+---------+------+-----------+-----------+-----------+----------+
|   0|        0|  0|     0|    0|          0|   0|    0|         0|      0|           0|        0|     0|          0|          0|          0|     22044|
+----+---------+---+------+-----+-----------+----+-----+----------+-------+------------+---------+------+-----------+-----------+-----------+----------+



Verificando valores exclusivos das principais colunas categoricas

In [205]:
# Verifique os valores exclusivos das colunas desconhecidas
print("Valores unicos de 'type_store':")
(df_merge3.select("type_store").distinct()).show()

print("Valores unicos de 'type_holy':")
(df_merge3.select("type_holy").distinct()).show()

(df_merge3.select("state").distinct()).show()

Valores unicos de 'type_store':
+----------+
|type_store|
+----------+
|         E|
|         B|
|         D|
|         C|
|         A|
+----------+

Valores unicos de 'type_holy':
+----------+
| type_holy|
+----------+
|     Event|
|   Holiday|
|  Transfer|
|    Bridge|
|Additional|
+----------+

+--------------------+
|               state|
+--------------------+
|              Manabi|
|            Cotopaxi|
|           Pichincha|
|          Chimborazo|
|              Guayas|
|                Loja|
|         Santa Elena|
|            Imbabura|
|              El Oro|
|               Azuay|
|             Bolivar|
|          Tungurahua|
|Santo Domingo de ...|
|            Los Rios|
|          Esmeraldas|
|             Pastaza|
+--------------------+



In [206]:
# CRIANDO COPIA DO DATADRAME
df_merge_c = df_merge3.alias('df_merge_c')

**PREPARAÇÃO DOS DADOS PARA USO NOS MODELOS DE PREVISÃO: ENGENHARIA DE RECURSOS EM PARTES**¶

ENGENHARIA DE RECURSOS PARTE 1

In [207]:
# Contagem de frequencia coluna onpromotion
df_count = df_merge_c.select("onpromotion").distinct()
print(df_count.count())

285


In [208]:
# verificando o valor min e max
result = df_count.select([min("onpromotion"), max("onpromotion")])
result.show()

+----------------+----------------+
|min(onpromotion)|max(onpromotion)|
+----------------+----------------+
|               0|             716|
+----------------+----------------+



In [209]:
# Removendo colunas desnecessárias no dataset
df_mesc = df_merge_c.drop('city','date','id', 'locale', 'locale_name', 'description', 'transferred', 'state', 'dcoilwtico' )
df_mesc.show(5)

+---------+----------+-----+-----------+----------+-------+------------+---------+
|store_nbr|    family|sales|onpromotion|type_store|cluster|transactions|type_holy|
+---------+----------+-----+-----------+----------+-------+------------+---------+
|       25|AUTOMOTIVE|  0.0|          0|         D|      1|         770|  Holiday|
|       25| BABY CARE|  0.0|          0|         D|      1|         770|  Holiday|
|       25|    BEAUTY|  2.0|          0|         D|      1|         770|  Holiday|
|       25| BEVERAGES|810.0|          0|         D|      1|         770|  Holiday|
|       25|     BOOKS|  0.0|          0|         D|      1|         770|  Holiday|
+---------+----------+-----+-----------+----------+-------+------------+---------+
only showing top 5 rows



Definindo categoria para a coluna family

In [210]:
# Verifique os valores exclusivos da colunas family
print("Valores unicos de 'family':")
(df_mesc.select("family").distinct())

Valores unicos de 'family':


DataFrame[family: string]

In [211]:
# Definindo as listas para cada categoria de produto
food_families = ['BEVERAGES', 'BREAD/BAKERY', 'FROZEN FOODS', 'MEATS', 'PREPARED FOODS', 'DELI','PRODUCE', 'DAIRY','POULTRY','EGGS','SEAFOOD']
home_families = ['HOME AND KITCHEN I', 'HOME AND KITCHEN II', 'HOME APPLIANCES']
clothing_families = ['LINGERIE', 'LADYSWARE']
grocery_families = ['GROCERY I', 'GROCERY II']
stationery_families = ['BOOKS', 'MAGAZINES','SCHOOL AND OFFICE SUPPLIES']
cleaning_families = ['HOME CARE', 'BABY CARE','PERSONAL CARE']
hardware_families = ['PLAYERS AND ELECTRONICS','HARDWARE']

In [212]:
# Categorizando a coluna 'famíly' com base nas categorias de produtos pre-defindas
df_mesc = df_mesc.withColumn('family', when(df_mesc['family'].isin(food_families), 'FOODS').otherwise(df_mesc['family']))
df_mesc = df_mesc.withColumn('family', when(df_mesc['family'].isin(home_families), 'HOME').otherwise(df_mesc['family']))
df_mesc = df_mesc.withColumn('family', when(df_mesc['family'].isin(clothing_families), 'CLOTHING').otherwise(df_mesc['family']))
df_mesc = df_mesc.withColumn('family', when(df_mesc['family'].isin(grocery_families), 'GROCERY').otherwise(df_mesc['family']))
df_mesc = df_mesc.withColumn('family', when(df_mesc['family'].isin(stationery_families), 'STATIONERY').otherwise(df_mesc['family']))
df_mesc = df_mesc.withColumn('family', when(df_mesc['family'].isin(cleaning_families), 'CLEANING').otherwise(df_mesc['family']))
df_mesc = df_mesc.withColumn('family', when(df_mesc['family'].isin(hardware_families), 'HARDWARE').otherwise(df_mesc['family']))

In [213]:
# Agrupar e agregar colunas do df_mesc
df_mesc.groupBy("family") \
    .agg(count("*").alias("count")) \
    .show(truncate=False)

+----------------+------+
|family          |count |
+----------------+------+
|LADIESWEAR      |9759  |
|LAWN AND GARDEN |9759  |
|AUTOMOTIVE      |9759  |
|HOME            |29277 |
|CELEBRATION     |9759  |
|STATIONERY      |29277 |
|LIQUOR,WINE,BEER|9759  |
|FOODS           |107349|
|CLEANING        |39036 |
|HARDWARE        |19518 |
|GROCERY         |19518 |
|PET SUPPLIES    |9759  |
|BEAUTY          |9759  |
|CLOTHING        |9759  |
+----------------+------+



Tratando a coluna ONPROMOTION : Duplicando para transformar as promoções em 1

In [214]:
# Duplicando a coluna onpromotion
df_mesc_dup = df_mesc.withColumn("onpromotion_d", df_mesc["onpromotion"])

In [223]:
# Substituir valor da coluna condicionalmente
df_mesc1 = df_mesc_dup.withColumn("onpromotion", \
              when(df_mesc_dup["onpromotion_d"] != 0, 1).otherwise(df_mesc_dup["onpromotion"]))

In [224]:
df_mesc2 = df_mesc1.drop("onpromotion_d")

Vefiricando o esquemas dos recursos e valores nulos no dataset

In [225]:
# Verificando esquema
df_mesc2.printSchema()

# Calculando  valores nulos de df_mesc
df_mesc2.select([count(when(isnull(c), c))\
          .alias(c) for c in df_mesc2.columns]).show()

root
 |-- store_nbr: integer (nullable = true)
 |-- family: string (nullable = true)
 |-- sales: double (nullable = true)
 |-- onpromotion: integer (nullable = true)
 |-- type_store: string (nullable = true)
 |-- cluster: integer (nullable = true)
 |-- transactions: integer (nullable = true)
 |-- type_holy: string (nullable = true)

+---------+------+-----+-----------+----------+-------+------------+---------+
|store_nbr|family|sales|onpromotion|type_store|cluster|transactions|type_holy|
+---------+------+-----+-----------+----------+-------+------------+---------+
|        0|     0|    0|          0|         0|      0|           0|        0|
+---------+------+-----+-----------+----------+-------+------------+---------+



Salvando dataset

In [226]:
df_mesc2.write.option("header",True) \
    .option("delimiter",",") \
    .format("csv") \
    .mode("overwrite") \
    .save("/content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast/processed/df_base1_train.csv")

Lendo o arquivo salvo

In [227]:
caminho_csv = "/content/drive/MyDrive/Colab_datasets/retail/data_sales_forcast/processed/df_base1_train.csv"

df_base = spark.read\
          .format("csv") \
          .option("skipRows", "1")\
          .option("header", "True")\
          .option("sep", ",")\
          .option("inferSchema", "True")\
          .load(caminho_csv)

In [228]:
# Contagem de linhas e colunas do dataset df_mesc
linhas = df_base.count()
print(f"Contagem de linhas do DataFrame  : {linhas}")
colunas = len(df_base.columns)
print(f"Contagem de colunas do DataFrame : {colunas}")

Contagem de linhas do DataFrame  : 322047
Contagem de colunas do DataFrame : 8


Codificação de variáveis categóricas

In [229]:
# ML não podem trabalhar com dados que não sejam numéricos,então vamos separar os nomes das colunas de acordo com o tipo de dados
type_data = defaultdict(list)
for entry in df_base.schema.fields:
  type_data[str(entry.dataType)].append(entry.name)
print(type_data)

defaultdict(<class 'list'>, {'IntegerType()': ['store_nbr', 'onpromotion', 'cluster', 'transactions'], 'StringType()': ['family', 'type_store', 'type_holy'], 'DoubleType()': ['sales']})


In [230]:
# Converter dados categóricos para tipo numérico, aplicando o modelo StringIndexer
# Convertendo Culuna family
indexer = StringIndexer(inputCol="family", outputCol="family_index").fit(df_base)
indexer_df = indexer.transform(df_base)

# Remover coluna family
indexed_df  = indexer_df.drop('family')

# Convertendo Culuna type_store
indexed_2 = StringIndexer(inputCol='type_store', outputCol='tstore_num').fit(indexed_df)
indexed_2_df = indexed_2.transform(indexed_df)

# Remover coluna type_store
indexed_2_df  = indexed_2_df.drop('type_store')

# Convertendo Culuna type_holy
indexed_3 = StringIndexer(inputCol='type_holy', outputCol='tholy_num').fit(indexed_df)
indexed_3_df = indexed_3.transform(indexed_2_df)

# Remover coluna type_holy
indexed_3_df  = indexed_3_df.drop('type_holy')

In [231]:
# Aplicando o OneHotEncoder nas colunas indexadas
encoder = OneHotEncoder(inputCol='family_index', outputCol="family_vector").fit(indexed_3_df)
encoded_df = encoder.transform(indexed_3_df)

# Remover coluna family_index
encoded_df  = encoded_df.drop('family_index')

# Aplicando OneHotEncode a coluna tstore_num
encoder1 = OneHotEncoder(inputCol='tstore_num', outputCol="tstore__vector").fit(encoded_df)
encoded_df1 = encoder1.transform(encoded_df)

# Remover coluna tstore_num
encoded_df1  = encoded_df1.drop('tstore_num')

# Aplicando OneHotEncode a coluna tholy_num
encoder2 = OneHotEncoder(inputCol='tholy_num', outputCol="tholy__vector").fit(encoded_df1)
encoded_df2 = encoder2.transform(encoded_df1)

# Remover coluna coluna tholy_num
encoded_df2  = encoded_df2.drop('tholy_num')

Montagem de recursos, usando VectorAssembler

In [232]:
# Escolhendo os recursos a serem vetorizados
col_features = ['store_nbr', 'onpromotion', 'cluster', 'transactions', 'family_vector', 'tstore__vector', 'tholy__vector']
col_unscaled = ['sales']

In [233]:
# Vetorizando os dados em uma nova coluna "features" que será nossa classe input/features (sem o alvo)
assembler = VectorAssembler(inputCols=col_features, outputCol='features')

In [234]:
assembled_df = assembler.transform(encoded_df2)
assembled_df.show(10, truncate=False)

+---------+-------+-----------+-------+------------+--------------+--------------+-------------+---------------------------------------------------------+
|store_nbr|sales  |onpromotion|cluster|transactions|family_vector |tstore__vector|tholy__vector|features                                                 |
+---------+-------+-----------+-------+------------+--------------+--------------+-------------+---------------------------------------------------------+
|1        |4.0    |0          |13     |1792        |(13,[6],[1.0])|(4,[0],[1.0]) |(4,[0],[1.0])|(25,[0,2,3,10,17,21],[1.0,13.0,1792.0,1.0,1.0,1.0])      |
|1        |0.0    |0          |13     |1792        |(13,[1],[1.0])|(4,[0],[1.0]) |(4,[0],[1.0])|(25,[0,2,3,5,17,21],[1.0,13.0,1792.0,1.0,1.0,1.0])       |
|1        |4.0    |1          |13     |1792        |(13,[7],[1.0])|(4,[0],[1.0]) |(4,[0],[1.0])|(25,[0,1,2,3,11,17,21],[1.0,1.0,13.0,1792.0,1.0,1.0,1.0])|
|1        |2145.0 |1          |13     |1792        |(13,[0],[1.0])|(4,

Agora que todos os recursos foram transformados em um Vetor Denso. Faça padronização dos dados

In [235]:
# Inicializar o standardScaler
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

In [236]:
# Ajuste e transforme o DataFrame ao scaler
scaled_df = standardScaler.fit(assembled_df).transform(assembled_df)

In [237]:
# CRIANDO COPIA DO DATASET FINAL
df_final = scaled_df.alias('df_final')

In [238]:
# Inspecione o resultado da padronização
df_final.select("features", "features_scaled").show(10, truncate=False)

+---------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                 |features_scaled                                                                                                                                              |
+---------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(25,[0,2,3,10,17,21],[1.0,13.0,1792.0,1.0,1.0,1.0])      |(25,[0,2,3,10,17,21],[0.06412240026792974,2.757854709492489,1.7061223026700696,5.8336218876708426,2.108100881156672,2.0709727525254626])                     |
|(25,[0,2,3,5,17,21],[1.0,13.0,1792.0,1.0,1.0,1.0])       |(25,[0,2,3,5,17,21],[0.06412240026792974,2.757854709492489,1.70612230

In [239]:
# Visão geral da padronização
df_final.show(10, truncate=False)

+---------+-------+-----------+-------+------------+--------------+--------------+-------------+---------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
|store_nbr|sales  |onpromotion|cluster|transactions|family_vector |tstore__vector|tholy__vector|features                                                 |features_scaled                                                                                                                                              |
+---------+-------+-----------+-------+------------+--------------+--------------+-------------+---------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1        |4.0    |0          |13     |1792        |(13,[6],[

**MODELAGEM DOS DADOS**

Construindo um modelo de aprendizado de máquina com Spark ML

In [240]:
# Selecione recursos/variáveis de entrada, recurso de saida/variavel alvo para melhor visualização
df_final.select("features", "sales").show(10, truncate=False)

+---------------------------------------------------------+-------+
|features                                                 |sales  |
+---------------------------------------------------------+-------+
|(25,[0,2,3,10,17,21],[1.0,13.0,1792.0,1.0,1.0,1.0])      |4.0    |
|(25,[0,2,3,5,17,21],[1.0,13.0,1792.0,1.0,1.0,1.0])       |0.0    |
|(25,[0,1,2,3,11,17,21],[1.0,1.0,13.0,1792.0,1.0,1.0,1.0])|4.0    |
|(25,[0,1,2,3,4,17,21],[1.0,1.0,13.0,1792.0,1.0,1.0,1.0]) |2145.0 |
|(25,[0,2,3,7,17,21],[1.0,13.0,1792.0,1.0,1.0,1.0])       |0.0    |
|(25,[0,2,3,4,17,21],[1.0,13.0,1792.0,1.0,1.0,1.0])       |411.941|
|(25,[0,2,3,12,17,21],[1.0,13.0,1792.0,1.0,1.0,1.0])      |27.0   |
|(25,[0,1,2,3,5,17,21],[1.0,1.0,13.0,1792.0,1.0,1.0,1.0]) |648.0  |
|(25,[0,1,2,3,4,17,21],[1.0,1.0,13.0,1792.0,1.0,1.0,1.0]) |719.0  |
|(25,[0,2,3,4,17,21],[1.0,13.0,1792.0,1.0,1.0,1.0])       |157.052|
+---------------------------------------------------------+-------+
only showing top 10 rows



Dividindo os dados em conjuntos de treinamento e teste para treinar nosso modelo



In [242]:
# Dividindo os dados em conjuntos de treinamento e teste para treinar nosso modelo
# Estou usando randomSplit() para dividir dados em divisões de tamanho fixo com base nos pesos fornecidos
train_data, test_data = df_final.randomSplit(weights = [0.80, 0.20], seed = 42)

In [243]:
# Verificando as estatísticas dos nossos conjuntos de treinamento
train_data.describe().show()

+-------+------------------+------------------+-------------------+-----------------+------------------+
|summary|         store_nbr|             sales|        onpromotion|          cluster|      transactions|
+-------+------------------+------------------+-------------------+-----------------+------------------+
|  count|            257645|            257645|             257645|           257645|            257645|
|   mean|26.993483281259095|407.14736154710033| 0.2595509324846203|8.534677560208815|1736.2126258999785|
| stddev|15.603070947412105|1220.5082827218823|0.43838908729623716|4.713213218788782|1053.4330925759366|
|    min|                 1|               0.0|                  0|                1|                54|
|    max|                54|          89576.36|                  1|               17|              8359|
+-------+------------------+------------------+-------------------+-----------------+------------------+



In [244]:
# Verificando as estatísticas dos nossos conjuntos de teste
test_data.describe().show()

+-------+------------------+------------------+------------------+-----------------+------------------+
|summary|         store_nbr|             sales|       onpromotion|          cluster|      transactions|
+-------+------------------+------------------+------------------+-----------------+------------------+
|  count|             64402|             64402|             64402|            64402|             64402|
|   mean|26.999425483680632| 403.3273755182037|0.2574298934815689|8.517297599453434|1725.7374926244527|
| stddev|15.563662464331024|1347.2398248202974|0.4372215819199028|4.716202065656625|1037.8142544170275|
|    min|                 1|               0.0|                 0|                1|                54|
|    max|                54|          124717.0|                 1|               17|              8359|
+-------+------------------+------------------+------------------+-----------------+------------------+



In [245]:
# Construa o modelo de regressão linear
lr = LinearRegression(featuresCol = 'features', labelCol='sales', maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [246]:
# Ajuste o modelo aos dados de treinamento
lr_model = lr.fit(train_data)

In [247]:
# Imprima os coeficientes e intercepte para regressão logística
print("Coeficientes: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coeficientes: [0.0,734.5416408376735,-2.0361645857088613,0.22257780248396414,287.5998008860713,51.7902350637495,-156.96101784507334,-85.83995876855985,1826.2419066924085,-72.40238576129724,-86.96762470878114,-149.8655994224939,-88.25730809533064,-81.42998036721738,-61.4014275593803,-102.27677429989032,-92.06106618787155,-45.453844233329335,-31.18832052687046,23.23090150515221,-51.54348305433546,24.555690448759247,17.352791648827843,-14.16723122161026,-11.591707592357725]
Intercept: -307.6008158283497


In [248]:
# Resuma o modelo sobre o conjunto de treinamento e imprima algumas métricas:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 1028.873306
r2: 0.289369


In [250]:
# Faça previsões sobre dados de teste usando metodo transform()
predictions = lr_model.transform(test_data)

In [251]:
# mostrar os valores previstos e os valores reais
predictions.select("prediction","sales","features").show(10)

+-------------------+-----+--------------------+
|         prediction|sales|            features|
+-------------------+-----+--------------------+
|-426.68282872085007|  0.0|(25,[0,2,3,6,17,2...|
| -419.5874102982706|  0.0|(25,[0,2,3,11,17,...|
|-210.36393052757245|  0.0|(25,[0,2,3,5,17,2...|
| -349.1217903001031|  0.0|(25,[0,2,3,10,17,...|
|-346.65865754497804|  0.0|(25,[0,2,3,7,17,2...|
| -412.8830049668443|  0.0|(25,[0,2,3,6,17,2...|
|-334.63945621084395|  0.0|(25,[0,2,3,7,17,2...|
| -334.8768109411294|  0.0|(25,[0,2,3,10,17,...|
| -401.9766926451301|  0.0|(25,[0,2,3,6,17,2...|
|-330.85563356861655|  0.0|(25,[0,2,3,7,17,2...|
+-------------------+-----+--------------------+
only showing top 10 rows



Avaliando o modelo : saber o quão eficiente o modelo é observando algumas métricas de desempenho

In [252]:
#R2 score on test set
r2_test= lr_model.evaluate(test_data).r2
print('R2 score on test set: ', r2_test)

#RMSE on test set
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='sales',metricName='rmse')
rmse_test = evaluator.evaluate(predictions)
print('RMSE on test set: ', rmse_test)

R2 score on test set:  0.2378794673216732
RMSE on test set:  1176.1247023116605
