- Preparar el fichero `orders_data.parquet` de modo que pueda ser usado para construir un 'forecasting model'.  
- Limpiar la dataset para que cumpla los requerimientos del equipo de Data y Machine Learning.  
- Guardar el archivo actualizado (limpio) como `orders_data_clean.parquet`

  
dbfs:/FileStore/shared_uploads/mario.martin@tajamar365.com/orders_data.parquet

Como ingeniero de datos de una empresa de comercio electrónico llamada Voltmart, un equipo de aprendizaje automático le ha pedido que limpie los datos que contienen información sobre los pedidos realizados el año pasado. Tienen previsto utilizar estos datos depurados para crear un modelo de previsión de la demanda (Forecasting Model). Para ello, han compartido sus requisitos sobre el formato de tabla de salida deseado.

Un analista ha compartido un archivo parquet llamado `orders_data.parquet` para que usted los limpie y los preprocese.

A continuación puede ver el esquema del conjunto de datos junto con los requisitos de limpieza de los perezosos analistas de datos:

## `orders_data.parquet`

| column | data type | description | cleaning requirements | 
|--------|-----------|-------------|-----------------------|
| `order_date` | `timestamp` | Date and time when the order was made | _Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date_ |
| `time_of_day` | `string` | Period of the day when the order was made | _New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm_ |
| `order_id` | `long` | Order ID | _N/A_ |
| `product` | `string` | Name of a product ordered | _Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase_ |
| `product_ean` | `double` | Product ID | _N/A_ |
| `category` | `string` | Broader category of a product | _Ensure all values are lowercase_ |
| `purchase_address` | `string` | Address line where the order was made ("House Street, City, State Zipcode") | _N/A_ |
| `purchase_state` | `string` | US State of the purchase address | _New column containing: the State that the purchase was ordered from_ |
| `quantity_ordered` | `long` | Number of product units ordered | _N/A_ |
| `price_each` | `double` | Price of a product unit | _N/A_ |
| `cost_price` | `double` | Cost of production per product unit | _N/A_ |
| `turnover` | `double` | Total amount paid for a product (quantity x price) | _N/A_ |
| `margin` | `double` | Profit made by selling a product (turnover - cost) | _N/A_ |

<br>

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, when, lit, date_format, hour, lower, substring_index
)
from pyspark.sql.types import StringType

In [0]:
# Cargar el archivo parquet
file_path = "dbfs:/FileStore/shared_uploads/mario.martin@tajamar365.com/orders_data.parquet"
df = spark.read.parquet(file_path)
df.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin
0,2023-01-22 21:25:00,141234,iPhone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0
1,2023-01-28 14:15:00,141235,Lightning Charging Cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475
2,2023-01-17 13:33:00,141236,Wired Headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99
3,2023-01-05 20:33:00,141237,27in FHD Monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965
4,2023-01-25 11:59:00,141238,Wired Headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995


### Respuestas:

##### 1. Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date 

In [0]:
df = df.withColumn("hour", hour(col("order_date"))) \
       .filter((col("hour") >= 5)) \
       .drop("hour")
df = df.withColumn("order_date", to_date(col("order_date")))  # Convertir a formato de fecha
df.select("order_date").show(truncate=False)

+----------+
|order_date|
+----------+
|2023-01-22|
|2023-01-28|
|2023-01-17|
|2023-01-05|
|2023-01-25|
|2023-01-29|
|2023-01-26|
|2023-01-05|
|2023-01-01|
|2023-01-22|
|2023-01-07|
|2023-01-31|
|2023-01-09|
|2023-01-25|
|2023-01-03|
|2023-01-05|
|2023-01-10|
|2023-01-24|
|2023-01-30|
|2023-01-08|
+----------+
only showing top 20 rows



##### 2. New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm

In [0]:
df = df.withColumn(
    "time_of_day",
    when((hour(col("order_date")) >= 5) & (hour(col("order_date")) < 12), "morning")
    .when((hour(col("order_date")) >= 12) & (hour(col("order_date")) < 18), "afternoon")
    .otherwise("evening")
)

print("Datos después de agregar la columna 'time_of_day':")
df.select("order_date", "time_of_day").show(truncate=False)

