In [None]:
wxd_hms_endpoint='thrift://<YOUR WXD ENDPOINT>'
wxd_hms_username='ibmlhapikey'
wxd_hms_password='****'
source_bucket_endpoint='s3.us-south.cloud-object-storage.appdomain.cloud'
source_bucket_access_key='****'
source_bucket_secret_key='****'
wxd_bucket_endpoint='s3.us-south.cloud-object-storage.appdomain.cloud'
wxd_bucket_access_key='****'
wxd_bucket_secret_key='****'
db_host_name = 'jdbc:db2://*HOSTNAME*:*PORTNUMBER*/*DBNAME*'
db_user_name='******'
db_password='****'

In [2]:
conf=spark.sparkContext.getConf()
spark.stop()

from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession

conf.setAll([
    ("spark.sql.catalogImplementation", "hive"),
    ("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"),
    ("spark.sql.iceberg.vectorization.enabled", "false"),
    ("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog"),
    ("spark.sql.catalog.lakehouse.type", "hive"),
    ("spark.sql.catalog.lakehouse.uri", wxd_hms_endpoint),
    ("spark.hive.metastore.client.auth.mode", "PLAIN"),
    ("spark.hive.metastore.client.plain.username", wxd_hms_username),
    ("spark.hive.metastore.client.plain.password", wxd_hms_password),
    ("spark.hive.metastore.use.SSL", "true"),
    ("spark.hive.metastore.truststore.type", "JKS"),
    ("spark.hive.metastore.truststore.path", "file:///opt/ibm/jdk/lib/security/cacerts"),
    ("spark.hive.metastore.truststore.password", "changeit"),
    ("spark.hadoop.fs.s3a.bucket.sparkdb2-target.endpoint", source_bucket_endpoint),
    ("spark.hadoop.fs.s3a.bucket.sparkdb2-target.key", source_bucket_access_key),
    ("spark.hadoop.fs.s3a.bucket.sparkdb2-target.secret.key", source_bucket_secret_key),
    ("spark.hadoop.fs.s3a.bucket.sparkdb2-target.endpoint", wxd_bucket_endpoint),
    ("spark.hadoop.fs.s3a.bucket.sparkdb2-target.access.key", wxd_bucket_access_key),
    ("spark.hadoop.fs.s3a.bucket.sparkdb2-target.secret.key", wxd_bucket_secret_key),
    # Add Db2 configurations here
    ("spark.sql.catalog.db2catalog.type", "jdbc"),  # JDBC catalog type for Db2
    ("spark.sql.catalog.db2catalog.jdbc.url", db_host_name),  # JDBC URL for Db2
    ("spark.sql.catalog.db2catalog.jdbc.user", db_user_name),  # Username for Db2
    ("spark.sql.catalog.db2catalog.jdbc.password", db_password),  # Password for Db2
    ("spark.sql.catalog.db2catalog.jdbc.sslConnection", "true"),  # Enable SSL for JDBC connection
    ("spark.sql.catalog.db2catalog.jdbc.sslTrustStoreLocation", "s3a://*****"),  # Path to the truststore file
    ("spark.sql.catalog.db2catalog.jdbc.sslTrustStorePassword", "*****"),  # Password for the truststore
    # Additional Db2 JDBC properties can be added here if needed
])


spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [3]:
def list_databases(spark):
    # list the database under lakehouse catalog
    spark.sql("show databases from lakehouse").show() 
list_databases(spark)

+-------------------+
|          namespace|
+-------------------+
|`abyte-hive-schema`|
|   bronze_deltalake|
|          bronze_ec|
|   bronze_ecommerce|
|        bronze_hudi|
|  bronze_sparkdelta|
|bronze_sparkwithdb2|
|        bronze_test|
|               data|
|            default|
|             demodb|
|            demodb1|
|           demodb_1|
|     gold_deltalake|
|            gold_ec|
|     gold_ecommerce|
|          gold_hudi|
|    gold_sparkdelta|
|  gold_sparkwithdb2|
|          gold_test|
+-------------------+
only showing top 20 rows



