In [0]:
dbutils.fs.mkdirs("dbfs:/FileStore/caso1")

Out[28]: True

- Preparar el fichero `orders_data.parquet` de modo que pueda ser usado para contruir un 'forecasting model'.  
- Limpiar la dataset para que cumpla los requerimientos del equipo de Data y Machine Learning.  
- Guardar el archivo actualizado (limpio) como `orders_data_clean.parquet`

  
![](/files/caso1/1.png)

Como ingeniero de datos de una empresa de comercio electrónico llamada Voltmart, un equipo de aprendizaje automático le ha pedido que limpie los datos que contienen información sobre los pedidos realizados el año pasado. Tienen previsto utilizar estos datos depurados para crear un modelo de previsión de la demanda (Forecasting Model). Para ello, han compartido sus requisitos sobre el formato de tabla de salida deseado.

Un analista ha compartido un archivo parquet llamado `orders_data.parquet` para que usted los limpie y los preprocese.

A continuación puede ver el esquema del conjunto de datos junto con los requisitos de limpieza de los perezosos analistas de datos:

## `orders_data.parquet`

| column | data type | description | cleaning requirements | 
|--------|-----------|-------------|-----------------------|
| `order_date` | `timestamp` | Date and time when the order was made | _Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date_ |
| `time_of_day` | `string` | Period of the day when the order was made | _New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm_ |
| `order_id` | `long` | Order ID | _N/A_ |
| `product` | `string` | Name of a product ordered | _Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase_ |
| `product_ean` | `double` | Product ID | _N/A_ |
| `category` | `string` | Broader category of a product | _Ensure all values are lowercase_ |
| `purchase_address` | `string` | Address line where the order was made ("House Street, City, State Zipcode") | _N/A_ |
| `purchase_state` | `string` | US State of the purchase address | _New column containing: the State that the purchase was ordered from_ |
| `quantity_ordered` | `long` | Number of product units ordered | _N/A_ |
| `price_each` | `double` | Price of a product unit | _N/A_ |
| `cost_price` | `double` | Cost of production per product unit | _N/A_ |
| `turnover` | `double` | Total amount paid for a product (quantity x price) | _N/A_ |
| `margin` | `double` | Profit made by selling a product (turnover - cost) | _N/A_ |

<br>

In [0]:
from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)

In [0]:
df = spark.read.parquet('dbfs:/FileStore/caso1/orders_data.parquet')
df.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin
0,2023-01-22 21:25:00,141234,iPhone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0
1,2023-01-28 14:15:00,141235,Lightning Charging Cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475
2,2023-01-17 13:33:00,141236,Wired Headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99
3,2023-01-05 20:33:00,141237,27in FHD Monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965
4,2023-01-25 11:59:00,141238,Wired Headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995


### Respuestas:

##### 1. Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date 

In [0]:
hour = F.hour(F.col("order_date"))

df = df.where((hour > 5))

df.where((hour >= 0) & (hour <= 5)).count()

Out[31]: 0

Esta salida nos indica que hemos eliminado todos los pedidos desde las 00 horas hasta las 05 horas, ya que al filtrar por esas condiciones y contar las filas, nos sale un valor de 0.

Para ello, simplemente hemos indicado que el dataframe debe cumplir los requisitos contrarios, es decir, una hora mayor que 5, o menor que cero (esa parte la hemos omitido ya que no existen horas menores a 0)

##### 2. New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm

In [0]:

df = df.where((hour >= 5) & (hour < 12)).withColumn("day_time", F.lit("morning"))
df.where((hour >= 12) & (hour < 18)).withColumn("day_time", F.lit("afternoon"))
df.where((hour >= 18) & (hour <= 23)).withColumn("day_time", F.lit("evening"))

df.select(F.col("order_date"), F.col("day_time")).display()

order_date,day_time
2023-01-25T11:59:00.000+0000,morning
2023-01-01T10:30:00.000+0000,morning
2023-01-07T11:29:00.000+0000,morning
2023-01-31T10:12:00.000+0000,morning
2023-01-10T11:20:00.000+0000,morning
2023-01-24T08:13:00.000+0000,morning
2023-01-30T09:28:00.000+0000,morning
2023-01-08T11:51:00.000+0000,morning
2023-01-29T10:40:00.000+0000,morning
2023-01-03T09:46:00.000+0000,morning


La salida nos muestra que todos los pedidos han sido entregados por la mañana

##### 3. Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase

In [0]:
df = df.where(~F.lower((F.col("product"))).contains("tv"))

df.where(F.lower((F.col("product"))).contains("tv")).count()

Out[38]: 0

La salida nos muestra que, gracias a nuestro filtro de coger aquellas filas que NO contengan tv en la colunmna producto, existen 0 coincidencias cuando buscamos la palabra "tv" en la columna productos

