## # Objetivos do trabalho

O objetivo deste script é simular a criação de um data warehouse com foco em vendas de uma rede de supermercados fictícia. Usaremos a base  **Online Retail** obtida em "https://archive.ics.uci.edu/static/public/352/online+retail.zip para realizar analises estatiticas em cima dos dados coletados.

## # Caracteristicas da base de dados

A **Online Retail** é um conjunto de dados transacionais que contém todas as transações ocorridas entre 01/12/2010 e 09/12/2011 para um varejo online sem loja física, com sede e registro no Reino Unido. As caracteristicas desta base são _**Multivariate, Sequential, Time-Series**_ e possui as seguintes colunas e tipos de dados.

| **Nome da Variável** | **Papel**  | **Tipo**        | **Descrição**                                                                                      | **Unidades** |
|----------------------|------------|------------------|----------------------------------------------------------------------------------------------------|--------------|
| **InvoiceNo**        | ID         | Categórica       | Um número inteiro de 6 dígitos atribuído exclusivamente a cada transação. Se começar com a letra 'C', indica um cancelamento. | –            |
| **StockCode**        | ID         | Categórica       | Um número inteiro de 5 dígitos atribuído exclusivamente a cada produto distinto.                  | –            |
| **Description**      | Atributo   | Categórica       | Nome do produto.                                                                                  | –            |
| **Quantity**         | Atributo   | Inteiro          | Quantidade de cada produto (item) por transação.                                                  | –            |
| **InvoiceDate**      | Atributo   | Data             | Dia e hora em que cada transação foi gerada.                                                      | –            |
| **UnitPrice**        | Atributo   | Contínua         | Preço do produto por unidade.                                                                     | Libras Esterlinas |
| **CustomerID**       | Atributo   | Categórica       | Número inteiro de 5 dígitos atribuído exclusivamente a cada cliente.                              | –            |
| **Country**          | Atributo   | Categórica       | Nome do país onde reside cada cliente.                                                             | –            |




## Perguntas de negócio
Para o varejista que cedeu a base online retail pode ser importante saber qual é o produto que mais vende em cada pais e também datas especificas onde tem-se as maiores saidas. Com base nisso teremos duas perguntas a serem respondidas pelo datalake proposto:
- Os produtos que vendem mais em cada pais
- Quais datas tem-se mais vendas.

In [0]:
!pip install openpyxl

import requests,zipfile,os,io
import pandas as pd
from io import BytesIO
from pyspark.sql import SparkSession,Column
from pyspark import SparkContext
from pyspark.sql.window import Window
from pyspark.sql.functions import col, trim,concat_ws, collect_list, isnan,count, row_number,split, flatten, array_distinct
import matplotlib.pyplot as plt
from pyspark.ml.fpm import FPGrowth
from pyspark.sql import functions as F

Collecting openpyxl
  Downloading openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)
