![Top <](./images/watsonxdata.png "watsonxdata")

# Data Ingestion through Spark
This notebook demonstrate how Spark can connect to watsonx.data and ingest the data. This system has a local Spark engine that will be used to access watsonx.data. This is a minimally configured Spark engine, but is sufficient to demonstrate the steps needed to connect to watsonx.data and access the data that resides in the catalogs. 

## Copy Spark Libraries
The Spark libraries that are used by this notebook need to be loaded into the local file system in order for the spark calls to work properly.

In [1]:
%system tar -xf /spark/spark.tgz -C /usr/local

[]

## Environment Variables 
We need to make sure that a number of environment variables are set so that the Spark code can be accessed.

In [2]:
%env SPARK_HOME=/usr/local/spark
%env PYSPARK_DRIVER_PYTHON=jupyter
%env PYSPARK_DRIVER_PYTHON_OPTS=notebook
%env PATH=/usr/local/bin:/usr/local/sbin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/spark/bin:/root/bin

env: SPARK_HOME=/usr/local/spark
env: PYSPARK_DRIVER_PYTHON=jupyter
env: PYSPARK_DRIVER_PYTHON_OPTS=notebook
env: PATH=/usr/local/bin:/usr/local/sbin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/spark/bin:/root/bin


## System Variables
In addition to the environment variables, we need to set some Python variables that will be used throughout the scripts. These settings are:
* minio_host - The URL of the Minio server
* minio_port - The port that the Minio server is using
* hive_host  - The URL of the Thrift/Hive server
* hive_port  - The port that the Thrift/Hive server is using

Note that the URLs and PORTS are for an internal connection in the watsonx.data development server. These URLs and PORTS will be different if you are connecting externally.

In [5]:
minio_host    = "watsonxdata"
minio_port    = "9000"
hive_host     = "watsonxdata"
hive_port     = "8380"

## Minio - Object Storage CLI
In order to use the MinIO CLI, we must first register the MinIO server that we need to connect to. Before we do that we need to extract the passwords of the MinIO service, along with some other credentials. The passwords for all of the services can be found in the `/certs/passwords` file found in this server. 

The following code will extract all of the passwords and userids that are required for the MinIO and Spark connections.

In [9]:
hive_id           = None
hive_password     = None
minio_access_key  = None
minio_secret_key  = None
keystore_password = None 
cert_file         = "/certs/lh-ssl-ts.jks"

try:
    with open('/certs/passwords') as fd:
        certs = fd.readlines()
    for line in certs:
        args = line.split()
        if (len(args) >= 3):
            system   = args[0].strip()
            user     = args[1].strip()
            password = args[2].strip()
            if (system == "Minio"):
                minio_access_key = user
                minio_secret_key = password
            elif (system == "Thrift"):
                hive_id = user
                hive_password = password
            elif (system == "Keystore"):
                keystore_password = password
            else:
                pass
except Error as e:
    print("Certificate file with passwords could not be found")

### Minio - Objest Storage System Alias
Before running any commands against the MinIO server, an alias needs to be created that includes the access and secret key.

In [7]:
%system mc alias set watsonxdata http://{minio_host}:{minio_port} {minio_access_key} {minio_secret_key}

['Added `watsonxdata` successfully.']

### List Buckets
The `mc` command provides us with a number of commands that allows us to manage buckets and files within them. The following command checks to see if the `staging-bucket` exists. This bucket is used for all of the Spark examples.

In [10]:
%system mc ls tree watsonxdata

['[2024-12-17 15:12:13 EST]     0B hive-bucket/',
 '[2024-12-17 15:12:13 EST]     0B iceberg-bucket/',
 '[2025-01-15 23:57:45 EST]     0B staging-bucket/',
 '[2024-12-17 15:12:14 EST]     0B wxd-milvus/',
 '[2024-12-17 15:12:13 EST]     0B wxd-system/']

If the staging bucket exists, we will delete the bucket and the contents.

### Create a Bucket
At this point we will create the staging bucket that we are doing to use to hold our data.

In [51]:
#%system mc mb watsonxdata/staging-bucket

### Load Data
Next we will load the data from the `/staging-bucket` directory. Note that we need to use the full name of the bucket. The `mc` command allows to select which files to place into a bucket, or an entire directory with recursion. In this case we are only going to select the csv files.

In [52]:
#%system mc cp /notebooks/staging-bucket/*.json watsonxdata/staging-bucket/

We can double check that our files are there with the `mc ls tree` command and using the `--files` option.

In [8]:
%system mc tree --files watsonxdata/staging-bucket/

['watsonxdata/staging-bucket/',
 '├─ config_udfs.json',
 '├─ configs.json',
 '├─ contacts.json',
 '├─ tickets.json',
 '└─ time_entries.json']

