# 1. Carga de Librerias

In [1]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.window import Window

# 2. Opciones y Parametros Globales

In [2]:
spark = SparkSession.builder.appName('WS_DE').getOrCreate()

# 3. Carga de bases de datos

In [3]:
#Schema para base de datos sales_df
schema_sales = StructType([
    StructField('product_id', StringType(), True),
    StructField('store_id', StringType(), True),
    StructField('date', StringType(), True),
    StructField('sales', DoubleType(), True),
    StructField('revenue', DoubleType(), True),
    StructField('stock', DoubleType(), True),
    StructField('price', DoubleType(), True),
    StructField('promo_type_1', StringType(), True),
    StructField('promo_bin_1', StringType(), True),
    StructField('promo_type_2', StringType(), True),
    StructField('promo_bin_2', StringType(), True),
    StructField('promo_discount_2', StringType(), True),
    StructField('promo_discount_type_2', StringType(), True)
])
#Schema para base de datos prod_df
schema_prod = StructType([
    StructField('product_id', StringType(), True),
    StructField('product_length', DoubleType(), True),
    StructField('product_depth', DoubleType(), True),
    StructField('product_width', DoubleType(), True),
    StructField('cluster_id', StringType(), True),
    StructField('hierarchy1_id', StringType(), True),
    StructField('hierarchy2_id', StringType(), True),
    StructField('hierarchy3_id', StringType(), True),
    StructField('hierarchy4_id', StringType(), True),
    StructField('hierarchy5_id', StringType(), True),
])
#Schema para base de datos stores_df
schema_stores = StructType([
    StructField('store_id', StringType(), True),
    StructField('storetype_id', StringType(), True),
    StructField('store_size', IntegerType(), True),
    StructField('city_id', StringType(), True)
])


In [4]:
sales_df = spark.read.csv(r'C:\Users\germa\Desktop\Sumz\WS_DE\WS_SumzDE\sales.csv\sales.csv', sep=',', header=True, schema=schema_sales)
prod_df = spark.read.csv(r'C:\Users\germa\Desktop\Sumz\WS_DE\WS_SumzDE\product_hierarchy.csv', sep=',', header=True, schema=schema_prod)
stores_df = spark.read.csv(r'C:\Users\germa\Desktop\Sumz\WS_DE\WS_SumzDE\store_cities.csv', sep=',', header = True, schema=schema_stores)


In [5]:
sales_df.createOrReplaceTempView("sales")
prod_df.createOrReplaceTempView("prods")
stores_df.createOrReplaceTempView("stores")

# 4. EDA bases de datos

In [6]:
query = "SELECT * FROM sales LIMIT 10"
spark.sql(query).show()

+----------+--------+----------+-----+-------+-----+-----+------------+-----------+------------+-----------+----------------+---------------------+
|product_id|store_id|      date|sales|revenue|stock|price|promo_type_1|promo_bin_1|promo_type_2|promo_bin_2|promo_discount_2|promo_discount_type_2|
+----------+--------+----------+-----+-------+-----+-----+------------+-----------+------------+-----------+----------------+---------------------+
|     P0001|   S0002|2017-01-02|  0.0|    0.0|  8.0| 6.25|        PR14|       null|        PR03|       null|            null|                 null|
|     P0001|   S0012|2017-01-02|  1.0|    5.3|  0.0| 6.25|        PR14|       null|        PR03|       null|            null|                 null|
|     P0001|   S0013|2017-01-02|  2.0|  10.59|  0.0| 6.25|        PR14|       null|        PR03|       null|            null|                 null|
|     P0001|   S0023|2017-01-02|  0.0|    0.0|  6.0| 6.25|        PR14|       null|        PR03|       null|    

In [7]:
query = "SELECT * FROM prods LIMIT 10"
spark.sql(query).show()

+----------+--------------+-------------+-------------+----------+-------------+-------------+-------------+-------------+-------------+
|product_id|product_length|product_depth|product_width|cluster_id|hierarchy1_id|hierarchy2_id|hierarchy3_id|hierarchy4_id|hierarchy5_id|
+----------+--------------+-------------+-------------+----------+-------------+-------------+-------------+-------------+-------------+
|     P0000|           5.0|         20.0|         12.0|      null|          H00|        H0004|      H000401|    H00040105|  H0004010534|
|     P0001|          13.5|         22.0|         20.0| cluster_5|          H01|        H0105|      H010501|    H01050100|  H0105010006|
|     P0002|          22.0|         40.0|         22.0| cluster_0|          H03|        H0315|      H031508|    H03150800|  H0315080028|
|     P0004|           2.0|         13.0|          4.0| cluster_3|          H03|        H0314|      H031405|    H03140500|  H0314050003|
|     P0005|          16.0|         30.0|

