<a href="https://colab.research.google.com/github/Nicolas1545/Estat-stica-com-Python/blob/main/Brazilian_E_Commerce.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install pyspark



In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("MeuProjetoEscalavel") \
    .getOrCreate()


In [5]:
dados_vendas = [
    ("iPhone 15", "Eletrônicos", 5000, "SP"),
    ("Samsung S24", "Eletrônicos", 4500, "RJ"),
    ("Cadeira Gamer", "Móveis", 1200, "SP"),
    ("Teclado Mecânico", "Eletrônicos", 300, "MG"),
    ("Mesa de Escritório", "Móveis", 800, "RJ")
]

colunas = ["Produto", "Categoria", "Preço", "Estado"]

df = spark.createDataFrame(dados_vendas, schema=colunas)

df.show()

df.printSchema()

+------------------+-----------+-----+------+
|           Produto|  Categoria|Preço|Estado|
+------------------+-----------+-----+------+
|         iPhone 15|Eletrônicos| 5000|    SP|
|       Samsung S24|Eletrônicos| 4500|    RJ|
|     Cadeira Gamer|     Móveis| 1200|    SP|
|  Teclado Mecânico|Eletrônicos|  300|    MG|
|Mesa de Escritório|     Móveis|  800|    RJ|
+------------------+-----------+-----+------+

root
 |-- Produto: string (nullable = true)
 |-- Categoria: string (nullable = true)
 |-- Preço: long (nullable = true)
 |-- Estado: string (nullable = true)



In [6]:
from pyspark.sql import functions as F

resultado = df.filter(df.Estado == "SP") \
              .groupBy("Categoria") \
              .agg(F.sum("Preço").alias("Total_Faturado"))
resultado.show()

+-----------+--------------+
|  Categoria|Total_Faturado|
+-----------+--------------+
|Eletrônicos|          5000|
|     Móveis|          1200|
+-----------+--------------+



In [7]:
df = df.withColumnRenamed("Preço", "preco")
df.select("preco").show()

+-----+
|preco|
+-----+
| 5000|
| 4500|
| 1200|
|  300|
|  800|
+-----+



In [10]:
from google.colab import files
uploaded = files.upload()

Saving olist_orders_dataset.csv to olist_orders_dataset.csv
Saving olist_products_dataset.csv to olist_products_dataset.csv
Saving olist_sellers_dataset.csv to olist_sellers_dataset.csv
Saving product_category_name_translation.csv to product_category_name_translation.csv
Saving olist_geolocation_dataset.csv to olist_geolocation_dataset.csv
Saving olist_order_items_dataset.csv to olist_order_items_dataset.csv
Saving olist_order_payments_dataset.csv to olist_order_payments_dataset.csv
Saving olist_order_reviews_dataset.csv to olist_order_reviews_dataset.csv
Saving olist_customers_dataset.csv to olist_customers_dataset.csv


In [11]:
df_orders = spark.read.csv("olist_orders_dataset.csv", header=True, inferSchema=True)
df_items = spark.read.csv("olist_order_items_dataset.csv", header=True, inferSchema=True)

df_orders.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



SINTAXE:
df_esquerda.join(df_direita, "coluna_em_comum", "tipo_de_join")

In [12]:
from pyspark.sql import functions as F

df_completo = df_orders.join(df_items, "order_id", "inner")

df_completo.groupBy("order_status") \
            .agg(F.sum("price").alias("faturamento_total")) \
            .orderBy(F.desc("faturamento_total")) \
            .show()

+------------+--------------------+
|order_status|   faturamento_total|
+------------+--------------------+
|   delivered|1.3221498110006876E7|
|     shipped|  150727.44000000018|
|    canceled|   95235.27000000014|
|    invoiced|   61526.37000000008|
|  processing|    60439.2200000001|
| unavailable|             2007.69|
|    approved|  209.60000000000002|
+------------+--------------------+



In [13]:
df_vendas_detalhado = df_orders.join(df_items, on="order_id", how="inner")

df_vendas_detalhado.select("order_id", "order_status", "product_id", "price").show(5)

+--------------------+------------+--------------------+-----+
|            order_id|order_status|          product_id|price|
+--------------------+------------+--------------------+-----+
|00010242fe8c5a6d1...|   delivered|4244733e06e7ecb49...| 58.9|
|00018f77f2f0320c5...|   delivered|e5f2d52b802189ee6...|239.9|
|000229ec398224ef6...|   delivered|c777355d18b72b67a...|199.0|
|00024acbcdf0a6daa...|   delivered|7634da152a4610f15...|12.99|
|00042b26cf59d7ce6...|   delivered|ac6c3623068f30de0...|199.9|
+--------------------+------------+--------------------+-----+
only showing top 5 rows


In [14]:
df_vendas_detalhado.count()

112650

In [15]:
df_orders.count()

99441

In [16]:
df_products = spark.read.csv("olist_products_dataset.csv", header=True, inferSchema=True)

In [17]:
df_completo = df_orders.join(df_items, on="order_id", how="inner") \
                        .join(df_products, on="product_id", how="inner")

df_completo.select("order_id", "product_category_name", "price").show(5)

+--------------------+---------------------+-----+
|            order_id|product_category_name|price|
+--------------------+---------------------+-----+
|00010242fe8c5a6d1...|           cool_stuff| 58.9|
|00018f77f2f0320c5...|             pet_shop|239.9|
|000229ec398224ef6...|     moveis_decoracao|199.0|
|00024acbcdf0a6daa...|           perfumaria|12.99|
|00042b26cf59d7ce6...|   ferramentas_jardim|199.9|
+--------------------+---------------------+-----+
only showing top 5 rows


