In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_date, year, lit,
    when, trim, regexp_replace
)
from pyspark.sql.types import (
    IntegerType, DoubleType, StringType, DateType
)

In [0]:
ped = spark.sql("select * from classes.bronze.pedidos")

In [0]:
column_mapping = {
    "nr_pedido": "order_id",
    "nr_linha_pedido": "order_line_id",
    "data_pedido": "order_date",
    "nome_vendedor": "salesperson_name",
    "estado": "state",
    "cidade": "city",
    "nome_produto": "product_name",
    "cod_produto": "product_code",
    "grupo_produto": "product_group",
    "qtde_produtos": "product_quantity",
    "preco_unitario": "unit_price",
    "valor_imposto": "tax_amount",
    "valor_total_linha": "total_line_value",
    "status_pedido": "order_status"
}


for old_col, new_col in column_mapping.items():
    ped = ped.withColumnRenamed(old_col, new_col)

In [0]:
ped.printSchema()

In [0]:
from pyspark.sql.functions import col, round

ped = ped.withColumn("tax_amount", round(col("tax_amount"), 2))

display(ped)

In [0]:
from pyspark.sql.functions import col, round, to_date, date_format, trim, initcap, year, month


In [0]:

ped = ped.dropDuplicates(['order_id', 'order_line_id'])

In [0]:

cols_texto = ["salesperson_name", "city", "product_name", "order_status"]
for c in cols_texto:
    ped = ped.withColumn(c, trim(initcap(col(c))))

In [0]:
ped = ped.withColumn("state", trim(col("state")))

In [0]:

ped = ped.na.fill(0, subset=["product_quantity", "unit_price", "tax_amount", "total_line_value"])


In [0]:

ped = ped.withColumn("tax_amount", round(col("tax_amount"), 2)) \
         .withColumn("order_date", to_date(col("order_date"), "dd-MM-yyyy")) 

display(ped)

In [0]:

ped.write.mode("overwrite") \
    .partitionBy("order_year", "order_month") \
    .saveAsTable("classes.silver.pedidos")
    