##### 4. Ensure all values are lowercase

In [0]:
for column in df.columns:
    df = df.withColumn(column, F.lower(F.col(column)))

df.show()

+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+----------+--------+-------+--------+
|         order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|cost_price|turnover| margin|day_time|
+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+----------+--------+-------+--------+
|2023-01-25 11:59:00|  141238|    wired headphones|9.692680938163e12|électronique|387 10th st, aust...|               1|     11.99|     5.995|   11.99|  5.995| morning|
|2023-01-01 10:30:00|  141242|bose soundsport h...|1.508418177978e12|électronique|867 willow st, lo...|               1|     99.99|    49.995|   99.99| 49.995| morning|
|2023-01-07 11:29:00|  141244|apple airpods hea...|4.332898830865e12|   vêtements|492 walnut st, sa...|               1|     150.0|      97.5|   150.0|   5

Simplemente estamos iterando todas las columnas para cambiar sus valores a minúsculas, y en el resultado se evidencia que se ha realizado correctamente

##### 5. New column containing: the State that the purchase was ordered from

In [0]:
state = F.upper(F.split(F.trim(F.split(F.col("purchase_address"), ",")[2]), " ")[0])

df = df.withColumn("state", state)

df.select(F.col("state"), F.col("purchase_address")).show(truncate=False)

+-----+----------------------------------------+
|state|purchase_address                        |
+-----+----------------------------------------+
|TX   |387 10th st, austin, tx 73301           |
|CA   |867 willow st, los angeles, ca 90001    |
|CA   |492 walnut st, san francisco, ca 94016  |
|CA   |322 6th st, san francisco, ca 94016     |
|CA   |471 center st, los angeles, ca 90001    |
|MA   |414 walnut st, boston, ma 02215         |
|CA   |220 9th st, los angeles, ca 90001       |
|WA   |238 sunset st, seattle, wa 98101        |
|OR   |675 washington st, portland, or 97035   |
|NY   |937 highland st, new york city, ny 10001|
|CA   |649 sunset st, los angeles, ca 90001    |
|NY   |611 elm st, new york city, ny 10001     |
|MA   |90 13th st, boston, ma 02215            |
|CA   |8 jackson st, los angeles, ca 90001     |
|CA   |386 elm st, san francisco, ca 94016     |
|CA   |789 washington st, los angeles, ca 90001|
|GA   |534 elm st, atlanta, ga 30301           |
|CA   |4 1st st, los

Hemos extraído la información del estado mediante splits por "," y teniendo en cuenta los espacios en blanco, para así crear una nueva columna llamada "state", donde vemos que efectivamente se está cogiendo correctamente cada estado de cada dirección

##### 6. Guardar archivo final limpio con nombre `orders_data_clean.parquet` 

In [0]:
df.write.parquet("dbfs:/FileStore/caso1/orders_data_clean.parquet")

for file_info in dbutils.fs.ls("dbfs:/FileStore/caso1/"):
    print(file_info)

FileInfo(path='dbfs:/FileStore/caso1/1.png', name='1.png', size=159457, modificationTime=1732781732000)
FileInfo(path='dbfs:/FileStore/caso1/orders_data.parquet', name='orders_data.parquet', size=6871073, modificationTime=1732781733000)
FileInfo(path='dbfs:/FileStore/caso1/orders_data_clean.parquet/', name='orders_data_clean.parquet/', size=0, modificationTime=0)


Hacemos un ls a la carpeta en la que hemos guardado el archivo para ver que, efectivamente, se ha guardado el archivo .parquet

##### 7. Exportar archivo limpio en formato CSV 

In [0]:
df.write.csv("dbfs:/FileStore/caso1/orders_data_clean.csv")

for file_info in dbutils.fs.ls("dbfs:/FileStore/caso1/"):
    print(file_info)

FileInfo(path='dbfs:/FileStore/caso1/1.png', name='1.png', size=159457, modificationTime=1732781732000)
FileInfo(path='dbfs:/FileStore/caso1/orders_data.parquet', name='orders_data.parquet', size=6871073, modificationTime=1732781733000)
FileInfo(path='dbfs:/FileStore/caso1/orders_data_clean.csv/', name='orders_data_clean.csv/', size=0, modificationTime=0)
FileInfo(path='dbfs:/FileStore/caso1/orders_data_clean.parquet/', name='orders_data_clean.parquet/', size=0, modificationTime=0)


Por último, listamos de nuevo el directorio y vemos que tenemos ambos ficheros guardados correctamente y en su formato correspondiente.

También, podríamos verlo en la pestaña de catalog en Databricks:

  
![](/files/caso1/02.png)