### Plan
- [X] Set up the environment
- [X] Load local CSV files
- [ ] Upload data to an S3 bucket
- [ ] Read data from the S3 bucket
- [ ] Write data to S3 in Parquet format with partitioning
- [ ] Perform upserts to S3 in Parquet format
- [X] Write data to S3 in Delta Table format
- [ ] Perform upserts to S3 in Delta Table format
- [ ] Load local JSON files
- [ ] Write JSON data to S3 in Delta Table format
- [X] Practice PySpark transformations and adhere to style guides
- [ ] Practice Repartition and Coalesce

### Set up the environment

In [1]:
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [10]:
from pyspark.sql import SparkSession

# SparkSession Definition
APP_NAME = 'Brazilian-Ecommerce'
spark = (SparkSession
        .builder
        .appName(APP_NAME)
        .master("local[*]")
        .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.2,io.delta:delta-spark_2.12:3.3.0')
        .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
        .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.DefaultAWSCredentialsProviderChain')  # This will check .aws/credentials
        .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
        .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')
        .getOrCreate()
    )


sc = spark.sparkContext

In [109]:
# Terminate Spark Session
spark.stop

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0x107913010>>

### Load local CSV files


In [11]:
input_folder = '../../storage/raw'
df_customers = spark.read.csv(f'{input_folder}/olist_customers_dataset.csv', inferSchema=True, header=True)
df_geolocation = spark.read.csv(f'{input_folder}/olist_geolocation_dataset.csv', inferSchema=True, header=True)
df_order_items = spark.read.csv(f'{input_folder}/olist_order_items_dataset.csv', inferSchema=True, header=True)
df_order_payments = spark.read.csv(f'{input_folder}/olist_order_payments_dataset.csv', inferSchema=True, header=True)
df_order_reviews = spark.read.csv(f'{input_folder}/olist_order_reviews_dataset.csv', inferSchema=True, header=True)
df_orders = spark.read.csv(f'{input_folder}/olist_orders_dataset.csv', inferSchema=True, header=True)
df_products = spark.read.csv(f'{input_folder}/olist_products_dataset.csv', inferSchema=True, header=True)
df_products = spark.read.csv(f'{input_folder}/olist_products_dataset.csv', inferSchema=True, header=True)
df_sellers = spark.read.csv(f'{input_folder}/olist_sellers_dataset.csv', inferSchema=True, header=True)
df_product_category = spark.read.csv(f'{input_folder}/product_category_name_translation.csv', inferSchema=True, header=True)

                                                                                

### Write data to S3 in Delta Table format

In [5]:
bucket = 'gb-delta-tables-test'

In [6]:
base_path = f"s3a://{bucket}/delta"

try:
    (df_customers.write
        .format("delta")
        .mode("overwrite")
        .save(f"{base_path}/customers")
    )
    print("Successfully wrote to S3!")
except Exception as e:
    print("Error writing to S3:", str(e))

                                                                                

Successfully wrote to S3!


In [8]:
bucket = 'gb-delta-tables-test'
base_path = f"s3a://{bucket}/delta"
table_path = f"{base_path}/order_items"

df_order_items = spark.read.csv(f'{input_folder}/olist_order_items_dataset.csv', inferSchema=True, header=True)

try:
    (df_order_items.write
        .format("delta")
        .mode("overwrite")
        .save(table_path)
    )
    print("Successfully wrote to S3!")
except Exception as e:
    print("Error writing to S3:", str(e))

                                                                                

Successfully wrote to S3!


In [9]:
bucket = 'gb-delta-tables-test'
base_path = f"s3a://{bucket}/delta"
table_path = f"{base_path}/order_items"

df_order_items_2 = spark.read.csv(f'{input_folder}/olist_order_items_dataset_2.csv', inferSchema=True, header=True)

In [None]:
spark.read.format('delta').load(table_path).createOrReplaceTempView('order_items')
spark.sql('SELECT COUNT(*) AS ct FROM order_items').show()

+--------+
|count(1)|
+--------+
|  112640|
+--------+



In [27]:
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, table_path)

merge_condition = 'target.order_id = source.order_id and target.order_item_id = source.order_item_id'

