- Preparar el fichero `orders_data.parquet` de modo que pueda ser usado para contruir un 'forecasting model'.  
- Limpiar la dataset para que cumpla los requerimientos del equipo de Data y Machine Learning.  
- Guardar el archivo actualizado (limpio) como `orders_data_clean.parquet`

  
![](https://community.cloud.databricks.com/files/EP_3/1.png)

Como ingeniero de datos de una empresa de comercio electrónico llamada Voltmart, un equipo de aprendizaje automático le ha pedido que limpie los datos que contienen información sobre los pedidos realizados el año pasado. Tienen previsto utilizar estos datos depurados para crear un modelo de previsión de la demanda (Forecasting Model). Para ello, han compartido sus requisitos sobre el formato de tabla de salida deseado.

Un analista ha compartido un archivo parquet llamado `orders_data.parquet` para que usted los limpie y los preprocese.

A continuación puede ver el esquema del conjunto de datos junto con los requisitos de limpieza de los perezosos analistas de datos:

## `orders_data.parquet`

| column | data type | description | cleaning requirements | 
|--------|-----------|-------------|-----------------------|
| `order_date` | `timestamp` | Date and time when the order was made | _Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date_ |
| `time_of_day` | `string` | Period of the day when the order was made | _New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm_ |
| `order_id` | `long` | Order ID | _N/A_ |
| `product` | `string` | Name of a product ordered | _Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase_ |
| `product_ean` | `double` | Product ID | _N/A_ |
| `category` | `string` | Broader category of a product | _Ensure all values are lowercase_ |
| `purchase_address` | `string` | Address line where the order was made ("House Street, City, State Zipcode") | _N/A_ |
| `purchase_state` | `string` | US State of the purchase address | _New column containing: the State that the purchase was ordered from_ |
| `quantity_ordered` | `long` | Number of product units ordered | _N/A_ |
| `price_each` | `double` | Price of a product unit | _N/A_ |
| `cost_price` | `double` | Cost of production per product unit | _N/A_ |
| `turnover` | `double` | Total amount paid for a product (quantity x price) | _N/A_ |
| `margin` | `double` | Profit made by selling a product (turnover - cost) | _N/A_ |

<br>

In [0]:
from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)

In [0]:
df = spark.read.parquet('dbfs:/FileStore/shared_uploads/orders_data.parquet')
df.display()

### Respuestas:

##### 1. Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date 

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType

#Filtramos las filas donde la hora esté entre las 12 AM y 5 AM
df_t2 = df_t1.filter(~(F.hour('order_date') >= 0) & (F.hour('order_date') <= 5))


#Convertimos la columna order_date de timestamp a date
df_t2 = df_t1.withColumn("order_date", F.to_date(df["order_date"]))

#Mostramos los cambios
df_t2.display()

##### 2. New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm

In [0]:
from pyspark.sql import functions as F

# Crear la nueva columna "time_of_day" basándose en la hora de "order_date"
df_t1 = df.withColumn(
    'time_of_day',
    F.when(F.hour('order_date').between(5, 11), 'morning')  # De 5 AM a 11:59 AM
     .when(F.hour('order_date').between(12, 17), 'afternoon')  # De 12 PM a 5:59 PM
     .otherwise('evening')  # De 6 PM a 4:59 AM (incluso la medianoche)
)

df_t1.display()

##### 3. Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase

In [0]:
from pyspark.sql import functions as F

#Convertimos los valores de la columna "products" a minúsculas
df_t3 = df_t2.withColumn("product", F.lower("product"))

#Eliminamos los valores tv en la columna product
df_t3 = df_t3.filter(~df_t3["product"].contains("tv"))

df_t3.display()

##### 4. Ensure all values are lowercase

In [0]:
from pyspark.sql import functions as F

#Convertimos los valores de la columna "category" a minúsculas
df_t4 = df_t3.withColumn("category", F.lower("category"))

df_t4.display()

##### 5. New column containing: the State that the purchase was ordered from

In [0]:
from pyspark.sql import functions as F

# Dividimos la columna 'purchase_address' por la coma
df_t5 = df_t4.withColumn("purchase_state", F.split(F.col("purchase_address"), ",").getItem(2))

# Limpiamos espacios extra con trim() para asegurarnos de que no haya espacios al principio o al final
df_t5 = df_t5.withColumn("purchase_state", F.trim(F.col("purchase_state")))

# Ahora, extraemos el estado (que es la primera parte de la tercera columna, antes del código postal) usando 'split' nuevamente
df_t5 = df_t5.withColumn("purchase_state", F.split(F.col("purchase_state"), " ").getItem(0))

df_t5.display()

##### 6. Guardar archivo final limpio con nombre `orders_data_clean.parquet` 

In [0]:
# Guardamos el DataFrame como un solo archivo Parquet
output_path = "/FileStore/shared_uploads/orders_data_clean.parquet"

df_t5.coalesce(1).write.mode("overwrite").parquet(output_path)

##### 7. Exportar archivo limpio en formato CSV 

In [0]:
# Ruta donde se guardará el archivo CSV
csv_output_path = "/FileStore/shared_uploads/orders_data_clean.csv"

# Leemos el archivo Parquet
df_parquet = spark.read.parquet("/FileStore/shared_uploads/orders_data_clean.parquet")

# Guardamos el DataFrame como un solo archivo CSV
df_parquet.repartition(1).write.option("header", "true").mode("overwrite").csv(csv_output_path)

df_t5.display()