# Spark Initialization

The next set of Python instructions will initialize the Spark connection. Once the connection is established to the engine, we need to update a number of values to provide credentials and a URL to the Hive and MinIO services.

### Initialize the Spark Connection
Initialize the settings for the Spark service.

In [11]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
import warnings
warnings.filterwarnings('ignore')

spark = SparkSession.builder.appName('sparky').getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
conf = sc.getConf()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/23 01:59:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/23 01:59:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/01/23 01:59:14 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


### Watsonx.data Configuration Information
Once we have the configuration established, we need to update the values corresponding to our MinIO and Hive settings.

In [12]:
_ = conf.set("spark.sql.debug.maxToStringFields",                    "100")
_ = conf.set("fs.s3a.path.style.access",                             "true")
_ = conf.set("fs.s3a.impl",                                          "org.apache.hadoop.fs.s3a.S3AFileSystem")
_ = conf.set("fs.s3a.connection.ssl.enabled",                        "true")
_ = conf.set("spark.driver.extraJavaOptions",                        "-Dcom.sun.jndi.ldap.object.disableEndpointIdentification=true")

_ = conf.set("spark.sql.catalogImplementation",                      "hive")
_ = conf.set("spark.sql.extensions",                                 "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
_ = conf.set("spark.sql.iceberg.vectorization.enabled",              "false")

_ = conf.set("spark.sql.defaultCatalog",                             "iceberg_data")
_ = conf.set("spark.sql.catalog.iceberg_data",                       "org.apache.iceberg.spark.SparkCatalog")
_ = conf.set("spark.sql.catalog.iceberg_data.type",                  "hive")
_ = conf.set("spark.sql.catalog.iceberg_data.uri",                   f"thrift://{hive_host}:{hive_port}")

_ = conf.set("spark.hive.metastore.client.auth.mode",                "PLAIN")
_ = conf.set("spark.hive.metastore.client.plain.username",           hive_id)
_ = conf.set("spark.hive.metastore.client.plain.password",           hive_password)

_ = conf.set("spark.hive.metastore.use.SSL",                         "true")
_ = conf.set("spark.hive.metastore.truststore.type",                 "jks")
_ = conf.set("spark.hive.metastore.truststore.path",                 cert_file)
_ = conf.set("spark.hive.metastore.truststore.password",             keystore_password)
_ = conf.set("spark.hive.metastore.uris",                            f"thrift://{hive_host}:{hive_port}")

_ = conf.set("spark.hadoop.fs.s3a.endpoint",                         f"http://{minio_host}:{minio_port}")
_ = conf.set("spark.hadoop.fs.s3a.access.key",                       minio_access_key)
_ = conf.set("spark.hadoop.fs.s3a.secret.key",                       minio_secret_key)

### Restart Spark with new Configuration
To make the configuration changes take effect, we need to stop the Spark services and recreate it with the new configuration information.

In [13]:
sc.stop()

spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
conf = sc.getConf()

# Data Ingestion to Staging area
Steps involved 
1. Read JSON file with pre-definded schema into a data frame
2. Load the data frame to staging table

### 1/5. Ingest data from file "config_udfs.json" to table "iceberg_data.cb_di.config_udfs_stg"

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, lit, explode, create_map


# Define schema for known fields
schema = StructType([
    StructField("identifier", StringType(), True),
    StructField("return_type", StringType(), True),
    StructField("key", StringType(), True),
    StructField("value", StringType(), True)
])

# Load JSON file
json_file_path = "staging-bucket/config_udfs.json"  # Replace with your file path
json_df = spark.read.option("multiline", "true").schema(schema).json(json_file_path)

# Select and cast necessary columns matching the Iceberg table schema
data_to_insert = json_df.select(
    col("identifier").cast("int"),
    col("return_type"),
    col("key"),
    col("value")
)

data_to_insert.show()


+----------+-----------+----+-----+
|identifier|return_type| key|value|
+----------+-----------+----+-----+
|     16707|   CwConfig|null| null|
|     16719|   CwConfig|null| null|
|     16717|   CwConfig|null| null|
|     16702|   CwConfig|null| null|
|     16718|   CwConfig|null| null|
|     11464|   CwConfig|null| null|
|     11386|   CwConfig|null| null|
|     11388|   CwConfig|null| null|
|     11394|   CwConfig|null| null|
|     11402|   CwConfig|null| null|
|     11410|   CwConfig|null| null|
|     11416|   CwConfig|null| null|
|     11406|   CwConfig|null| null|
|     11422|   CwConfig|null| null|
|     11428|   CwConfig|null| null|
|     11414|   CwConfig|null| null|
|     11418|   CwConfig|null| null|
|     11432|   CwConfig|null| null|
|     11438|   CwConfig|null| null|
|     11436|   CwConfig|null| null|
+----------+-----------+----+-----+
only showing top 20 rows



In [18]:
# Insert directly into the Iceberg table
data_to_insert.writeTo("iceberg_data.cb_di.config_udfs_stg").append()

                                                                                

### 2/5. Ingest data from file "tickets.json" to table "iceberg_data.cb_di.tickets_stg".

In [73]:
from pyspark.sql.types import StructType, StructField, StringType

# Define the schema with StringType for all columns
schema = StructType([
    StructField("identifier", StringType(), True),
    StructField("link", StringType(), True),
    StructField("integration_type", StringType(), True),
    StructField("board_id", StringType(), True),
    StructField("board_name", StringType(), True),
    StructField("status_id", StringType(), True),
    StructField("status_name", StringType(), True),
    StructField("actual_hours", StringType(), True),
    StructField("concept", StringType(), True),
    StructField("updated_date", StringType(), True),
    StructField("updated_by", StringType(), True),
    StructField("integration_id", StringType(), True),
    StructField("parent_name", StringType(), True),
    StructField("company_id", StringType(), True),
    StructField("created_date", StringType(), True),
    StructField("created_by", StringType(), True),
    StructField("return_type", StringType(), True),
    StructField("type", StringType(), True),
    StructField("sub_type", StringType(), True),
    StructField("item", StringType(), True),
    StructField("ticket_owner", StringType(), True),
    StructField("sla", StringType(), True),
    StructField("agreement", StringType(), True),
    StructField("predecessor", StringType(), True),
    StructField("estimated_start_date", StringType(), True),
    StructField("due_date", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("impact", StringType(), True),
    StructField("priority", StringType(), True),
    StructField("sla_status", StringType(), True),
    StructField("budget_hours", StringType(), True),
    StructField("opportunity", StringType(), True),
    StructField("source", StringType(), True),
    StructField("vcio", StringType(), True),
    StructField("account_tech", StringType(), True),
    StructField("assigned_by", StringType(), True),
    StructField("closed_by", StringType(), True),
    StructField("closed_date", StringType(), True)
])

# Load JSON file with the schema
json_file_path = "staging-bucket/tickets.json"  # Replace with your file path
json_df = spark.read.option("multiline", "true").schema(schema).json(json_file_path)

# Display schema and data
json_df.printSchema()
json_df.show(truncate=False)



root
 |-- identifier: string (nullable = true)
 |-- link: string (nullable = true)
 |-- integration_type: string (nullable = true)
 |-- board_id: string (nullable = true)
 |-- board_name: string (nullable = true)
 |-- status_id: string (nullable = true)
 |-- status_name: string (nullable = true)
 |-- actual_hours: string (nullable = true)
 |-- concept: string (nullable = true)
 |-- updated_date: string (nullable = true)
 |-- updated_by: string (nullable = true)
 |-- integration_id: string (nullable = true)
 |-- parent_name: string (nullable = true)
 |-- company_id: string (nullable = true)
 |-- created_date: string (nullable = true)
 |-- created_by: string (nullable = true)
 |-- return_type: string (nullable = true)
 |-- type: string (nullable = true)
 |-- sub_type: string (nullable = true)
 |-- item: string (nullable = true)
 |-- ticket_owner: string (nullable = true)
 |-- sla: string (nullable = true)
 |-- agreement: string (nullable = true)
 |-- predecessor: string (nullable = true)

In [74]:
# Insert directly into the Iceberg table
json_df.writeTo("iceberg_data.cb_di.tickets_stg").append()

### 3/5. Ingest data from file "time_entries.json" to table "iceberg_data.cb_di.time_entries_stg".

In [77]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType


# Define schema with all columns as StringType
schema = StructType([
    StructField("identifier", StringType(), True),
    StructField("user", StringType(), True),
    StructField("hours", StringType(), True),
    StructField("date", StringType(), True),
    StructField("billable", StringType(), True),
    StructField("work_role", StringType(), True),
    StructField("status", StringType(), True),
    StructField("location", StringType(), True),
    StructField("business_unit", StringType(), True),
    StructField("agreement", StringType(), True),
    StructField("agreement_type", StringType(), True),
    StructField("invoice_number", StringType(), True),
    StructField("start_time", StringType(), True),
    StructField("end_time", StringType(), True),
    StructField("return_type", StringType(), True)
])

# Path to the JSON file
json_file_path = "staging-bucket/time_entries.json"  # Replace with your actual path

# Read JSON file using the defined schema
json_df = spark.read.option("multiline", "true").schema(schema).json(json_file_path)

# Show the data
json_df.show(truncate=False)


+----------+---------------------------------------+-------------------+----+--------+---------+------+--------+-------------+---------+--------------+--------------+----------+--------+-----------+
|identifier|user                                   |hours              |date|billable|work_role|status|location|business_unit|agreement|agreement_type|invoice_number|start_time|end_time|return_type|
+----------+---------------------------------------+-------------------+----+--------+---------+------+--------+-------------+---------+--------------+--------------+----------+--------+-----------+
|1160693   |csalter                                |0.31666666666666665|null|        |         |      |        |             |         |              |              |null      |null    |CwTicket   |
|1160715   |asheikh                                |0.5                |null|        |         |      |        |             |         |              |              |null      |null    |CwTicket   |
|1160

In [79]:
# Insert directly into the Iceberg table
json_df.writeTo("iceberg_data.cb_di.time_entries_stg").append()

### 4/5. Ingest data from file "configs.json" to table "iceberg_data.cb_di.configs_stg".

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
import pyspark.sql.functions as F


# Define schema with all columns as StringType (VARCHAR equivalent)
schema = StructType([
    StructField("identifier", StringType(), True),
    StructField("link", StringType(), True),
    StructField("integration_type", StringType(), True),
    StructField("updated_date", StringType(), True),
    StructField("updated_by", StringType(), True),
    StructField("integration_id", StringType(), True),
    StructField("parent_name", StringType(), True),
    StructField("company_id", StringType(), True),
    StructField("created_date", StringType(), True),
    StructField("created_by", StringType(), True),
    StructField("return_type", StringType(), True),
    StructField("business_unit_id", StringType(), True),
    StructField("business_unit", StringType(), True),
    StructField("location_id", StringType(), True),
    StructField("location", StringType(), True),
    StructField("config_type", StringType(), True),
    StructField("status", StringType(), True),
    StructField("SLA", StringType(), True),
    StructField("install_date", StringType(), True),
    StructField("installed_by", StringType(), True),
    StructField("purchase_date", StringType(), True),
    StructField("expiration_date", StringType(), True),
    StructField("vendor", StringType(), True),
    StructField("manufacturer", StringType(), True),
    StructField("part_number", StringType(), True),
    StructField("model_number", StringType(), True),
    StructField("serial_number", StringType(), True),
    StructField("tag_number", StringType(), True),
    StructField("bill_customer", StringType(), True),
    StructField("needs_renewal", StringType(), True),
    StructField("contact", StringType(), True),
    StructField("site", StringType(), True),
    StructField("site_address", StringType(), True),
    StructField("notes", StringType(), True),
    StructField("vendor_notes", StringType(), True)
])

# Load JSON data
json_file_path = "staging-bucket/configs.json"  # Replace with actual path
json_df = spark.read.option("multiline", "true").schema(schema).json(json_file_path)

# Optional: Show the data
json_df.show(truncate=False)




+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------+----------------+--------------------+------------+--------------+------------+----------+--------------------+------------+-----------+----------------+-------------+-----------+--------+-----------+---------+----+------------+------------+-------------+---------------+------+------------+-----------+----------------+---------------+----------+-------------+-------------+-------+------------------------------------+------------+----------------------------------------+----------------------------------------------------------------------------------+
|identifier|link                                                                                                                                                 |integration_type|updated_date        |updated_by  |integration_id|parent_name |company_id|created_date        |created

In [82]:
# Insert directly into the Iceberg table
json_df.writeTo("iceberg_data.cb_di.configs_stg").append()

### 5/5. Ingest data from file "contacts.json" to table "iceberg_data.cb_di.contacts_stg"

In [None]:
from pyspark.sql.types import StructType, StructField, StringType

# Define the schema with StringType for all columns
schema = StructType([
    StructField("id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("link", StringType(), True),
    StructField("integration_type", StringType(), True),
    StructField("business_unit_id", StringType(), True),
    StructField("location_id", StringType(), True),
    StructField("identifier", StringType(), True),
    StructField("updated_date", StringType(), True),
    StructField("updated_by", StringType(), True),
    StructField("integration_id", StringType(), True),
    StructField("parent_name", StringType(), True),
    StructField("company_id", StringType(), True),
    StructField("created_date", StringType(), True),
    StructField("created_by", StringType(), True),
    StructField("return_type", StringType(), True),
    StructField("full_name", StringType(), True),
    StructField("company_location", StringType(), True),
    StructField("department", StringType(), True),
    StructField("site", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone_number", StringType(), True),
    StructField("facebook", StringType(), True),
    StructField("twitter", StringType(), True),
    StructField("linkedin", StringType(), True)
])

# Load JSON file with the schema
json_file_path = "staging-bucket/contacts.json"  # Replace with your file path
json_df = spark.read.option("multiline", "true").schema(schema).json(json_file_path)

# Display schema and data
json_df.printSchema()
json_df.show(truncate=False)


In [None]:
# Insert directly into the Iceberg table
json_df.writeTo("iceberg_data.cb_di.contacts_stg").append()

# Data Load from STG to Data Store tables

### 1/5 Data load from "iceberg_data.cb_di.config_udfs_stg" to "iceberg_data.cb_di.config_udfs"

In [21]:
from pyspark.sql import functions as F

# Load the source DataFrame (assuming the source table is named 'config_udfs_stg')
config_udfs_stg_df = spark.table("iceberg_data.cb_di.config_udfs_stg")

# Apply type conversions for each column
converted_config_udfs_df = config_udfs_stg_df.select(
    F.col("identifier").cast("int").alias("identifier"),  # Convert to INT
    F.col("key").cast("string").alias("key"),  # Convert to STRING
    F.col("value").cast("string").alias("value"),  # Convert to STRING
    F.col("return_type").cast("string").alias("return_type")  # Convert to STRING
)

# Insert the transformed data into the target Iceberg table 'iceberg_data.cb_di.config_udfs'
converted_config_udfs_df.write.format("iceberg").mode("append").insertInto("iceberg_data.cb_di.config_udfs")

# ---------------- Validation Section ----------------

# Load the target table for validation
target_config_udfs_df = spark.table("iceberg_data.cb_di.config_udfs")

# 1. Display a sample of records
print("Sample records from the config_udfs target table:")
target_config_udfs_df.show(10)

# 2. Count and compare the number of records
source_count = config_udfs_stg_df.count()
target_count = target_config_udfs_df.count()
print(f"Source record count: {source_count}, Target record count: {target_count}")


Sample records from the config_udfs target table:
+----------+----+-----+-----------+
|identifier| key|value|return_type|
+----------+----+-----+-----------+
|     16707|null| null|   CwConfig|
|     16719|null| null|   CwConfig|
|     16717|null| null|   CwConfig|
|     16702|null| null|   CwConfig|
|     16718|null| null|   CwConfig|
|     11464|null| null|   CwConfig|
|     11386|null| null|   CwConfig|
|     11388|null| null|   CwConfig|
|     11394|null| null|   CwConfig|
|     11402|null| null|   CwConfig|
+----------+----+-----+-----------+
only showing top 10 rows

Source record count: 50, Target record count: 50


### 2/5 Data load from "iceberg_data.cb_di.tickets_stg" to "iceberg_data.cb_di.tickets"

In [20]:
from pyspark.sql import functions as F

# Load the source DataFrame (ticket_stg)
ticket_stg_df = spark.table("iceberg_data.cb_di.tickets_stg")
#ticket_stg_df.show()
# Apply necessary transformations, setting other columns to NULL or blank ("" for string columns)
transformed_df = ticket_stg_df.select(
    # Ensure the identifier is treated as a BIGINT
    F.col("identifier").cast("long").alias("identifier"),
    
    # Cast string fields to VARCHAR(255) as per the target schema
    F.col("link").cast("string").alias("link"),
    F.col("integration_type").cast("string").alias("integration_type"),
    F.col("board_id").cast("int").alias("board_id"),
    F.col("board_name").cast("string").alias("board_name"),
    F.col("status_id").cast("int").alias("status_id"),
    F.col("status_name").cast("string").alias("status_name"),
    
    # Cast actual_hours as FLOAT
    F.col("actual_hours").cast("float").alias("actual_hours"),
    
    # Cast concept as string
    F.col("concept").cast("string").alias("concept"),
    
    # Convert updated_date to TIMESTAMP as per the format specified
    F.expr("CAST(REPLACE(REPLACE(updated_date, 'T', ' '), 'Z', ' +00:00') AS TIMESTAMP)").alias("updated_date"),
    
    # Cast other string fields, set remaining to NULL
    F.col("updated_by").cast("string").alias("updated_by"),
    F.col("integration_id").cast("int").alias("integration_id"),
    F.col("parent_name").cast("string").alias("parent_name"),
    F.col("company_id").cast("int").alias("company_id"),
    
    # Convert created_date to TIMESTAMP
    F.expr("CAST(REPLACE(REPLACE(created_date, 'T', ' '), 'Z', ' +00:00') AS TIMESTAMP)").alias("created_date"),
    
    # Cast created_by as string
    F.col("created_by").cast("string").alias("created_by"),
    
    # Return_type, Type, Sub_type as string
    F.col("return_type").cast("string").alias("return_type"),
    F.col("type").cast("string").alias("type"),
    F.col("sub_type").cast("string").alias("sub_type"),
    
    # Other string fields (set to NULL if not in the source)
    F.lit(None).cast("string").alias("item"),  # Setting as NULL
    F.lit(None).cast("string").alias("ticket_owner"),  # Setting as NULL
    F.lit(None).cast("string").alias("sla"),  # Setting as NULL
    F.lit(None).cast("string").alias("agreement"),  # Setting as NULL
    F.lit(None).cast("string").alias("predecessor"),  # Setting as NULL
    
    # Cast dates, or set NULL if not available
    F.lit(None).cast("date").alias("estimated_start_date"),  # Setting as NULL
    F.lit(None).cast("date").alias("due_date"),  # Setting as NULL
    
    # Cast duration as BIGINT (if not available, set NULL)
    F.lit(None).cast("long").alias("duration"),  # Setting as NULL
    
    # Impact, priority, sla_status as string, setting NULL where needed
    F.lit(None).cast("string").alias("impact"),  # Setting as NULL
    F.lit(None).cast("string").alias("priority"),  # Setting as NULL
    F.lit(None).cast("string").alias("sla_status"),  # Setting as NULL
    
    # Budget hours as FLOAT (set NULL)
    F.lit(None).cast("float").alias("budget_hours"),  # Setting as NULL
    
    # Opportunity, source, vcio, account_tech, assigned_by, closed_by as string
    F.lit(None).cast("string").alias("opportunity"),  # Setting as NULL
    F.lit(None).cast("string").alias("source"),  # Setting as NULL
    F.lit(None).cast("string").alias("vcio"),  # Setting as NULL
    F.lit(None).cast("string").alias("account_tech"),  # Setting as NULL
    F.lit(None).cast("string").alias("assigned_by"),  # Setting as NULL
    F.lit(None).cast("string").alias("closed_by"),  # Setting as NULL
    
    # Convert closed_date to TIMESTAMP (set NULL if missing)
    F.lit(None).cast("timestamp").alias("closed_date")  # Setting as NULL
)
#transformed_df.show()
# Insert the transformed data into the target Iceberg table 'iceberg_data.cb_di.ticket'
transformed_df.write.format("iceberg").mode("overwrite").insertInto("iceberg_data.cb_di.tickets")



### 3/5 Data load from "iceberg_data.cb_di.time_entries_stg" to "iceberg_data.cb_di.time_entries"

In [21]:
from pyspark.sql import functions as F

# Load the source DataFrame (assuming the source table is named 'time_entries_stg')
time_entries_stg_df = spark.table("iceberg_data.cb_di.time_entries_stg")

# Apply type conversions for each column
converted_time_entries_df = time_entries_stg_df.select(
    F.col("identifier").cast("int").alias("identifier"),  # Convert to INT
    F.col("user").cast("string").alias("user"),  # Convert to VARCHAR(255)
    F.col("hours").cast("double").alias("hours"),  # Convert to DOUBLE
    F.expr("CAST(REPLACE(REPLACE(date, 'T', ' '), 'Z', ' +00:00') AS TIMESTAMP)").alias("date"),  # Convert to TIMESTAMP
    F.col("billable").cast("boolean").alias("billable"),  # Convert to BOOLEAN
    F.col("work_role").cast("string").alias("work_role"),  # Convert to VARCHAR(255)
    F.col("status").cast("string").alias("status"),  # Convert to VARCHAR(255)
    F.col("location").cast("string").alias("location"),  # Convert to VARCHAR(255)
    F.col("business_unit").cast("string").alias("business_unit"),  # Convert to VARCHAR(255)
    F.col("agreement").cast("string").alias("agreement"),  # Convert to VARCHAR(255)
    F.col("agreement_type").cast("string").alias("agreement_type"),  # Convert to VARCHAR(255)
    F.col("invoice_number").cast("string").alias("invoice_number"),  # Convert to VARCHAR(255)
    F.col("start_time").cast("string").alias("start_time"),  # Convert to VARCHAR(255)
    F.col("end_time").cast("string").alias("end_time"),  # Convert to VARCHAR(255)
    F.col("return_type").cast("string").alias("return_type")  # Convert to VARCHAR(255)
)

# Insert the transformed data into the target Iceberg table 'iceberg_data.cb_di.time_entries'
converted_time_entries_df.write.format("iceberg").mode("overwrite").insertInto("iceberg_data.cb_di.time_entries")

# ---------------- Validation Section ----------------

# Load the target table for validation
target_time_entries_df = spark.table("iceberg_data.cb_di.time_entries")

# 1. Display a sample of records
print("Sample records from the time_entries target table:")
target_time_entries_df.show(10)

# 2. Count and compare the number of records
source_count = time_entries_stg_df.count()
target_count = target_time_entries_df.count()
print(f"Source record count: {source_count}, Target record count: {target_count}")

# 3. Check schema consistency
print("Schema of the target time_entries table:")
target_time_entries_df.printSchema()

Sample records from the time_entries target table:
+----------+--------------------+-------------------+----+--------+---------+------+--------+-------------+---------+--------------+--------------+----------+--------+-----------+
|identifier|                user|              hours|date|billable|work_role|status|location|business_unit|agreement|agreement_type|invoice_number|start_time|end_time|return_type|
+----------+--------------------+-------------------+----+--------+---------+------+--------+-------------+---------+--------------+--------------+----------+--------+-----------+
|   1160693|             csalter|0.31666666666666665|null|    null|         |      |        |             |         |              |              |      null|    null|   CwTicket|
|   1160715|             asheikh|                0.5|null|    null|         |      |        |             |         |              |              |      null|    null|   CwTicket|
|   1160197|Leah Katsos lkats...|                

### 4/5 Data load from "iceberg_data.cb_di.configs_stg" to "iceberg_data.cb_di.configs_stg"

In [22]:
from pyspark.sql import functions as F

# Load the source DataFrame (assuming the source table is named 'configs_stg')
configs_stg_df = spark.table("iceberg_data.cb_di.configs_stg")

# Apply type conversions for each column
converted_configs_df = configs_stg_df.select(
    F.col("identifier").cast("int").alias("identifier"),
    F.col("link").cast("string").alias("link"),
    F.col("integration_type").cast("string").alias("integration_type"),
    F.expr("CAST(REPLACE(REPLACE(updated_date, 'T', ' '), 'Z', ' +00:00') AS TIMESTAMP)").alias("updated_date"),
    F.col("updated_by").cast("string").alias("updated_by"),
    F.col("integration_id").cast("int").alias("integration_id"),
    F.col("parent_name").cast("string").alias("parent_name"),
    F.col("company_id").cast("int").alias("company_id"),
    F.expr("CAST(REPLACE(REPLACE(created_date, 'T', ' '), 'Z', ' +00:00') AS TIMESTAMP)").alias("created_date"),
    F.col("created_by").cast("string").alias("created_by"),
    F.col("return_type").cast("string").alias("return_type"),
    F.col("business_unit_id").cast("int").alias("business_unit_id"),
    F.col("business_unit").cast("string").alias("business_unit"),
    F.col("location_id").cast("int").alias("location_id"),
    F.col("location").cast("string").alias("location"),
    F.col("config_type").cast("string").alias("config_type"),
    F.col("status").cast("string").alias("status"),
    F.col("SLA").cast("string").alias("SLA"),
    F.col("install_date").cast("string").alias("install_date"),
    F.col("installed_by").cast("string").alias("installed_by"),
    F.col("purchase_date").cast("date").alias("purchase_date"),
    F.col("expiration_date").cast("date").alias("expiration_date"),
    F.col("vendor").cast("string").alias("vendor"),
    F.col("manufacturer").cast("string").alias("manufacturer"),
    F.col("part_number").cast("string").alias("part_number"),
    F.col("model_number").cast("string").alias("model_number"),
    F.col("serial_number").cast("string").alias("serial_number"),
    F.col("tag_number").cast("string").alias("tag_number"),
    F.col("bill_customer").cast("boolean").alias("bill_customer"),
    F.col("needs_renewal").cast("boolean").alias("needs_renewal"),
    F.col("contact").cast("string").alias("contact"),
    F.col("site").cast("string").alias("site"),
    F.col("site_address").cast("string").alias("site_address"),
    F.col("notes").cast("string").alias("notes"),
    F.col("vendor_notes").cast("string").alias("vendor_notes")
)

# Insert the transformed data into the target Iceberg table 'iceberg_data.cb_di.configs'
converted_configs_df.write.format("iceberg").mode("overwrite").insertInto("iceberg_data.cb_di.configs")

# ---------------- Validation Section ----------------

# Load the target table for validation
target_configs_df = spark.table("iceberg_data.cb_di.configs")

# 1. Display a sample of records
print("Sample records from the configs target table:")
target_configs_df.show(10)

# 2. Count and compare the number of records
source_count = configs_stg_df.count()
target_count = target_configs_df.count()
print(f"Source record count: {source_count}, Target record count: {target_count}")


Sample records from the configs target table:
+----------+--------------------+----------------+-------------------+------------+--------------+------------+----------+-------------------+------------+-----------+----------------+-------------+-----------+--------+-----------+---------+----+------------+------------+-------------+---------------+------+------------+-----------+----------------+---------------+----------+-------------+-------------+-------+--------------------+------------+--------------------+------------+
|identifier|                link|integration_type|       updated_date|  updated_by|integration_id| parent_name|company_id|       created_date|  created_by|return_type|business_unit_id|business_unit|location_id|location|config_type|   status| sla|install_date|installed_by|purchase_date|expiration_date|vendor|manufacturer|part_number|    model_number|  serial_number|tag_number|bill_customer|needs_renewal|contact|                site|site_address|               notes|ve

### 5/5 Data load from "iceberg_data.cb_di.contacts_stg" to "iceberg_data.cb_di.contacts"

In [24]:
from pyspark.sql import functions as F

# Load the source DataFrame (assuming the source table is named 'contacts_stg')
contacts_stg_df = spark.table("iceberg_data.cb_di.contacts_stg")

# Apply type conversions for each column
converted_contacts_df = contacts_stg_df.select(
    F.col("id").cast("long").alias("id"),  # Convert to BIGINT
    F.col("title").cast("string").alias("title"),  # Convert to VARCHAR
    F.col("link").cast("string").alias("link"),  # Convert to VARCHAR
    F.col("integration_type").cast("string").alias("integration_type"),  # Convert to VARCHAR
    F.col("business_unit_id").cast("int").alias("business_unit_id"),  # Convert to INT
    F.col("location_id").cast("int").alias("location_id"),  # Convert to INT
    F.col("identifier").cast("int").alias("identifier"),  # Convert to INT
    F.expr("CAST(REPLACE(REPLACE(updated_date, 'T', ' '), 'Z', ' +00:00') AS TIMESTAMP)").alias("updated_date"),  # Convert to TIMESTAMP
    F.col("updated_by").cast("string").alias("updated_by"),  # Convert to VARCHAR
    F.col("integration_id").cast("int").alias("integration_id"),  # Convert to INT
    F.col("parent_name").cast("string").alias("parent_name"),  # Convert to VARCHAR
    F.col("company_id").cast("int").alias("company_id"),  # Convert to INT
    F.expr("CAST(REPLACE(REPLACE(created_date, 'T', ' '), 'Z', ' +00:00') AS TIMESTAMP)").alias("created_date"),  # Convert to TIMESTAMP
    F.col("created_by").cast("string").alias("created_by"),  # Convert to VARCHAR
    F.col("return_type").cast("string").alias("return_type"),  # Convert to VARCHAR
    F.col("full_name").cast("string").alias("full_name"),  # Convert to VARCHAR
    F.col("company_location").cast("string").alias("company_location"),  # Convert to VARCHAR
    F.col("department").cast("string").alias("department"),  # Convert to VARCHAR
    F.col("site").cast("string").alias("site"),  # Convert to VARCHAR
    F.col("email").cast("string").alias("email"),  # Convert to VARCHAR
    F.col("phone_number").cast("string").alias("phone_number"),  # Convert to VARCHAR
    F.col("facebook").cast("string").alias("facebook"),  # Convert to VARCHAR
    F.col("twitter").cast("string").alias("twitter"),  # Convert to VARCHAR
    F.col("linkedin").cast("string").alias("linkedin")  # Convert to VARCHAR
)

# Insert the transformed data into the target Iceberg table 'iceberg_data.cb_di.contacts'
converted_contacts_df.write.format("iceberg").mode("overwrite").insertInto("iceberg_data.cb_di.contacts")

# ---------------- Validation Section ----------------

# Load the target table for validation
target_contacts_df = spark.table("iceberg_data.cb_di.contacts")

# 1. Display a sample of records
print("Sample records from the contacts target table:")
target_contacts_df.show(10)

# 2. Count and compare the number of records
source_count = contacts_stg_df.count()
target_count = target_contacts_df.count()
print(f"Source record count: {source_count}, Target record count: {target_count}")


Sample records from the contacts target table:
+----+-----+--------------------+----------------+----------------+-----------+----------+-------------------+------------+--------------+-----------+----------+-------------------+-----------+-----------+----------------+----------------+----------+-----+-----+------------+--------+-------+--------+
|  id|title|                link|integration_type|business_unit_id|location_id|identifier|       updated_date|  updated_by|integration_id|parent_name|company_id|       created_date| created_by|return_type|       full_name|company_location|department| site|email|phone_number|facebook|twitter|linkedin|
+----+-----+--------------------+----------------+----------------+-----------+----------+-------------------+------------+--------------+-----------+----------+-------------------+-----------+-----------+----------------+----------------+----------+-----+-----+------------+--------+-------+--------+
|null|     |https://staging.c...|     ConnectWi

# Data Ingestion is complete. 