In [137]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("session").getOrCreate()
spark.conf.set("spark.sql.adaptive.enabled", "true") #Otimização dinâmicas

# **Convertendo os arquivos de CSV para Parquet**

In [4]:
#Importando os arquivos em CSV com a função nativa do PySpark
olist_customers = spark.read.option("header", True).csv("olist_customers_dataset.csv")
olist_orders = spark.read.option("header", True).csv("olist_orders_dataset.csv")
olist_order_reviews = spark.read.option("header", True).csv("olist_order_reviews_dataset.csv")
olist_order_items = spark.read.option("header", True).csv("olist_order_items_dataset.csv")
olist_products = spark.read.option("header", True).csv("olist_products_dataset.csv")

In [5]:
#Convertendo os dfs para Parquet
olist_customers.write.mode("overwrite").parquet("olist_customers_dataset.parquet")
olist_customers.write.mode("overwrite").parquet("olist_customers_dataset.parquet")
olist_orders.write.mode("overwrite").parquet("olist_orders_dataset.parquet")
olist_order_reviews.write.mode("overwrite").parquet("olist_order_reviews_dataset.parquet")
olist_order_items.write.mode("overwrite").parquet("olist_order_items_dataset.parquet")
olist_products.write.mode("overwrite").parquet("olist_products_dataset.parquet")

In [None]:
#Importando novamente os dfs, mas com a versão em Parquet
olist_customers = spark.read.option("header", True).parquet("olist_customers_dataset.parquet") \
    .select('customer_id', 'customer_unique_id')
olist_orders = spark.read.option("header", True).parquet("olist_orders_dataset.parquet") \
    .select('customer_id', 'order_id', 'order_status', 'order_purchase_timestamp')
olist_order_reviews = spark.read.option("header", True).parquet("olist_order_reviews_dataset.parquet") \
    .select('order_id', 'review_score')
olist_order_items = spark.read.option("header", True).parquet("olist_order_items_dataset.parquet") \
    .select('order_id', 'product_id')
olist_products = spark.read.option("header", True).parquet("olist_products_dataset.parquet")

# **Verificando a volumetria de pedidos e produtos de cada pedido**

In [138]:
#Estrutura base para os cálculos
produtos = olist_order_items.groupBy("order_id").agg(
    F.count("product_id").alias("total"),
    F.countDistinct("product_id").alias("distinto")
)

#Visão total de pedidos
volumetria = produtos.groupBy("total").agg(F.count("*").alias("volumetria_pedidos"))
total_pedidos = volumetria.agg({"volumetria_pedidos":"sum"}).collect()[0][0]

volumetria = volumetria.withColumn(
    "volumetria_pedidos_perc",
    round((F.col("volumetria_pedidos") / F.lit(total_pedidos)) * 100, 2)
)
volumetria.orderBy("total").show(5)

#Visão total distintos
volumetria_distintos = produtos.groupBy("distinto").agg(F.count("*").alias("volumetria_distintos"))
total_distintos = volumetria_distintos.agg({"volumetria_distintos":"sum"}).collect()[0][0]

volumetria_distintos = volumetria_distintos.withColumn(
    "volumetria_distintos_perc",
    round((F.col("volumetria_distintos") / F.lit(total_distintos)) *100, 2)
)
volumetria_distintos.orderBy("distinto").show(5)

+-----+------------------+-----------------------+
|total|volumetria_pedidos|volumetria_pedidos_perc|
+-----+------------------+-----------------------+
|    1|             88863|                  90.06|
|    2|              7516|                   7.62|
|    3|              1322|                   1.34|
|    4|               505|                   0.51|
|    5|               204|                   0.21|
+-----+------------------+-----------------------+
only showing top 5 rows

+--------+--------------------+-------------------------+
|distinto|volumetria_distintos|volumetria_distintos_perc|
+--------+--------------------+-------------------------+
|       1|               95430|                    96.72|
|       2|                2846|                     2.88|
|       3|                 298|                      0.3|
|       4|                  70|                     0.07|
|       5|                   8|                     0.01|
+--------+--------------------+---------------------

**Versão em SQL**

In [132]:
#Query base para as consultas
olist_order_items.createOrReplaceTempView("order_items")

spark.sql("""
    CREATE OR REPLACE TEMP VIEW produtos AS
    SELECT 
        order_id,
        COUNT(product_id) AS total,
        COUNT(DISTINCT product_id) AS distinto
    FROM order_items
    GROUP BY order_id
""")

