## Análise de dados: Comércio eletrônico brasileiro

Este projeto é composto por um conjunto de dados públicos de comércio eletrônico brasileiro, disponibilizados pelo site Olist, são registros que compõem todo o processo de venda de um produto, da compra, pagamento, entrega e avaliação, além de dados de geolocalização, produtos e vendedores. Estas informações serão tratadas e analisadas, de modo a responder questões de negócio.


### Importação de bibliotecas

In [38]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, desc, sum, hour


### Criação e iniciação de uma sessão Spark

In [39]:

appName = 'PySpark - Olist'

spark = SparkSession.builder \
    .appName('PySpark - Olist') \
    .config('spark.driver.memory', '8g') \
    .config('spark.driver.cores', '2') \
    .config('spark.executor.memory', '8g') \
    .config('spark.executor.cores', '4') \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

spark


### Criação dos datasets a partir da leitura dos arquivos *.csv


In [40]:
df_orders = spark.read.csv('dados\olist_orders_dataset.csv', sep=',', header=True, 
                           encoding='utf-8', inferSchema=True)
df_customers = spark.read.csv('dados\olist_customers_dataset.csv', sep=',', header=True, 
                              encoding='utf-8', inferSchema=True)
df_geolocation = spark.read.csv('dados\olist_geolocation_dataset.csv', sep=',', header=True, 
                                encoding='utf-8', inferSchema=True)
df_order_items = spark.read.csv('dados\olist_order_items_dataset.csv', sep=',', header=True, 
                                encoding='utf-8', inferSchema=True)
df_order_payments = spark.read.csv('dados\olist_order_payments_dataset.csv', sep=',', header=True, 
                                   encoding='utf-8', inferSchema=True)
df_order_reviews = spark.read.csv('dados\olist_order_reviews_dataset.csv', sep=',', header=True, 
                                  encoding='utf-8', inferSchema=True, 
                                  multiLine=True, ignoreLeadingWhiteSpace=True, escape='"', quote='"')
df_products = spark.read.csv('dados\olist_products_dataset.csv', sep=',', header=True, 
                             encoding='utf-8', inferSchema=True)
df_sellers = spark.read.csv('dados\olist_sellers_dataset.csv', sep=',', header=True, 
                            encoding='utf-8', inferSchema=True)

### Verificando os tipos das colunas

In [41]:
df_orders.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [42]:
df_customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [43]:
df_geolocation.printSchema()

root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)



In [44]:
df_order_items.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



In [45]:
df_order_payments.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [46]:
df_order_reviews.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: timestamp (nullable = true)
 |-- review_answer_timestamp: timestamp (nullable = true)



In [47]:
df_products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)



In [48]:
df_sellers.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



### Verificando a existência de registros nulos

In [49]:
def check_nulls(dataframe, name) -> None:
    '''
    Verifica e exibe a quantidade de valores nulos em cada coluna do dataframe.

    :param dataframe: DataFrame
        Dataframe a ser analisado.
    :param name: str
        Nome identificando o dataframe para exibição na saída.
    '''
    print(f'\n{name.upper()} { "-" * (100 - len(name))}')
    for coluna in dataframe.columns:
        qty = dataframe.filter(dataframe[coluna].isNull()).count()
        if qty >= 1:
            print(f'{coluna}: {qty}')


In [50]:
check_nulls(df_orders, 'df_orders')
check_nulls(df_customers, 'df_customers')
check_nulls(df_geolocation, 'df_geolocation')
check_nulls(df_order_items, 'df_order_items')
check_nulls(df_order_payments, 'df_order_payments')
check_nulls(df_order_reviews, 'df_order_reviews')
check_nulls(df_products, 'df_products')
check_nulls(df_sellers, 'df_sellers')



DF_ORDERS -------------------------------------------------------------------------------------------
order_approved_at: 160
order_delivered_carrier_date: 1783
order_delivered_customer_date: 2965

DF_CUSTOMERS ----------------------------------------------------------------------------------------

DF_GEOLOCATION --------------------------------------------------------------------------------------

DF_ORDER_ITEMS --------------------------------------------------------------------------------------

DF_ORDER_PAYMENTS -----------------------------------------------------------------------------------

DF_ORDER_REVIEWS ------------------------------------------------------------------------------------
review_comment_title: 87658
review_comment_message: 58256

DF_PRODUCTS -----------------------------------------------------------------------------------------
product_category_name: 610
product_name_lenght: 610
product_description_lenght: 610
product_photos_qty: 610
product_weight_g: 2