In [18]:
top_categorias = df_completo.groupBy("product_category_name") \
    .agg(F.sum("price").alias("faturamento_total")) \
    .orderBy(F.desc("faturamento_total"))

top_categorias.show(5)

+---------------------+------------------+
|product_category_name| faturamento_total|
+---------------------+------------------+
|         beleza_saude| 1258681.340000017|
|   relogios_presentes|1205005.6800000127|
|      cama_mesa_banho|1036988.6800000388|
|        esporte_lazer| 988048.9700000194|
| informatica_acess...| 911954.3200000152|
+---------------------+------------------+
only showing top 5 rows


In [19]:
df_translation = spark.read.csv("product_category_name_translation.csv", header=True, inferSchema=True)

In [20]:
df_final = df_completo.join(df_translation, on="product_category_name", how="left")

df_final.select("order_id", "product_category_name", "product_category_name_english").show(5)

+--------------------+---------------------+-----------------------------+
|            order_id|product_category_name|product_category_name_english|
+--------------------+---------------------+-----------------------------+
|00010242fe8c5a6d1...|           cool_stuff|                   cool_stuff|
|00018f77f2f0320c5...|             pet_shop|                     pet_shop|
|000229ec398224ef6...|     moveis_decoracao|              furniture_decor|
|00024acbcdf0a6daa...|           perfumaria|                    perfumery|
|00042b26cf59d7ce6...|   ferramentas_jardim|                 garden_tools|
+--------------------+---------------------+-----------------------------+
only showing top 5 rows


In [21]:
from pyspark.sql import functions as F

relatorio_final = df_final.groupBy("product_category_name_english") \
    .agg(
        F.count("order_id").alias("quantidade_vendas"),
        F.round(F.sum("price"),2).alias("faturamento_total")
    ) \
    .orderBy(F.desc("faturamento_total"))

relatorio_final.show(10)

+-----------------------------+-----------------+-----------------+
|product_category_name_english|quantidade_vendas|faturamento_total|
+-----------------------------+-----------------+-----------------+
|                health_beauty|             9670|       1258681.34|
|                watches_gifts|             5991|       1205005.68|
|               bed_bath_table|            11115|       1036988.68|
|               sports_leisure|             8641|        988048.97|
|         computers_accesso...|             7827|        911954.32|
|              furniture_decor|             8334|        729762.49|
|                   cool_stuff|             3796|        635290.85|
|                   housewares|             6964|        632248.66|
|                         auto|             4235|        592720.11|
|                 garden_tools|             4347|        485256.46|
+-----------------------------+-----------------+-----------------+
only showing top 10 rows


In [22]:
from google.colab import files
uploaded = files.upload()

Saving olist_customers_dataset.csv to olist_customers_dataset (1).csv
Saving olist_orders_dataset.csv to olist_orders_dataset (1).csv


In [25]:
df_orders = spark.read.csv("olist_orders_dataset.csv", header=True, inferSchema=True)
df_customers = spark.read.csv("olist_customers_dataset.csv", header=True, inferSchema=True)

df_orders.printSchema()
df_customers.printSchema()



root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [26]:
df_rastreio_clientes = df_orders.join(df_customers, on="customer_id", how="inner")

df_rastreio_clientes.select("order_id", "customer_id", "customer_city").show(10)

+--------------------+--------------------+--------------------+
|            order_id|         customer_id|       customer_city|
+--------------------+--------------------+--------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|           sao paulo|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|           barreiras|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|          vianopolis|
|949d5b44dbf5de918...|f88197465ea7920ad...|sao goncalo do am...|
|ad21c59c0840e6cb8...|8ab97904e6daea886...|         santo andre|
|a4591c265e18cb1dc...|503740e9ca751ccdd...|        congonhinhas|
|136cce7faa42fdb2c...|ed0271e0b7da060a3...|          santa rosa|
|6514b8ad8028c9f2c...|9bdf08b4b3b52b552...|           nilopolis|
|76c6e866289321a7c...|f54a9f0e6b351c431...|        faxinalzinho|
|e69bfb5eb88e0ed6a...|31ad1d1b63eb99624...|            sorocaba|
+--------------------+--------------------+--------------------+
only showing top 10 rows


In [29]:
from pyspark.sql import functions as F

df_top_city = df_rastreio_clientes.groupBy("customer_city") \
  .agg(
        F.count("order_id").alias("quantidade_pedidos")
    ) \
    .orderBy(F.desc("quantidade_pedidos"))

df_top_city.show()

+--------------------+------------------+
|       customer_city|quantidade_pedidos|
+--------------------+------------------+
|           sao paulo|             15540|
|      rio de janeiro|              6882|
|      belo horizonte|              2773|
|            brasilia|              2131|
|            curitiba|              1521|
|            campinas|              1444|
|        porto alegre|              1379|
|            salvador|              1245|
|           guarulhos|              1189|
|sao bernardo do c...|               938|
|             niteroi|               849|
|         santo andre|               797|
|              osasco|               746|
|              santos|               713|
|             goiania|               692|
| sao jose dos campos|               691|
|           fortaleza|               654|
|            sorocaba|               633|
|              recife|               613|
|       florianopolis|               570|
+--------------------+------------