[?25l[K     |█▎                              | 10 kB 24.4 MB/s eta 0:00:01[K     |██▋                             | 20 kB 6.0 MB/s eta 0:00:01[K     |████                            | 30 kB 8.7 MB/s eta 0:00:01[K     |█████▎                          | 40 kB 7.5 MB/s eta 0:00:01[K     |██████▌                         | 51 kB 5.5 MB/s eta 0:00:01[K     |███████▉                        | 61 kB 6.4 MB/s eta 0:00:01[K     |█████████▏                      | 71 kB 7.4 MB/s eta 0:00:01[K     |██████████▌                     | 81 kB 6.2 MB/s eta 0:00:01[K     |███████████▊                    | 92 kB 6.9 MB/s eta 0:00:01[K     |█████████████                   | 102 kB 7.5 MB/s eta 0:00:01[K     |██████████████▍                 | 112 kB 7.5 MB/s eta 0:00:01[K     |███████████████▊                | 122 kB 7.5 MB/s eta 0:00:01[K     |█████████████████               | 133 kB 7.5 MB/s eta 0:00:



In [0]:

url = "https://archive.ics.uci.edu/static/public/352/online+retail.zip"
response = requests.get(url)
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
    file_name = z.namelist()[0]
    with z.open(file_name) as f:
        df = pd.read_excel(f)

Para iniciar as análises o excel será carregado diretamente em um dataframe spark definindo assim a camada BRONZE

In [0]:
spark = SparkSession.builder.appName("LerExcelParaBronze").config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.5").getOrCreate()

df_spark = spark.createDataFrame(df)
bronze_path = "/mnt/data/bronze/compras"
df_spark.write.format("delta").mode("overwrite").save(bronze_path)
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")
spark.sql(f"CREATE TABLE IF NOT EXISTS bronze.compras USING DELTA LOCATION '{bronze_path}'")

  Could not convert 'C536379' with type str: tried to convert to int64
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


Out[9]: DataFrame[]

Para a análise de itens frequentes, informações em falta como Numero da nota fiscal, codigo do produto, descrição do produto e pais origem invalidam o registro, então para o conjunto PRATA estas linhas serão deletadas.

In [0]:

df_bronze_compras_tmp = spark.sql("SELECT * FROM bronze.compras")
#removendo linhas inuteis
print ("Antes da limpeza: "+str(df_bronze_compras_tmp.count()) )    
df_bronze_compras_limpo = df_bronze_compras_tmp.filter(
    ~(
        col("InvoiceNo").isNull() | (trim(col("InvoiceNo")) == "") |
        col("StockCode").isNull() | (trim(col("StockCode")) == "") |
        col("Description").isNull() | isnan(col("Description")) | (trim(col("Description")) == "") | (trim(col("Description")) == "Damaged") |
        col("CustomerID").isNull() | (trim(col("CustomerID")) == "") |
        col("Country").isNull() | (trim(col("Country")) == "") |  (trim(col("Country")) == "Unspecified") |
        col("InvoiceDate").isNull() | (trim(col("InvoiceDate")) == "")
    )
)
print("depois da limpeza :"+str(df_bronze_compras_limpo.count()))

#criando camada prata
prata_path = "/mnt/data/prata/compras"
df_bronze_compras_limpo.write.format("delta").mode("overwrite").save(prata_path)
spark.sql("CREATE DATABASE IF NOT EXISTS prata")
spark.sql(f"CREATE TABLE IF NOT EXISTS prata.compras USING DELTA LOCATION '{prata_path}'")

Antes da limpeza: 541909
depois da limpeza :539995
Out[4]: DataFrame[]

Para o conjunto **OURO** os dados foram organizados como **fato_venda,dim_cliente,dim_pais,dim_produto**

Tabela: **fato_venda**

- InvoiceNo     (string)   → Número único da nota fiscal (se começar com 'C', é cancelamento)
- InvoiceDate   (timestamp)→ Data e hora da transação
- ClienteID     (long)     → FK para dim_cliente
- PaisID        (long)     → FK para dim_pais
- ProdutoID     (long)     → FK para dim_produto
- Quantity      (integer)  → Quantidade de produtos vendidos
- UnitPrice     (double)   → Preço unitário do produto (libras esterlinas)


Tabela: **dim_cliente**

- ClienteID      (long)     → ID único da dimensão cliente (gerado)
- CodigoCliente  (integer)  → Código original do cliente (5 dígitos)
- PaisID         (long)     → FK para dim_pais

Tabela: **dim_pais**

- PaisID     (long)   → ID único do país (gerado)
- NomePais   (string) → Nome do país do cliente

Tabela: **dim_produto**

- ProdutoID         (long)   → ID único do produto (gerado)
- CodigoProduto     (string) → Código original do produto (5 dígitos)
- ProdutoDescricao  (string) → Nome do produto
- PrecoUnitario     (double) → Preço padrão do produto (libras esterlinas)



In [0]:
df_dados_tmp = spark.sql("SELECT * FROM prata.compras")

df_pais = df_dados_tmp.select("Country").dropDuplicates() \
    .withColumnRenamed("Country", "NomePais") \
    .withColumn("PaisID", F.monotonically_increasing_id())

df_cliente_raw = df_dados_tmp.select("CustomerID", "Country") \
    .where("CustomerID IS NOT NULL") \
    .dropDuplicates(["CustomerID"])

df_cliente = df_cliente_raw.join(
    df_pais, df_cliente_raw["Country"] == df_pais["NomePais"], "left"
).select("CustomerID", "PaisID") \
 .withColumn("ClienteID", F.monotonically_increasing_id())

df_produto = df_dados_tmp.select("StockCode", "Description", "UnitPrice") \
    .dropDuplicates(["StockCode"]) \
    .withColumnRenamed("Description", "ProdutoDescricao") \
    .withColumnRenamed("UnitPrice", "ProdutoPrecoUnitario") \
    .withColumn("ProdutoID", F.monotonically_increasing_id())

df_venda_raw = df_dados_tmp.select("InvoiceNo", "InvoiceDate", "StockCode", "CustomerID", "Quantity", "UnitPrice") \
    .where("CustomerID IS NOT NULL") \
    .filter(~F.col("InvoiceNo").startswith("C"))  # opcional: ignora cancelamentos

# Junta com Produto
df_venda = df_venda_raw.join(
    df_produto.select("StockCode", "ProdutoID"), on="StockCode", how="left"
)

# Junta com Cliente (que já tem PaisID)
df_venda = df_venda.join(
    df_cliente.select("CustomerID", "ClienteID", "PaisID"), on="CustomerID", how="left"
)

# Seleciona colunas finais
df_venda = df_venda.select(
    "InvoiceNo", "InvoiceDate", "ClienteID", "PaisID", "ProdutoID", "Quantity", "UnitPrice"
)

ouro_pais_path = "/mnt/data/ouro/pais"
df_pais.write.format("delta").mode("overwrite").save(ouro_pais_path)
spark.sql("CREATE DATABASE IF NOT EXISTS ouro")
spark.sql(f"CREATE TABLE IF NOT EXISTS ouro.dim_pais USING DELTA LOCATION '{ouro_pais_path}'")

ouro_produto_path = "/mnt/data/ouro/produto"
df_produto.write.format("delta").mode("overwrite").save(ouro_produto_path)
spark.sql("CREATE DATABASE IF NOT EXISTS ouro")
spark.sql(f"CREATE TABLE IF NOT EXISTS ouro.dim_produto USING DELTA LOCATION '{ouro_produto_path}'")

ouro_venda_path = "/mnt/data/ouro/venda"
df_venda.write.format("delta").mode("overwrite").save(ouro_venda_path)
spark.sql("CREATE DATABASE IF NOT EXISTS ouro")
spark.sql(f"CREATE TABLE IF NOT EXISTS ouro.fato_venda USING DELTA LOCATION '{ouro_venda_path}'")

ouro_cliente_path = "/mnt/data/ouro/cliente"
df_cliente.write.format("delta").mode("overwrite").save(ouro_cliente_path)
spark.sql("CREATE DATABASE IF NOT EXISTS ouro")
spark.sql(f"CREATE TABLE IF NOT EXISTS ouro.dim_cliente USING DELTA LOCATION '{ouro_cliente_path}'")



Out[30]: DataFrame[]

> Como objetivos sobre os dados do conjunto **OURO** em questão queremos
-   Obter estatisticas como:
> -     Produtos mais vendidos por pais
> -     Produtos mais vendidos por pais / data
-  Obter associações entre produtos 


In [0]:
df_vendas_por_pais = spark.sql("SELECT c.NomePais,b.ProdutoDescricao,count(*)  as frequencia FROM ouro.fato_venda a INNER JOIN ouro.dim_produto b on a.ProdutoID = b.ProdutoID  INNER JOIN ouro.dim_pais c on c.PaisID = a.PaisID  group by c.NomePais,b.ProdutoDescricao order by frequencia desc")

janela = Window.partitionBy("NomePais").orderBy(df_vendas_por_pais["frequencia"].desc())
df_top_produtos_por_pais = df_vendas_por_pais.withColumn(
    "rank", row_number().over(janela)
).filter("rank = 1").drop("rank")

df_top_produtos_por_pais.show(38,truncate=False)



+--------------------+-----------------------------------+----------+
|NomePais            |ProdutoDescricao                   |frequencia|
+--------------------+-----------------------------------+----------+
|Australia           |GREEN POLKADOT BOWL                |9         |
|Austria             |DISCO BALL ROTATOR BATTERY OPERATED|12        |
|Bahrain             |12 PENCILS TALL TUBE RED RETROSPOT |2         |
|Belgium             |DISCO BALL ROTATOR BATTERY OPERATED|96        |
|Brazil              |PAPER LANTERN 5 POINT SEQUIN STAR  |1         |
|Canada              |GREEN GOOSE FEATHER TREE 60CM      |2         |
|Channel Islands     |SWEET PUDDING STICKER SHEET        |7         |
|Cyprus              |thrown away                        |10        |
|Czech Republic      |SMALL HEART FLOWERS HOOK           |1         |
|Denmark             |DISCO BALL ROTATOR BATTERY OPERATED|13        |
|EIRE                |VINTAGE CHRISTMAS GIFT BAG LARGE   |98        |
|European Community 

In [0]:
df_vendas_por_pais_data = spark.sql("SELECT a.InvoiceDate,b.ProdutoDescricao,count(*)  as frequencia FROM ouro.fato_venda a INNER JOIN ouro.dim_produto b on a.ProdutoID = b.ProdutoID  INNER JOIN ouro.dim_pais c on c.PaisID = a.PaisID  group by a.InvoiceDate,b.ProdutoDescricao order by frequencia desc")

df_vendas_por_pais_data.show(100,truncate=False)


+-------------------+-----------------------------------+----------+
|InvoiceDate        |ProdutoDescricao                   |frequencia|
+-------------------+-----------------------------------+----------+
|2011-06-05 11:37:00|TROPICAL PASSPORT COVER            |20        |
|2011-06-05 11:37:00|SET/6 COLLAGE PAPER CUPS           |12        |
|2011-10-26 12:46:00|SET OF 10 LANTERNS FAIRY LIGHT STAR|8         |
|2011-11-23 14:07:00|BEADED CRYSTAL HEART GREEN ON STICK|7         |
|2011-10-24 10:43:00|PAPER BUNTING VINTAGE PAISLEY      |7         |
|2010-12-12 14:27:00|SCENTED VELVET LOUNGE CANDLE       |6         |
|2011-01-16 16:25:00|JAM JAR WITH GREEN LID             |6         |
|2011-01-16 16:25:00|BLUE BUNNY EASTER EGG BASKET       |6         |
|2011-11-04 12:45:00|FOOD CONTAINER SET 3 LOVE HEART    |6         |
|2011-01-09 12:53:00|SCENTED VELVET LOUNGE CANDLE       |6         |
|2011-05-10 15:07:00|STAR  T-LIGHT HOLDER               |5         |
|2011-05-10 15:07:00|RUSTIC STRAWB

# Conclusão
 Após análise na base **_Online Retail_**,tive poucos problemas com a limpeza dos dados, havia apenas poucos registros com paises indefinidos e produtos sem descrição. A camada ouro foi a mais custosa para a montagem do modelo estrela, dado a natureza desnormalizada que os dados fornecidos continham, tive que exercer as etapas iniciais de normalização dos dados para que fosse possível estabelecer o esquema estrela.
 As perguntas de negócios traçadas, poderiam ser melhores dado um conhecimento maior sobre o Varejista fornecedor dos dados, porém, para uma empresa multinacional como a que está em foco, saber o produto que mais vende em cada país e as datas chave que se tem mais vendas pode ser um Ás na manga para traçar promoções. Por fim, uma análise que poderia ser adicional, é a conhecida busca por itens frequentes, como a que foi realizada pelo walmart em um experimento, onde descobriu-se que no dia em que acontece o _Superbol _ tinha muita saida de Cerveja e fraldas.