Foram identificados valores nulos em 3 dataframes, df_orders, df_order_reviews e df_products, entretando no caso de **df_orders** os dados representam operações de venda, logo possui vários estágios podendo ser uma venda concluída, cancelada, processamento ou mesmo em trânsito, ou seja, dependendo do estágio algumas colunas podem ficarem vazias (nulas), em **df_order_reviews** há campos com reviews dos compradores sobre suas compras, não é obrigatório um cliente escrever um review e em **df_products** há produtos com nome e descrição ausentes, porém constam em pedidos de clientes.

### Verificando a existência de registros duplicados

In [51]:
def check_duplicates(dataframe, fields) -> None:
    '''
    Verifica e exibe uma amostra de 5 registros duplicados com base em um ou mais campos especificados.

    :param dataframe: DataFrame
        Dataframe a ser analisado.
    :param fields: str ou list de str
        Nome do campo ou lista de campos a serem usados como referência para identificar duplicatas.
    '''
    duplicate = dataframe.groupBy(fields) \
        .agg(count('*').alias('qty')) \
        .where(col('qty') > 1) \
        .orderBy(desc('qty'))   
    duplicate.show(5, truncate=False)

In [52]:
check_duplicates(df_customers, 'customer_unique_id')

+--------------------------------+---+
|customer_unique_id              |qty|
+--------------------------------+---+
|8d50f5eadf50201ccdcedfb9e2ac8455|17 |
|3e43e6105506432c953e165fb2acf44c|9  |
|ca77025e7201e3b30c44b472ff346268|7  |
|1b6c7548a2a1f9037c1fd3ddfed95f33|7  |
|6469f99c1f9dfae7733b25662e7f1782|7  |
+--------------------------------+---+
only showing top 5 rows



Em **df_customers** existem dados duplicados na coluna _customer_unique_id_, porém conforme a descrição da tabela no site Kaggle está correto conforme regra estabelecida pela Olist, basicamente esse campo permite que se identifique clientes que fizeram recompras.

In [53]:
check_duplicates(df_geolocation, ['geolocation_lat', 'geolocation_lng'])

+-------------------+------------------+---+
|geolocation_lat    |geolocation_lng   |qty|
+-------------------+------------------+---+
|-27.102098999999946|-48.62961349999995|314|
|-23.495901469908656|-46.87468669635919|190|
|-23.506049208479613|-46.71737739541604|141|
|-23.490617505282753|-46.86900366603934|127|
|-23.00551425914832 |-43.37596441256672|102|
+-------------------+------------------+---+
only showing top 5 rows



In [54]:
df_geolocation = df_geolocation.dropDuplicates()
        
df_geolocation.show(5, truncate=False)

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|geolocation_lat    |geolocation_lng    |geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|1020                       |-23.551247650297142|-46.628331542472104|sao paulo       |SP               |
|1015                       |-23.548031800450076|-46.63357615958799 |sao paulo       |SP               |
|1033                       |-23.54077242862274 |-46.63658627751788 |são paulo       |SP               |
|1122                       |-23.529995958364616|-46.64040623812689 |sao paulo       |SP               |
|1103                       |-23.539337475993985|-46.62899422217096 |são paulo       |SP               |
+---------------------------+-------------------+-------------------+----------------+-----------------+
only showing top 5 rows



Existiam linhas duplicadas em **df_geolocation**, porém nesse dataframe não há uma coluna com um identificador exclusivo para cada linha então foi criada uma tabela com os dados exclusivos baseado nas colunas _geolocation_lat_ e _geolocation_lng_ e substituído dataframe anterior.

In [55]:
check_duplicates(df_order_items, 'order_id')

+--------------------------------+---+
|order_id                        |qty|
+--------------------------------+---+
|8272b63d03f5f79c56e9e4120aec44ef|21 |
|1b15974a0141d54e36626dca3fdc731a|20 |
|ab14fdcfbe524636d65ee38360e22ce8|20 |
|428a2f660dc84138d969ccd69a0ab6d5|15 |
|9ef13efd6949e4573a18964dd1bbe7f5|15 |
+--------------------------------+---+
only showing top 5 rows



Há linhas duplicadas em **df_order_items**, este dataframe contêm os itens que compõem uma compra, logo uma ordem de venda pode conter um ou vários produtos distintos ou não, portanto não há necessidade de efetuar qualquer ajuste.

In [56]:
check_duplicates(df_order_payments, 'order_id')