#Visão total de pedidos
volumetria = spark.sql("""
    SELECT 
        total,
        COUNT(*) AS volumetria_pedidos,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) AS volumetria_pedidos_perc
    FROM produtos
    GROUP BY total
    ORDER BY total
""")
volumetria.show(5, truncate=False)

#Visão total distintos
volumetria_distintos = spark.sql("""
    SELECT
        distinto,
        COUNT(*) AS volumetria_distinto,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) AS volumetria_distintos_perc
    FROM produtos
    GROUP BY distinto
    ORDER BY distinto
""")
volumetria_distintos.show(5, truncate=False)

+-----+------------------+-----------------------+
|total|volumetria_pedidos|volumetria_pedidos_perc|
+-----+------------------+-----------------------+
|1    |88863             |90.06                  |
|2    |7516              |7.62                   |
|3    |1322              |1.34                   |
|4    |505               |0.51                   |
|5    |204               |0.21                   |
+-----+------------------+-----------------------+
only showing top 5 rows

+--------+-------------------+-------------------------+
|distinto|volumetria_distinto|volumetria_distintos_perc|
+--------+-------------------+-------------------------+
|1       |95430              |96.72                    |
|2       |2846               |2.88                     |
|3       |298                |0.30                     |
|4       |70                 |0.07                     |
|5       |8                  |0.01                     |
+--------+-------------------+-------------------------+
onl

In [None]:
#Query base para as consultas
olist_order_items.createOrReplaceTempView("order_items")

spark.sql("""
    CREATE OR REPLACE TEMP VIEW produtos AS
    SELECT 
        order_id,
        COUNT(product_id) AS total,
        COUNT(DISTINCT product_id) AS distinto
    FROM order_items
    GROUP BY order_id
""")

#Visão total de pedidos
volumetria = spark.sql("""
    SELECT 
        total,
        COUNT(*) AS volumetria_pedidos,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) AS volumetria_pedidos_perc
    FROM produtos
    GROUP BY total
    ORDER BY total
""")
volumetria.show(5, truncate=False)

#Visão total distintos
volumetria_distintos = spark.sql("""
    SELECT
        distinto,
        COUNT(*) AS volumetria_distinto,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) AS volumetria_distintos_perc
    FROM produtos
    GROUP BY distinto
    ORDER BY distinto
""")
volumetria_distintos.show(5, truncate=False)

+-----+------------------+-----------------------+
|total|volumetria_pedidos|volumetria_pedidos_perc|
+-----+------------------+-----------------------+
|1    |88863             |90.06                  |
|2    |7516              |7.62                   |
|3    |1322              |1.34                   |
|4    |505               |0.51                   |
|5    |204               |0.21                   |
+-----+------------------+-----------------------+
only showing top 5 rows

+--------+-------------------+-------------------------+
|distinto|volumetria_distinto|volumetria_distintos_perc|
+--------+-------------------+-------------------------+
|1       |95430              |96.72                    |
|2       |2846               |2.88                     |
|3       |298                |0.30                     |
|4       |70                 |0.07                     |
|5       |8                  |0.01                     |
+--------+-------------------+-------------------------+
onl

In [None]:
lista_de_pedidos_com_unico_produto = produtos.filter(F.col("distinto") == 1).select("order_id")

# **Verificando a volumetria de notas para cada pedido**

In [None]:
#Opção para um dataset pequeno
lista_ids = [row['order_id'] for row in lista_de_pedidos_com_unico_produto.collect()]
df_produtos = olist_orders.filter(F.col("order_id").isin(lista_ids))

In [52]:
#Opção para um dataset grande
df_produtos = olist_orders.join(lista_de_pedidos_com_unico_produto, on="order_id", how="inner")
df_produtos.count(), olist_orders.count()

(95430, 99441)

In [None]:
df_produtos_entregues = df_produtos.filter(F.col("order_status") == "delivered").select("customer_id", "order_id")
df_produtos_entregues.count(), df_produtos.count(), olist_orders.count()

(93281, 95430, 99441)

In [54]:
df_produtos_avaliacoes = df_produtos_entregues.join(olist_customers, on="customer_id", how="inner") \
                                              .join(olist_order_reviews, on="order_id", how="left")
df_produtos_avaliacoes = df_produtos_avaliacoes.drop("customer_id")
df_produtos_avaliacoes.show(5)

