- Carga el CSV en Spark.

In [None]:
from pyspark.sql import SparkSession

# Create or get a Spark session
spark = SparkSession.builder.appName("Read Compressed CSV from S3").getOrCreate()

# Define the path to your S3 bucket and compressed CSV files
s3_bucket_path = "s3://mdge-e3-2024/*.csv.gz"

# Read the compressed CSV files into a DataFrame
df = spark.read.csv(s3_bucket_path, header=True, inferSchema=True)

In [5]:
# Add "year" and "month" columns based on "fecha_registro"
from pyspark.sql import functions as F
df = df.withColumn("year", F.year("fecha_registro"))
df = df.withColumn("month", F.month("fecha_registro"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Guarda el CSV como parquet en S3, particionalo por `catalogo`. (Utiliza todos los trucos que consideres).

In [7]:
# Define output directory
output_directory = "s3://mdge-e3-2024/parquet_files_partitioned/"

# Write the DataFrame to Parquet, partitioned by 'categoria', 'year', and 'month'
df.write.partitionBy("categoria", "year", "month").mode('overwrite').parquet(output_directory)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Carga el parquet en Spark.

In [8]:
# Read the Parquet files into a DataFrame
df_parquet = spark.read.parquet(output_directory)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Contesta las siguientes preguntas utilizando PySpark. Realiza el siguiente análisis **(por año)** y sobre todos los catálogos.

- ¿Cuántos catálogos diferentes tenemos?

In [9]:
from pyspark.sql.functions import countDistinct

# Count distinct catalogs
distinct_catalogs = df_parquet.agg(countDistinct("catalogo").alias("distinct_catalogs"))
distinct_catalogs.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+
|distinct_catalogs|
+-----------------+
|               12|
+-----------------+

- ¿Cuáles son los20 catálogos con más observaciones? Guarda la salida de este query en tu bucket de S3, lo necesitaremos más adelante.

In [11]:
from pyspark.sql.functions import desc

# Find top 20 catalogs by number of observations
top_20_catalogs = df_parquet.groupBy("catalogo").count().orderBy(desc("count")).limit(20)
top_20_catalogs.show()

# Save the output to S3
top_20_catalogs.write.mode('overwrite').parquet("s3://mdge-e3-2024/top_20_catalogs/")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------+
|           catalogo|   count|
+-------------------+--------+
|            basicos|72474782|
|       medicamentos|29402008|
|  electrodomesticos|12276099|
| frutas y legumbres| 7571260|
|   utiles escolares| 5160328|
|           mercados| 3292185|
|           juguetes| 2651525|
|              pacic| 1079162|
|pescados y mariscos|  789438|
|          navidenos|  428681|
|              tenis|   31626|
|        aeropuertos|     581|
+-------------------+--------+

- ¿Tenemos datos de todos los estados del país? De no ser así, ¿cuáles faltan?

In [25]:
all_states = [
    "aguascalientes", "baja california", "baja california sur", "campeche", "chiapas",
    "chihuahua", "ciudad de mexico", "distrito federal", "coahuila de zaragoza", "colima", "durango", "guanajuato",
    "guerrero", "hidalgo", "jalisco", "estado de mexico", "michoacan de ocampo", "morelos", "nayarit",
    "nuevo leon", "oaxaca", "puebla", "queretaro", "quintana roo", "san luis potosi",
    "sinaloa", "sonora", "tabasco", "tamaulipas", "tlaxcala", "veracruz", "yucatan", "zacatecas"
]

# Get distinct states from DataFrame
states_in_data = df_parquet.select("estado").distinct().rdd.flatMap(lambda x: x).collect()

# Find missing states
missing_states = [state for state in all_states if state not in states_in_data]
print("Missing States:", missing_states)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Missing States: ['distrito federal']

- ¿Cuántas observaciones tenemos por estado?

In [13]:
# Count observations by state
observations_by_state = df_parquet.groupBy("estado").count()
observations_by_state.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+--------+
|          estado|   count|
+----------------+--------+
|      tamaulipas| 3836149|
|       zacatecas| 3221774|
|      nuevo leon| 5229471|
|        campeche| 2611299|
| san luis potosi| 2605944|
|        veracruz| 4181993|
|         morelos| 1509095|
|      guanajuato| 4933886|
|          sonora| 3511149|
|        tlaxcala| 2843154|
|         nayarit|  992414|
|         sinaloa| 2264742|
|          oaxaca| 2244336|
|        guerrero| 1858948|
|    quintana roo| 4754708|
|       queretaro| 3385694|
|estado de mexico|17656040|
|          puebla| 3514964|
|         durango| 2389334|
|         jalisco| 6446410|
+----------------+--------+
only showing top 20 rows

- De cada estado obten: el número de catalogos diferentes por año, ¿ha aumentado el número de catálogos con el tiempo?

In [14]:
from pyspark.sql.functions import year

# Count distinct catalogs per state per year
catalogs_per_state_per_year = df_parquet.groupBy("estado", "year").agg(countDistinct("catalogo").alias("distinct_catalogs"))
catalogs_per_state_per_year.show()

# To check if the number of catalogs has increased over the years, we will order the results
trend_catalogs = catalogs_per_state_per_year.orderBy("estado", "year")
trend_catalogs.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+----+-----------------+
|             estado|year|distinct_catalogs|
+-------------------+----+-----------------+
|     aguascalientes|2021|               10|
|             colima|2017|               10|
|         nuevo leon|2023|               11|
|     aguascalientes|2019|               11|
|          zacatecas|2016|                9|
|          queretaro|2023|               11|
|            sinaloa|2021|                9|
|            hidalgo|2023|               11|
|       quintana roo|2016|               10|
|            jalisco|2016|                9|
|            yucatan|2016|               10|
|             puebla|2020|                8|
|baja california sur|2019|               10|
|         tamaulipas|2020|                9|
|             oaxaca|2024|                9|
|         tamaulipas|2015|               10|
|baja california sur|2023|               11|
|    san luis potosi|2015|               10|
|   ciudad de mexico|2021|               10|
|         

Utilizando Spark contesta las siguientes preguntas a partir **del catálogo que
le tocó a tu equipo**. Recuerda trabajar en el archivo con los datos particionados
de otra manera tus queries van a tardar mucho.

In [31]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Material Escolar Analysis").getOrCreate()

# Read data from the specific category partition
df_cat = spark.read.parquet("s3://mdge-e3-2024/parquet_files_partitioned/categoria=material escolar/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- ¿Cuańtas marcas diferentes tiene tu categoría?

In [32]:
from pyspark.sql.functions import countDistinct

# Count distinct brands in the category
distinct_brands = df_cat.agg(countDistinct("marca").alias("distinct_brands"))
distinct_brands.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+
|distinct_brands|
+---------------+
|           1019|
+---------------+

- ¿Cuál es la marca con mayor precio? ¿En qué estado?

In [33]:
from pyspark.sql.functions import max, struct

# Find the brand with the highest price and the state
brand_highest_price = df_cat.select("marca", "estado", "precio").groupBy("marca", "estado").agg(max("precio").alias("max_precio"))
brand_highest_price.orderBy("max_precio", ascending=False).show(1)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----------------+----------+
|marca|          estado|max_precio|
+-----+----------------+----------+
|  s/m|estado de mexico|   2275.01|
+-----+----------------+----------+
only showing top 1 row

- ¿Cuál es la marca con menor precio en CDMX? (en aquel entonces Distrito Federal)

In [34]:
from pyspark.sql.functions import min

# Filter by CDMX (Distrito Federal) and find the brand with the lowest price
brand_lowest_price_cdmx = df_cat.filter(df_cat.estado == "ciudad de mexico").groupBy("marca").agg(min("precio").alias("min_precio"))
brand_lowest_price_cdmx.orderBy("min_precio", ascending=True).show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----------+
|marca|min_precio|
+-----+----------+
|  s/m|      0.87|
+-----+----------+
only showing top 1 row

- ¿Cuál es la marca con mayores observaciones?

In [35]:
# Find the brand with the most observations
brand_most_observations = df_cat.groupBy("marca").count().orderBy("count", ascending=False)
brand_most_observations.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------+
|marca| count|
+-----+------+
|  s/m|562386|
+-----+------+
only showing top 1 row

- ¿Cuáles son el top 5 de marcas con mayor precio en cada estado? ¿Son diferentes?

In [36]:
# Find the top 5 brands with the highest price in each state
top5_brands_per_state = df_cat.groupBy("estado", "marca").agg(max("precio").alias("max_precio")).orderBy("estado", "max_precio", ascending=[True, False])
top5_brands_per_state.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------------+----------+
|        estado|               marca|max_precio|
+--------------+--------------------+----------+
|aguascalientes|               casio|     619.0|
|aguascalientes|       matematicas 1|     580.0|
|aguascalientes|matematicas 3. sa...|     476.0|
|aguascalientes|espanol 3. serie ...|     476.0|
|aguascalientes|ciencias y tecnol...|     465.0|
|aguascalientes|geografia 1. seri...|     465.0|
|aguascalientes|matematicas 1. se...|     465.0|
|aguascalientes|formacion civica ...|     465.0|
|aguascalientes|historia 2. serie...|     465.0|
|aguascalientes|biologia 1. serie...|     465.0|
|aguascalientes|historia 1. serie...|     465.0|
|aguascalientes|matematicas 2. se...|     465.0|
|aguascalientes|formacion civica ...|     465.0|
|aguascalientes|historia 2. serie...|     456.0|
|aguascalientes|historia ii. seri...|     450.0|
|aguascalientes|               norma|     449.0|
|aguascalientes|formacion civica ...|     440.0|
|aguascalientes|hist

- ¿Cuáles son el top 5 de marcas con menor precio en CDMX? (en aquel entonces Distrito Federal)

In [37]:
# Filter by CDMX and find the top 5 brands with the lowest price
top5_lowest_price_cdmx = df_cat.filter(df_cat.estado == "ciudad de mexico").groupBy("marca").agg(min("precio").alias("min_precio")).orderBy("min_precio", ascending=True).limit(5)
top5_lowest_price_cdmx.show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+
|     marca|min_precio|
+----------+----------+
|       s/m|      0.87|
|   dietrix|       2.5|
|   pelikan|      2.74|
|sony (48x)|       3.2|
|       bic|       3.3|
+----------+----------+

- ¿Cuáles son el top 5 de marcas con mayores observaciones? ¿Se parecen a las de nivel por estado?

In [38]:
# Find the top 5 brands with the most observations globally
global_top5_observation_brands = df_cat.groupBy("marca").count().orderBy("count", ascending=False).limit(5)
global_top5_observation_brands.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------+
|               marca| count|
+--------------------+------+
|                 s/m|562386|
|             crayola|462957|
|      bic. evolution|216912|
|        norma. color|123132|
|pritt. original (...|122245|
+--------------------+------+

- ¿Ha dejado de existir alguna marca durante los años que tienes? ¿Cuál? ¿Cuándo desapareció?

In [39]:
from pyspark.sql.functions import min, max

# Find if any brand has disappeared over the years
brand_years = df_cat.groupBy("marca").agg(min("year").alias("first_year"), max("year").alias("last_year"))
current_year = df_cat.select(max("year")).collect()[0][0]  # assuming this retrieves the current or latest year in the dataset
disappeared_brands = brand_years.filter(brand_years.last_year < current_year)
disappeared_brands.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+---------+
|               marca|first_year|last_year|
+--------------------+----------+---------+
|geografia. secund...|      2018|     2023|
|espanol 1. leo, e...|      2016|     2017|
|            spalding|      2015|     2015|
|        scribe. 6231|      2015|     2022|
|        zebra pencil|      2015|     2023|
|fisica. ciencias ...|      2019|     2023|
|formacion civica ...|      2015|     2018|
|secuencia matemat...|      2015|     2017|
|tenis con imagina...|      2018|     2019|
|scribe. fast & fu...|      2017|     2018|
|espanol 2. santil...|      2015|     2018|
|formacion civica ...|      2015|     2023|
|  norma. hello kitty|      2015|     2018|
|formacion civica ...|      2015|     2019|
|ciencias 2 fisica...|      2015|     2018|
|norma. grandes he...|      2015|     2016|
|matematicas 1. se...|      2018|     2023|
|ciencias y tecnol...|      2019|     2021|
|       maped. 2 in 1|      2015|     2020|
|   paper mate carmen|      2018

- Genera una gráfica de serie de tiempo por estado para la marca con mayor precio -en todos los años-, donde el eje equis es el año y el eje ye es el precio máximo.

In [41]:
from pyspark.sql.functions import max

# Extract data for visualization
brand_max_price_by_state_year = df_cat.groupBy("estado", "year", "marca").agg(max("precio").alias("max_precio")).orderBy("estado", "year")
brand_max_price_by_state_year.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+----+--------------------+----------+
|        estado|year|               marca|max_precio|
+--------------+----+--------------------+----------+
|aguascalientes|2015|norma. snoopy. pe...|      59.9|
|aguascalientes|2015|norma. i love one...|      46.8|
|aguascalientes|2015|        maped. softy|      18.8|
|aguascalientes|2015|  pelikan. colorella|    137.53|
|aguascalientes|2015|  scribe. duplicador|      69.0|
|aguascalientes|2015|             popular|      11.5|
|aguascalientes|2015|scribe. polycover...|     41.32|
|aguascalientes|2015|scribe. linea eje...|    114.63|
|aguascalientes|2015|scribe. escolar. ...|     24.78|
|aguascalientes|2015|paper mate. expre...|      24.5|
|aguascalientes|2015|        office depot|      25.0|
|aguascalientes|2015|     maped. security|      17.5|
|aguascalientes|2015|  maped. c.b. 063030|      16.0|
|aguascalientes|2015|   zebra. z grip max|      26.0|
|aguascalientes|2015|       norma. minnie|      54.9|
|aguascalientes|2015|      b

In [3]:
spark.stop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…