In [1]:
import logging
from datetime import datetime
from typing import List, Optional, Tuple, Dict

from pyspark.sql import SparkSession, DataFrame, Window # type: ignore
from pyspark.sql.functions import (col, to_timestamp, year, month, regexp_replace, when, # type: ignore
                                  date_format, to_date, sequence, explode, lit, min as min_,
                                  max as max_, count, last, row_number, concat, current_timestamp, rank) # type: ignore
import pyspark.sql.functions as F # type: ignore

from pyspark.sql.window import Window # type: ignore
from pyspark.sql.types import StringType, DoubleType, TimestampType, DateType # type: ignore

# Logging setup
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("InsertBronzeToSilver")

# Constants
SOURCE_CATALOG = "datalake"
SOURCE_NAMESPACE = f"{SOURCE_CATALOG}.bronze"
TARGET_CATALOG = "datalake"
TARGET_NAMESPACE = f"{TARGET_CATALOG}.silver"

In [2]:
FINANCIAL_COLUMNS = ["price", "open", "high", "low", "volume"]
START_DATE = "1995-01-05"  # Start date for continuous date generation
END_DATE = "2025-03-07"    # End date for continuous date generation
# Date column
DATE_COLUMN = "date"

In [3]:
def create_spark_session(app_name: str) -> SparkSession:
    """Initialize Spark session with required configurations"""
    logger.info("Initializing Spark Session...")
    spark = (
        SparkSession.builder.appName(app_name)
        .enableHiveSupport()
        .config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")
        .config("spark.sql.avro.datetimeRebaseModeInWrite", "CORRECTED")
        .getOrCreate()
    )
    
    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")

    
    logger.info(f"Spark Session initialized successfully. Spark version: {spark.version}")
    
    # Log important configs
    conf = spark.sparkContext.getConf()
    s3_endpoint = conf.get("spark.hadoop.fs.s3a.endpoint", "N/A")
    catalog_type = conf.get(f"spark.sql.catalog.{TARGET_CATALOG}.type", "N/A")
    catalog_uri = conf.get(f"spark.sql.catalog.{TARGET_CATALOG}.uri", "N/A")
    warehouse = conf.get(f"spark.sql.catalog.{TARGET_CATALOG}.warehouse", "N/A")
    
    logger.info(f"Source Namespace: {SOURCE_NAMESPACE}")
    logger.info(f"Target Namespace: {TARGET_NAMESPACE}")
    logger.info(f"S3 Endpoint (Hadoop): {s3_endpoint}")
    logger.info(f"Catalog Type: {catalog_type}")
    logger.info(f"Catalog URI: {catalog_uri}")
    logger.info(f"Catalog Warehouse: {warehouse}")
    
    return spark

In [4]:
def create_namespace_if_not_exists(spark: SparkSession, namespace: str):
    """Create Iceberg namespace if it doesn't exist"""
    try:
        logger.info(f"Checking/Creating namespace: {namespace}")
        spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {namespace}")
        logger.info(f"Namespace {namespace} ensured.")
    except Exception as e:
        logger.error(f"Failed to create namespace {namespace}: {str(e)}")

In [5]:
def list_tables(spark: SparkSession, namespace: str) -> List[str]:
    """List all tables in a namespace"""
    try:
        logger.info(f"Listing tables in namespace: {namespace}")
        tables_df = spark.sql(f"SHOW TABLES IN {namespace}")
        
        if tables_df.count() == 0:
            logger.warning(f"No tables found in namespace {namespace}")
            return []
            
        table_names = [row.tableName for row in tables_df.select("tableName").collect()]
        logger.info(f"Found {len(table_names)} tables: {', '.join(table_names)}")
        return table_names
        
    except Exception as e:
        logger.error(f"Failed to list tables in {namespace}: {str(e)}")
        return []

In [6]:
def segment_based_forward_fill(df, date_col="date", partition_cols=None):
    """
    Perform forward fill on all columns except date_col and partition_cols.

    Parameters:
    - df: Input PySpark DataFrame (e.g., after left join with date range).
    - date_col: Name of the date column.
    - partition_cols: List of column names to partition by (e.g., ["symbol"]). Use None for no partitioning.

    Returns:
    - DataFrame with forward-filled missing values.
    """
    df = df.withColumn("year", year(col(DATE_COLUMN))).withColumn("month", month(col(DATE_COLUMN)))
    
    partition_cols = ['year', 'month']

    # Define window: partition by optional keys and order by date ascending
    window_spec = Window.partitionBy(*partition_cols).orderBy(DATE_COLUMN).rowsBetween(Window.unboundedPreceding, 0)

    # Forward fill for all columns except partition + date
    columns_to_fill = [c for c in df.columns if c not in [DATE_COLUMN, 'year', 'month', 'source_file', 'inserted']]

    for c in columns_to_fill:
        df = df.withColumn(c, last(c, ignorenulls=True).over(window_spec))

    return df

