# Esse notebook tem como objetivo ingestionar e tratar uma base de dados






# Vendas em supermercados

link do dataset  https://www.kaggle.com/datasets/aungpyaeap/supermarket-sales
#### Registro histórico de dados de vendas em 3 supermercados diferentes


In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import to_timestamp, date_format,to_date,current_timestamp
from delta.tables import * 

In [0]:
file_path = "dbfs:/FileStore/shared_uploads/correiarafaelsilva@outlook.com/supermarket_sales.csv"
delta_table_full_name = "tabela_venda_supermercado"
path = '/databricks/driver/'
path_df_porcentagem_quantidade_sexo = f"{path}df_porcentagem_quantidade_sexo"
path_df_total_vendas = f"{path}df_total_vendas"
path_df_vendas_por_mes = f"{path}df_vendas_por_mes"
path_df_receita_produtolinha_produto = f"{path}df_receita_produtolinha_produto"

In [0]:
df = spark.read.format("csv")\
    .option("header", "true")\
    .load(file_path)

## Renomear as colunas para nome em português e tipar as colunas

In [0]:
# Renomear as colunas pois no arquivo possui nomes com caracteres especiais
df = df.withColumnRenamed("Invoice ID","id_fatura") \
    .withColumnRenamed("Branch","supermercado") \
    .withColumnRenamed("City","cidade")\
    .withColumnRenamed("Customer type","tipo_cliente") \
    .withColumnRenamed("Gender","sexo_cliente") \
    .withColumnRenamed("Product line","linha_produto") \
    .withColumnRenamed("Unit price","preco_unitario_dolar") \
    .withColumnRenamed("Quantity","quantidade") \
    .withColumnRenamed("Tax 5%","taxa_imposto") \
    .withColumnRenamed("Total","total_com_imposto") \
    .withColumnRenamed("Date","data_compra") \
    .withColumnRenamed("Time","hora")\
    .withColumnRenamed("Payment","metodo_pagamento") \
    .withColumnRenamed("cogs","custo_produtos_vendidos") \
    .withColumnRenamed("gross margin percentage","margem_percentual")\
    .withColumnRenamed("gross income","receita")\
    .withColumnRenamed("Rating","classificacao_cliente_geral_compra")

In [0]:
# Tirar as colunas calculadas

df = df.select(
    "id_fatura",\
    "supermercado",\
    "cidade",\
    "tipo_cliente",\
    "sexo_cliente",\
    "linha_produto",\
    "preco_unitario_dolar",\
    "quantidade",\
    "taxa_imposto",\
    "data_compra",\
    "hora",\
    "metodo_pagamento",\
    "custo_produtos_vendidos",\
    "margem_percentual",\
    "classificacao_cliente_geral_compra")

In [0]:
#Colocar a  tipagem corretar
df = df.withColumn("preco_unitario_dolar",col("preco_unitario_dolar").cast("Double"))\
    .withColumn("quantidade",col("quantidade").cast("Integer"))\
    .withColumn("taxa_imposto",col("taxa_imposto").cast("Double"))\
    .withColumn("data_compra", to_date("data_compra", "M/d/yyyy"))\
    .withColumn("hora", to_timestamp("hora", "HH:mm").cast(TimestampType()))\
    .withColumn("hora", date_format("hora", "HH:mm"))\
    .withColumn("custo_produtos_vendidos",col("custo_produtos_vendidos").cast("Double"))\
    .withColumn("classificacao_cliente_geral_compra",col("classificacao_cliente_geral_compra").cast("Double"))


# Vou recriar as colunas calculadas 
## exemplo:"total_com_imposto" e tratar os campos nulos com comandos sql

In [0]:
df.createOrReplaceTempView('vendas_supermercado')

In [0]:
df = spark.sql("""
      
      select       
        id_fatura,
        supermercado,
        coalesce(cidade, "indefinido") as cidade,
        case 
            when tipo_cliente = "Member" then "membro"
            when tipo_cliente = "Normal" then "normal"
            else "indefinido"
        end as tipo_cliente,
        case
            when sexo_cliente = "Female" then "feminino" 
            when sexo_cliente = "Male" then "masculino" 
            else "não declarado"
        end as sexo_cliente,
        case
            when linha_produto = "Home and lifestyle" then "casa e estilo de vida"
            when linha_produto = "Fashion accessories" then "acessorios fashions"
            when linha_produto = "Health and beauty" then "saude e beleza"
            when linha_produto = "Electronic accessories" then "acessorios eletronicos"
            when linha_produto = "Food and beverages" then "Alimentos e bebidas"
            when linha_produto = "Sports and travel" then "esportes e viagens"
            else "indefinido"
        end as linha_produto,
        coalesce(round(preco_unitario_dolar,2),0) as preco_unitario_dolar,
        coalesce(round(quantidade,2),0) as quantidade,
        coalesce(round(taxa_imposto,2),0) as taxa_imposto,
        coalesce(round(((preco_unitario_dolar * quantidade) + taxa_imposto),2),0) as total_com_imposto,
        data_compra,
        coalesce(hora,"00:00") as hora,
        case
            when metodo_pagamento = "Ewallet" then "carteira eletronica"
            when metodo_pagamento = "Cash" then "dinheiro"
            when metodo_pagamento = "Credit card" then "cartao de credito"
            else "indefinido"
        end as  metodo_pagamento,
        coalesce(round(custo_produtos_vendidos,2),0) as custo_produtos_vendidos,
        coalesce(round(margem_percentual,2),0) as margem_percentual,
        coalesce(round((((preco_unitario_dolar*quantidade) + taxa_imposto ) - custo_produtos_vendidos),2),0) as receita,
        coalesce(classificacao_cliente_geral_compra,0) as classificacao_cliente_geral_compra


      from
        vendas_supermercado      
      """)