morning_orders = df.filter(col("time_of_day") == "morning")
print("Pedidos realizados en la mañana (5am-12pm):")
morning_orders.show(50, truncate=False)
# Se observa como no existe ninguno en morning, ya que están eliminados previamente
# Si volvemos a ejecutar la celda donde se carga el archivo entero, y luego ejecutamos el morning_orders.show, observaremos como si muestra pedidos por la mañana

Datos después de agregar la columna 'time_of_day':
+----------+-----------+
|order_date|time_of_day|
+----------+-----------+
|2023-01-22|evening    |
|2023-01-28|evening    |
|2023-01-17|evening    |
|2023-01-05|evening    |
|2023-01-25|evening    |
|2023-01-29|evening    |
|2023-01-26|evening    |
|2023-01-05|evening    |
|2023-01-01|evening    |
|2023-01-22|evening    |
|2023-01-07|evening    |
|2023-01-31|evening    |
|2023-01-09|evening    |
|2023-01-25|evening    |
|2023-01-03|evening    |
|2023-01-05|evening    |
|2023-01-10|evening    |
|2023-01-24|evening    |
|2023-01-30|evening    |
|2023-01-08|evening    |
+----------+-----------+
only showing top 20 rows

Pedidos realizados en la mañana (5am-12pm):
+----------+--------+-------+----------+--------+----------------+----------------+----------+----------+--------+------+-----------+
|order_date|order_id|product|product_id|category|purchase_address|quantity_ordered|price_each|cost_price|turnover|margin|time_of_day|
+----------

##### 3. Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase

###Datos de productos sin hacer ningun cambio

In [0]:
df.select("product").distinct().show(truncate=False)

+--------------------------+
|product                   |
+--------------------------+
|Wired Headphones          |
|Macbook Pro Laptop        |
|Apple Airpods Headphones  |
|iPhone                    |
|Lightning Charging Cable  |
|Bose SoundSport Headphones|
|USB-C Charging Cable      |
|AAA Batteries (4-pack)    |
|20in Monitor              |
|27in FHD Monitor          |
|Vareebadd Phone           |
|34in Ultrawide Monitor    |
|LG Dryer                  |
|AA Batteries (4-pack)     |
|Google Phone              |
|Flatscreen TV             |
|LG Washing Machine        |
|27in 4K Gaming Monitor    |
|ThinkPad Laptop           |
+--------------------------+



##### Observamos que en los datos normales, existen productos que son TV, a continuación los eliminaremos:

In [0]:
df = df.filter(~lower(col("product")).contains("tv"))  # Elimina las filas que contienen la palabra tv
df = df.withColumn("product", lower(col("product")))  # Hace un lower a todas los registros en productos para que se conviertan a minúsculas.
print("Datos una vez eliminadas las TV y con todos los productos en LowerCase")
df.select("product").distinct().show(truncate=False)

# Podemos observar que no existe ninguna TV en los productos

Datos una vez eliminadas las TV y con todos los productos en LowerCase
+--------------------------+
|product                   |
+--------------------------+
|aa batteries (4-pack)     |
|iphone                    |
|google phone              |
|usb-c charging cable      |
|lightning charging cable  |
|thinkpad laptop           |
|apple airpods headphones  |
|20in monitor              |
|lg washing machine        |
|aaa batteries (4-pack)    |
|macbook pro laptop        |
|lg dryer                  |
|wired headphones          |
|27in 4k gaming monitor    |
|27in fhd monitor          |
|bose soundsport headphones|
|34in ultrawide monitor    |
|vareebadd phone           |
+--------------------------+



##### 4. Ensure all values are lowercase

### Datos sin hacer cambios

In [0]:
df.show(100, truncate=False)

+-------------------+--------+--------------------------+-----------------+------------+-----------------------------------------+----------------+----------+------------------+--------+--------+
|order_date         |order_id|product                   |product_id       |category    |purchase_address                         |quantity_ordered|price_each|cost_price        |turnover|margin  |
+-------------------+--------+--------------------------+-----------------+------------+-----------------------------------------+----------------+----------+------------------+--------+--------+
|2023-01-22 21:25:00|141234  |iPhone                    |5.638008983335E12|Vêtements   |944 Walnut St, Boston, MA 02215          |1               |700.0     |231.0             |700.0   |469.0   |
|2023-01-28 14:15:00|141235  |Lightning Charging Cable  |5.563319511488E12|Alimentation|185 Maple St, Portland, OR 97035         |1               |14.95     |7.475             |14.95   |7.475   |
|2023-01-17 13:33:00