In [7]:
def create_date_range_df(spark: SparkSession) -> DataFrame:
    """Create a DataFrame with continuous date range"""
    logger.info(f"Creating date range from {START_DATE} to {END_DATE}...")
    
    date_range_df = spark.sql(f"""
        SELECT explode(sequence(
            to_date('{START_DATE}'), 
            to_date('{END_DATE}'), 
            interval 1 day
        )) as {DATE_COLUMN}
    """)
    
    logger.info(f"Created continuous date range with {date_range_df.count()} dates")
    return date_range_df

In [8]:
def process_with_forward_fill(spark: SparkSession, df: DataFrame, table_name: str) -> DataFrame:
    """Process DataFrame with continuous dates and forward fill"""
    logger.info(f"Processing {table_name} with forward fill...")
    
    # 1. Get the full date range DataFrame
    date_range_df = create_date_range_df(spark)
    
    # 2. Join with existing data
    logger.info("Joining with full date range...")
    full_df = date_range_df.join(df, on=DATE_COLUMN, how="left")
    
    # 3. Order by date for forward fill
    full_df = full_df.orderBy(DATE_COLUMN)
    
    # 4. Apply segment-based forward fill for all tables
    result_df = segment_based_forward_fill(full_df, table_name)
    
    # 7. Show sample data
    logger.info("Sample data after processing:")
    result_df.orderBy(DATE_COLUMN).limit(3).show(truncate=False)
    
    return result_df

In [9]:
def clean_and_cast_columns(df: DataFrame) -> DataFrame:
    """Clean and cast columns to appropriate types"""
    logger.info("Cleaning and casting columns to appropriate types...")
    
    # Ensure date column is date type
    if DATE_COLUMN in df.columns:
        date_type = str(df.schema[DATE_COLUMN].dataType)
        logger.info(f"Date column type: {date_type}")
        
        if not (isinstance(df.schema[DATE_COLUMN].dataType, DateType) or 
                isinstance(df.schema[DATE_COLUMN].dataType, TimestampType)):
            logger.info("Converting date column to date type")
            
            # Try multiple date formats
            df = df.withColumn(
                DATE_COLUMN,
                F.coalesce(
                    to_timestamp(col(DATE_COLUMN), "MM-dd-yyyy")
                ).cast("date")
            )
    
    # Process financial columns
    for column in FINANCIAL_COLUMNS:
        if column in df.columns:
            df = df.withColumn(
                column,
                regexp_replace(
                    regexp_replace(col(column).cast("string"), ",", ""),
                    "%", ""
                )
            )
            logger.info(f"Cleaned string values in column '{column}'")
            
            # Cast to float
            df = df.withColumn(column, col(column).cast("double"))
            logger.info(f"Converted column '{column}' to double type")
    
    return df

In [10]:
def transform_data(spark: SparkSession, df: DataFrame, table_name: str) -> DataFrame:
    """Main transformation function"""
    logger.info(f"Transforming data for table {table_name}")
    
    # 1. Standardize column names
    window_spec = Window.partitionBy("date").orderBy(col("inserted").desc())

    # Add a rank column
    df_ranked = df.withColumn("rnk", rank().over(window_spec))
    
    # Filter to only the top-ranked (latest inserted) row per date
    df = df_ranked.filter(col("rnk") == 1).drop("rnk")
    
    # 2. Clean and cast columns
    df = clean_and_cast_columns(df)
    
    # 3. Filter to include only data from START_DATE onward
    logger.info(f"Filtering data from {START_DATE} onward")
    df = df.filter(col(DATE_COLUMN) >= START_DATE)
    
    # 4. Process with continuous date range and forward fill
    df = process_with_forward_fill(spark, df, table_name)
    
    return df

In [11]:
def write_to_iceberg(df: DataFrame, table_name: str, partition_by: List[str]):
    """Write DataFrame to Iceberg table"""
    logger.info(f"Writing to Iceberg table: {table_name}")
    logger.info(f"Partitioning by: {partition_by}")
    
    # Check for duplicate columns
    df = df.drop(col('inserted')).withColumn("inserted", current_timestamp())

    try:
        # Prepare writer
        writer = (
            df.write
            .format("iceberg")
            .mode("overwrite")
            .option("overwriteSchema", "true")
            .partitionBy(*partition_by)
            #.option("iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
        )        
        # Write to table
        writer.saveAsTable(table_name)
        logger.info(f"Successfully wrote to {table_name}")
        
    except Exception as e:
        logger.error(f"Failed to write to table {table_name}: {str(e)}")
        raise