In [4]:
def create_databases(spark):
    # Base S3 location
    base_s3_path = "s3a://sparkdb2-target/"
    
    # Databases to be created with corresponding paths
    databases = {
        "bronze_ecommerce": f"{base_s3_path}bronze_ecom/",
        "silver_ecommerce": f"{base_s3_path}silver_ecom/",
        "gold_ecommerce": f"{base_s3_path}gold_ecom/"
    }

    # Loop through each database and create it if not exists
    for db_name, db_location in databases.items():
        try:
            spark.sql(f"CREATE DATABASE IF NOT EXISTS lakehouse.{db_name} LOCATION '{db_location}'")
            print(f"Database {db_name} created at {db_location}")
        except Exception as e:
            print(f"Error creating database {db_name} at {db_location}: {e}")

# Example usage:
create_databases(spark)

Database bronze_ecommerce created at s3a://sparkdb2-target/bronze_ecom/
Database silver_ecommerce created at s3a://sparkdb2-target/silver_ecom/
Database gold_ecommerce created at s3a://sparkdb2-target/gold_ecom/


In [None]:
def migrate_db2_to_bronze(spark):
    schema_name = "ecommerce"
    # Assuming you have a list of table names in the BANKING schema
    table_names = ["HIST_SALES"]  # Add more table names as needed

    # Specify the base S3 path for the Iceberg tables
    base_s3_path = "s3a://sparkdb2-target/bronze_ecom/"
    
    for table_name in table_names:
        full_table_name = f"{schema_name}.{table_name}"

        # Define the Iceberg table name with the bronze prefix
        iceberg_table_name = f"lakehouse.bronze_ecommerce.bronze_{table_name.lower()}"  # Adding "bronze_" prefix
        s3_table_path = f"{base_s3_path}bronze_{table_name.lower()}"  # S3 path must match the Iceberg table name

        # Read data from DB2 Cloud into DataFrame
        df = spark.read.format("jdbc") \
            .option("url", db_host_name) \
            .option("user", db_user_name) \
            .option("password", db_password) \
            .option("dbtable", full_table_name) \
            .option("sslConnection", "true") \
            .load()

        # Log the paths for debugging
        print(f"Creating Iceberg table: {iceberg_table_name} at S3 path: {s3_table_path}")

        # Create the Iceberg table in the Bronze layer with the specified S3 path
        try:
            spark.sql(f"""
                CREATE TABLE IF NOT EXISTS {iceberg_table_name} 
                USING iceberg 
                LOCATION '{s3_table_path}' 
                AS SELECT * FROM VALUES (1) WHERE FALSE
            """)
            print(f"Iceberg table {iceberg_table_name} created successfully.")
        except Exception as e:
            print(f"Error creating Iceberg table: {e}")

        # Write data to the Iceberg table
        try:
            df.write.format("iceberg").mode("overwrite").saveAsTable(iceberg_table_name)
            print(f"Data written to Iceberg table {iceberg_table_name} successfully.")
        except Exception as e:
            print(f"Error writing data to Iceberg table: {e}")

        # Describe the Iceberg table
        spark.sql(f'DESCRIBE TABLE {iceberg_table_name}').show(10)

        # Query the Iceberg table
        spark.sql(f'SELECT * FROM {iceberg_table_name}').show()

        print(f"Migration of table {iceberg_table_name} from DB2 to Watson Data Iceberg completed successfully.")

    print("All tables migration completed successfully.")

# Example usage
migrate_db2_to_bronze(spark)


Creating Iceberg table: lakehouse.bronze_ecommerce.bronze_hist_sales at S3 path: s3a://sparkdb2-target/bronze_ecom/bronze_hist_sales
Iceberg table lakehouse.bronze_ecommerce.bronze_hist_sales created successfully.