In [8]:
query = "SELECT * FROM stores LIMIT 10"
spark.sql(query).show()

+--------+------------+----------+-------+
|store_id|storetype_id|store_size|city_id|
+--------+------------+----------+-------+
|   S0091|        ST04|        19|   C013|
|   S0012|        ST04|        28|   C005|
|   S0045|        ST04|        17|   C008|
|   S0032|        ST03|        14|   C019|
|   S0027|        ST04|        24|   C022|
|   S0088|        ST04|        20|   C009|
|   S0095|        ST02|        44|   C014|
|   S0055|        ST04|        24|   C014|
|   S0099|        ST03|        14|   C014|
|   S0078|        ST04|        19|   C036|
+--------+------------+----------+-------+



# 5. Preguntas

1. Quisiera saber cuáles fueron los productos más populares cada mes durante el año 2018. ¿Me puedes mostrar esa información?

In [12]:
query_q1 = """WITH RankedSales AS (
    SELECT 
        month,
        product_id,
        units_sold,
        ammount_sold,

        -- Rank by units sold within each month
        RANK() OVER (PARTITION BY month ORDER BY units_sold DESC) AS rank_units_sold

    FROM (
        -- Seleccion de año 2018
        SELECT 
            MONTH(date) AS month,
            product_id,
            SUM(sales) AS units_sold,
            SUM(revenue) AS ammount_sold
        FROM sales
        WHERE YEAR(date) = 2018
        GROUP BY MONTH(date), product_id
    ) AS AggregatedSales
)

SELECT
    month,
    product_id,
    units_sold
FROM RankedSales
WHERE rank_units_sold <= 5;
"""
spark.sql(query_q1).show()

+-----+----------+----------+
|month|product_id|units_sold|
+-----+----------+----------+
|    1|     P0438|   21326.0|
|    1|     P0103|   19046.0|
|    1|     P0364|    7984.0|
|    1|     P0051|    7754.0|
|    1|     P0590|    5692.0|
|    2|     P0438|   21485.0|
|    2|     P0103|   16474.0|
|    2|     P0051|    7555.0|
|    2|     P0388|    6212.0|
|    2|     P0590|    5952.0|
|    3|     P0438|   23154.0|
|    3|     P0103|   17923.0|
|    3|     P0388|   11840.0|
|    3|     P0364|   10505.0|
|    3|     P0051|    9635.0|
|    4|     P0438|   24392.0|
|    4|     P0103|   19166.0|
|    4|     P0388|   10965.0|
|    4|     P0051|    7448.0|
|    4|     P0131|    6788.0|
+-----+----------+----------+
only showing top 20 rows



2. Necesito saber en cuántas tiendas se implementaron promociones del tipo 1 en el canal 1 durante el año 2019. ¿Puedes darme ese dato?</li>

In [14]:
query_q2 = """SELECT COUNT(DISTINCT store_id) AS num_stores
FROM sales
WHERE 
    YEAR(date) = 2019 AND 
    promo_bin_1 IS NOT NULL AND 
    promo_type_1 IS NOT NULL;
"""
spark.sql(query_q2).show()

+----------+
|num_stores|
+----------+
|       144|
+----------+



3. Me gustaría saber cuál fue la ciudad que tuvo el mayor volumen de ventas en general durante todo el periodo de 2017 a 2019. ¿Tienes esa información disponible?

In [17]:
query_q3 = """ 
WITH VentasTiendas AS (
    SELECT 
        s.store_id,
        SUM(s.sales) AS total_units_sold,
        st.city_id
    FROM sales s
    LEFT JOIN stores st ON s.store_id = st.store_id
    WHERE YEAR(s.date) IN (2017, 2018, 2019)
    GROUP BY st.city_id, s.store_id
)

SELECT 
    city_id, 
    SUM(total_units_sold) AS unidades_vendidas
FROM VentasTiendas
GROUP BY city_id
ORDER BY unidades_vendidas DESC
LIMIT 5;
"""
spark.sql(query_q3).show()

+-------+------------------+
|city_id| unidades_vendidas|
+-------+------------------+
|   C014| 2573439.551999999|
|   C022| 906317.9269999999|
|   C031|        803258.172|
|   C036|375269.86100000003|
|   C024|        286316.087|
+-------+------------------+



4. ¿Podrías decirme cuál fue el promedio de stock disponible en todas las tiendas durante el año 2017? Estoy interesado en conocer ese dato.

