In [22]:
import sys

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from minio import Minio
from utils.common import get_spark_session, get_minio_client
from pyspark.sql.functions import col, when, last, desc, dense_rank, to_date, date_format, lit
from utils.bronze import Bronze

username = 'kayden'

spark = get_spark_session(
        f"{username}_ETL",
        username,
    )

log_path = f"s3a://{username}/logging"

spark._jsc.hadoopConfiguration().set("log4j.appender.file", "org.apache.log4j.FileAppender")
spark._jsc.hadoopConfiguration().set("log4j.appender.file.File", log_path)
spark._jsc.hadoopConfiguration().set("log4j.appender.file.layout", "org.apache.log4j.PatternLayout")
spark._jsc.hadoopConfiguration().set("log4j.appender.file.layout.ConversionPattern", "%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n")

    # mc = get_minio_client(username)
MINIO_ENDPOINT = "minio:9000/"
MINIO_ACCESS_KEY = username
MINIO_SECRET_KEY = "password"

mc = Minio(
        endpoint=MINIO_ENDPOINT,
        access_key=MINIO_ACCESS_KEY,
        secret_key=MINIO_SECRET_KEY,
        secure=False
    )

In [11]:
spark.sql(f"USE bronze_{username}")
table = f"bronze_{username}.shopee_sales"
query = f"""
    SELECT 
        order_number,
        order_date,
        ship_date,
        ordered_quantity as order_quantity,
        unit_price,
        unit_discount,
        CAST(total_order_price AS FLOAT) as sales_amount,
        payment_date,
        product_key,
        product_name,
        category_name as product_category,
        unit_price as price,
        weight,
        DAY(order_date) AS day,
        MONTH(order_date) AS month,
        YEAR(order_date) as year,
        QUARTER(order_date) as quarter,
        DATE_FORMAT(order_date, 'EEEE') AS day_of_week,
        DAYOFWEEK(order_date) AS day_of_week_number,
        shop_discount_code,
        shopee_discount_code,
        tracking_code,
        shipping_company,
        buyer_name as customer_name
    FROM {table}
"""

sql_transformed_df = spark.sql(query)

sql_transformed_df.printSchema()

24/05/31 08:08:32 WARN DeltaLog: Change in the table id detected while updating snapshot. 
Previous snapshot = Snapshot(path=s3a://kayden/bronze/shopee_sales/_delta_log, version=13, metadata=Metadata(bae77174-2003-4b8c-90c2-fccbe889fdf2,null,null,Format(parquet,Map()),{"type":"struct","fields":[{"name":"order_number","type":"string","nullable":true,"metadata":{}},{"name":"order_date","type":"timestamp","nullable":true,"metadata":{}},{"name":"order_status","type":"string","nullable":true,"metadata":{}},{"name":"tracking_code","type":"string","nullable":true,"metadata":{}},{"name":"shipping_company","type":"string","nullable":true,"metadata":{}},{"name":"delivery_method","type":"string","nullable":true,"metadata":{}},{"name":"order_type","type":"string","nullable":true,"metadata":{}},{"name":"expected_delivery_date","type":"timestamp","nullable":true,"metadata":{}},{"name":"ship_date","type":"string","nullable":true,"metadata":{}},{"name":"product_key","type":"string","nullable":true,"me

root
 |-- order_number: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- order_quantity: integer (nullable = true)
 |-- unit_price: float (nullable = true)
 |-- unit_discount: float (nullable = true)
 |-- sales_amount: float (nullable = true)
 |-- payment_date: date (nullable = true)
 |-- product_key: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- price: float (nullable = true)
 |-- weight: double (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- day_of_week_number: integer (nullable = true)
 |-- shop_discount_code: string (nullable = true)
 |-- shopee_discount_code: string (nullable = true)
 |-- tracking_code: string (nullable = true)
 |-- shipping_company: string (nullable = true)
 |-- c

In [12]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))
# sql_transformed_df.toPandas().head(5)
sql_transformed_df.show(10)

