### Mount to ADLS

In [None]:
storageAccountName = ""
storageAccountAccessKey = ""
sasToken = ""
blobContainerName = "olistmarketplace"
mountPoint = "/mnt/olistmarketplaceim/"
if not any(mount.mountPoint == mountPoint for mount in dbutils.fs.mounts()):
  try:
    dbutils.fs.mount(
      source = "wasbs://{}@{}.blob.core.windows.net".format(blobContainerName, storageAccountName),
      mount_point = mountPoint,
      #extra_configs = {'fs.azure.account.key.' + storageAccountName + '.blob.core.windows.net': storageAccountAccessKey}
      extra_configs = {'fs.azure.sas.' + blobContainerName + '.' + storageAccountName + '.blob.core.windows.net': sasToken}
    )
    print("mount succeeded!")
  except Exception as e:
    print("mount exception", e)

In [None]:
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.types import StringType, IntegerType, TimestampType, DoubleType
from pyspark.sql.functions import lit

from datetime import datetime

### Preparing to Silver

- Select all data with most recent date
- Select just relevant data

### Olist Customer

In [None]:
df_customers = spark.read.format('parquet').option("header", "true").load("/mnt/olistmarketplaceim/bronze/customers/")
w = Window.partitionBy('customer_id')
df_customers = df_customers.withColumn('maxCarga', F.max('DT_CARGA').over(w))\
    .where(F.col('DT_CARGA') == F.col('maxCarga'))\
    .drop(*['maxCarga','DT_CARGA'])

### Olist Geolocation

In [None]:
df_geolocation = spark.read.format('parquet').option("header", "true").load("/mnt/olistmarketplaceim/bronze/geolocation/")
w = Window.partitionBy('geolocation_state')
df_geolocation = df_geolocation.withColumn('maxCarga', F.max('DT_CARGA').over(w))\
    .where(F.col('DT_CARGA') == F.col('maxCarga'))\
    .drop(*['maxCarga','DT_CARGA'])

### Olist Items

In [None]:
df_items = spark.read.format('parquet').option("header", "true").load("/mnt/olistmarketplaceim/bronze/items/")
w = Window.partitionBy('order_id')
df_items = df_items.withColumn('maxCarga', F.max('DT_CARGA').over(w))\
    .where(F.col('DT_CARGA') == F.col('maxCarga'))\
    .drop(*['maxCarga','DT_CARGA'])

### Olist Products

In [None]:
df_products = spark.read.format('parquet').option("header", "true").load("/mnt/olistmarketplaceim/bronze/products/")
w = Window.partitionBy('product_id')
df_products = df_products.withColumn('maxCarga', F.max('DT_CARGA').over(w))\
    .where(F.col('DT_CARGA') == F.col('maxCarga'))\
    .drop(*['maxCarga','DT_CARGA'])


### Olist Orders

In [None]:
df_orders = spark.read.format('parquet').option("header", "true").load("/mnt/olistmarketplaceim/bronze/orders/")
w = Window.partitionBy('order_id')
df_orders = df_orders.withColumn('maxCarga', F.max('DT_CARGA').over(w))\
    .where(F.col('DT_CARGA') == F.col('maxCarga'))\
    .drop(*['maxCarga','DT_CARGA'])

### Olist Payments

In [None]:
df_payments = spark.read.format('parquet').option("header", "true").load("/mnt/olistmarketplaceim/bronze/payments/")
w = Window.partitionBy('order_id')
df_payments = df_payments.withColumn('maxCarga', F.max('DT_CARGA').over(w))\
    .where(F.col('DT_CARGA') == F.col('maxCarga'))\
    .drop(*['maxCarga','DT_CARGA'])

### Olist Sellers

In [None]:
df_sellers = spark.read.format('parquet').option("header", "true").load("/mnt/olistmarketplaceim/bronze/sellers/")
w = Window.partitionBy('seller_id')
df_sellers = df_sellers.withColumn('maxCarga', F.max('DT_CARGA').over(w))\
    .where(F.col('DT_CARGA') == F.col('maxCarga'))\
    .drop(*['maxCarga','DT_CARGA'])