In [13]:
#RESPUESTA
sales_df.filter(F.col('year') == 2017).select(F.round(F.mean('stock')).alias('stock_promedio_2017')).show()

+-------------------+
|stock_promedio_2017|
+-------------------+
|               17.0|
+-------------------+



5. Estoy buscando saber cuántas veces se aplicaron descuentos en el canal 2 durante el mes de diciembre de 2018. ¿Puedes ayudarme con esa información?

In [14]:
#RESPUESTA
sales_df.filter((F.col('year') == 2018) &
                (F.col('month') == 12) &
                (F.col('promo_bin_2').isNotNull())).count()

0

6. ¿Cuál fue el producto con el precio de venta más alto en cada tienda durante el tercer trimestre de 2019? Me gustaría conocer esos detalles.

In [27]:
#RESPUESTA
stores_window = Window.partitionBy('store_id')
prods_price = sales_df.filter((F.col('date') >= '2019-07-01') &
                              (F.col('date') <= '2019-09-30'))
prods_price = prods_price.withColumn('rank_price', F.rank().over(stores_window.orderBy(F.col('price').desc())))
prods_price.filter(F.col('rank_price') == 1).orderBy('store_id').select('store_id', 'product_id', 'price', 'rank_price').distinct().orderBy('store_id').show(20)

+--------+----------+------+----------+
|store_id|product_id| price|rank_price|
+--------+----------+------+----------+
|   S0001|     P0498| 299.9|         1|
|   S0002|     P0708|1599.0|         1|
|   S0003|     P0498| 299.9|         1|
|   S0004|     P0498| 299.9|         1|
|   S0005|     P0498| 299.9|         1|
|   S0006|     P0680|139.95|         1|
|   S0007|     P0391| 179.9|         1|
|   S0008|     P0498| 299.9|         1|
|   S0009|     P0680|139.95|         1|
|   S0010|     P0680|139.95|         1|
|   S0011|     P0391| 179.9|         1|
|   S0012|     P0708|1349.0|         1|
|   S0013|     P0708|1499.0|         1|
|   S0014|     P0391| 179.9|         1|
|   S0015|     P0391| 179.9|         1|
|   S0016|     P0498| 299.9|         1|
|   S0017|     P0517|199.95|         1|
|   S0018|     P0391| 179.9|         1|
|   S0019|     P0680|139.95|         1|
|   S0020|     P0474| 499.0|         1|
+--------+----------+------+----------+
only showing top 20 rows



7. Estoy interesado en saber cuántas tiendas experimentaron un aumento de ventas del 20% o más en comparación con el año anterior. ¿Tienes esos datos disponibles?

In [29]:
sales_hist = sales_df.groupBy('store_id', 'year').agg(F.round(F.sum('revenue')).alias('total_sales'))
sales_hist = sales_hist.withColumn('previous_year_sales', F.lag('total_sales').over(stores_window.orderBy('year')))
sales_hist = sales_hist.filter(F.col('previous_year_sales').isNotNull())
sales_hist = sales_hist.withColumn('change_sales_previous_year', F.round((F.col('total_sales') - F.col('previous_year_sales'))/F.col('previous_year_sales'), 4))

In [30]:
sales_hist.show(5)

+--------+----+-----------+-------------------+--------------------------+
|store_id|year|total_sales|previous_year_sales|change_sales_previous_year|
+--------+----+-----------+-------------------+--------------------------+
|   S0001|2018|   272539.0|           209593.0|                    0.3003|
|   S0001|2019|   364910.0|           272539.0|                    0.3389|
|   S0002|2018|   175935.0|           125121.0|                    0.4061|
|   S0002|2019|   193065.0|           175935.0|                    0.0974|
|   S0003|2018|    45477.0|            32672.0|                    0.3919|
+--------+----+-----------+-------------------+--------------------------+
only showing top 5 rows



In [31]:
#RESPUESTA
v1 = sales_hist.filter((F.col('year') == 2019) &
                  (F.col('change_sales_previous_year') >= 0.2)).select('store_id').count()
v2 = sales_hist.filter((F.col('change_sales_previous_year') >= 0.2)).select('store_id').distinct().count()

print(f"Numero de tiendas que aumentaron sus ventas entre el 2018 al 2019 en al menos un 20%: {v1} \nNumero de tiendas que aumentaron sus ventas en comparasion con el año anterior en al menos un 20%: {v2}")

Numero de tiendas que aumentaron sus ventas entre el 2018 al 2019 en al menos un 20%: 48 
Numero de tiendas que aumentaron sus ventas en comparasion con el año anterior en al menos un 20%: 125


