In [None]:
%pip install --user bokeh pandas-bokeh
%pip install --user matplotlib
%pip install --user seaborn

In [None]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.window import Window
sc = pyspark.SparkContext('local[*]')

In [None]:
from pyspark.sql.functions import *

In [None]:
# import findspark
# findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("QueryHiveTable") \
    .config("spark.sql.catalogImplementation", "hive") \
    .enableHiveSupport() \
    .getOrCreate()

In [None]:
import pandas_bokeh
from bokeh.plotting import *
from bokeh.io import *
from bokeh.palettes import Category10 as colors
from bokeh.models import *

import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
spark.sql("USE obligatorio")

Preguntas

1) Top 10 conjuntos de productos que más se venden juntos.

In [None]:
set_products = spark.sql("""
                            SELECT fo.product_id AS product_1, fo2.product_id AS product_2, COUNT(*) AS pair_count
                            FROM fact_order fo
                            JOIN fact_order fo2 ON fo2.order_id = fo.order_id AND fo2.product_id < fo.product_id
                            GROUP BY fo.product_id, fo2.product_id
                            ORDER BY pair_count DESC
                            LIMIT 10
                        """)

In [None]:
set_products.show(truncate=False)

Mapa de calor

In [None]:
set_products_df = set_products.toPandas()

In [None]:
heatmap_data = set_products_df.pivot_table(index='product_1', columns='product_2', values='pair_count', fill_value=0)

plt.figure(figsize=(10, 6))
sns.heatmap(heatmap_data, annot=True, cmap='YlGnBu', fmt='g', cbar_kws={'label': 'Pair Count'})
plt.title('Top 10 conjuntos de productos que más se venden juntos')
plt.show()

Scatter Plot

In [None]:
set_products_df = set_products.toPandas()

In [None]:
plt.figure(figsize=(10, 6))
sns.scatterplot(data=set_products_df, x='product_1', y='product_2', size='pair_count', hue='pair_count', palette='coolwarm', sizes=(20, 200))
plt.xticks(rotation=90)
plt.title('Top 10 conjuntos de productos que más se venden juntos')
plt.show()

2) Top 5 productos más vendidos por ciudad.

In [None]:
ranked_products = spark.sql("""
                                WITH ranked_products AS (
                                    SELECT dc.customer_city AS city, fo.product_id, COUNT(fo.product_id) AS product_count,
                                        ROW_NUMBER() OVER (PARTITION BY dc.customer_city ORDER BY COUNT(fo.product_id) DESC) AS rank
                                    FROM fact_order fo
                                    JOIN dim_customers dc ON dc.customer_id = fo.customer_id
                                    GROUP BY dc.customer_city, fo.product_id
                                )
                                SELECT city, product_id, product_count, rank
                                FROM ranked_products
                                WHERE rank <= 5
                                ORDER BY city, rank;
                            """)

In [None]:
ranked_products.show(500, truncate=False)

3) ¿Qué días del mes se vende más?

In [None]:
fact_order = spark.table("fact_order")

In [None]:
days_sales = fact_order.select(
    dayofmonth(col("order_purchase_timestamp")).alias("day_of_month"),
    col("total_items")
).groupBy("day_of_month").agg(
    sum("total_items").alias("total_products_sold")
)

In [None]:
most_sold_day = days_sales.filter(col("total_products_sold").isNotNull()) \
                          .orderBy(col("total_products_sold").desc())

In [None]:
most_sold_day.show(31)

Gráfico de barras

In [None]:
most_sold_day_pd = most_sold_day.toPandas()

In [None]:
pandas_bokeh.output_notebook()

most_sold_day_pd.plot(kind='bar', x='day_of_month', y='total_products_sold', title='¿Qué días del mes se vende más?', legend=False)

Diagrama de líneas

In [None]:
sns.set(style="whitegrid")

plt.figure(figsize=(12, 6))
sns.lineplot(
    x="day_of_month",
    y="total_products_sold",
    data=most_sold_day_pd,
    marker="o",
    color="b"
)

plt.xticks(most_sold_day_pd["day_of_month"], rotation=45)
plt.xlabel("Día del mes", fontsize=12)
plt.ylabel("Total ventas", fontsize=12)
plt.title("¿Qué días del mes se vende más?", fontsize=16)

plt.grid(color='gray', linestyle='--', linewidth=0.5)

plt.show()

4) ¿Qué proporción de pedidos recibe cada calificación en las reseñas (de 1 a 5)?

In [None]:
total_reviews = spark.sql("""
                            SELECT COUNT(*) AS total_reviews
                            FROM fact_order fo
                            JOIN dim_reviews dr ON dr.order_id = fo.order_id
                          """).collect()[0][0]

In [None]:
review_counts = spark.sql("""
                            SELECT review_score, COUNT(*) AS review_count
                            FROM fact_order fo
                            JOIN dim_reviews dr ON dr.order_id = fo.order_id
                            GROUP BY review_score
                          """)

In [None]:
review_percentages = review_counts.withColumn("percentage", round((col("review_count") / total_reviews) * 100, 2))

In [None]:
review_percentages.show()

Gráfico circular

In [None]:
review_percentages_pd = review_percentages.toPandas()

In [None]:
pandas_bokeh.output_notebook()

review_percentages_pd.plot(kind="pie", y="percentage", labels=review_percentages_pd["review_score"], legend=False, title="¿Qué proporción de pedidos recibe cada calificación en las reseñas (de 1 a 5)?")

Gráfico de barras

In [None]:
review_scores = review_percentages_pd['review_score']
percentages = review_percentages_pd['percentage']

p = figure(title="¿Qué proporción de pedidos recibe cada calificación en las reseñas (de 1 a 5)?", x_axis_label='Review Score', y_axis_label='Percentage', x_range=[str(x) for x in range(1, 6)])

p.vbar(x=review_scores, top=percentages, width=0.8, color=colors[len(review_scores)])

output_notebook()
show(p)

5) ¿Cuál es el tiempo promedio entre la aprobación de un pedido y la entrega al cliente?

In [None]:
fact_order = spark.table("fact_order")

In [None]:
avg_time = fact_order.agg(
    count("*").alias("total_orders"),
    round(avg("diff_order_delivered_customer_vs_order_approved_at"), 2).alias("average_time (days)")
    )

In [None]:
avg_time.show()