+--------------------+--------------------+------------+
|            order_id|  customer_unique_id|review_score|
+--------------------+--------------------+------------+
|98c78522be2bccf4c...|62aad23fbe0be06a5...|           5|
|bec5a824282dde8cd...|5570e312bd641ceba...|           5|
|c5f30bdd01bc931c0...|c09049ee8be260854...|           5|
|8dede9c6014be218c...|b08fab27d47a1eb6d...|           5|
|b23f8178f3b6555a7...|86c7ca0abbc14188b...|           5|
+--------------------+--------------------+------------+
only showing top 5 rows



In [None]:
#Forma simples de verificar vazios
df_produtos_avaliacoes.select([
    F.count(F.hen(F.col(c).isNull(), c)).alias(c + '_nulos') 
    for c in df_produtos_avaliacoes.columns
]).show()


+--------------+------------------------+------------------+
|order_id_nulos|customer_unique_id_nulos|review_score_nulos|
+--------------+------------------------+------------------+
|             0|                       0|                 0|
+--------------+------------------------+------------------+



In [None]:
#Forma mais complexa de verifica vazios
df_produtos_avaliacoes_nulos = df_produtos_avaliacoes

for c in df_produtos_avaliacoes.columns:
    df_produtos_avaliacoes_nulos = df_produtos_avaliacoes_nulos.withColumn(
        f"{c}_isnull", F.ol(c).isNull()
    )

isnull_cols = [f"{c}_isnull" for c in df_produtos_avaliacoes.columns]

df_produtos_avaliacoes_nulos.groupBy(isnull_cols).count().show(truncate=False)

+---------------+-------------------------+-------------------+-----+
|order_id_isnull|customer_unique_id_isnull|review_score_isnull|count|
+---------------+-------------------------+-------------------+-----+
|false          |false                    |false              |93163|
+---------------+-------------------------+-------------------+-----+



In [61]:
df_produtos_avaliacoes = df_produtos_avaliacoes.dropna(subset=["review_score"], how="any")

In [64]:
colunas = {
    'customer_unique_id':'user_id',
    'order_id':'product_id',
    'review_score':'rating'
}

for antigo, novo in colunas.items():
    df_produtos_avaliacoes = df_produtos_avaliacoes \
                                .withColumnRenamed(antigo, novo)

In [None]:
avaliacoes_por_user_id = df_produtos_avaliacoes.groupBy("user_id").agg(F.count("*").alias("qtd_avaliacoes"))
usuarios_por_qtd_avaliacoes = avaliacoes_por_user_id.groupBy("qtd_avaliacoes").agg(F.count("*").alias("qtd_usuarios"))
window = Window.partitionBy()
usuarios_por_qtd_avaliacoes = usuarios_por_qtd_avaliacoes.withColumn(
    "qtd_usuarios_perct", round((F.col("qtd_usuarios") / sum("qtd_usuarios").over(window)) *100, 2))
usuarios_por_qtd_avaliacoes.orderBy(F.col("qtd_avaliacoes")).show()

+--------------+------------+------------------+
|qtd_avaliacoes|qtd_usuarios|qtd_usuarios_perct|
+--------------+------------+------------------+
|             1|       87323|             97.11|
|             2|        2154|               2.4|
|             3|         310|              0.34|
|             4|         101|              0.11|
|             5|          22|              0.02|
|             6|          11|              0.01|
|             7|           1|               0.0|
|            15|           1|               0.0|
+--------------+------------+------------------+



In [None]:
window = Window.partitionBy()
usuarios = df_produtos_avaliacoes.groupBy("user_id").agg(F.count("product_id").alias("pedidos"),
                                                         F.count("rating").alias("avaliacoes"),
                                                         round(F.mean("rating"), 0).alias("avaliacoes_avg"))

usuarios_df = usuarios.groupBy("pedidos").agg(F.count("*").alias("volumetria_usuarios"))
usuarios_df = usuarios_df.withColumn(
    "volumetria_usuarios_perc", round((F.col("volumetria_usuarios") / F.sum("volumetria_usuarios").over(window)) *100, 2))
usuarios_df.orderBy("pedidos").show()

usuarios_df = usuarios.groupBy("avaliacoes_avg").agg(F.count("*").alias("volumetria_usuarios"))
usuarios_df = usuarios_df.withColumn(
    "volumetria_usuarios_perc", round(F.col("volumetria_usuarios") / F.sum("volumetria_usuarios").over(window) *100, 2))
usuarios_df.orderBy("avaliacoes_avg").show()