In [0]:
df = df.withColumn("process_time", current_timestamp())

## Criando e gravando em uma tabela delta com o merge usando com paramento append

In [0]:
# Criando uma tabela delta
DeltaTable.createIfNotExists(spark) \
  .tableName(delta_table_full_name) \
  .addColumns(df.schema) \
  .execute()

Out[101]: <delta.tables.DeltaTable at 0x7f9331d2e730>

In [0]:
# gravando em uma tabela delta
df.write.mode("overwrite").saveAsTable(delta_table_full_name)

# Com a tabela criada e salva como delta agora será criada alguns analises

In [0]:

# Total de vendas das lojas e por cidade

df_total_vendas = spark.sql(f"""
select 
  supermercado,
  cidade,
  round(sum(total_com_imposto),2) as total_vendas_com_impostos
from
    {delta_table_full_name}

group by 
  supermercado,
  cidade    
order by 
  supermercado,
  cidade
""")

In [0]:
df_total_vendas.write.mode("overwrite").parquet(path_df_total_vendas)

In [0]:
# Verificando a base vimos que so tem vendas do ano 2019 então faremos o agrupamento mensal

df_vendas_por_mes = spark.sql(f"""
select 
  supermercado,
  date_format(data_compra,"yyyy-MM") as mes,
  round(sum(total_com_imposto),2) as total_vendas_com_impostos
from
  {delta_table_full_name}
group by 
  supermercado,
  date_format(data_compra,"yyyy-MM")
order by 
  supermercado,
  date_format(data_compra,"yyyy-MM")
  """)

In [0]:
df_vendas_por_mes.write.mode("overwrite").parquet(path_df_vendas_por_mes)

In [0]:
# Receita por linha_produto no ano(podemos fazer por mês também)

df_receita_produtolinha_produto = spark.sql(f"""

select 
  supermercado,
  linha_produto,
  date_format(data_compra,"yyyy"),
  round(sum(receita),2) as receita_total
  
from
  {delta_table_full_name}
group by 
  supermercado,
  date_format(data_compra,"yyyy"),
  linha_produto  
order by supermercado,linha_produto
""")

In [0]:
df_receita_produtolinha_produto.write.mode("overwrite").parquet(path_df_receita_produtolinha_produto)

In [0]:
# Contagem de vendas da linha_produto pelo sexo dos clientes

df_porcentagem_quantidade_sexo = spark.sql(f"""
with
venda_total as (
select 
  supermercado,
  linha_produto,  
  count(id_fatura) as quantidade_de_compras
from
  {delta_table_full_name}
group by 
  supermercado,
  linha_produto
order by 
  supermercado,
  linha_produto
),
venda_por_sexo as ( 

select 
  supermercado,
  linha_produto,  
  sexo_cliente,
  count(id_fatura) as quantidade_de_compras_por_sexo
from
  {delta_table_full_name}
group by 
  supermercado,
  linha_produto,
  sexo_cliente
order by 
  supermercado,
  linha_produto,
  sexo_cliente
),
porcentagem as (
  select
    vs.supermercado,
    vs.linha_produto,
    vs.sexo_cliente,
    v.quantidade_de_compras as quantidade_total_de_compras_por_linha_produto,
    quantidade_de_compras_por_sexo,
    round(((100 * vs.quantidade_de_compras_por_sexo)/ v.quantidade_de_compras),0)  as porcentagem_compras_por_sexo
  from
    venda_total v
  inner join
    venda_por_sexo vs
      on vs.supermercado = v.supermercado 
        and vs.linha_produto = v.linha_produto

  order by 
    vs.supermercado,
    vs.linha_produto,
    vs.sexo_cliente

)
select 
  supermercado,
  linha_produto,
  sexo_cliente,
  quantidade_total_de_compras_por_linha_produto,
  quantidade_de_compras_por_sexo,
  porcentagem_compras_por_sexo
from 
  porcentagem
  """)

In [0]:
df_porcentagem_quantidade_sexo.write.mode("overwrite").parquet(path_df_vendas_por_mes)

Não foi possivel fazer o agendamento do notebook pois o community não dar acesso ao workflow