##### Podemos observar como bastantes nombres en las columnas contienen letras mayúsculas, a continuación vamos a convertir todo a minúscula.

In [0]:
df = df.select([lower(col(c)).alias(c) for c in df.columns]) # Mediante un bucle for recorro todas las columnas dándole un alias de "c", para aplicar el lower a todas ellas y así no tener que ir columna a columna
print("Datos después de convertir todos los datos del parquet a minúsculas:")
df.show(100, truncate=False)

Datos después de convertir todos los datos del parquet a minúsculas:
+----------+--------+--------------------------+-----------------+------------+-----------------------------------------+----------------+----------+------------------+--------+--------+-----------+
|order_date|order_id|product                   |product_id       |category    |purchase_address                         |quantity_ordered|price_each|cost_price        |turnover|margin  |time_of_day|
+----------+--------+--------------------------+-----------------+------------+-----------------------------------------+----------------+----------+------------------+--------+--------+-----------+
|2023-01-22|141234  |iphone                    |5.638008983335e12|vêtements   |944 walnut st, boston, ma 02215          |1               |700.0     |231.0             |700.0   |469.0   |evening    |
|2023-01-28|141235  |lightning charging cable  |5.563319511488e12|alimentation|185 maple st, portland, or 97035         |1             

##### 5. New column containing: the State that the purchase was ordered from

In [0]:
# Primero definimos una nueva columna: purchase_state
df = df.withColumn(
    "purchase_state",
    # Mediante un substring en la columna purchase_address que es donde se encuentyra la dirección y donde por tanto podremos sacar el estado, cogemos el último segmento despúes de la coma, que es donde se encuentra el estado en cada dirección
    substring_index(col("purchase_address"), ", ", -1)  
    .substr(-8, 2) # Finalmente cogemos las dos últimas letras para establecer el estado
)
print("Datos después de agregar la columna 'purchase_state':")
df.select("purchase_address", "purchase_state").show(truncate=False)

Datos después de agregar la columna 'purchase_state':
+---------------------------------------+--------------+
|purchase_address                       |purchase_state|
+---------------------------------------+--------------+
|944 walnut st, boston, ma 02215        |ma            |
|185 maple st, portland, or 97035       |or            |
|538 adams st, san francisco, ca 94016  |ca            |
|738 10th st, los angeles, ca 90001     |ca            |
|387 10th st, austin, tx 73301          |tx            |
|775 willow st, san francisco, ca 94016 |ca            |
|979 park st, los angeles, ca 90001     |ca            |
|181 6th st, san francisco, ca 94016    |ca            |
|867 willow st, los angeles, ca 90001   |ca            |
|657 johnson st, san francisco, ca 94016|ca            |
|492 walnut st, san francisco, ca 94016 |ca            |
|322 6th st, san francisco, ca 94016    |ca            |
|618 7th st, los angeles, ca 90001      |ca            |
|512 wilson st, san francisco, ca 

##### 6. Guardar archivo final limpio con nombre `orders_data_clean.parquet` 

In [0]:
cleaned_file_path = "dbfs:/FileStore/shared_uploads/orders_data_clean.parquet"
df.write.mode("overwrite").parquet(cleaned_file_path)
print(f"Archivo limpio guardado como Parquet en: {cleaned_file_path}")

Archivo limpio guardado como Parquet en: dbfs:/FileStore/shared_uploads/orders_data_clean.parquet


##### 7. Exportar archivo limpio en formato CSV 

In [0]:
cleaned_csv_path = "dbfs:/FileStore/shared_uploads/orders_data_clean.csv"
df.write.mode("overwrite").option("header", "true").csv(cleaned_csv_path)
print(f"Archivo limpio exportado como CSV en: {cleaned_csv_path}")

Archivo limpio exportado como CSV en: dbfs:/FileStore/shared_uploads/orders_data_clean.csv