+--------------------------------+---+
|order_id                        |qty|
+--------------------------------+---+
|fa65dad1b0e818e3ccc5cb0e39231352|29 |
|ccf804e764ed5650cd8759557269dc13|26 |
|285c2e15bebd4ac83635ccc563dc71f4|22 |
|895ab968e7bb0d5659d16cd74cd1650c|21 |
|ee9ca989fc93ba09a6eddc250ce01742|19 |
+--------------------------------+---+
only showing top 5 rows



O dataframe **df_order_payments** consiste na cadastro de meios de pagamentos, parcelas (quando aplicável), então uma ordem de venda pode ser com uma ou várias formas de pagamento e estas podem ser parceladas em várias vezes, nenhuma alteração executada.

In [57]:
check_duplicates(df_order_reviews, 'order_id')

+--------------------------------+---+
|order_id                        |qty|
+--------------------------------+---+
|df56136b8031ecd28e200bb18e6ddb2e|3  |
|8e17072ec97ce29f0e1f111e598b0c85|3  |
|03c939fd7fd3b38f8485a0f95798f1f6|3  |
|c88b1d1b157a9999ce368f218a407141|3  |
|f63a31c3349b87273468ff7e66852056|2  |
+--------------------------------+---+
only showing top 5 rows



Após a conclusão da entrega do pedido o cliente recebe um e-mail convidando a efetuar uma avaliação, entretanto tanto o título quanto o mensagem da review são opcionais. Apesar de existirem pedidos com mais de uma avaliação os demais campos apresentam notas reviews, data e horas distintas, são essas informações que alimentam o dataframe **df_order_reviews**. Não foram feitas alterações.

In [58]:
check_duplicates(df_orders, 'order_id')

+--------+---+
|order_id|qty|
+--------+---+
+--------+---+



Um cliente pode efetuar um ou várias compras e que podem estar em várias estágios desde a criação da ordem até a sua entrega, ainda entre esses estágios algum outro processo pode determinar a continuidade da venda ou não, resultando em por exemplo o cancelamento, sendo assim pode ocorrer de existir registros de operações de venda ainda não concluídos, estas são as informaçõe que compõem o **df_orders**.

In [59]:
check_duplicates(df_products, 'product_id')

+----------+---+
|product_id|qty|
+----------+---+
+----------+---+



O dataframe **df_products** contém o cadastro de produtos, descrição, dimensões, entre outras informações.

In [60]:
check_duplicates(df_sellers, 'seller_id')

+---------+---+
|seller_id|qty|
+---------+---+
+---------+---+



O dataframe **df_sellers** contém o cadastro de vendedores e sua localização (cep, cidade e estado).

### Criando views temporárias para uso do Spark SQL

In [61]:
df_orders.createOrReplaceTempView('tb_orders')
df_customers.createOrReplaceTempView('tb_customers')
df_geolocation.createOrReplaceTempView('tb_geolocation')
df_order_items.createOrReplaceTempView('tb_order_items')
df_order_payments.createOrReplaceTempView('tb_order_payments')
df_order_reviews.createOrReplaceTempView('tb_order_reviews')
df_products.createOrReplaceTempView('tb_products')
df_sellers.createOrReplaceTempView('tb_sellers')


### Criando views SQL temporárias 

In [62]:
spark.sql('''
    CREATE TEMP VIEW vw_order_costumer AS 
    SELECT 
        oo.customer_id,
        oo.order_id,
        cc.customer_unique_id,
        cc.customer_city,
        cc.customer_state        
    FROM tb_orders oo
    INNER JOIN tb_customers cc 
    ON cc.customer_id = oo.customer_id;
''')


