<a href="https://colab.research.google.com/github/erikafranca/desafio_ae/blob/main/DesafioLuizaLab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
## instalar pyspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark




In [5]:
## definindo os paths para cada arquivo de dados

categoria = "/content/luizalab/categoria.csv"
cidade = "/content/luizalab/cidade.csv"
cliente = "/content/luizalab/cliente.csv"
estado = "/content/luizalab/estado.csv"
filial = "/content/luizalab/filial.csv"
item_pedido = "/content/luizalab/item_pedido.csv"
parceiro = "/content/luizalab/parceiro.csv"
pedido = "/content/luizalab/pedido.csv"
produto = "/content/luizalab/produto.csv"
subcategoria = "/content/luizalab/subcategoria.csv"

In [6]:
# iniciar SESSAO SPARK
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [52]:
## definindo os dataframes a partir dos dados transacionais recebidos

df_categoria = spark.read.csv(categoria, header = "true", inferSchema="true", sep='|')
df_cidade = spark.read.csv(cidade, header = "true", inferSchema="true", sep='|')
df_cliente = spark.read.csv(cliente, header = "true", inferSchema="true", sep='|')
df_estado = spark.read.csv(estado, header = "true", inferSchema="true", sep='|')
df_filial = spark.read.csv(filial, header = "true", inferSchema="true", sep='|')
df_item_pedido = spark.read.csv(item_pedido, header = "true", inferSchema="true", sep='|')
df_parceiro = spark.read.csv(parceiro, header = "true", inferSchema="true", sep='|')
df_pedido = spark.read.csv(pedido, header = "true", inferSchema="true", sep='|')
df_produto = spark.read.csv(produto, header = "true", inferSchema="true", sep='|')
df_subcategoria = spark.read.csv(subcategoria, header = "true", inferSchema="true", sep='|')

In [38]:
## teste em um dataframe
df_categoria.select(df_categoria["id_categoria"]).show()

+------------+
|id_categoria|
+------------+
|           1|
|           2|
|           3|
|           4|
|           5|
|           6|
|           7|
|           8|
|           9|
|          10|
|          11|
|          12|
|          13|
|          14|
|          15|
|          16|
|          17|
|          18|
|          19|
|          20|
+------------+
only showing top 20 rows



In [64]:
## unindo dataframes a partir do relacionamento chave entre eles 

df_lake = df_pedido.join(df_item_pedido, df_pedido['id_pedido'] == df_item_pedido['id_pedido'], 'left') \
        .join(df_cliente, df_pedido['id_cliente'] == df_cliente['id_cliente'], 'left') \
        .join(df_parceiro, df_pedido['id_parceiro'] == df_parceiro['id_parceiro'], 'left') \
        .join(df_filial, df_pedido['id_filial'] == df_filial['id_filial'], 'left') \
        .join(df_cidade, df_filial['id_cidade'] == df_cidade['id_cidade'], 'left') \
        .join(df_estado, df_cidade['id_estado'] == df_estado['id_estado'], 'left') \
        .join(df_produto, df_item_pedido['id_produto'] == df_produto['id_produto'], 'left')\
        .join(df_subcategoria, df_produto['id_subcategoria'] == df_subcategoria['id_subcategoria'], 'left') \
        .join(df_categoria, df_subcategoria['id_categoria'] == df_categoria['id_categoria'], 'left') \
        
        
        
        

In [96]:
## verificando dados do datalake 

df_lake.show()

# verificando schema 

df_lake.printSchema()

+-----------+-------------------+-----------+----------+---------+-------------+-----------+----------+----------+-----------+----------+--------------------+---------+-----------+--------------------+---------+---------------+---------+---------+---------------+---------+---------+---------+----------+--------------------+---------------+---------------+--------------------+------------+------------+---------------+-------------+
|  id_pedido|          dt_pedido|id_parceiro|id_cliente|id_filial|vr_total_pago|  id_pedido|id_produto|quantidade|vr_unitario|id_cliente|          nm_cliente|flag_ouro|id_parceiro|         nm_parceiro|id_filial|      ds_filial|id_cidade|id_cidade|      ds_cidade|id_estado|id_estado|ds_estado|id_produto|          ds_produto|id_subcategoria|id_subcategoria|     ds_subcategoria|id_categoria|id_categoria|   ds_categoria|perc_parceiro|
+-----------+-------------------+-----------+----------+---------+-------------+-----------+----------+----------+-----------+----

In [78]:
#remoção de colunas repetidas com função python

def dropDupDfCols(df):
    newcols = []
    dupcols = []

    for i in range(len(df.columns)):
        if df.columns[i] not in newcols:
            newcols.append(df.columns[i])
        else:
            dupcols.append(i)

    df = df.toDF(*[str(i) for i in range(len(df.columns))])
    for dupcol in dupcols:
        df = df.drop(str(dupcol))

    return df.toDF(*newcols)

df_pronto = dropDupDfCols(df_lake)

In [85]:
## salvar os dados multidimensionais para que as áreas interessadas tenham acesso aos dados analíticos

df_pronto.repartition(1).write.csv("/content/luizalab/vendas", sep=',',header='true')




In [95]:
## renomear arquivo para que fique compreensível

import os 
path = '/content/luizalab/vendas'

for file in os.listdir(path):
    if file.endswith('.csv'):
       file= (os.path.join(path, file))
       os.rename(file, path + '/vendas.csv')



In [122]:
## utilizando a spark sql 

df_pronto.registerTempTable("tbl_vendas")

## calculo de valor de comissão por produto e agrupando valores totais por parceiro, mes e ano