In [None]:
def bronze_ingest_from_csv_temp_table(spark):
    # Load CSV data into a DataFrame
    csvDF = spark.read.option("header", True).csv("s3a://sparkdb2-target/promotions_sale.csv")
    csvDF.createOrReplaceTempView("promotions")

    # Specify the S3 path for the Iceberg table
    s3_table_path = "s3a://sparkdb2-target/bronze_ecom/bronze_promotions_sale"

    # Load temporary table into an Iceberg table with the specified S3 path
    print(f"Creating Iceberg table: lakehouse.bronze_ecommerce.bronze_promotions_sale")
    print(f"Table will be saved at S3 path: {s3_table_path}")
    
    spark.sql(f"""
        CREATE OR REPLACE TABLE lakehouse.bronze_ecommerce.bronze_promotions_sale 
        USING iceberg 
        LOCATION '{s3_table_path}' 
        AS SELECT * FROM promotions
    """)

    print("Iceberg table created successfully.")

    # Describe the table created
    print("Describing the created table:")
    spark.sql('DESCRIBE TABLE lakehouse.bronze_ecommerce.bronze_promotions_sale').show(10)
    
    # Query the table
    print("Querying the created table:")
    spark.sql('SELECT * FROM lakehouse.bronze_ecommerce.bronze_promotions_sale').show()

# Example usage
bronze_ingest_from_csv_temp_table(spark)


In [None]:
def create_enhanced_aggregated_sales_table(spark):
    # Define the schema name and table names
    schema_name = "ecommerce"
    stock_table_name = "STOCK"  # Stock table in DB2
    sales_table_name = "HIST_SALES"  # Assuming sales data is already migrated to Iceberg
    accessory_table_name = "ACCESSORIES"  # Accessory table in DB2

    # Load stock data from DB2
    try:
        stock_data = spark.read.format("jdbc") \
            .option("url", db_host_name) \
            .option("user", db_user_name) \
            .option("password", db_password) \
            .option("dbtable", f"{schema_name}.{stock_table_name}") \
            .option("sslConnection", "true") \
            .load()
        # Create a temporary view for stock data
        stock_data.createOrReplaceTempView("stock_temp")
    except Exception as e:
        print(f"Error loading stock data: {e}")
        return

    # Load accessory data from DB2
    try:
        accessory_data = spark.read.format("jdbc") \
            .option("url", db_host_name) \
            .option("user", db_user_name) \
            .option("password", db_password) \
            .option("dbtable", f"{schema_name}.{accessory_table_name}") \
            .option("sslConnection", "true") \
            .load()
        # Create a temporary view for accessory data
        accessory_data.createOrReplaceTempView("accessory_temp")
    except Exception as e:
        print(f"Error loading accessory data: {e}")
        return

    # Create a temporary view from the sales table in the Bronze layer
    sales_temp_view_query = f"""
    CREATE OR REPLACE TEMP VIEW sales_temp AS 
    SELECT * FROM lakehouse.bronze_ecommerce.bronze_{sales_table_name.lower()}
    """
    spark.sql(sales_temp_view_query)

    # Create a temporary view from the promotions table
    promotions_temp_view_query = """
    CREATE OR REPLACE TEMP VIEW promotions_temp AS 
    SELECT * FROM lakehouse.bronze_ecommerce.bronze_promotions_sale
    """
    spark.sql(promotions_temp_view_query)

    # Aggregating sales data for products and accessories
    aggregated_sales_query = """
    SELECT
        s.PRODUCT_ID,
        DATE_FORMAT(s.SALE_DATE, 'yyyy-MM') AS sale_month,
        COUNT(DISTINCT s.SALES_ID) AS total_sales_count,
        SUM(s.TOTAL_AMOUNT) AS total_sales_amount,
        AVG(s.TOTAL_AMOUNT) AS average_sales_amount,
        MAX(p.Discount_Percentage) AS Discount_Percentage,
        COUNT(st.PRODUCT_ID) AS total_stock_count,
        CASE
            WHEN COUNT(st.PRODUCT_ID) > 0 THEN 'In Stock'
            ELSE 'Out of Stock'
        END AS stock_status,
        'Product' AS item_type
    FROM sales_temp s
    LEFT JOIN promotions_temp p ON s.PRODUCT_ID = p.Product_ID 
    LEFT JOIN stock_temp st ON s.PRODUCT_ID = st.PRODUCT_ID
    WHERE s.PRODUCT_ID IS NOT NULL  -- Ensure PRODUCT_ID is not null
    GROUP BY s.PRODUCT_ID, sale_month
    
    UNION ALL
    
    SELECT
        a.ACCESSORY_ID AS PRODUCT_ID,
        DATE_FORMAT(s.SALE_DATE, 'yyyy-MM') AS sale_month,
        COUNT(DISTINCT s.SALES_ID) AS total_sales_count,
        SUM(s.TOTAL_AMOUNT) AS total_sales_amount,
        AVG(s.TOTAL_AMOUNT) AS average_sales_amount,
        MAX(p.Discount_Percentage) AS Discount_Percentage,
        COUNT(st.ACCESSORY_ID) AS total_stock_count,
        CASE
            WHEN COUNT(st.ACCESSORY_ID) > 0 THEN 'In Stock'
            ELSE 'Out of Stock'
        END AS stock_status,
        'Accessory' AS item_type
    FROM sales_temp s
    LEFT JOIN promotions_temp p ON s.ACCESSORY_ID = p.Product_ID 
    LEFT JOIN stock_temp st ON s.ACCESSORY_ID = st.ACCESSORY_ID
    LEFT JOIN accessory_temp a ON s.ACCESSORY_ID = a.ACCESSORY_ID
    WHERE s.IS_ACCESSORY = true  -- Filter for accessory sales
    AND a.ACCESSORY_ID IS NOT NULL  -- Ensure ACCESSORY_ID is not null
    GROUP BY a.ACCESSORY_ID, sale_month
    ORDER BY sale_month
    """

    # Run the combined aggregation query
    aggregated_sales_df = spark.sql(aggregated_sales_query)

    # Create the Iceberg table for aggregated sales
    aggregated_iceberg_table_name = "lakehouse.silver_ecommerce.aggregated_sales_data"
    try:
        aggregated_sales_df.write.format("iceberg").mode("overwrite").saveAsTable(aggregated_iceberg_table_name)
    except Exception as e:
        print(f"Error saving the Iceberg table: {e}")
        return

    # Describe the created Iceberg table for aggregated sales
    spark.sql(f"DESCRIBE TABLE {aggregated_iceberg_table_name}").show()

    # Query the Iceberg table to check the aggregated sales results
    spark.sql(f"SELECT * FROM {aggregated_iceberg_table_name} LIMIT 10").show()  # Limit for easier viewing

    print("Enhanced aggregation of product and accessory sales completed successfully.")

