In [1]:
from pyspark.sql import SparkSession,types,functions

In [2]:
spark = (SparkSession.builder.appName('PySpark-Project').getOrCreate())

In [3]:
orders_data = spark.read.parquet('orders_data.parquet')
orders_data.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin
0,2023-01-22 21:25:00,141234,iPhone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0
1,2023-01-28 14:15:00,141235,Lightning Charging Cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475
2,2023-01-17 13:33:00,141236,Wired Headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99
3,2023-01-05 20:33:00,141237,27in FHD Monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965
4,2023-01-25 11:59:00,141238,Wired Headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995


In [4]:
orders_data.printSchema()

root
 |-- order_date: timestamp_ntz (nullable = true)
 |-- order_id: long (nullable = true)
 |-- product: string (nullable = true)
 |-- product_id: double (nullable = true)
 |-- category: string (nullable = true)
 |-- purchase_address: string (nullable = true)
 |-- quantity_ordered: long (nullable = true)
 |-- price_each: double (nullable = true)
 |-- cost_price: double (nullable = true)
 |-- turnover: double (nullable = true)
 |-- margin: double (nullable = true)



In [None]:
#Criação de uma coluna para informar em qual parte do dia foi feito o pedido
orders_data = (         
    orders_data
    .withColumn(
        'time_of_day',
        functions.when((functions.hour('order_date') >= 0)& (functions.hour('order_date')<=5),'night')
        .when((functions.hour('order_date')>= 6)& (functions.hour('order_date')<=11),'morning')
        .when((functions.hour('order_date')>=12) & (functions.hour('order_date')<=17),'afternoon')
        .when((functions.hour('order_date')>=18)& (functions.hour('order_date')<=23),'evening')
        .otherwise(None)
    )
    #Após criar a coluna extra para identificar o periodo do pedido, criaremos o filtro
    .filter(functions.col('time_of_day')!='night')

    #Agora podemos alterar o tipo do dado
    .withColumn('order_date',functions.col('order_date').cast(types.DateType()))

)



In [None]:
#Visualização após criação da nova coluna
orders_data.show()

+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+-----------+
|order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|        cost_price|turnover|  margin|time_of_day|
+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+-----------+
|2023-01-22|  141234|              iPhone|5.638008983335E12|   Vêtements|944 Walnut St, Bo...|               1|     700.0|             231.0|   700.0|   469.0|    evening|
|2023-01-28|  141235|Lightning Chargin...|5.563319511488E12|Alimentation|185 Maple St, Por...|               1|     14.95|             7.475|   14.95|   7.475|  afternoon|
|2023-01-17|  141236|    Wired Headphones| 2.11397339522E12|   Vêtements|538 Adams St, San...|               2|     11.99|             5.995

In [7]:
orders_data.select('category').distinct().show()

+------------+
|    category|
+------------+
|Électronique|
|      Sports|
|   Vêtements|
|Alimentation|
+------------+



In [8]:
#Removendo TV's da tabela já que a loja não comercializa mais e garantindo todos os dados em minúsculo
orders_data = (
    orders_data
    .withColumn('product',functions.lower('product'))
    .withColumn('category',functions.lower('category'))
    .filter(~functions.col('product').contains('tv'))
)

In [None]:
#Coluna para informar de onde foi feito o pedido e qual o destino do pedido
orders_data = (
    orders_data
    .withColumn('address_split',functions.split('purchase_address',' '))
    .withColumn('purchase_state',functions.col('address_split').getItem(functions.size('address_split')-2))
    .drop('address_split')

)

In [None]:
#Conferindo quantidade de pedidos por estado
n_states = orders_data.groupBy("purchase_state").count().orderBy(functions.desc("count"))
n_states.show()

+--------------+-----+
|purchase_state|count|
+--------------+-----+
|            CA|68292|
|            NY|22831|
|            TX|22773|
|            MA|18360|
|            GA|13624|
|            WA|13592|
|            OR| 9186|
|            ME| 2263|
+--------------+-----+



In [None]:
#Exportar os dados tratados
(
    orders_data
    .write
    .parquet('orders_data_clean.parquet',mode='overwrite')
)