In [12]:
def process_table(spark: SparkSession, table_name: str):
    """Process a single table from bronze to silver"""
    logger.info(f"=== Processing table: {table_name} ===")
    start_time = datetime.now()
    
    try:
        # 1. Read from bronze
        source_table = f"{SOURCE_NAMESPACE}.{table_name}"
        logger.info(f"Reading from {source_table}")
        
        df = spark.table(source_table)
        if df.rdd.isEmpty():
            logger.warning(f"No data found in {source_table}. Skipping.")
            return
        
        # 2. Display schema
        logger.info("Table schema:")
        df.printSchema()
        
        # 3. Transform data
        df_transformed = transform_data(spark, df, table_name)
        
        # 4. Write to silver
        target_table = f"{TARGET_NAMESPACE}.{table_name}"
        write_to_iceberg(df_transformed, target_table, ["year", "month"])
        
        # 5. Log success
        end_time = datetime.now()
        duration = end_time - start_time
        logger.info(f"=== Successfully processed {table_name} in {duration} ===")
        
    except Exception as e:
        logger.error(f"Error processing {table_name}: {str(e)}", exc_info=True)
        raise

In [13]:
try:
    # Initialize Spark
    spark = create_spark_session("Bronze_to_Silver_ETL")
    
    # Ensure target namespace exists
    create_namespace_if_not_exists(spark, TARGET_NAMESPACE)
    
    # List bronze tables to process
    bronze_tables = list_tables(spark, SOURCE_NAMESPACE)
        
    # Process each table
    processed_count = 0
    failed_count = 0
    
    for table_name in bronze_tables:
        try:
            process_table(spark, table_name)
            processed_count += 1
        except Exception as e:
            logger.error(f"Failed to process table {table_name}")
            failed_count += 1

    # Summarize results
    logger.info("=== Job Summary ===")
    logger.info(f"Total tables found: {len(bronze_tables)}")
    logger.info(f"Successfully processed: {processed_count}")
    logger.info(f"Failed: {failed_count}")
    
except Exception as e:
    logger.critical(f"Job failed with error: {str(e)}")
    raise
finally:
    if spark:
        spark.stop()

2025-05-24 17:47:32,598 - INFO - Initializing Spark Session...
2025-05-24 17:47:33,492 - INFO - Spark Session initialized successfully. Spark version: 3.5.5
2025-05-24 17:47:33,520 - INFO - Source Namespace: datalake.bronze
2025-05-24 17:47:33,521 - INFO - Target Namespace: datalake.silver
2025-05-24 17:47:33,522 - INFO - S3 Endpoint (Hadoop): http://minio:9000
2025-05-24 17:47:33,522 - INFO - Catalog Type: rest
2025-05-24 17:47:33,523 - INFO - Catalog URI: http://nessie:19120/iceberg
2025-05-24 17:47:33,523 - INFO - Catalog Warehouse: N/A
2025-05-24 17:47:33,524 - INFO - Checking/Creating namespace: datalake.silver
2025-05-24 17:47:35,095 - INFO - Namespace datalake.silver ensured.
2025-05-24 17:47:35,097 - INFO - Listing tables in namespace: datalake.bronze
2025-05-24 17:47:38,054 - INFO - Found 19 tables: dow_jones, gold, inflation, interest, msci_world, nasdaq100, oil, russell2000, s_p500, us_10_year_bond, us_2_year_bond, us_3_month_bond, us_5_year_bond, us_dollar, usd_vnd, vn_10_y

