In [0]:
# Access keys config
spark.conf.set("fs.s3a.access.key", "XXXXXXXXXXXXXXX")
spark.conf.set("fs.s3a.secret.key", "YYYYYYYYYYYYYYYYYYYYYYYYYYYY")
spark.conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.conf.set("fs.s3a.endpoint", "s3.amazonaws.com")
spark.conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

#### Creating Schemas

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, TimestampType

payments_schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("payment_sequential", IntegerType(), True),
    StructField("payment_type", StringType(), True),
    StructField("payment_installments", IntegerType(), True),
    StructField("payment_value", FloatType(), True)
])

products_schema = StructType([
    StructField("product_id", StringType(), False),
    StructField("product_category_name", StringType(), True),
    StructField("product_name_lenght", IntegerType(), True),
    StructField("product_description_length", IntegerType(), True),
    StructField("product_photos_qty", IntegerType(), True),
    StructField("product_weight_g", IntegerType(), True),
    StructField("product_length_cm", IntegerType(), True),
    StructField("product_height_cm", IntegerType(), True),
    StructField("product_width_cm", IntegerType(), True)
])

sellers_schema = StructType([
    StructField("seller_id", StringType(), False),
    StructField("seller_zip_code_prefix", IntegerType(), True),
    StructField("seller_city", StringType(), True),
    StructField("seller_state", StringType(), True)
])

orders_schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("customer_id", StringType(), True),
    StructField("order_status", StringType(), True),
    StructField("order_purchase_timestamp", TimestampType(), True),
    StructField("order_delivered_carrier_date", TimestampType(), True),
    StructField("order_delivered_customer_date", TimestampType(), True),
    StructField("order_estimated_delivery_date", TimestampType(), True)
])

order_items_schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("order_item_id", IntegerType(), True),
    StructField("product_id", StringType(), True),
    StructField("seller_id", StringType(), True),
    StructField("shipping_limit_date", TimestampType(), True),
    StructField("price", FloatType(), True),
    StructField("freight_value", FloatType(), True)
])

customers_schema = StructType([
    StructField("customer_id", StringType(), False),
    StructField("customer_unique_id", StringType(), True),
    StructField("customer_zip_code_prefix", IntegerType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True)
])


#### Ingesting Data

In [0]:
payments_df = spark.read.schema(payments_schema).option("header", "true").csv("s3://ecommerce-sales-project/bronze/payments.csv")
products_df = spark.read.schema(products_schema).option("header", "true").csv("s3://ecommerce-sales-project/bronze/products.csv")
sellers_df = spark.read.schema(sellers_schema).option("header", "true").csv("s3://ecommerce-sales-project/bronze/sellers.csv")
orders_df = spark.read.schema(orders_schema).option("header", "true").csv("s3://ecommerce-sales-project/bronze/orders.csv")
order_items_df = spark.read.schema(order_items_schema).option("header", "true").csv("s3://ecommerce-sales-project/bronze/order_items.csv")
customers_df = spark.read.schema(customers_schema).option("header", "true").csv("s3://ecommerce-sales-project/bronze/customers.csv")

#### Data Transformation

In [0]:
#transforming payments_df
from pyspark.sql.functions import current_timestamp

payments_processed_df = payments_df.withColumn("payment_processing_time", current_timestamp())

In [0]:
payments_processed_df.show(5)

+--------------------+------------------+------------+--------------------+-------------+-----------------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|payment_processing_time|
+--------------------+------------------+------------+--------------------+-------------+-----------------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|   2024-07-02 14:18:...|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|   2024-07-02 14:18:...|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|   2024-07-02 14:18:...|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|   2024-07-02 14:18:...|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|   2024-07-02 14:18:...|
+--------------------+------------------+------------+--------------------+-------------

In [0]:
#transforming products_df

products_processed_df = products_df.select("product_id", "product_category_name", \
                                           "product_weight_g", "product_length_cm", \
                                           "product_height_cm", "product_width_cm") \
                                           .withColumn("product_category_name", upper(col("product_category_name"))) \
                                           .withColumnRenamed("product_category_name", "product_category")

In [0]:
products_processed_df.show(5)

+--------------------+----------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+----------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|       PERFUMERY|             225|               16|               10|              14|
|3aa071139cb16b67c...|             ART|            1000|               30|               18|              20|
|96bd76ec8810374ed...|   SPORT LEISURE|             154|               18|                9|              15|
|cef67bcfe19066a93...|          BABIES|             371|               26|                4|              26|
|9dc1a7de274444849...|      HOUSEWARES|             625|               20|               17|              13|
+--------------------+----------------+----------------+-----------------+-----------------+----------------+
only showi