24/05/31 08:09:17 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+----------+----------+--------------+----------+-------------+------------+------------+-----------+--------------------+----------------+---------+------+---+-----+----+-------+-----------+------------------+------------------+--------------------+-------------+----------------+-------------------+
|        order_number|order_date| ship_date|order_quantity|unit_price|unit_discount|sales_amount|payment_date|product_key|        product_name|product_category|    price|weight|day|month|year|quarter|day_of_week|day_of_week_number|shop_discount_code|shopee_discount_code|tracking_code|shipping_company|      customer_name|
+--------------------+----------+----------+--------------+----------+-------------+------------+------------+-----------+--------------------+----------------+---------+------+---+-----+----+-------+-----------+------------------+------------------+--------------------+-------------+----------------+-------------------+
|SO20210424/SHOPPE-23|2021-05-0

In [14]:
from pyspark.sql import DataFrame, Window
from pyspark.sql.functions import col, when, last, desc, dense_rank, to_date, date_format
cleaned_df = sql_transformed_df \
            .filter(col("order_number").isNotNull() & col("product_key").isNotNull() & col("order_date").isNotNull()) \
            .withColumn("product_name", when(col("product_name").isNull(), 
                                            last("product_name", ignorenulls=True)
                                            .over(Window.partitionBy("product_key").orderBy("order_date")))
                        .otherwise(col("product_name"))) \
            .na.drop(subset=["order_number", "product_key", "order_date", "product_name"]) \
            .withColumn("rank", dense_rank().over(Window.partitionBy("order_number", "product_key").orderBy(desc("order_date")))) \
            .filter(col("rank") == 1) \
            .drop("rank")
cleaned_df.show(10)
null_core_columns = cleaned_df.filter(col("order_number").isNull() | col("product_key").isNull() | col("order_date").isNull() | col("product_name").isNull())
    
    # Count the number of rows with null core columns
null_core_count = null_core_columns.count()

                                                                                

+--------------------+----------+----------+--------------+----------+-------------+------------+------------+-----------+--------------------+----------------+--------+------+---+-----+----+-------+-----------+------------------+------------------+--------------------+-------------+----------------+--------------------+
|        order_number|order_date| ship_date|order_quantity|unit_price|unit_discount|sales_amount|payment_date|product_key|        product_name|product_category|   price|weight|day|month|year|quarter|day_of_week|day_of_week_number|shop_discount_code|shopee_discount_code|tracking_code|shipping_company|       customer_name|
+--------------------+----------+----------+--------------+----------+-------------+------------+------------+-----------+--------------------+----------------+--------+------+---+-----+----+-------+-----------+------------------+------------------+--------------------+-------------+----------------+--------------------+
|SO20210101/SHOPPE-10|2021-01-0

                                                                                

In [15]:
print("number of rows with null core columns:", null_core_count)

number of rows with null core columns: 0


In [16]:
cleaned_df.printSchema()