root
 |-- date: string (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- price: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- source_file: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- inserted: timestamp (nullable = true)



2025-05-24 17:47:41,193 - INFO - Cleaned string values in column 'open'
2025-05-24 17:47:41,209 - INFO - Converted column 'open' to double type
2025-05-24 17:47:41,235 - INFO - Cleaned string values in column 'high'
2025-05-24 17:47:41,252 - INFO - Converted column 'high' to double type
2025-05-24 17:47:41,276 - INFO - Cleaned string values in column 'low'
2025-05-24 17:47:41,292 - INFO - Converted column 'low' to double type
2025-05-24 17:47:41,318 - INFO - Cleaned string values in column 'volume'
2025-05-24 17:47:41,332 - INFO - Converted column 'volume' to double type
2025-05-24 17:47:41,333 - INFO - Filtering data from 1995-01-05 onward
2025-05-24 17:47:41,351 - INFO - Processing dow_jones with forward fill...
2025-05-24 17:47:41,352 - INFO - Creating date range from 1995-01-05 to 2025-03-07...
2025-05-24 17:47:41,656 - INFO - Created continuous date range with 11020 dates
2025-05-24 17:47:41,658 - INFO - Joining with full date range...
2025-05-24 17:47:41,911 - INFO - Sample data 

+----------+------+-------+-------+-------+-------+---------------------------------------------+----+-----+--------------------------+
|date      |open  |high   |low    |price  |volume |source_file                                  |year|month|inserted                  |
+----------+------+-------+-------+-------+-------+---------------------------------------------+----+-----+--------------------------+
|1995-01-05|3857.6|3860.68|3843.19|3850.92|2.581E7|file:///src/data/raw/Dow%20Jones/DowJones.csv|1995|1    |2025-05-24 10:14:39.409379|
|1995-01-06|3850.9|3887.26|3841.84|3867.41|3.024E7|file:///src/data/raw/Dow%20Jones/DowJones.csv|1995|1    |2025-05-24 10:14:39.409379|
|1995-01-07|3850.9|3887.26|3841.84|3867.41|3.024E7|NULL                                         |1995|1    |NULL                      |
+----------+------+-------+-------+-------+-------+---------------------------------------------+----+-----+--------------------------+



2025-05-24 17:47:58,195 - INFO - Successfully wrote to datalake.silver.dow_jones
2025-05-24 17:47:58,197 - INFO - === Successfully processed dow_jones in 0:00:20.140675 ===
2025-05-24 17:47:58,199 - INFO - === Processing table: gold ===
2025-05-24 17:47:58,200 - INFO - Reading from datalake.bronze.gold
2025-05-24 17:47:58,486 - INFO - Table schema:
2025-05-24 17:47:58,489 - INFO - Transforming data for table gold
2025-05-24 17:47:58,531 - INFO - Cleaning and casting columns to appropriate types...
2025-05-24 17:47:58,534 - INFO - Date column type: StringType()
2025-05-24 17:47:58,534 - INFO - Converting date column to date type
2025-05-24 17:47:58,576 - INFO - Cleaned string values in column 'price'
2025-05-24 17:47:58,583 - INFO - Converted column 'price' to double type
2025-05-24 17:47:58,599 - INFO - Cleaned string values in column 'open'
2025-05-24 17:47:58,607 - INFO - Converted column 'open' to double type
2025-05-24 17:47:58,626 - INFO - Cleaned string values in column 'high'
20

root
 |-- date: string (nullable = true)
 |-- price: string (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- source_file: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- inserted: timestamp (nullable = true)



2025-05-24 17:47:58,786 - INFO - Created continuous date range with 11020 dates
2025-05-24 17:47:58,787 - INFO - Joining with full date range...
2025-05-24 17:47:58,914 - INFO - Sample data after processing:

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f5279a99040, pid=65994, tid=66025
#
# JRE version: OpenJDK Runtime Environment (11.0.26+4) (build 11.0.26+4-post-Debian-1deb11u1)
# Java VM: OpenJDK 64-Bit Server VM (11.0.26+4-post-Debian-1deb11u1, mixed mode, sharing, tiered, compressed oops, g1 gc, linux-amd64)
# Problematic frame:
# V  [libjvm.so+0xc35040]  ProtectionDomainEntry::object_no_keepalive()+0x0
#
# Core dump will be written. Default location: Core dumps may be processed with "/usr/share/apport/apport -p%p -s%s -c%c -d%d -P%P -u%u -g%g -- %E" (or dumping to /src/notebooks/Bao/core.65994)
#
# An error report file with more information is saved as:
# /src/notebooks/Bao/hs_err_pid65994.log
#
# If you would like to submit a bug report, please visit:
#   https://bugs.debian.org/openjdk-11
#


2025-05-24 17:48:00,960 - INFO - Error while receiving.
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
2025-05-24 17:48:00,965 - INFO - Closing down clientserver connection
2025-05-24 17:48:00,968 - ERROR - Exception while sending command.
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/spark/python/lib/py

ConnectionRefusedError: [Errno 111] Connection refused

2025-05-24 17:48:01,751 - INFO - Closing down clientserver connection
2025-05-24 17:48:01,754 - INFO - Closing down clientserver connection
2025-05-24 17:48:01,756 - INFO - Closing down clientserver connection
2025-05-24 17:48:01,758 - INFO - Closing down clientserver connection
2025-05-24 17:48:01,760 - INFO - Closing down clientserver connection
2025-05-24 17:48:01,762 - INFO - Closing down clientserver connection