# Example usage
create_enhanced_aggregated_sales_table(spark)


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when

def create_customer_segmentation_table(spark):
    # Define the table names in the DB2 database
    customer_table_name = "ecommerce.customers"
    historical_sales_table_name = "lakehouse.bronze_ecommerce.bronze_hist_sales"  # Iceberg table for historical sales
    promotions_table_name = "lakehouse.bronze_ecommerce.bronze_promotions_sale"  # Iceberg table for promotions
    iceberg_table_name = "lakehouse.silver_ecommerce.customer_segmentation"

    # Load customer data from DB2
    print(f"Loading customer data from {customer_table_name}...")
    customer_data = spark.read.format("jdbc") \
        .option("url", db_host_name) \
        .option("user", db_user_name) \
        .option("password", db_password) \
        .option("dbtable", customer_table_name) \
        .option("sslConnection", "true") \
        .load()
    print("Customer data loaded successfully.")

    # Load historical sales data from Iceberg table
    print("Loading historical sales data from Iceberg table...")
    sales_data = spark.sql("SELECT * FROM lakehouse.bronze_ecommerce.bronze_hist_sales")
    print("Historical sales data loaded successfully.")

    # Load promotions data from Iceberg table
    print("Loading promotions data from Iceberg table...")
    promotions_data = spark.sql("SELECT * FROM lakehouse.bronze_ecommerce.bronze_promotions_sale")
    print("Promotions data loaded successfully.")

    # Join customer data with sales data on CUSTOMER_ID
    print("Joining customer data with historical sales data...")
    joined_data = customer_data.join(sales_data, "CUSTOMER_ID", "left")
    print("Customer data joined with historical sales data successfully.")

    # Optionally, join with promotions data if relevant
    print("Joining with promotions data...")
    joined_data = joined_data.join(promotions_data, "PRODUCT_ID", "left")  # Adjust join condition as per your schema
    print("Joined with promotions data successfully.")

    # Segment customers based on their total purchases and demographic data
    print("Segmenting customers based on total purchases...")
    customer_segments = joined_data.groupBy("CUSTOMER_ID", "CUSTOMER_NAME", "LOCATION") \
        .agg(
            F.sum(col("TOTAL_AMOUNT")).alias("TOTAL_PURCHASES"),
            F.avg(col("TOTAL_AMOUNT")).alias("AVG_PURCHASE_AMOUNT"),
            F.count("*").alias("NUM_PURCHASES")
        ) \
        .withColumn(
            "SEGMENT",
            when(col("TOTAL_PURCHASES") > 1000, "High Value")
            .when((col("TOTAL_PURCHASES") >= 500) & (col("TOTAL_PURCHASES") <= 1000), "Medium Value")
            .otherwise("Low Value")
        )
    print("Customer segmentation completed successfully.")

    # Check if the segmentation table exists
    table_exists = False
    try:
        existing_segments = spark.table(iceberg_table_name)
        print(f"Customer segmentation table {iceberg_table_name} exists. Proceeding with upsert.")
        table_exists = True
    except Exception as e:
        print(f"Customer segmentation table {iceberg_table_name} does not exist. Creating a new table.")
        table_exists = False

    if table_exists:
        # Perform upsert logic: update existing records and append new ones
        print("Performing upsert operation...")
        updated_segments = existing_segments.alias("existing").join(
            customer_segments.alias("new"),
            "CUSTOMER_ID",
            "outer"
        ).select(
            F.coalesce(col("new.CUSTOMER_ID"), col("existing.CUSTOMER_ID")).alias("CUSTOMER_ID"),
            F.coalesce(col("new.CUSTOMER_NAME"), col("existing.CUSTOMER_NAME")).alias("CUSTOMER_NAME"),
            F.coalesce(col("new.LOCATION"), col("existing.LOCATION")).alias("LOCATION"),
            F.coalesce(col("new.TOTAL_PURCHASES"), col("existing.TOTAL_PURCHASES")).alias("TOTAL_PURCHASES"),
            F.coalesce(col("new.AVG_PURCHASE_AMOUNT"), col("existing.AVG_PURCHASE_AMOUNT")).alias("AVG_PURCHASE_AMOUNT"),
            F.coalesce(col("new.NUM_PURCHASES"), col("existing.NUM_PURCHASES")).alias("NUM_PURCHASES"),
            F.coalesce(col("new.SEGMENT"), col("existing.SEGMENT")).alias("SEGMENT")
        )

        # Write the updated segments back to the Iceberg table
        updated_segments.write \
            .format("iceberg") \
            .mode("overwrite") \
            .saveAsTable(iceberg_table_name)
        print("Customer segmentation data upserted successfully.")
    else:
        # If the table doesn't exist, create and insert new data
        print(f"Creating the table and inserting new data into {iceberg_table_name}...")
        customer_segments.write \
            .format("iceberg") \
            .mode("overwrite") \
            .saveAsTable(iceberg_table_name)
        print(f"Customer segmentation table {iceberg_table_name} created and data inserted.")

    # Display the data for verification
    print("Displaying the customer segmentation data for verification:")
    spark.sql(f"SELECT * FROM {iceberg_table_name}").show()

# Example usage
create_customer_segmentation_table(spark)