root
 |-- order_number: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- order_quantity: integer (nullable = true)
 |-- unit_price: float (nullable = true)
 |-- unit_discount: float (nullable = true)
 |-- sales_amount: float (nullable = true)
 |-- payment_date: date (nullable = true)
 |-- product_key: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- price: float (nullable = true)
 |-- weight: double (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- day_of_week_number: integer (nullable = true)
 |-- shop_discount_code: string (nullable = true)
 |-- shopee_discount_code: string (nullable = true)
 |-- tracking_code: string (nullable = true)
 |-- shipping_company: string (nullable = true)
 |-- c

In [17]:
numeric_columns = ['unit_price', 'unit_discount', 'sales_amount', 'price', 'weight']
for col_name in numeric_columns:
    cleaned_df = cleaned_df.withColumn(col_name, when(col(col_name).isNull() | (col(col_name) < 0), 0).otherwise(col(col_name)))

cleaned_df = cleaned_df.withColumn('order_quantity', when(col('order_quantity').isNull() | (col('order_quantity') == 0), 1).otherwise(col('order_quantity')))

# Replace null or 0 values in sales_amount with (unit_price * ordered_quantity) - unit_discount
cleaned_df = cleaned_df.withColumn('sales_amount', when((col('sales_amount').isNull()) | (col('sales_amount') == 0), (col('unit_price') * col('order_quantity')) - col('unit_discount')).otherwise(col('sales_amount')))
    
# Format date columns to 'yyyy-MM-dd'
date_columns = ['order_date', 'ship_date', 'payment_date']
for col_name in date_columns:
    cleaned_df = cleaned_df.withColumn(col_name, to_date(col(col_name), 'yyyy-MM-dd').cast('date'))

# Format order_date to 'yyyy-MM-dd'
cleaned_df = cleaned_df.withColumn('order_date', date_format(col('order_date'), 'yyyy-MM-dd'))

cleaned_df.show()



+--------------------+----------+----------+--------------+----------+-------------+------------+------------+-----------+--------------------+----------------+--------+------+---+-----+----+-------+-----------+------------------+------------------+--------------------+-------------+----------------+--------------------+
|        order_number|order_date| ship_date|order_quantity|unit_price|unit_discount|sales_amount|payment_date|product_key|        product_name|product_category|   price|weight|day|month|year|quarter|day_of_week|day_of_week_number|shop_discount_code|shopee_discount_code|tracking_code|shipping_company|       customer_name|
+--------------------+----------+----------+--------------+----------+-------------+------------+------------+-----------+--------------------+----------------+--------+------+---+-----+----+-------+-----------+------------------+------------------+--------------------+-------------+----------------+--------------------+
|SO20210101/SHOPPE-10|2021-01-0

                                                                                

In [19]:
def __ensure_data_quality(df: DataFrame, quality_issues) -> bool:
    quality_checks_passed = True
    
    # Example quality checks:
    
    # Check for null values in critical columns
    null_checks = ["order_number", "order_date", "product_key", "product_name"]
    for column in null_checks:
        null_count = df.filter(col(column).isNull()).count()
        if null_count > 0:
            quality_issues.append(f"Column '{column}' has {null_count} null values.")
            quality_checks_passed = False

    # Check for negative values in numerical columns
    negative_checks = ['unit_price', 'unit_discount', 'sales_amount', 'price', 'weight']
    for column in negative_checks:
        negative_count = df.filter(col(column) < 0).count()
        if negative_count > 0:
            quality_issues.append(f"Column '{column}' has {negative_count} negative values.")
            quality_checks_passed = False

    # Check for invalid date ranges
    date_checks = ['order_date', 'ship_date', 'payment_date']
    for column in date_checks:
        invalid_dates_count = df.filter(col(column) > lit("2100-01-01")).count()
        if invalid_dates_count > 0:
            quality_issues.append(f"Column '{column}' has {invalid_dates_count} invalid future dates.")
            quality_checks_passed = False

    # Add additional quality checks as needed

    return quality_checks_passed, quality_issues

def __report_quality_issues(quality_issues):
    # Report quality issues (e.g., log to a file, print to console, etc.)
    print("Data quality issues found:")
    for issue in quality_issues:
        print(issue)

In [23]:
if __ensure_data_quality(cleaned_df, []):
    print("passed")
else:
    __report_quality_issues()



passed


                                                                                

In [30]:
def __write_to_silver(df: DataFrame, username):
    source = 'shopee'
    silver_table_path = f's3a://{username}/silver/{source}_sales_silver'
    silver_table = f"silver_{username}.{source}_sales_silver"
    try: 
        if DeltaTable.isDeltaTable(spark, silver_table_path):
            delta_table = DeltaTable.forPath(spark, silver_table_path)
            delta_table.alias("tgt").merge(
                df.alias("src"),
                "tgt.order_number = src.order_number AND tgt.product_key = src.product_key"
            ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
        else:
            df.write.format("delta").mode("overwrite").save(silver_table_path)

        spark.sql(f"CREATE TABLE IF NOT EXISTS {silver_table} USING DELTA LOCATION '{silver_table_path}'")
        print("Data has been successfully loaded to Silver.")
    except Exception as e:
        # Handle the error appropriately
        print("Error occurred while load data to Silver:", str(e))

In [31]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS silver_{username}")
__write_to_silver(cleaned_df, username)

24/05/31 08:23:24 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`silver_kayden`.`shopee_sales_silver` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


Data has been successfully loaded to Silver.


In [35]:
spark.sql("USE silver_kayden")
query_df = spark.sql("SELECT * FROM silver_kayden.shopee_sales_silver LIMIT 10")
query_df.show()

                                                                                

+--------------------+----------+----------+--------------+----------+-------------+------------+------------+-----------+--------------------+----------------+--------+------+---+-----+----+-------+-----------+------------------+------------------+--------------------+-------------+----------------+--------------------+
|        order_number|order_date| ship_date|order_quantity|unit_price|unit_discount|sales_amount|payment_date|product_key|        product_name|product_category|   price|weight|day|month|year|quarter|day_of_week|day_of_week_number|shop_discount_code|shopee_discount_code|tracking_code|shipping_company|       customer_name|
+--------------------+----------+----------+--------------+----------+-------------+------------+------------+-----------+--------------------+----------------+--------+------+---+-----+----+-------+-----------+------------------+------------------+--------------------+-------------+----------------+--------------------+
|SO20210101/SHOPPE...|2021-01-1

In [48]:
class Silver:

    # Set up logging
    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
    logger = logging.getLogger(__name__)

    def __init__(self, spark, username, source):
        self.spark = spark
        self.username = username
        # self.bronze_df = self.__read_bronze_data()
        self.source = source
        self.quality_issues = []

    def run(self):
        self.logger.info("Starting ETL process.")
        self.spark.sql(f"CREATE DATABASE IF NOT EXISTS silver_{self.username}")
        transformed_df = self.transform()
        if self.__ensure_data_quality(transformed_df):
            self.__write_to_silver(transformed_df)
        else:
            self.__report_quality_issues()

    # def __read_bronze_data(self):
    #     bronze_table_path = f"s3a://{self.username}/bronze/{self.source}_sales"
    #     logger.info(f"Reading bronze data from {bronze_table_path}.")
    #     df = self.spark.read.format("delta").load(bronze_table_path)
    #     if df.schema == shopee_bronze_schema:
    #         logger.info("Bronze data schema validated.")
    #         return df
    #     else:
    #         logger.error("Schema mismatch for the Bronze table data.")
    #         raise ValueError("Schema mismatch for the Bronze table data.")

    def transform(self):
        if self.source == 'shopee':
            self.logger.info("Transforming Shopee data.")
            return self.__shopee_transform()
        
    def __shopee_transform(self):
        self.logger.info("Starting Shopee data transformation.")
        self.spark.sql(f"USE bronze_{self.username}")
        table = f"bronze_{self.username}.shopee_sales"
        query = f"""
            SELECT 
                order_number,
                order_date,
                ship_date,
                ordered_quantity as order_quantity,
                unit_price,
                unit_discount,
                CAST(total_order_price AS FLOAT) as sales_amount,
                payment_date,
                product_key,
                product_name,
                category_name as product_category,
                unit_price as price,
                weight,
                DAY(order_date) AS day,
                MONTH(order_date) AS month,
                YEAR(order_date) as year,
                QUARTER(order_date) as quarter,
                DATE_FORMAT(order_date, 'EEEE') AS day_of_week,
                DAYOFWEEK(order_date) AS day_of_week_number,
                shop_discount_code,
                shopee_discount_code,
                tracking_code,
                shipping_company,
                buyer_name as customer_name
            FROM {table}
        """

        sql_transformed_df = self.spark.sql(query)
        self.logger.info("Shopee data transformed using SQL query.")

        # Clean data
        cleaned_df = sql_transformed_df \
            .filter(col("order_number").isNotNull() & col("product_key").isNotNull() & col("order_date").isNotNull()) \
            .withColumn("product_name", when(col("product_name").isNull(), 
                                            last("product_name", ignorenulls=True)
                                            .over(Window.partitionBy("product_key").orderBy("order_date")))
                        .otherwise(col("product_name"))) \
            .na.drop(subset=["order_number", "product_key", "order_date", "product_name"]) \
            .withColumn("rank", dense_rank().over(Window.partitionBy("order_number", "product_key").orderBy(desc("order_date")))) \
            .filter(col("rank") == 1) \
            .drop("rank")
        
        # Transform data
        numeric_columns = ['unit_price', 'unit_discount', 'sales_amount', 'price', 'weight']
        for col_name in numeric_columns:
            transformed_df = cleaned_df.withColumn(col_name, when(col(col_name).isNull() | (col(col_name) < 0), 0).otherwise(col(col_name)))

        transformed_df = transformed_df.withColumn('order_quantity', when(col('order_quantity').isNull() | (col('order_quantity') == 0), 1).otherwise(col('order_quantity')))
        
        # Replace null or 0 values in sales_amount with (unit_price * ordered_quantity) - unit_discount
        transformed_df = transformed_df.withColumn('sales_amount', when((col('sales_amount').isNull()) | (col('sales_amount') == 0), (col('unit_price') * col('order_quantity')) - col('unit_discount')).otherwise(col('sales_amount')))
            
        # Format date columns to 'yyyy-MM-dd'
        date_columns = ['order_date', 'ship_date', 'payment_date']
        for col_name in date_columns:
            transformed_df = transformed_df.withColumn(col_name, to_date(col(col_name), 'yyyy-MM-dd').cast('date'))
        
        # Format order_date to 'yyyy-MM-dd'
        transformed_df = transformed_df.withColumn('order_date', date_format(col('order_date'), 'yyyy-MM-dd'))

        self.logger.info("Shopee data transformation completed.")
        return transformed_df
    
    def __ensure_data_quality(self, df: DataFrame) -> bool:
        self.logger.info("Starting data quality checks.")
        quality_checks_passed = True
        
        # Example quality checks:
        
        # Check for null values in critical columns
        null_checks = ["order_number", "order_date", "product_key", "product_name"]
        for column in null_checks:
            null_count = df.filter(col(column).isNull()).count()
            if null_count > 0:
                issue = f"Column '{column}' has {null_count} null values."
                self.quality_issues.append(issue)
                self.logger.warning(issue)
                quality_checks_passed = False

        # Check for negative values in numerical columns
        negative_checks = ['unit_price', 'unit_discount', 'sales_amount', 'price', 'weight']
        for column in negative_checks:
            negative_count = df.filter(col(column) < 0).count()
            if negative_count > 0:
                issue = f"Column '{column}' has {negative_count} negative values."
                self.quality_issues.append(issue)
                self.logger.warning(issue)
                quality_checks_passed = False

        # Check for invalid date ranges
        date_checks = ['order_date', 'ship_date', 'payment_date']
        for column in date_checks:
            invalid_dates_count = df.filter(col(column) > lit("2100-01-01")).count()
            if invalid_dates_count > 0:
                issue = f"Column '{column}' has {invalid_dates_count} invalid future dates."
                self.quality_issues.append(issue)
                self.logger.warning(issue)
                quality_checks_passed = False

        # Add additional quality checks as needed

        self.logger.info("Data quality checks completed.")
        return quality_checks_passed

    def __write_to_silver(self, df: DataFrame):
        silver_table_path = f's3a://{self.username}/silver/{self.source}_sales_silver'
        silver_table = f"silver_{self.username}.{self.source}_sales_silver"
        try: 
            self.logger.info(f"Writing data to Silver at {silver_table_path}.")
            if DeltaTable.isDeltaTable(self.spark, silver_table_path):
                delta_table = DeltaTable.forPath(self.spark, silver_table_path)
                delta_table.alias("tgt").merge(
                    df.alias("src"),
                    "tgt.order_number = src.order_number AND tgt.product_key = src.product_key"
                ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
            else:
                df.write.format("delta").mode("overwrite").save(silver_table_path)

            self.spark.sql(f"CREATE TABLE IF NOT EXISTS {silver_table} USING DELTA LOCATION '{silver_table_path}'")
            self.logger.info("Data has been successfully loaded to Silver.")
        except Exception as e:
            self.logger.error("Error occurred while loading data to Silver:", exc_info=True)

    def __report_quality_issues(self):
        # Report quality issues (e.g., log to a file, print to console, etc.)
        self.logger.info("Data quality issues found:")
        for issue in self.quality_issues:
            self.logger.error(issue)

In [49]:
silver = Silver(spark, username, 'shopee')
silver.run()

INFO:__main__:Starting ETL process.
INFO:__main__:Transforming Shopee data.
INFO:__main__:Starting Shopee data transformation.
INFO:__main__:Shopee data transformed using SQL query.
INFO:__main__:Shopee data transformation completed.
INFO:__main__:Starting data quality checks.
INFO:__main__:Data quality checks completed.                                    
INFO:__main__:Writing data to Silver at s3a://kayden/silver/shopee_sales_silver.
INFO:__main__:Data has been successfully loaded to Silver.                      


In [53]:
spark.sql("DROP DATABASE IF EXISTS bronze_kayden")
spark.sql("DROP DATABASE IF EXISTS silver_kayden")
# spark.sql("DROP TABLE IF EXISTS bronze_kayden.shopee_sales")
# spark.sql("DROP TABLE IF EXISTS silver_kayden.shopee_sales_silver")

DataFrame[]