(deltaTable.alias('target')
    .merge(
        df_order_items_2.alias('source'),
        merge_condition
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


                                                                                

In [28]:
spark.read.format('delta').load(table_path).createOrReplaceTempView('order_items_2')
spark.sql('SELECT COUNT(*) AS ct FROM order_items_2').show()

+------+
|    ct|
+------+
|112650|
+------+



### Practice PySpark transformations and adhere to style guides

In [13]:
import pyspark.sql.functions as F

In [46]:
df_order_items.show()

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+------------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|shipping_limit_day|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+------------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|        2017-09-19|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|        2017-05-03|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|        17.87|        2018-01-18|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|        12.79|        2018-08-15|
|00042b26cf59d7ce6...|     

In [87]:
df_order_items_filtered = (df_order_items
        .filter(F.col('product_id').like('8d4f2bb7e93e6710a%'))
        .withColumn('shipping_limit_month', F.date_trunc('month', 'shipping_limit_date'))
    )

In [88]:
price_by_month_product = (df_order_items_filtered
    .groupBy(
        'product_id',
        'shipping_limit_month'
    )
    .agg(F.sum('price').alias('total_price')))

In [89]:
price_by_month_product = price_by_month_product.alias('pbmp')
df_order_items_filtered = df_order_items_filtered.alias('doi')

join_conditions = [
    price_by_month_product.product_id == df_order_items_filtered.product_id,
    price_by_month_product.shipping_limit_month == df_order_items_filtered.shipping_limit_month
]
(price_by_month_product
    .join(df_order_items_filtered, on=join_conditions, how='inner')
    .select(
        F.col('doi.order_id').alias('order_id'),
        F.col('doi.order_item_id').alias('order_item_id'),
        F.col('doi.product_id').alias('product_id'),
        F.col('doi.seller_id').alias('seller_id'),
        F.col('doi.shipping_limit_date').alias('shipping_limit_date'),
        F.col('doi.freight_value').alias('freight_value'),
        F.col('doi.price').alias('price'),
        F.col('pbmp.total_price').alias('total_price')
    )
    .withColumn('price_ratio', F.round(F.col('price') / F.col('total_price'), 4))
    .drop('total_price')
    .show()
)

+--------------------+-------------+--------------------+--------------------+-------------------+-------------+-----+-----------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|freight_value|price|price_ratio|
+--------------------+-------------+--------------------+--------------------+-------------------+-------------+-----+-----------+
|00054e8431b9d7675...|            1|8d4f2bb7e93e6710a...|7040e82f899a04d1b...|2017-12-14 12:10:31|        11.85| 19.9|     0.5559|
|2df5ebb637dfe6f39...|            1|8d4f2bb7e93e6710a...|7040e82f899a04d1b...|2018-02-07 02:10:33|        16.79| 19.9|        1.0|
|905bd749100612f28...|            1|8d4f2bb7e93e6710a...|7040e82f899a04d1b...|2017-12-04 03:33:17|         15.1| 15.9|     0.4441|
+--------------------+-------------+--------------------+--------------------+-------------------+-------------+-----+-----------+



In [20]:
df_order_items.groupBy('order_id', 'order_item_id').agg(F.count('*').alias('ct')).orderBy(F.desc('ct')).filter(F.col('ct') > 1).show()

+--------+-------------+---+
|order_id|order_item_id| ct|
+--------+-------------+---+
+--------+-------------+---+



In [102]:
from pyspark.sql import Window as W

w_month_price = (W.partitionBy(
    'product_id',
    F.date_trunc('month', 'shipping_limit_date')
))

(df_order_items
    .filter(F.col('product_id').like('8d4f2bb7e93e6710a%'))
    .withColumn('total_price', F.sum('price').over(w_month_price))
    .withColumn('price_ratio', F.round(F.col('price') / F.col('total_price'), 4))
    .drop('total_price')
    .show()
)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+-----------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|price_ratio|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+-----------+
|00054e8431b9d7675...|            1|8d4f2bb7e93e6710a...|7040e82f899a04d1b...|2017-12-14 12:10:31| 19.9|        11.85|     0.5559|
|905bd749100612f28...|            1|8d4f2bb7e93e6710a...|7040e82f899a04d1b...|2017-12-04 03:33:17| 15.9|         15.1|     0.4441|
|2df5ebb637dfe6f39...|            1|8d4f2bb7e93e6710a...|7040e82f899a04d1b...|2018-02-07 02:10:33| 19.9|        16.79|        1.0|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+-----------+