AnalysisException: [TEMP_TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create the temporary view `vw_order_costumer` because it already exists.
Choose a different name, drop or replace the existing view,  or add the IF NOT EXISTS clause to tolerate pre-existing views.

In [None]:
spark.sql('''
    CREATE TEMP VIEW vw_price_freight AS 
        SELECT
            cc.customer_city,
            cc.customer_state,
            oo.order_id,
            oo.order_purchase_timestamp,
            oi.price,
            oi.freight_value
        FROM tb_orders oo
        INNER JOIN tb_customers cc 
        ON cc.customer_id = oo.customer_id
        INNER JOIN tb_order_items oi 
        ON oi.order_id = oo.order_id;
''')


DataFrame[]

### 01 - Qual o total(quantidade) de vendas divididas por estado?

In [None]:
spark.sql('''
    SELECT 
        customer_state, 
        COUNT(order_id) amount_of_sales
    FROM vw_order_costumer 
        GROUP BY 
            customer_state 
        ORDER BY 
            amount_of_sales DESC;
''').show(31, truncate=False)


+--------------+---------------+
|customer_state|amount_of_sales|
+--------------+---------------+
|SP            |41746          |
|RJ            |12852          |
|MG            |11635          |
|RS            |5466           |
|PR            |5045           |
|SC            |3637           |
|BA            |3380           |
|DF            |2140           |
|ES            |2033           |
|GO            |2020           |
|PE            |1652           |
|CE            |1336           |
|PA            |975            |
|MT            |907            |
|MA            |747            |
|MS            |715            |
|PB            |536            |
|PI            |495            |
|RN            |485            |
|AL            |413            |
|SE            |350            |
|TO            |280            |
|RO            |253            |
|AM            |148            |
|AC            |81             |
|AP            |68             |
|RR            |46             |
+---------

### 02 - Qual o total(valor) de vendas e fretes divididos por estado?

In [None]:
spark.sql('''
    SELECT
        customer_state,
        CAST(SUM (price) AS DECIMAL(10,2)) AS total_price,
        CAST(SUM (freight_value) AS DECIMAL(10,2)) AS total_freight
    FROM vw_price_freight
        GROUP BY 
            customer_state
        ORDER BY 
            total_price DESC;
''').show(31, truncate=False)


+--------------+-----------+-------------+
|customer_state|total_price|total_freight|
+--------------+-----------+-------------+
|SP            |5202955.05 |718723.07    |
|RJ            |1824092.67 |305589.31    |
|MG            |1585308.03 |270853.46    |
|RS            |750304.02  |135522.74    |
|PR            |683083.76  |117851.68    |
|SC            |520553.34  |89660.26     |
|BA            |511349.99  |100156.68    |
|DF            |302603.94  |50625.50     |
|GO            |294591.95  |53114.98     |
|ES            |275037.31  |49764.60     |
|PE            |262788.03  |59449.66     |
|CE            |227254.71  |48351.59     |
|PA            |178947.81  |38699.30     |
|MT            |156453.53  |29715.43     |
|MA            |119648.22  |31523.77     |
|MS            |116812.64  |19144.03     |
|PB            |115268.08  |25719.73     |
|PI            |86914.08   |21218.20     |
|RN            |83034.98   |18860.10     |
|AL            |80314.81   |15914.59     |
|SE        

### 03 - Qual o total(quantidade) e distribuição(%) de vendas por hora?

In [None]:
spark.sql('''
    SELECT
        date_format(order_purchase_timestamp, 'HH') AS hour_24,
        COUNT(order_id) AS amount_per_hour,
        CAST(COUNT(*) * 100 / SUM(COUNT(*)) OVER() AS DECIMAL(10,2)) AS perc_score
    FROM tb_orders
        GROUP BY 
            hour_24
        ORDER BY 
            hour_24;
''').show(28, truncate=False)


+-------+---------------+----------+
|hour_24|amount_per_hour|perc_score|
+-------+---------------+----------+
|00     |2394           |2.41      |
|01     |1170           |1.18      |
|02     |510            |0.51      |
|03     |272            |0.27      |
|04     |206            |0.21      |
|05     |188            |0.19      |
|06     |502            |0.50      |
|07     |1231           |1.24      |
|08     |2967           |2.98      |
|09     |4785           |4.81      |
|10     |6177           |6.21      |
|11     |6578           |6.61      |
|12     |5995           |6.03      |
|13     |6518           |6.55      |
|14     |6569           |6.61      |
|15     |6454           |6.49      |
|16     |6675           |6.71      |
|17     |6150           |6.18      |
|18     |5769           |5.80      |
|19     |5982           |6.02      |
|20     |6193           |6.23      |
|21     |6217           |6.25      |
|22     |5816           |5.85      |
|23     |4123           |4.15      |
+

### 04 - Qual a média(valor) de vendas por hora?

In [None]:
spark.sql('''
    SELECT 
        hour_24,
        CAST(AVG (sum_total) AS DECIMAL(10,2)) AS avg_total
    FROM (
            SELECT 
                oo.order_id, 
                date_format(order_purchase_timestamp, 'HH') AS hour_24,
                SUM (price) AS sum_total
            FROM tb_orders oo
                INNER JOIN tb_order_items oi 
                ON oi.order_id = oo.order_id 
                GROUP BY 
                    oo.order_id, 
                    hour_24,
                    oo.order_purchase_timestamp
                ORDER BY 
                    hour_24
            ) AS total_amount_per_order
        GROUP BY 
            hour_24
        ORDER BY 
            hour_24;
''').show(28, truncate=False)


+-------+---------+
|hour_24|avg_total|
+-------+---------+
|00     |133.23   |
|01     |129.77   |
|02     |108.58   |
|03     |132.06   |
|04     |118.45   |
|05     |119.76   |
|06     |114.43   |
|07     |125.03   |
|08     |132.95   |
|09     |143.59   |
|10     |136.67   |
|11     |134.11   |
|12     |142.45   |
|13     |134.92   |
|14     |145.42   |
|15     |141.20   |
|16     |141.11   |
|17     |137.41   |
|18     |143.31   |
|19     |138.95   |
|20     |138.98   |
|21     |135.71   |
|22     |135.95   |
|23     |128.60   |
+-------+---------+



### 05 - Qual o ticket médio nos anos de 2016, 2017 e 2018?

In [None]:
spark.sql('''
    SELECT 
        ext_year,
        CAST(AVG (sum_total) AS DECIMAL(10,2)) AS avg_ticket
    FROM (
            SELECT 
                oo.order_id,
                EXTRACT (YEAR FROM order_purchase_timestamp) AS ext_year, 
                SUM (price) AS sum_total
            FROM tb_orders oo
                INNER JOIN tb_order_items oi 
                ON oi.order_id = oo.order_id 
                GROUP BY 
                    ext_year,
                    oo.order_id
                ORDER BY 
                    ext_year
            ) AS total_amount_per_order
        GROUP BY 
            ext_year
        ORDER BY 
            ext_year;
''').show(truncate=False)


+--------+----------+
|ext_year|avg_ticket|
+--------+----------+
|2016    |159.57    |
|2017    |138.09    |
|2018    |137.35    |
+--------+----------+



### 06 - Qual a distribuição(%) da pontuação do pedidos?

In [None]:
spark.sql('''
    SELECT 
        review_score,
        COUNT(*) AS qty_score,
        CAST(COUNT(*) * 100 / SUM(COUNT(*)) OVER() AS DECIMAL(10,2)) AS perc_score
    FROM tb_order_reviews
        GROUP BY
            review_score
        ORDER BY
            review_score DESC;
''').show(truncate=False)


+------------+---------+----------+
|review_score|qty_score|perc_score|
+------------+---------+----------+
|5           |57328    |57.78     |
|4           |19142    |19.29     |
|3           |8179     |8.24      |
|2           |3151     |3.18      |
|1           |11424    |11.51     |
+------------+---------+----------+



### 07 - Quais as 10 cidades com as maiores volumes(quantidade) de vendas?

In [None]:
spark.sql('''
    SELECT 
        customer_city, 
        customer_state, 
        COUNT(*) AS top_10
    FROM vw_order_costumer 
        GROUP BY 
            customer_city, 
            customer_state 
        ORDER BY 
            top_10 DESC
        LIMIT 10;
''').show(truncate=False)


+---------------------+--------------+------+
|customer_city        |customer_state|top_10|
+---------------------+--------------+------+
|sao paulo            |SP            |15540 |
|rio de janeiro       |RJ            |6882  |
|belo horizonte       |MG            |2773  |
|brasilia             |DF            |2131  |
|curitiba             |PR            |1521  |
|campinas             |SP            |1444  |
|porto alegre         |RS            |1379  |
|salvador             |BA            |1245  |
|guarulhos            |SP            |1189  |
|sao bernardo do campo|SP            |938   |
+---------------------+--------------+------+



### 08 - Quais as 10 cidades com os maiores volumes(valores) de vendas e fretes?

In [None]:
spark.sql('''
    SELECT 
        customer_city, 
        customer_state, 
        CAST(SUM (price) AS DECIMAL(10,2)) AS total_price,
        CAST(SUM (freight_value) AS DECIMAL(10,2)) AS total_freight
    FROM vw_price_freight 
        GROUP BY 
            customer_city, 
            customer_state 
        ORDER BY 
            total_price DESC
        LIMIT 10;
''').show(truncate=False)


+--------------+--------------+-----------+-------------+
|customer_city |customer_state|total_price|total_freight|
+--------------+--------------+-----------+-------------+
|sao paulo     |SP            |1914924.54 |255302.58    |
|rio de janeiro|RJ            |992538.86  |161695.16    |
|belo horizonte|MG            |355611.13  |61122.26     |
|brasilia      |DF            |301920.25  |50384.89     |
|curitiba      |PR            |211738.06  |33001.81     |
|porto alegre  |RS            |190562.08  |33502.01     |
|campinas      |SP            |187844.53  |24697.17     |
|salvador      |BA            |181104.42  |35667.98     |
|guarulhos     |SP            |144268.39  |19307.43     |
|niteroi       |RJ            |117907.12  |20012.26     |
+--------------+--------------+-----------+-------------+



### 09 - Qual a quantidade de produtos cadastrados por categoria?

In [None]:
spark.sql('''
    SELECT 
        product_category_name,
        COUNT(product_id) AS qty_product
    FROM tb_products
        GROUP BY
            product_category_name
        ORDER BY
            qty_product DESC;
''').show(78, truncate=False)


+----------------------------------------------+-----------+
|product_category_name                         |qty_product|
+----------------------------------------------+-----------+
|cama_mesa_banho                               |3029       |
|esporte_lazer                                 |2867       |
|moveis_decoracao                              |2657       |
|beleza_saude                                  |2444       |
|utilidades_domesticas                         |2335       |
|automotivo                                    |1900       |
|informatica_acessorios                        |1639       |
|brinquedos                                    |1411       |
|relogios_presentes                            |1329       |
|telefonia                                     |1134       |
|bebes                                         |919        |
|perfumaria                                    |868        |
|fashion_bolsas_e_acessorios                   |849        |
|papelaria              

### 10 - Qual a quantidade e distribuição(%) das categorias nos pedidos?

In [None]:
spark.sql('''
    SELECT
        product_category_name,
        COUNT(oi.product_id) AS qty_product,
        CAST(COUNT(oi.product_id) * 100 / SUM(COUNT(oi.product_id)) OVER() AS DECIMAL(10,2)) AS perc_category
    FROM tb_products po
        INNER JOIN tb_order_items oi 
        ON po.product_id = oi.product_id
        GROUP BY
            product_category_name
        ORDER BY
            perc_category DESC;
''').show(78, truncate=False)


+----------------------------------------------+-----------+-------------+
|product_category_name                         |qty_product|perc_category|
+----------------------------------------------+-----------+-------------+
|cama_mesa_banho                               |11115      |9.87         |
|beleza_saude                                  |9670       |8.58         |
|esporte_lazer                                 |8641       |7.67         |
|moveis_decoracao                              |8334       |7.40         |
|informatica_acessorios                        |7827       |6.95         |
|utilidades_domesticas                         |6964       |6.18         |
|relogios_presentes                            |5991       |5.32         |
|telefonia                                     |4545       |4.03         |
|ferramentas_jardim                            |4347       |3.86         |
|automotivo                                    |4235       |3.76         |
|brinquedos              

### 11 - Qual a quantidade de vendas por vendedor?

In [None]:
spark.sql('''
    SELECT
        se.seller_id,
        COUNT(DISTINCT oi.order_id) AS qty_order_by_seller
    FROM tb_sellers se
        INNER JOIN tb_order_items oi 
        ON se.seller_id = oi.seller_id
        GROUP BY 
            se.seller_id
        ORDER BY
            qty_order_by_seller DESC;
''').show(truncate=False)


+--------------------------------+-------------------+
|seller_id                       |qty_order_by_seller|
+--------------------------------+-------------------+
|6560211a19b47992c3666cc44a7e94c0|1854               |
|4a3ca9315b744ce9f8e9374361493884|1806               |
|cc419e0650a3c5ba77189a1882b7556a|1706               |
|1f50f920176fa81dab994f9023523100|1404               |
|da8622b14eb17ae2831f4ac5b9dab84a|1314               |
|955fee9216a65b617aa5c0531780ce60|1287               |
|7a67c85e85bb2ce8582c35f2203ad736|1160               |
|ea8482cd71df3c1969d7b9473ff13abc|1146               |
|4869f7a5dfa277a7dca6462dcf3b52b2|1132               |
|3d871de0142ce09b7081e2b9d1733cb1|1080               |
|7c67e1448b00f6e969d365cea6b010ab|982                |
|8b321bb669392f5163d04c59e235e066|943                |
|1025f0e2d44d7041d6cf58b6550e0bfa|915                |
|620c87c171fb2a6dd6e8bb4dec959fc6|740                |
|a1043bafd471dff536d0c462352beb48|718                |
|cca3071e3

In [None]:
spark.stop()