### Olist Reviews

In [None]:
df_reviews = spark.read.format('parquet').option("header", "true").load("/mnt/olistmarketplaceim/bronze/reviews/")
w = Window.partitionBy('review_id')
df_reviews = df_reviews.withColumn('maxCarga', F.max('DT_CARGA').over(w))\
    .where(F.col('DT_CARGA') == F.col('maxCarga'))\
    .drop(*['maxCarga','DT_CARGA'])

### Bronze to Silver

In [None]:
# Join
df_big = df_orders.join(df_payments, ["order_id"])\
                  .join(df_items, ["order_id"])\
                  .join(df_reviews, ["order_id"])\
                  .join(df_customers, ["customer_id"])

df_big = df_big.join(df_products, ["product_id"])

df_big = df_big.join(df_sellers, ["seller_id"])

In [None]:
# Cast to right data types
df_big_silver = df_big.withColumn("order_purchase_timestamp", F.col("order_purchase_timestamp").cast(TimestampType()))\
    .withColumn("order_approved_at", F.col("order_approved_at").cast(TimestampType()))\
        .withColumn("order_delivered_carrier_date", F.col("order_delivered_carrier_date").cast(TimestampType()))\
            .withColumn("order_delivered_customer_date", F.col("order_delivered_customer_date").cast(TimestampType()))\
                .withColumn("order_estimated_delivery_date", F.col("order_estimated_delivery_date").cast(TimestampType()))\
                    .withColumn("payment_installments", F.col("payment_installments").cast(IntegerType()))\
                        .withColumn("payment_value", F.col("payment_value").cast(DoubleType()))\
                            .withColumn("shipping_limit_date", F.col("shipping_limit_date").cast(TimestampType()))\
                                .withColumn("price", F.col("price").cast(DoubleType()))\
                                    .withColumn("freight_value", F.col("freight_value").cast(DoubleType()))\
                                        .withColumn("review_score", F.col("review_score").cast(IntegerType()))\
                                            .withColumn("review_creation_date", F.col("review_creation_date").cast(TimestampType()))\
                                                .withColumn("review_answer_timestamp", F.col("review_answer_timestamp").cast(TimestampType()))\
                                                    .withColumn("customer_zip_code_prefix", F.col("customer_zip_code_prefix").cast(IntegerType()))\
                                                        .withColumn("product_name_lenght", F.col("product_name_lenght").cast(IntegerType()))\
                                                            .withColumn("product_description_lenght", F.col("product_description_lenght").cast(IntegerType()))\
                                                                .withColumn("product_photos_qty", F.col("product_photos_qty").cast(IntegerType()))\
                                                                    .withColumn("product_weight_g", F.col("product_weight_g").cast(IntegerType()))\
                                                                        .withColumn("product_length_cm", F.col("product_length_cm").cast(IntegerType()))\
                                                                            .withColumn("product_height_cm", F.col("product_height_cm").cast(IntegerType()))\
                                                                                .withColumn("product_width_cm", F.col("product_width_cm").cast(IntegerType()))\
                                                                                    .withColumn("seller_zip_code_prefix", F.col("seller_zip_code_prefix").cast(IntegerType()))

In [None]:
df_big_silver.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- payment_sequential: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)
 |-- order_item_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_comment_titl

In [None]:
# Getting datetime now
current_dateTime = datetime.now()
current_dateTime = current_dateTime.strftime('%Y-%m-%dT%H:%M:%S')

In [None]:
# Add column with current datetime
df_big_silver = df_big_silver.withColumn('DT_CARGA', lit(current_dateTime).cast(TimestampType()))
df_geolocation = df_geolocation.withColumn('DT_CARGA', lit(current_dateTime).cast(TimestampType()))

In [None]:
# Save data in one big table format in silver layer
df_big_silver.write.format("delta").save("/mnt/olistmarketplaceim/silver/olist_obt/")

In [None]:
# Save geolocation table in silver layer
df_geolocation.write.format("delta").save("/mnt/olistmarketplaceim/silver/geolocation/")