+-------+-------------------+------------------------+
|pedidos|volumetria_usuarios|volumetria_usuarios_perc|
+-------+-------------------+------------------------+
|      1|              87323|                   97.11|
|      2|               2154|                     2.4|
|      3|                310|                    0.34|
|      4|                101|                    0.11|
|      5|                 22|                    0.02|
|      6|                 11|                    0.01|
|      7|                  1|                     0.0|
|     15|                  1|                     0.0|
+-------+-------------------+------------------------+

+--------------+-------------------+------------------------+
|avaliacoes_avg|volumetria_usuarios|volumetria_usuarios_perc|
+--------------+-------------------+------------------------+
|           1.0|               8155|                    9.07|
|           2.0|               2640|                    2.94|
|           3.0|             

**Versão SQL**

In [None]:
#Query base para as consultas
df_produtos_avaliacoes.createOrReplaceTempView("produtos_avaliacoes")

spark.sql("""
    CREATE OR REPLACE TEMP VIEW avaliacoes_por_user_id AS
    SELECT user_id,
        COUNT(*) AS qtd_avaliacoes
    FROM produtos_avaliacoes
    GROUP BY user_id
""")

resultado = spark.sql("""
    SELECT qtd_avaliacoes,
        COUNT(*) AS qtd_usuarios,
        ROUND(COUNT(*) *100 / SUM(COUNT(*)) OVER(), 2) AS qtd_usuarios_perct
    FROM avaliacoes_por_user_id
    GROUP BY qtd_avaliacoes
    ORDER BY qtd_avaliacoes
""")

resultado.show()

+--------------+------------+------------------+
|qtd_avaliacoes|qtd_usuarios|qtd_usuarios_perct|
+--------------+------------+------------------+
|             1|       87323|             97.11|
|             2|        2154|               2.4|
|             3|         310|              0.34|
|             4|         101|              0.11|
|             5|          22|              0.02|
|             6|          11|              0.01|
|             7|           1|               0.0|
|            15|           1|               0.0|
+--------------+------------+------------------+



In [152]:
#Query base para as consultas
df_produtos_avaliacoes.createOrReplaceTempView("usuarios")

spark.sql("""
    CREATE OR REPLACE TEMP VIEW usuarios_df AS
    SELECT user_id,
          COUNT(product_id) AS pedidos,
          COUNT(rating) AS avaliacoes,
          ROUND(MEAN(rating), 0) AS avaliacoes_avg
    FROM usuarios
    GROUP BY user_id
""")

resultado = spark.sql("""
    SELECT pedidos,
          COUNT(*) AS volumetria_usuarios,
          ROUND(COUNT(*) * 100 / SUM(COUNT(*)) OVER(), 2) AS volumetria_usuarios_perc
    FROM usuarios_df
    GROUP BY pedidos
    ORDER BY pedidos
""")
resultado.show()

resultado = spark.sql("""
    SELECT avaliacoes_avg,
          COUNT(*) AS volumetria_usuarios,
          ROUND(COUNT(*) * 100 / SUM(COUNT(*)) OVER(), 2) AS volumetria_usuarios_perc
    FROM usuarios_df
    GROUP BY avaliacoes_avg
    ORDER BY avaliacoes_avg
""")
resultado.show()

+-------+-------------------+------------------------+
|pedidos|volumetria_usuarios|volumetria_usuarios_perc|
+-------+-------------------+------------------------+
|      1|              87323|                   97.11|
|      2|               2154|                     2.4|
|      3|                310|                    0.34|
|      4|                101|                    0.11|
|      5|                 22|                    0.02|
|      6|                 11|                    0.01|
|      7|                  1|                     0.0|
|     15|                  1|                     0.0|
+-------+-------------------+------------------------+

+--------------+-------------------+------------------------+
|avaliacoes_avg|volumetria_usuarios|volumetria_usuarios_perc|
+--------------+-------------------+------------------------+
|           1.0|               8155|                    9.07|
|           2.0|               2640|                    2.94|
|           3.0|             

In [None]:
user_window = Window.partitionBy("user_id")

df_produtos_avaliacoes = df_produtos_avaliacoes.withColumn(
    "user_mean", F.avg("rating").over(user_window))
df_produtos_avaliacoes = df_produtos_avaliacoes.withColumn(
    "user_reviews_count", F.count("rating").over(user_window))
df_produtos_avaliacoes = df_produtos_avaliacoes.withColumn(
    "normalized_rating", F.col("rating") - F.col("user_mean"))