In [1]:
pip install pyspark



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [3]:
spark = SparkSession.builder.appName('ecommerce').getOrCreate()

In [4]:
df_customers = spark.read.csv('/content/df_Customers.csv', header=True, inferSchema=True)

In [5]:
df_customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [6]:
df_orders = spark.read.csv('/content/df_Orders.csv', header=True, inferSchema=True)

In [7]:
df_orders.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_timestamp: timestamp (nullable = true)
 |-- order_estimated_delivery_date: date (nullable = true)



In [8]:
df_products = spark.read.csv('/content/df_Products.csv', header=True, inferSchema=True)

In [9]:
df_products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_weight_g: double (nullable = true)
 |-- product_length_cm: double (nullable = true)
 |-- product_height_cm: double (nullable = true)
 |-- product_width_cm: double (nullable = true)



In [10]:
df_payments = spark.read.csv('/content/df_Payments.csv', header=True, inferSchema=True)

In [11]:
df_payments.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [12]:
df_orderitems = spark.read.csv('/content/df_OrderItems.csv', header=True, inferSchema=True)

In [13]:
df_orderitems.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- price: double (nullable = true)
 |-- shipping_charges: double (nullable = true)



# 1 - Qual é o valor total de vendas por estado?

In [14]:
df_valor_vendas_uf = df_orders.join(df_orderitems, df_orders.order_id == df_orderitems.order_id).join(df_customers, df_orders.customer_id == df_customers.customer_id)

df_valor_vendas_uf = df_valor_vendas_uf.groupBy("customer_state").agg((F.sum(df_orderitems.price + df_orderitems.shipping_charges)).alias("total_sales")).orderBy(F.desc("total_sales"))

df_valor_vendas_uf.show()

+--------------+--------------------+
|customer_state|         total_sales|
+--------------+--------------------+
|            SP|1.4586002190000033E7|
|            RJ|   4347299.469999973|
|            MG|  3972702.5099999947|
|            RS|  1884154.7600000007|
|            PR|  1807890.2500000023|
|            SC|  1210286.6000000022|
|            BA|  1204627.9900000007|
|            GO|           768086.19|
|            ES|   695583.5900000005|
|            DF|   641402.1700000003|
|            PE|   613616.6499999991|
|            CE|  466328.37000000034|
|            PA|            343767.5|
|            MT|   321673.9700000002|
|            MA|  243749.88000000012|
|            MS|  240606.81999999995|
|            PB|  201391.18000000005|
|            RN|  154890.03000000003|
|            PI|  140538.71999999997|
|            SE|  119347.12000000008|
+--------------+--------------------+
only showing top 20 rows



# 2 - Quais são os 10 produtos mais vendidos em termos de quantidade?

In [22]:
df_top10 = df_orderitems.join(df_products, df_orderitems.product_id == df_products.product_id).select(df_orderitems["product_id"], df_products["product_category_name"], df_orderitems["order_id"])

df_top10 = df_top10.groupBy("product_id","product_category_name").agg(F.count("order_id").alias("quantity_sold")).orderBy(F.desc("quantity_sold")).limit(10)

df_top10.show()


+------------+---------------------+-------------+
|  product_id|product_category_name|quantity_sold|
+------------+---------------------+-------------+
|0vbEvli2JYJu|                 toys|       164025|
|UgkSjxoiV9Ev|                 toys|       146689|
|9NwzO0Pm0fDM|                 toys|       146689|
|SLTlrWtcYt1m|                 toys|       103041|
|Biwi1BNtUB7l|                 toys|        87025|
|ro08JPncYzLh|         garden_tools|        84100|
|1edaUIVffPFF|        watches_gifts|        65025|
|sCONCvmO1cEY| computers_accesso...|        58564|
|dAz11YkQkoue|        health_beauty|        51529|
|Ffe8gTdmyO3U|                 toys|        43681|
+------------+---------------------+-------------+



# 3 - Qual é o tempo médio entre a data de compra e a data de entrega para cada status de pedido?

In [26]:
# Filtrar as ordens onde order_delivered_timestamp não é nulo
df_tempo_entrega = df_orders.filter(df_orders.order_delivered_timestamp.isNotNull())

# Calcular a média do tempo de entrega e agrupar por order_status
df_tempo_entrega = df_tempo_entrega \
    .groupBy("order_status") \
    .agg(F.avg(F.datediff(df_orders.order_delivered_timestamp, df_orders.order_purchase_timestamp)).alias("avg_delivery_time")) \
    .orderBy(F.asc("avg_delivery_time"))

# Exibir o resultado
df_tempo_entrega.show()


+------------+------------------+
|order_status| avg_delivery_time|
+------------+------------------+
|   delivered|12.372297590995402|
|    canceled|              17.2|
+------------+------------------+



# 4 - Qual foi o método de pagamento mais utilizado para pedidos acima de R$ 1000,00?

In [25]:
# Realizar os joins entre payments, orders e order_items
df_forma_pagto = df_payments \
    .join(df_orders, df_payments.order_id == df_orders.order_id) \
    .join(df_orderitems, df_orders.order_id == df_orderitems.order_id)

# Agrupar por payment_type, contar o número de pedidos e aplicar o filtro do HAVING
df_forma_pagto = df_forma_pagto \
    .groupBy("payment_type") \
    .agg(
        F.count(df_payments.order_id).alias("num_orders"),
        F.sum(df_orderitems.price + df_orderitems.shipping_charges).alias("total_sales")
    ) \
    .filter(F.col("total_sales") > 1000) \
    .orderBy(F.desc("num_orders"))

# Exibir o resultado
df_forma_pagto.show()


+------------+----------+-------------------+
|payment_type|num_orders|        total_sales|
+------------+----------+-------------------+
| credit_card|     65814|2.503643150999996E7|
|      wallet|     17302|  6956812.949999956|
|     voucher|      4911| 1934790.4499999983|
|  debit_card|      1289|  475037.1500000002|
+------------+----------+-------------------+



# 5 - Quantos clientes únicos realizaram pedidos em cada mês do ano?

In [24]:
# Agrupar por ano e mês da data de compra e contar clientes distintos
df_qtd_clientes_uniq = df_orders \
    .groupBy(F.year("order_purchase_timestamp").alias("year"),
             F.month("order_purchase_timestamp").alias("month")) \
    .agg(F.countDistinct("customer_id").alias("unique_customers")) \
    .orderBy("year", "month")

# Exibir o resultado
df_qtd_clientes_uniq.show()


+----+-----+----------------+
|year|month|unique_customers|
+----+-----+----------------+
|2016|    9|               3|
|2016|   10|             289|
|2016|   12|               1|
|2017|    1|             857|
|2017|    2|            1534|
|2017|    3|            2392|
|2017|    4|            2156|
|2017|    5|            3407|
|2017|    6|            2953|
|2017|    7|            3781|
|2017|    8|            4028|
|2017|    9|            3937|
|2017|   10|            4258|
|2017|   11|            6840|
|2017|   12|            4981|
|2018|    1|            6463|
|2018|    2|            6069|
|2018|    3|            6508|
|2018|    4|            6200|
|2018|    5|            6163|
+----+-----+----------------+
only showing top 20 rows