8. Quisiera saber cuál fue el producto que experimentó la mayor variación de precio a lo largo del periodo de 2017 a 2019. ¿Puedes proporcionarme esa información?

In [19]:
price_hist = sales_df.groupBy('product_id', 'year').agg(F.round(F.mean('price'), 2).alias('avg_price'))
product_window = Window.partitionBy('product_id')
price_hist = price_hist.withColumn('initial_price', F.lag(F.col('avg_price'), 2).over(product_window.orderBy('year')))

In [20]:
price_hist = price_hist.withColumn('price_variation', F.abs((F.col('initial_price') - F.col('avg_price'))/F.col('initial_price')))
price_hist = price_hist.withColumn('price_variation', F.round(F.col('price_variation'), 2))
price_hist = price_hist.withColumnRenamed('avg_price', 'current_price')

In [21]:
price_hist.orderBy('product_id', 'year').show(10)

+----------+----+-------------+-------------+---------------+
|product_id|year|current_price|initial_price|price_variation|
+----------+----+-------------+-------------+---------------+
|     P0001|2017|          6.9|         null|           null|
|     P0001|2018|          8.0|         null|           null|
|     P0001|2019|        10.95|          6.9|           0.59|
|     P0002|2017|       317.91|         null|           null|
|     P0002|2018|       270.64|         null|           null|
|     P0002|2019|       273.31|       317.91|           0.14|
|     P0004|2017|          4.5|         null|           null|
|     P0004|2018|         4.77|         null|           null|
|     P0004|2019|          4.0|          4.5|           0.11|
|     P0005|2017|         33.9|         null|           null|
+----------+----+-------------+-------------+---------------+
only showing top 10 rows



In [22]:
#RESPUESTA
#Top 10 de productos con mayor variacion de precio entre el 2017 y 2019
price_hist.filter(F.col('price_variation').isNotNull()).orderBy('price_variation', ascending = False).show(10)

+----------+----+-------------+-------------+---------------+
|product_id|year|current_price|initial_price|price_variation|
+----------+----+-------------+-------------+---------------+
|     P0089|2019|        19.59|         5.37|           2.65|
|     P0061|2019|        24.25|         7.65|           2.17|
|     P0671|2019|         8.25|         3.25|           1.54|
|     P0147|2019|        12.73|         5.07|           1.51|
|     P0183|2019|        15.98|         6.42|           1.49|
|     P0282|2019|         4.65|          1.9|           1.45|
|     P0628|2019|        11.33|         4.75|           1.39|
|     P0440|2019|        35.51|        14.99|           1.37|
|     P0566|2019|         2.72|         1.15|           1.37|
|     P0214|2019|         9.83|         4.37|           1.25|
+----------+----+-------------+-------------+---------------+
only showing top 10 rows



9. ¿Sabes en qué mes y año se registró la mayor cantidad de ingresos generados en todas las tiendas? Estoy interesado en conocer ese dato importante.

In [23]:
#RESPUESTA
sales_df = sales_df.withColumn("year_month", F.concat_ws("-", F.year("date"), F.month("date")))
sales_df.groupBy('year_month').agg(F.round((F.sum('revenue'))).alias('total_sales')).orderBy('total_sales', ascending = False).show(10)

+----------+-----------+
|year_month|total_sales|
+----------+-----------+
|    2019-8|  2619133.0|
|    2019-6|  1534773.0|
|    2019-7|  1533953.0|
|    2019-9|  1520783.0|
|   2019-10|  1495227.0|
|    2018-5|  1372225.0|
|   2018-12|  1308908.0|
|    2019-5|  1284677.0|
|    2019-3|  1278992.0|
|   2018-10|  1263368.0|
+----------+-----------+
only showing top 10 rows



10. Me gustaría saber cuál fue el tamaño promedio de las tiendas en cada ciudad durante el año 2018. ¿Puedes darme esa información?

In [28]:
#RESPUESTA
stores_2018 = sales_df.filter(F.col('year') == 2018).select('store_id').distinct()
stores_2018 = stores_2018.join(stores_df, on = 'store_id', how = 'left')
stores_2018.groupBy('city_id').agg(F.round(F.mean('store_size')).alias('avg_store_size')).orderBy('avg_store_size', ascending = False).show(10)

+-------+--------------+
|city_id|avg_store_size|
+-------+--------------+
|   C004|          63.0|
|   C002|          47.0|
|   C036|          43.0|
|   C015|          42.0|
|   C007|          39.0|
|   C027|          39.0|
|   C033|          35.0|
|   C016|          35.0|
|   C017|          34.0|
|   C025|          31.0|
+-------+--------------+
only showing top 10 rows