df_com_0= spark.sql("""
SELECT
id_parceiro,
nm_parceiro,
ROUND(SUM(vr_total_pago),2) as vr_total_pago,
ROUND(SUM((vr_total_pago * perc_parceiro)/100),2) as vr_comissao,
month(dt_pedido) as mes,
year(dt_pedido) as ano
FROM tbl_vendas
GROUP BY 
id_parceiro,
nm_parceiro,
month(dt_pedido),
year(dt_pedido)
""")

## mostrando agrupamento

df_com_0.show()




+-----------+--------------------+-------------+-----------+---+----+
|id_parceiro|         nm_parceiro|vr_total_pago|vr_comissao|mes| ano|
+-----------+--------------------+-------------+-----------+---+----+
|         11|Parceiro Magalu - 11|     15135.12|     461.67|  8|2021|
|          6|Parceiro Magalu - 06|   8451003.97|  377452.09|  7|2021|
|         11|Parceiro Magalu - 11|     25832.17|     584.21|  6|2021|
|          5|Parceiro Magalu - 05|    931522.23|    22733.8|  9|2021|
|         10|Parceiro Magalu - 10|     306363.4|    7335.57|  6|2021|
|         16|Parceiro Magalu - 16|   6182042.29|   125916.4|  9|2021|
|          4|Parceiro Magalu - 04|        871.0|      17.42|  6|2021|
|         10|Parceiro Magalu - 10|    357296.67|    8345.91|  7|2021|
|          4|Parceiro Magalu - 04|       1092.0|      26.94|  9|2021|
|          5|Parceiro Magalu - 05|   1665491.01|   38720.53|  7|2021|
|         13|Parceiro Magalu - 13|2.494106252E7|  310002.21|  7|2021|
|         16|Parceir

In [129]:
## calculo de bonus

df_com_0.registerTempTable("tbl_com_0")

df_com_1 = spark.sql(""" 
SELECT 
id_parceiro,
nm_parceiro,
vr_total_pago,
CASE WHEN vr_total_pago/10000 > 0 then CAST((vr_total_pago/10000) as int)*100 else 0 end as bonus,
vr_comissao,
mes,
ano
FROM tbl_com_0 
""")
df_com_1.show()



+-----------+--------------------+-------------+------+-----------+---+----+
|id_parceiro|         nm_parceiro|vr_total_pago| bonus|vr_comissao|mes| ano|
+-----------+--------------------+-------------+------+-----------+---+----+
|         11|Parceiro Magalu - 11|     15135.12|   100|     461.67|  8|2021|
|          6|Parceiro Magalu - 06|   8451003.97| 84500|  377452.09|  7|2021|
|         11|Parceiro Magalu - 11|     25832.17|   200|     584.21|  6|2021|
|          5|Parceiro Magalu - 05|    931522.23|  9300|    22733.8|  9|2021|
|         10|Parceiro Magalu - 10|     306363.4|  3000|    7335.57|  6|2021|
|         16|Parceiro Magalu - 16|   6182042.29| 61800|   125916.4|  9|2021|
|          4|Parceiro Magalu - 04|        871.0|     0|      17.42|  6|2021|
|         10|Parceiro Magalu - 10|    357296.67|  3500|    8345.91|  7|2021|
|          4|Parceiro Magalu - 04|       1092.0|     0|      26.94|  9|2021|
|          5|Parceiro Magalu - 05|   1665491.01| 16600|   38720.53|  7|2021|

In [141]:
## calculo final de comissao, considerando o valor de bonus 
df_com_1.registerTempTable("tbl_com_1")


df_com_2 = spark.sql(""" 
SELECT 
id_parceiro,
nm_parceiro,
vr_total_pago,
CASE WHEN bonus <= 100 then 0 else round((vr_comissao - bonus),2) end as vr_comissao_final,
mes,
ano
from 
tbl_com_1
ORDER BY 
id_parceiro, mes, ano DESC
""")
## mostrando valores finais, agrupados por mês e ano

df_com_2.show()




+-----------+--------------------+-------------+-----------------+---+----+
|id_parceiro|         nm_parceiro|vr_total_pago|vr_comissao_final|mes| ano|
+-----------+--------------------+-------------+-----------------+---+----+
|          1|Parceiro Magalu - 01|     90047.64|          2034.33|  6|2021|
|          1|Parceiro Magalu - 01|     61109.64|          1365.88|  7|2021|
|          1|Parceiro Magalu - 01|     10568.14|              0.0|  8|2021|
|          1|Parceiro Magalu - 01|     64093.16|          1407.16|  9|2021|
|          2|Parceiro Magalu - 02|      3280.23|              0.0|  9|2021|
|          3|Parceiro Magalu - 03|    527613.19|         11366.06|  6|2021|
|          3|Parceiro Magalu - 03|    483907.58|          10130.7|  7|2021|
|          3|Parceiro Magalu - 03|    339750.34|          4738.73|  8|2021|
|          3|Parceiro Magalu - 03|    377602.01|          5001.44|  9|2021|
|          4|Parceiro Magalu - 04|        871.0|              0.0|  6|2021|
|          4

In [142]:
# salvar relatorio de comissao para as áreas interessadas

df_com_2.repartition(1).write.csv("/content/luizalab/comissao", sep=',',header='true')

## renomear arquivo para que fique compreensível

import os 
path = '/content/luizalab/comissao'

for file in os.listdir(path):
    if file.endswith('.csv'):
       file= (os.path.join(path, file))
       os.rename(file, path + '/comissoes.csv')


# Propostas de métricas interessantes para análise de comissionamento e performance:

1 - Agrupamento do valor total de vendas do parceiro por cidade para analisar se há grande disparidade de valor entre as localidades e talvez propor uma regra e/ou valor de comissionamento diferente nestes locais


2 - Ranking de parceiros que vendem (em valor) mais por mês


3 - Ranking de parceiros que vendem mais produtos por mês




