- Preparar el fichero `orders_data.parquet` de modo que pueda ser usado para contruir un 'forecasting model'.  
- Limpiar la dataset para que cumpla los requerimientos del equipo de Data y Machine Learning.  
- Guardar el archivo actualizado (limpio) como `orders_data_clean.parquet`

  
![](https://community.cloud.databricks.com/files/EP_3/1.png)

Como ingeniero de datos de una empresa de comercio electrónico llamada Voltmart, un equipo de aprendizaje automático le ha pedido que limpie los datos que contienen información sobre los pedidos realizados el año pasado. Tienen previsto utilizar estos datos depurados para crear un modelo de previsión de la demanda (Forecasting Model). Para ello, han compartido sus requisitos sobre el formato de tabla de salida deseado.

Un analista ha compartido un archivo parquet llamado `orders_data.parquet` para que usted los limpie y los preprocese.

A continuación puede ver el esquema del conjunto de datos junto con los requisitos de limpieza de los perezosos analistas de datos:

## `orders_data.parquet`

| column | data type | description | cleaning requirements | 
|--------|-----------|-------------|-----------------------|
| `order_date` | `timestamp` | Date and time when the order was made | _Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date_ |
| `time_of_day` | `string` | Period of the day when the order was made | _New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm_ |
| `order_id` | `long` | Order ID | _N/A_ |
| `product` | `string` | Name of a product ordered | _Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase_ |
| `product_ean` | `double` | Product ID | _N/A_ |
| `category` | `string` | Broader category of a product | _Ensure all values are lowercase_ |
| `purchase_address` | `string` | Address line where the order was made ("House Street, City, State Zipcode") | _N/A_ |
| `purchase_state` | `string` | US State of the purchase address | _New column containing: the State that the purchase was ordered from_ |
| `quantity_ordered` | `long` | Number of product units ordered | _N/A_ |
| `price_each` | `double` | Price of a product unit | _N/A_ |
| `cost_price` | `double` | Cost of production per product unit | _N/A_ |
| `turnover` | `double` | Total amount paid for a product (quantity x price) | _N/A_ |
| `margin` | `double` | Profit made by selling a product (turnover - cost) | _N/A_ |

<br>

In [0]:
from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

spark = (
    SparkSession
    .builder
    .appName('Caso_1')
    .getOrCreate()
)

In [0]:
df = spark.read.parquet('/FileStore/Caso_1/orders_data.parquet')
df.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin
0,2023-01-22 21:25:00,141234,iPhone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0
1,2023-01-28 14:15:00,141235,Lightning Charging Cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475
2,2023-01-17 13:33:00,141236,Wired Headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99
3,2023-01-05 20:33:00,141237,27in FHD Monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965
4,2023-01-25 11:59:00,141238,Wired Headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995


### Respuestas:

##### 1. Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date 

In [0]:
# Muestro el esquema del df original para comprobar que order_date es timestamp
df.printSchema()

root
 |-- order_date: timestamp (nullable = true)
 |-- order_id: long (nullable = true)
 |-- product: string (nullable = true)
 |-- product_id: double (nullable = true)
 |-- category: string (nullable = true)
 |-- purchase_address: string (nullable = true)
 |-- quantity_ordered: long (nullable = true)
 |-- price_each: double (nullable = true)
 |-- cost_price: double (nullable = true)
 |-- turnover: double (nullable = true)
 |-- margin: double (nullable = true)



In [0]:
from pyspark.sql.functions import hour, to_date

# Creo un dataframe con el archivo
df = spark.read.parquet('/FileStore/Caso_1/orders_data.parquet')

# Uso el df creado antes para filtrar en la sección de la hora del timestamp de los valores de la columna "order_date" los estén entre 0 y 5 que sería entre las 12am y las 5am. Una vez filtrado, creo una columna llamada "order_date" también que tiene el contenido en formato date de la columna order_date del df original. Con estos cambios creo el df resultante "df_filtrado"
df_filtrado = df.filter(~((hour('order_date') >= 0) & (hour('order_date') <= 5))) \
    .withColumn('order_date_formateado', to_date('order_date'))

# Muestro el schema para que se vea que la columna order_date es de tipo "date" e imprimo las 5 primeras filas
df_filtrado.printSchema()
df_filtrado.show(5)  

root
 |-- order_date: timestamp (nullable = true)
 |-- order_id: long (nullable = true)
 |-- product: string (nullable = true)
 |-- product_id: double (nullable = true)
 |-- category: string (nullable = true)
 |-- purchase_address: string (nullable = true)
 |-- quantity_ordered: long (nullable = true)
 |-- price_each: double (nullable = true)
 |-- cost_price: double (nullable = true)
 |-- turnover: double (nullable = true)
 |-- margin: double (nullable = true)
 |-- order_date_formateado: date (nullable = true)

+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+----------+--------+-------+---------------------+
|         order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|cost_price|turnover| margin|order_date_formateado|
+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------

##### 2. New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm

In [0]:
from pyspark.sql.functions import hour, when
from pyspark.sql import DataFrame

# Creo una función que obtiene un dataframe como argumento y que devuelve un dataframe con una columna "periodo_de_tiempo" que tiene los valores Morning, Afternoon y Evening dependiendo de la hora que detecte en valor de la columna order_date
def agregar_periodo_de_tiempo(df: DataFrame) -> DataFrame:
    return df.withColumn(
        'periodo_de_tiempo',
        when((hour('order_date') >= 5) & (hour('order_date') < 12), 'Morning')
        .when((hour('order_date') >= 12) & (hour('order_date') < 18), 'Afternoon')
        .otherwise('Evening')
    )

#Creo un dataframe nuevo en base al que me devuelve la función que he creado arriba y después lo muestro (primeras 20 filas por defecto y sólo las columnas order_date y periodo_de_tiempo)
df_filtrado = agregar_periodo_de_tiempo(df_filtrado)
df_filtrado.select("order_date", "periodo_de_tiempo").show()

+-------------------+-----------------+
|         order_date|periodo_de_tiempo|
+-------------------+-----------------+
|2023-01-22 21:25:00|          Evening|
|2023-01-28 14:15:00|        Afternoon|
|2023-01-17 13:33:00|        Afternoon|
|2023-01-05 20:33:00|          Evening|
|2023-01-25 11:59:00|          Morning|
|2023-01-29 20:22:00|          Evening|
|2023-01-26 12:16:00|        Afternoon|
|2023-01-05 12:04:00|        Afternoon|
|2023-01-01 10:30:00|          Morning|
|2023-01-22 21:20:00|          Evening|
|2023-01-07 11:29:00|          Morning|
|2023-01-31 10:12:00|          Morning|
|2023-01-09 18:57:00|          Evening|
|2023-01-25 19:19:00|          Evening|
|2023-01-03 21:54:00|          Evening|
|2023-01-05 17:20:00|        Afternoon|
|2023-01-10 11:20:00|          Morning|
|2023-01-24 08:13:00|          Morning|
|2023-01-30 09:28:00|          Morning|
|2023-01-08 11:51:00|          Morning|
+-------------------+-----------------+
only showing top 20 rows



##### 3. Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase

In [0]:
from pyspark.sql.functions import lower, col

# Creo una función que toma un df como argumento y devuelve otro en el que no hay filas con el string "tv" dentro de la columna "product".
# Esto lo condigue cambiando la columna "product" por sí misma pero con los valores en minúsculas y después filtrando las que contiene el string "tv"
def quitar_tv(df: DataFrame) -> DataFrame:
    return df.withColumn('product', lower(col('product'))) \
        .filter(~col('product').contains('tv')) 

# Asigno el df que devuelve la función a uno nuevo que luego muestro por pantalla (sólo la columna product)
df_sin_tv = quitar_tv(df_filtrado)
df_sin_tv.select("product").show()

+--------------------+
|             product|
+--------------------+
|              iphone|
|lightning chargin...|
|    wired headphones|
|    27in fhd monitor|
|    wired headphones|
|aaa batteries (4-...|
|27in 4k gaming mo...|
|usb-c charging cable|
|bose soundsport h...|
|apple airpods hea...|
|apple airpods hea...|
|  macbook pro laptop|
|aaa batteries (4-...|
|    27in fhd monitor|
|    27in fhd monitor|
|     vareebadd phone|
|apple airpods hea...|
|usb-c charging cable|
|aaa batteries (4-...|
|usb-c charging cable|
+--------------------+
only showing top 20 rows



##### 4. Ensure all values are lowercase

In [0]:
# Creo un dataframe nuevo partiendo del anterior al que he crado una columna que susituye a la de categoría ya que al tener el mismo nombre pyspark lo hace así por defecto, pero esta nueva columna tiene todos sus valores en minúsculas.
df_category_minusculas = df_sin_tv.withColumn('category', lower(col('category')))

# Muestro sólo la columna categoría
df_category_minusculas.select("category").show()

+------------+
|    category|
+------------+
|   vêtements|
|alimentation|
|   vêtements|
|      sports|
|électronique|
|alimentation|
|   vêtements|
|   vêtements|
|électronique|
|électronique|
|   vêtements|
|   vêtements|
|   vêtements|
|   vêtements|
|alimentation|
|alimentation|
|alimentation|
|      sports|
|électronique|
|alimentation|
+------------+
only showing top 20 rows



##### 5. New column containing: the State that the purchase was ordered from

In [0]:
from pyspark.sql.functions import regexp_extract

# Creo un df nuevo con una columna nueva partiendo del df anterior. Esta columna se crea obteniendo las dos primeras letras mayúsculas que se encuentren al revisar el string de "purchase_adress" de manera inversa, es decir, de derecha a izquierda.
df_con_estados = df_category_minusculas.withColumn('estado', 
    regexp_extract(col('purchase_address'), r'.*,\s+([A-Z]{2}).*', 1)
)

# Una vez terminado, muestro el nuevo df (sólo las columnas purchase_adress y estado sin abreviar)
df_con_estados.select('purchase_address', 'estado').show(truncate=False)

+---------------------------------------+------+
|purchase_address                       |estado|
+---------------------------------------+------+
|944 Walnut St, Boston, MA 02215        |MA    |
|185 Maple St, Portland, OR 97035       |OR    |
|538 Adams St, San Francisco, CA 94016  |CA    |
|738 10th St, Los Angeles, CA 90001     |CA    |
|387 10th St, Austin, TX 73301          |TX    |
|775 Willow St, San Francisco, CA 94016 |CA    |
|979 Park St, Los Angeles, CA 90001     |CA    |
|181 6th St, San Francisco, CA 94016    |CA    |
|867 Willow St, Los Angeles, CA 90001   |CA    |
|657 Johnson St, San Francisco, CA 94016|CA    |
|492 Walnut St, San Francisco, CA 94016 |CA    |
|322 6th St, San Francisco, CA 94016    |CA    |
|618 7th St, Los Angeles, CA 90001      |CA    |
|512 Wilson St, San Francisco, CA 94016 |CA    |
|440 Cedar St, Portland, OR 97035       |OR    |
|471 Center St, Los Angeles, CA 90001   |CA    |
|414 Walnut St, Boston, MA 02215        |MA    |
|220 9th St, Los Ang

##### 6. Guardar archivo final limpio con nombre `orders_data_clean.parquet` 

In [0]:
# Escribo el contenido del df obtenido en la celda anterior al parquet con el nombre solicitado
df_con_estados.write.format("parquet").save("/FileStore/Caso_1/output/orders_data_clean.parquet")

##### 7. Exportar archivo limpio en formato CSV 

In [0]:
# Leo el parquet y creo un dataframe con el contenido
df = spark.read.parquet("/FileStore/Caso_1/orders_data_clean.parquet")

# Creo un csv a partir del df que he obtenido y lo configuro para que use comas como delimitador y que tenga índices, entre otras.
df.write.option("header", "true") \
       .option("delimiter", ",") \
       .mode("overwrite") \
       .format("csv") \
       .save("/FileStore/Caso_1/output/orders_data_clean.csv")