## Dados trabalhados

Este conjunto de dados foi fornecido pela maior loja de departamentos do mercado brasileiro. Depois que um cliente compra o produto, um vendedor é notificado para atender a esse pedido. Assim que o cliente recebe o produto, ou a data prevista de entrega está prevista, o cliente recebe um inquérito de satisfação por e-mail onde pode dar nota da experiência de compra e deixar alguns comentários. 

### Pontos importantes
1.     Um pedido pode ter vários itens.
2.     Cada item pode ser atendido por um vendedor distinto.
3.     Todo o texto identificando lojas e parceiros foi substituído pelos nomes das grandes casas de Game of Thrones.

### Esquema de dados

Os dados são divididos em vários conjuntos de dados para melhor compreensão e organização. Consulte o esquema de dados a seguir ao trabalhar com ele:

![Data Schema](https://i.imgur.com/HRhd2Y0.png)

## Configuração do Spark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz
!pip install pyspark==3.3.2
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

import findspark
findspark.init()

# Criação da Sessão do Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Iniciando com Spark') \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

spark

# Download do arquivo
!wget -c --no-check-certificate "https://onedrive.live.com/download?cid=08A3A632FA9EDBB1&resid=8A3A632FA9EDBB1%2110217&authkey=ACij76UvvMhutw8" -O dataset.zip

# Salvando CSVs
!unzip /content/dataset.zip -d /content/dataset



### 1 - Adequação dos dados

In [None]:
# Criação dos datasets
df_customers = spark.read.csv('/content/dataset/olist_customers_dataset.csv', header=True, inferSchema=True)
df_geolocation = spark.read.csv('/content/dataset/olist_geolocation_dataset.csv', header=True, inferSchema=True)
df_order_items = spark.read.csv('/content/dataset/olist_order_items_dataset.csv', header=True, inferSchema=True)
df_order_payments = spark.read.csv('/content/dataset/olist_order_payments_dataset.csv', header=True, inferSchema=True)
df_order_reviews = spark.read.csv('/content/dataset/olist_order_reviews_dataset.csv', header=True, inferSchema=True)
df_orders = spark.read.csv('/content/dataset/olist_orders_dataset.csv', header=True, inferSchema=True)
df_products = spark.read.csv('/content/dataset/olist_products_dataset.csv', header=True, inferSchema=True)
df_sellers = spark.read.csv('/content/dataset/olist_sellers_dataset.csv', header=True, inferSchema=True)
df_product_category_name = spark.read.csv('/content/dataset/product_category_name_translation.csv', header=True, inferSchema=True)

In [None]:
# Importar funções 
from pyspark.sql.functions import *

In [None]:
## Adequação dos dados

# Adequação customers
def preprocessing(dataframe):
    return (
        dataframe
        .select(
        col('customer_id').cast('string'),
        col('customer_unique_id').cast('string'),
        col('customer_zip_code_prefix').cast('integer'),
        col('customer_city').cast('string'),
        col('customer_state').cast('string'),
        )
    )
    
processed_customers = preprocessing(df_customers)

# Adequação geolocation
def preprocessing(dataframe):
    return (
        dataframe
        .select(
        col('geolocation_zip_code_prefix').cast('integer'),
        col('geolocation_lat').cast('integer'),
        col('geolocation_lng').cast('integer'),
        col('geolocation_city').cast('string'),
        col('geolocation_state').cast('string'),
        )
    )
    
processed_geolocation = preprocessing(df_geolocation)

# Adequação order_items
def preprocessing(dataframe):
    return (
        dataframe
        .select(
        col('order_id').cast('string'),
        col('order_item_id').cast('string'),
        col('product_id').cast('string'),
        col('seller_id').cast('string'),
        col('shipping_limit_date').cast('timestamp'),
        col('price').cast('double'),
        col('freight_value').cast('double'),
        )
    )
    
processed_order_items = preprocessing(df_order_items)

# Adequação order_payments
def preprocessing(dataframe):
    return (
        dataframe
        .select(
        col('order_id').cast('string'),
        col('payment_sequential').cast('integer'),
        col('payment_type').cast('string'),
        col('payment_installments').cast('integer'),
        col('payment_value').cast('double'),
        )
    )
    
processed_order_payments = preprocessing(df_order_payments)

# Adequação order_reviews
def preprocessing(dataframe):
    return (
        dataframe
        .select(
        col('review_id').cast('string'),
        col('order_id').cast('string'),
        col('review_score').cast('integer'),
        col('review_comment_title').cast('string'),
        col('review_comment_message').cast('string'),
        col('review_creation_date').cast('timestamp'),
        col('review_answer_timestamp').cast('timestamp'),
        )
    )
    
processed_order_reviews = preprocessing(df_order_reviews)

# Adequação orders
def preprocessing(dataframe):
    return (
        dataframe
        .select(
        col('order_id').cast('string'),
        col('customer_id').cast('string'),
        col('order_status').cast('string'),
        col('order_purchase_timestamp').cast('timestamp'),
        col('order_approved_at').cast('timestamp'),
        col('order_delivered_carrier_date').cast('timestamp'),
        col('order_delivered_customer_date').cast('timestamp'),
        col('order_estimated_delivery_date').cast('timestamp'),
        )
    )
    
processed_orders = preprocessing(df_orders)

# Adequação products
def preprocessing(dataframe):
    return (
        dataframe
        .select(
        col('product_id').cast('string'),
        col('product_category_name').cast('string'),
        col('product_name_lenght').cast('integer'),
        col('product_description_lenght').cast('integer'),
        col('product_photos_qty').cast('integer'),
        col('product_weight_g').cast('integer'),
        col('product_length_cm').cast('integer'),
        col('product_height_cm').cast('integer'),
        col('product_width_cm').cast('integer'),
        )
    )
    
processed_products = preprocessing(df_products)

# Adequação sellers
def preprocessing(dataframe):
    return (
        dataframe
        .select(
        col('seller_id').cast('string'),
        col('seller_zip_code_prefix').cast('integer'),
        col('seller_city').cast('string'),
        col('seller_state').cast('string'),
        )
    )
    
processed_sellers = preprocessing(df_sellers)

# Adequação product_category_name
def preprocessing(dataframe):
    return (
        dataframe
        .select(
        col('product_category_name').cast('string'),
        col('product_category_name_english').cast('string'),
        )
    )
    
processed_product_category_name = preprocessing(df_product_category_name)

## 2 - Criando uma base unificada
Unificação dos datasets:
- olist_orders_dataset
- olist_order_items_dataset
- olist_order_payments_dataset
- olist_order_reviews_dataset

In [None]:
#criacao das tabelas temporárias
processed_orders.createOrReplaceTempView("sql_orders")
processed_order_items.createOrReplaceTempView("sql_items")
processed_order_payments.createOrReplaceTempView("sql_payments")
processed_order_reviews.createOrReplaceTempView("sql_reviews")

In [None]:
#junção das tabelas em que seus IDs forem iguais
df_orders_join = spark.sql(""" SELECT *
FROM sql_orders
INNER JOIN sql_items ON sql_orders.order_id = sql_items.order_id
INNER JOIN sql_payments ON sql_orders.order_id = sql_payments.order_id
INNER JOIN sql_reviews ON sql_orders.order_id = sql_reviews.order_id;
""")

In [None]:
df_orders_join.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+--------------------+------------------+------------+--------------------+-------------+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|            order_id|payment_sequential|payment_type|payment_installments|payment_value|           review_id|            order_id|review_score|review_c

## 3 - Perguntas de negócio:
- Qual foi o faturamento mês a mês? Qual o mês com maior número de pedidos?
- Qual o ticket médio dos pedidos?
- Qual o dia da semana com maior número de pedidos?
- Quais são os Top 10 produtos mais bem avaliados? O produto deve ter pelo menos 5 avaliações.
- Qual o faturamento mensal por método de pagamento?
- Qual a categoria mais vendida na empresa?
- Qual vendedor teve a melhor performance mês a mês?

## 4 - Qual foi o faturamento mês a mês? Qual o mês com maior número de pedidos?

In [None]:
#extraindo os dados do DF original e extraindo o mês, o dia da semana e o ano
df_vendas_por_ano_mes = df_orders_join \
    .withColumn("ano", date_format("order_purchase_timestamp", "yyyy")) \
    .withColumn("mes", date_format("order_purchase_timestamp", "MM")) \
    .withColumn("dia_da_semana", date_format("order_purchase_timestamp", "E"))


In [None]:
#agrupa os dados por mes e ano, soma o preco dos agrupamentos e ordena-os
df_faturamento_ano_mes = df_vendas_por_ano_mes \
    .groupBy("mes", "ano") \
    .agg(sum("price").alias("faturamento")) \
    .orderBy("ano", "mes") \

#mostra o faturamento por ano e mes
df_faturamento_ano_mes.show()

+---+----+------------------+
|mes| ano|       faturamento|
+---+----+------------------+
| 09|2016|132.39000000000001|
| 10|2016|50272.470000000096|
| 12|2016|              10.9|
| 01|2017| 128589.0600000002|
| 02|2017|259477.97999999818|
| 03|2017|392081.03999999457|
| 04|2017|389909.31999999506|
| 05|2017| 544303.1299999962|
| 06|2017|  453843.429999994|
| 07|2017| 531931.7299999974|
| 08|2017| 599081.0599999983|
| 09|2017| 660526.8199999983|
| 10|2017| 689624.8199999984|
| 11|2017|1045441.0000000176|
| 12|2017| 761677.3200000012|
| 01|2018| 982295.3000000179|
| 02|2018| 882310.8200000107|
| 03|2018|1018392.5100000163|
| 04|2018|1024458.7700000148|
| 05|2018|1025932.0700000152|
+---+----+------------------+
only showing top 20 rows



In [None]:
##descobrindo o mês com maior numero de pedidos

#agrupar as vendas por mes
pedidos_por_mes = df_vendas_por_ano_mes \
    .groupBy("mes") \
    .agg(count("sql_items.order_id").alias("numero vendas")) 

#ordena por numero de vendas e pega o primeiro item em ordem decrescente
mes_maior_numero_vendas = pedidos_por_mes.orderBy(desc("numero vendas")).first()["mes"] 

#mostra o mês com maior número de vendas
print("O mês com o maior número de vendas foi o de número:", mes_maior_numero_vendas)


O mês com o maior número de vendas foi o de número: 08


## 5 - Qual o ticket médio dos pedidos?

In [None]:
df_vendas_por_ano_mes.agg({'price': 'avg'}).show()

+------------------+
|        avg(price)|
+------------------+
|120.52434879704333|
+------------------+



## 6 - Qual o dia da semana com maior número de pedidos?

In [None]:
#conta os pedidos e os agrupa por dia da semana
df_pedidos_por_semana = df_vendas_por_ano_mes \
  .groupBy('dia_da_semana') \
  .agg(count('sql_items.order_id').alias('numero_pedidos')) \

#ordena o agrupamento do maior para o menor e pega o primeiro item
dia_semana_com_mais_pedidos = df_pedidos_por_semana.orderBy(desc('numero_pedidos')).first()['dia_da_semana']

#imprime o dia da semana de maior numero de vendas
print("O dia da semana com o maior número de vendas foi:", dia_semana_com_mais_pedidos)


O dia da semana com o maior número de vendas foi: Tue


## 7 - Quais são os Top 10 produtos mais bem avaliados? O produto deve ter pelo menos 5 avaliações.

In [None]:
#criando uma tabela temporária onde tem o numero de pedidos
df_orders_join.createOrReplaceTempView('sql_orders_join')

In [None]:
#agrupo por product_id, depois conto o numero de avaliacoes e somo as a quantidade de estrelas recebidas de cada produto
#retiro os produtos que receberam menos de 5 avaliacoes com o .filter e ordeno a tabela do maior para o menor produto em soma de estrelas
df_produtos_mais_bem_avaliados = df_orders_join \
  .groupBy('product_id') \
  .agg(count('review_score').alias('num_avaliacoes'), sum('review_score').alias('soma_estrelas')) \
  .filter("num_avaliacoes >= 5") \
  .orderBy(desc('soma_estrelas'))

#mostro os 10 produtos mais bem avaliados (com maior número de estrelas)
df_produtos_mais_bem_avaliados.show(10)

+--------------------+--------------+-------------+
|          product_id|num_avaliacoes|soma_estrelas|
+--------------------+--------------+-------------+
|aca2eb7d00ea1a7b8...|           533|         2143|
|99a4788cb24856965...|           517|         2024|
|422879e10f4668299...|           507|         1991|
|389d119b48cf3043d...|           405|         1663|
|368c6c730842d7801...|           395|         1544|
|53759a2ecddad2bb8...|           389|         1511|
|d1c427060a0f73f6b...|           354|         1450|
|53b36df67ebb7c415...|           324|         1361|
|154e7e31ebfa09220...|           294|         1270|
|3dd2a17168ec895c7...|           276|         1161|
+--------------------+--------------+-------------+
only showing top 10 rows



## 8 - Qual o faturamento mensal por método de pagamento?

In [None]:
#agrupa os dados por mes, ano e tipo de pagamento, soma o valor do preco da venda de cada agrupamento e os ordena
faturamento_por_metodo_pgto = df_vendas_por_ano_mes \
  .groupBy('mes', 'ano', 'payment_type') \
  .agg(sum('price').alias('faturamento')) \
  .orderBy('ano', 'mes') \
  .show()

+---+----+------------+------------------+
|mes| ano|payment_type|       faturamento|
+---+----+------------+------------------+
| 09|2016| credit_card|132.39000000000001|
| 10|2016|      boleto|           7128.73|
| 10|2016| credit_card| 41335.49000000005|
| 10|2016|  debit_card|209.89000000000001|
| 10|2016|     voucher|           1598.36|
| 12|2016| credit_card|              10.9|
| 01|2017|      boleto| 19897.47999999997|
| 01|2017| credit_card| 98289.38000000027|
| 01|2017|     voucher| 9813.329999999998|
| 01|2017|  debit_card|            588.87|
| 02|2017| credit_card|200339.37999999945|
| 02|2017|      boleto| 47357.02000000008|
| 02|2017|     voucher|10512.299999999994|
| 02|2017|  debit_card|           1269.28|
| 03|2017| credit_card|298680.48999999755|
| 03|2017|      boleto| 66600.67000000023|
| 03|2017|     voucher| 23674.32000000001|
| 03|2017|  debit_card|3125.5600000000004|
| 04|2017|  debit_card|2292.7200000000003|
| 04|2017|      boleto| 67829.74000000024|
+---+----+-

## 9 - Qual a categoria mais vendida na empresa?

In [None]:
#cria a tabela temporária dos produtos porque preciso relacionar sua categoria com a order_id
processed_products.createOrReplaceTempView("sql_products")

#join da tabela order_items e products para pegar sua caregoria
df_vendas_e_categorias = spark.sql(""" SELECT *
  FROM sql_items
  INNER JOIN sql_products
  ON sql_items.product_id == sql_products.product_id;
""")

#agrupar as vendas por categoria
df_categorias_mais_vendidas = df_vendas_e_categorias \
  .groupBy('product_category_name') \
  .agg(count('order_id').alias('num_vendas')) \

#criação da variável da categoria mais vendida, ordenação para decrescente e funcao para pegar o primeiro item
categoria_mais_vendida = df_categorias_mais_vendidas.orderBy(desc('num_vendas')).first()['product_category_name']
print("A categoria com o maior número de vendas foi:", categoria_mais_vendida)

A categoria com o maior número de vendas foi: cama_mesa_banho


## 10 - Qual vendedor teve a melhor performance mês a mês?

In [None]:
#melhor performance por numero de vendas, ordenado por maior numero de vendas
df_melhores_vendedores_por_mes = df_vendas_por_ano_mes \
  .groupBy('mes', 'ano', 'seller_id') \
  .agg(count('sql_items.order_id').alias('num_vendas')) \
  .orderBy(desc('num_vendas')) \
  .show()

#melhor performance por faturamento na célula abaixo


+---+----+--------------------+----------+
|mes| ano|           seller_id|num_vendas|
+---+----+--------------------+----------+
| 11|2017|1f50f920176fa81da...|       371|
| 05|2018|955fee9216a65b617...|       262|
| 01|2018|955fee9216a65b617...|       256|
| 04|2018|955fee9216a65b617...|       241|
| 11|2017|cc419e0650a3c5ba7...|       222|
| 07|2018|6560211a19b47992c...|       220|
| 05|2018|7d13fca1522535862...|       216|
| 01|2018|3d871de0142ce09b7...|       213|
| 08|2018|6560211a19b47992c...|       211|
| 11|2017|4a3ca9315b744ce9f...|       204|
| 03|2018|7c67e1448b00f6e96...|       196|
| 11|2017|6560211a19b47992c...|       192|
| 12|2017|cc419e0650a3c5ba7...|       192|
| 01|2018|da8622b14eb17ae28...|       192|
| 02|2018|1f50f920176fa81da...|       190|
| 05|2018|1025f0e2d44d7041d...|       187|
| 01|2018|8b321bb669392f516...|       185|
| 01|2018|ea8482cd71df3c196...|       185|
| 02|2018|8b321bb669392f516...|       183|
| 06|2018|6560211a19b47992c...|       180|
+---+----+-

In [None]:
#melhor performance por faturamento ordenado por maior faturamento
df_melhores_vendedores_por_mes = df_vendas_por_ano_mes \
  .groupBy('mes', 'ano', 'seller_id') \
  .agg(sum('price').alias('faturamento')) \
  .orderBy(desc('faturamento')) \
  .show()


+---+----+--------------------+------------------+
|mes| ano|           seller_id|       faturamento|
+---+----+--------------------+------------------+
| 09|2017|53243585a1d6dc264...| 75533.67000000003|
| 10|2017|53243585a1d6dc264...|           43614.4|
| 08|2017|53243585a1d6dc264...|           35776.0|
| 05|2018|7d13fca1522535862...|32219.809999999998|
| 03|2018|7d13fca1522535862...|          28612.44|
| 05|2018|955fee9216a65b617...|28064.309999999976|
| 05|2017|7e93a43ef30c4f03f...|27744.420000000006|
| 04|2018|7d13fca1522535862...| 27600.88999999999|
| 10|2017|7e93a43ef30c4f03f...|27381.509999999995|
| 03|2018|4869f7a5dfa277a7d...|27169.999999999993|
| 03|2018|7c67e1448b00f6e96...|25843.969999999972|
| 03|2018|5dceca129747e92ff...|           24981.1|
| 11|2017|53243585a1d6dc264...|          23213.96|
| 12|2017|fa1c13f2614d7b5c4...| 22845.60999999999|
| 04|2017|59417c56835dd8e2e...|           22665.0|
| 11|2017|46dc3b2cc0980fb8e...|22388.819999999978|
| 04|2018|955fee9216a65b617...|