# Curate CardHolder data (09)

In [1]:
import os
# get the accessKey and secretKey from Environment
accessKey = os.environ['AWS_ACCESS_KEY_ID']
secretKey = os.environ['AWS_SECRET_ACCESS_KEY']

from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
        .appName("Jupyter")
        .master("local[*]")

        .config("spark.jars.packages",
                "org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1,"
                "org.apache.iceberg:iceberg-aws-bundle:1.10.1")

        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .config("spark.hadoop.fs.s3a.endpoint", "http://ibm-lh-presto-svc:9000")
        .config("spark.hadoop.fs.s3a.path.style.access", "true")
        .config("spark.hadoop.fs.s3a.access.key", "f33150f834d9a8b2435474f6")
        .config("spark.hadoop.fs.s3a.secret.key", "fdd7d613b2c72d07c3618ae6")
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

        # ==== Iceberg catalog (local) ===
        .config("spark.sql.catalog.hiverest", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.hiverest.type", "rest")
        .config("spark.sql.catalog.hiverest.uri", "http://hive-metastore:9084/iceberg")
        .config("spark.sql.catalog.hiverest.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
        .config("spark.sql.catalog.hiverest.warehouse", "s3a://admin-bucket/iceberg/warehouse")

        # ⭐ REQUIRED FOR MINIO WITH ICEBERG AWS SDK
        .config("spark.sql.catalog.hiverest.s3.endpoint", "http://minio-1:9000")
        .config("spark.sql.catalog.hiverest.s3.path-style-access", "true")
        .config("spark.sql.catalog.hiverest.s3.access-key-id", accessKey)
        .config("spark.sql.catalog.hiverest.s3.secret-access-key", secretKey)
    
        # ==== Iceberg catalog (watson) ===
        .config("spark.sql.catalog.watson", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.watson.type", "rest")
        .config("spark.sql.catalog.watson.uri", "https://ibm-lh-presto-svc:8180/mds/iceberg")
        .config("spark.sql.catalog.watson.rest.auth.type", "basic")
        .config("spark.sql.catalog.watson.rest.auth.basic.username", "ibmlhadmin")
        .config("spark.sql.catalog.watson.rest.auth.basic.password", "password")    
        .config("spark.sql.catalog.watson.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
        .config("spark.sql.catalog.watson.warehouse", "iceberg_data")

        # ⭐ REQUIRED FOR MINIO WITH ICEBERG AWS SDK
        .config("spark.sql.catalog.watson.s3.endpoint", "http://ibm-lh-presto-svc:9000")
        .config("spark.sql.catalog.watson.s3.path-style-access", "true")
        .config("spark.sql.catalog.watson.s3.access-key-id", "f33150f834d9a8b2435474f6")
        .config("spark.sql.catalog.watson.s3.secret-access-key", "fdd7d613b2c72d07c3618ae6")

        # use "hiverest" for local setup, "watson" for the watsonx developer setup
        .config("spark.sql.defaultCatalog", "hiverest")

        .config(
            "spark.driver.extraJavaOptions",
            "-Djavax.net.ssl.trustStore=/data-transfer/truststore.jks " +
            "-Djavax.net.ssl.trustStorePassword=changeit"
        )
        .config(
            "spark.executor.extraJavaOptions",
            "-Djavax.net.ssl.trustStore=/data-transfer/truststore.jks " +
            "-Djavax.net.ssl.trustStorePassword=changeit"
        )

        .config(
            "spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
        )

        .getOrCreate()
)

In [2]:
%load_ext sql
%sql spark

In [3]:
%%sql
DROP TABLE IF EXISTS customer_db.cur_person_t;
DROP TABLE IF EXISTS customer_db.cur_address_t;

CREATE TABLE customer_db.cur_person_t (
    person_id STRING,
    first_name STRING,
    last_name STRING,    
    email_address STRING,
    phone_number STRING,
    preferred_contact STRING,
    segment STRING,
    avg_transaction_amount DOUBLE,
    onboarded_date TIMESTAMP
    );
    
CREATE TABLE customer_db.cur_address_t (
    address_id STRING,
    person_id STRING,
    street STRING,
    zip_code STRING,
    city STRING,
    state STRING
);

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Source table
source_table = "customer_db.raw_card_holder_t"

In [5]:
spark.sql(f"""
MERGE INTO customer_db.cur_person_t t
USING (
  SELECT
    card_holder.id                AS person_id,
    card_holder.first_name        AS first_name,
    card_holder.last_name         AS last_name,
    card_holder.email_address     AS email_address,
    card_holder.phone_number      AS phone_number,
    card_holder.preferred_contact AS preferred_contact,
    card_holder.segment           AS segment,
    card_holder.avg_transaction_amount AS avg_transaction_amount,
    card_holder.onboarded_date    AS onboarded_date
  FROM {source_table}
) s
ON t.person_id = s.person_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

DataFrame[]

In [7]:
%%sql
SELECT
    sha2(concat(card_holder.id, a.street, a.zip_code, a.city, a.state),256) AS address_id,
    card_holder.id AS person_id,
    a.street,
    a.zip_code AS zip_code,
    a.city,
    a.state
  FROM customer_db.raw_card_holder_t
  LATERAL VIEW explode(card_holder.addresses) addr AS a

Field 1,Field 2,Field 3,Field 4,Field 5,Field 6
9f59b07bb42202a1a28dea9018ade1c64a83936152af0c6e7bb1bfd087b30b31,f07fd9ba-502f-4669-c088-82f8d53c9c65,Schiller Rapid,76580,New Janettmouth,Wyoming
aee27e3aed42b1ac83e6861d591b3718721c7e120403824ce337a18d9a7a074b,d79ba92a-78eb-38f1-17c0-02b285342dd3,Lonna Corners,45304,Bahringerside,Iowa


In [8]:
spark.sql(f"""
MERGE INTO customer_db.cur_address_t t
USING (
  SELECT
    sha2(concat(card_holder.id, a.street, a.zip_code, a.city, a.state),256) AS address_id,
    card_holder.id AS person_id,
    a.street,
    a.zip_code AS zip_code,
    a.city,
    a.state
  FROM {source_table}
  LATERAL VIEW explode(card_holder.addresses) addr AS a
) s
ON t.person_id = s.person_id AND t.address_id = s.address_id

WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

DataFrame[]

In [18]:
%%sql
SELECT
    sha2(concat(card_holder.id,'card'),256) AS card_id,
    card_holder.id AS person_id,
    card_holder.card.number AS card_number,
    card_holder.card.type AS card_type,
    card_holder.card.expiry_date AS expiry_date
    FROM customer_db.raw_card_holder_t;

Field 1,Field 2,Field 3,Field 4,Field 5
ef30a07cec4e597514537d6fdc070b6d793c194def4e6d0284ea9cf6244fa2c3,f28981cd-f542-eea6-ca7d-ba176a3eaf9c,1800-1703-6464-1734,diners_club,2027-03-15
d356191520081ba09acb303161e679fc5a43523307ddfefa026d66e53a369be4,e0c540b5-fb4b-3f42-a632-51c4f7cef373,2131-0726-3156-7595,switch,2026-07-23


In [19]:
spark.sql(f"""
MERGE INTO customer_db.cur_card_t t
USING (
  SELECT
    sha2(concat(card_holder.id,'card'),256) AS card_id,
    card_holder.id AS person_id,
    card_holder.card.number AS card_number,
    card_holder.card.type AS card_type,
    card_holder.card.expiry_date AS expiry_date
  FROM {source_table}
  WHERE card_holder.card IS NOT NULL
) s
ON t.card_id = s.card_id

WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

DataFrame[]