In [0]:
#transforming sellers_df

sellers_processed_df = sellers_df.select("seller_id", "seller_city", "seller_state")

In [0]:
sellers_processed_df.show(5)

+--------------------+-----------------+------------+
|           seller_id|      seller_city|seller_state|
+--------------------+-----------------+------------+
|3442f8959a84dea7e...|         campinas|          SP|
|d1b65fc7debc3361e...|       mogi guacu|          SP|
|ce3ad9de960102d06...|   rio de janeiro|          RJ|
|c0f3eea2e14555b6f...|        sao paulo|          SP|
|51a04a8a6bdcb23de...|braganca paulista|          SP|
+--------------------+-----------------+------------+
only showing top 5 rows



In [0]:
#transforming orders_df
from pyspark.sql.functions import when, col

orders_processed_df = orders_df.select("order_id", "customer_id", "order_status", "order_purchase_timestamp", \
                                "order_estimated_delivery_date", "order_delivered_customer_date") \
                               .withColumn("order_status", when(col("order_status") == "invoiced", "delayed") \
                               .otherwise(col("order_status"))) \
                               .withColumnRenamed("order_purchase_timestamp", "order_purchase_date") \
                               .withColumnRenamed("order_delivered_customer_date", "order_delivery_date")

In [0]:
orders_processed_df.show(10)

+--------------------+--------------------+------------+-------------------+-----------------------------+-------------------+
|            order_id|         customer_id|order_status|order_purchase_date|order_estimated_delivery_date|order_delivery_date|
+--------------------+--------------------+------------+-------------------+-----------------------------+-------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|2017-10-02 10:56:33|          2017-10-10 21:25:13|2017-10-04 19:55:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|2018-07-24 20:41:37|          2018-08-07 15:27:45|2018-07-26 14:31:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|   delivered|2018-08-08 08:38:49|          2018-08-17 18:06:29|2018-08-08 13:50:00|
|949d5b44dbf5de918...|f88197465ea7920ad...|   delivered|2017-11-18 19:28:06|          2017-12-02 00:28:42|2017-11-22 13:39:59|
|ad21c59c0840e6cb8...|8ab97904e6daea886...|   delivered|2018-02-13 21:18:39|          2018-02-16 18:17:02|2018-

In [0]:
#transforming order_items_df
from pyspark.sql.functions import round

order_items_processed_df = order_items_df.select("order_id", "order_item_id", "product_id", \
                                          "seller_id", "price", "freight_value") \
                                         .withColumn("total_price", round(col("price") + col("freight_value"),2))

In [0]:
order_items_processed_df.show(5)

+--------------------+-------------+--------------------+--------------------+-----+-------------+-----------+
|            order_id|order_item_id|          product_id|           seller_id|price|freight_value|total_price|
+--------------------+-------------+--------------------+--------------------+-----+-------------+-----------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...| 58.9|        13.29|      72.19|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|239.9|        19.93|     259.83|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|199.0|        17.87|     216.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|12.99|        12.79|      25.78|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|199.9|        18.14|     218.04|
+--------------------+-------------+--------------------+--------------------+-----+-------------+-----------+
o

In [0]:
#transforming customers_df

customers_processed_df = customers_df.withColumnRenamed("customer_zip_code_prefix", "zip_code")

In [0]:
customers_processed_df.show(5)

+--------------------+--------------------+--------+--------------------+--------------+
|         customer_id|  customer_unique_id|zip_code|       customer_city|customer_state|
+--------------------+--------------------+--------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|   13056|            campinas|            SP|
+--------------------+--------------------+--------+--------------------+--------------+
only showing top 5 rows




#### Saving files to 'Silver' bucket

In [0]:
payments_processed_df.write.mode("overwrite").parquet("s3://ecommerce-sales-project/silver/payments_processed")
products_processed_df.write.mode("overwrite").parquet("s3://ecommerce-sales-project/silver/products_processed")
sellers_processed_df.write.mode("overwrite").parquet("s3://ecommerce-sales-project/silver/sellers_processed")
orders_processed_df.write.mode("overwrite").parquet("s3://ecommerce-sales-project/silver/orders_processed")
order_items_processed_df.write.mode("overwrite").parquet("s3://ecommerce-sales-project/silver/order_items_processed")
customers_processed_df.write.mode("overwrite").parquet("s3://ecommerce-sales-project/silver/customers_processed")