# Import libraries

In [1]:
from typing import List, Dict, Optional, Any
from sqlalchemy import create_engine, text
from sqlalchemy.sql import quoted_name
from dotenv import load_dotenv
import os
import stat
import glob
import shutil

import requests
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import (
    sum as _sum, min as _min, max as _max, avg as _avg, sha2, col, when, coalesce, lit, concat_ws, first, countDistinct
)
from pyspark.sql.types import DataType, DateType, TimestampType, NumericType, DecimalType
import traceback
import gc

# Helper functions

In [2]:
load_dotenv(".env")

DB_HOST_SOURCE = os.getenv("DB_HOST_SOURCE")
DB_USER_SOURCE = os.getenv("DB_USER_SOURCE")
DB_PASS_SOURCE = os.getenv("DB_PASS_SOURCE")
DB_PORT_SOURCE = os.getenv("DB_PORT_SOURCE")

DB_HOST_TARGET = os.getenv("DB_HOST_TARGET")
DB_USER_TARGET = os.getenv("DB_USER_TARGET")
DB_PASS_TARGET = os.getenv("DB_PASS_TARGET")
DB_PORT_TARGET = os.getenv("DB_PORT_TARGET")

DB_NAME_SOURCE = os.getenv("DB_NAME_SOURCE")
DB_NAME_STG = os.getenv("DB_NAME_STG")
DB_NAME_LOG = os.getenv("DB_NAME_LOG")
DB_NAME_WH = os.getenv("DB_NAME_WH")

API_PATH = os.getenv("API_PATH")

# Create URL link for each database connection
def source_engine():
    DB_URL = f"jdbc:postgresql://{DB_HOST_SOURCE}:{DB_PORT_SOURCE}/{DB_NAME_SOURCE}"
    return DB_URL, DB_USER_SOURCE, DB_PASS_SOURCE, DB_NAME_SOURCE

def stg_engine():
    DB_URL = f"jdbc:postgresql://{DB_HOST_TARGET}:{DB_PORT_TARGET}/{DB_NAME_STG}"
    return DB_URL, DB_USER_TARGET, DB_PASS_TARGET, DB_NAME_STG

def log_engine():
    DB_URL = f"jdbc:postgresql://{DB_HOST_TARGET}:{DB_PORT_TARGET}/{DB_NAME_LOG}"
    return DB_URL, DB_USER_TARGET, DB_PASS_TARGET, DB_NAME_LOG

def wh_engine():
    DB_URL = f"jdbc:postgresql://{DB_HOST_TARGET}:{DB_PORT_TARGET}/{DB_NAME_WH}"
    return DB_URL, DB_USER_TARGET, DB_PASS_TARGET, DB_NAME_WH

def wh_engine_sqlalchemy():
    return create_engine(f"postgresql://{DB_USER_TARGET}:{DB_PASS_TARGET}@{DB_HOST_TARGET}:{DB_PORT_TARGET}/{DB_NAME_WH}")

def load_log(spark: SparkSession, log_msg):
    DB_URL, DB_USER, DB_PASS, DB_NAME = log_engine()
    table_name = "etl_log"

    # set config
    connection_properties = {
        "user": DB_USER,
        "password": DB_PASS,
        "driver": "org.postgresql.Driver" # set driver postgres
    }

    log_msg.write.jdbc(url = DB_URL,
                      table = table_name,
                      mode = "append",
                      properties = connection_properties)

# Define spark session

In [3]:
spark = SparkSession.builder \
    .appName("ETL_Pipeline") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# Profiling Functions

In [10]:
class ProfileData():
    @staticmethod
    def table_profiler(spark: SparkSession, table: str, source_type: str = "db", **kwargs) -> List[Dict[str, Any]]:
        """
        Profile a table from database, CSV, or API, returning column statistics including data type,
        null percentage, duplicates, and max/mode.
    
        Args:
            spark: SparkSession object
            table: Name of the table or data source identifier (e.g., table name, file name, or API endpoint)
            source_type: Source type ('db', 'csv', or 'api') (default: 'db')
            **kwargs: Additional parameters:
                - For 'db': db_url, connection_properties
                - For 'csv': path (base directory for CSV files)
                - For 'api': api_url, params (optional query parameters)
    
        Returns:
            List of dictionaries containing profiling results for each column
        """
        results = []
        try:
            # Load data based on source type
            if source_type == "db":
                db_url = kwargs.get("db_url")
                connection_properties = kwargs.get("connection_properties")
                if not db_url or not connection_properties:
                    raise ValueError("db_url and connection_properties required for db source")
                df = spark.read.jdbc(url=db_url, table=table, properties=connection_properties)
            
            elif source_type == "csv":
                path = kwargs.get("path", "data/")
                df = spark.read.csv(f"{path}{table}", header=True)
            
            elif source_type == "api":
                api_url = kwargs.get("api_url")
                params = kwargs.get("params", {})
                if not api_url:
                    raise ValueError("api_url required for api source")
                response = requests.get(url=api_url, params=params)
                if response.status_code != 200:
                    raise Exception(f"API request fail with status code: {response.status_code}")
                json_data = response.json()
                if not json_data:
                    print(f"No data from API for {table}")
                    return results
                df = spark.createDataFrame(pd.DataFrame(json_data))
            
            else:
                raise ValueError(f"Unsupported source_type: {source_type}")
    
            # Get basic stats
            num_rows = df.count()
            num_cols = len(df.columns)
            
            # Profile each column
            for column in df.columns:
                try:
                    # Ensure column name is a string
                    column = str(column)
                    
                    col_type = df.schema[column].dataType
                    col_type_str = str(col_type.simpleString())
                    
                    # Null percentage
                    null_count = df.filter((df[column].isNull()) | (df[column]=="")).count()
                    null_pct = (null_count / num_rows * 100) if num_rows > 0 else 0.0
                    
                    # Duplicate count
                    distinct_count = df.select(countDistinct(col(column))).first()[0]
                    duplicate_count = num_rows - distinct_count if num_rows > 0 else 0
                    
                    # Min and max for numeric/date or mode for others
                    if isinstance(col_type, (DateType, TimestampType, NumericType)):
                        max_val = df.select(_max(col(column))).first()[0]
                        max_val = str(max_val) if max_val is not None else None
                        
                        min_val = df.select(_min(col(column))).first()[0]
                        min_val = str(min_val) if min_val is not None else None

                        mode_val = None
                    else:
                        mode_val = df.groupBy(col(column)).count().orderBy(col("count").desc()).first()
                        mode_val = str(mode_val[column]) if mode_val is not None else None
                        
                        max_val = None
                        min_val = None

                    # Possible data value
                    distinct_value = [row[column] for row in df.select(col(column)).distinct().collect()]
                    
                    # Append results
                    results.append({
                        "PIC": "Biyan",
                        "date": datetime.now(),
                        "table_name": table,
                        "column_name": column,
                        "num_rows": num_rows,
                        "num_columns": num_cols,
                        "data_type": col_type_str,
                        "null_percentage": null_pct,
                        "duplicate_count": duplicate_count,
                        "min": min_val,
                        "max": max_val,
                        "mode": mode_val,
                        "distinct_value": distinct_value
                    })
                
                except Exception as e:
                    print(f"Error profiling column {column} in table {table}: {str(e)}\n{traceback.format_exc()}")
                    continue
        
        except Exception as e:
            print(f"Error profiling table {table} from {source_type}: {str(e)}\n{traceback.format_exc()}")
        
        return results
    
    @classmethod
    def from_database(cls, spark: SparkSession) -> pd.DataFrame:
        """
        Profile all tables in a PostgreSQL database, including column count, row count,
        and per-column statistics (data type, null percentage, duplicates, max/mode).
    
        Args:
            spark: SparkSession object
    
        Returns:
            Pandas DataFrame with profiling results
        """
        # Initialize results list
        results = []
        
        try:
            # Get database connection details
            db_url, db_user, db_pass, db_name = source_engine()
            connection_properties = {
                "user": db_user,
                "password": db_pass,
                "driver": "org.postgresql.Driver"
            }
        
            # Get list of tables
            tables_query = f"(SELECT table_name FROM information_schema.tables WHERE table_schema = 'public') AS tables"
            tables_df = spark.read.jdbc(url=db_url, table=tables_query, properties=connection_properties)
            tables = [row.table_name for row in tables_df.collect()]
    
            for table in tables:
                # Profile each table
                res = cls.table_profiler(
                    spark=spark,
                    table=table,
                    source_type='db',
                    db_url=db_url,
                    connection_properties=connection_properties
                )
                results = results + res
    
            # Convert to pandas DataFrame
            result_df = pd.DataFrame(results)
            
        except Exception as e:
            print(f"Error profiling database: {str(e)}\n{traceback.format_exc()}")
            result_df = pd.DataFrame()
    
        finally:
            return result_df
    
    @classmethod
    def from_csv(cls, spark: SparkSession) -> pd.DataFrame:
        """
        Profile all CSV files in a directory, returning column statistics as a DataFrame.

        Args:
            spark: SparkSession object
        
        Returns:
            Pandas DataFrame with profiling results
        """
        path = "data/"
        results = []
        
        try:
            for file_name in os.listdir(path):
                fullpath = os.path.join(path, file_name)

                _, file_extension = os.path.splitext(file_name)
                    
                if file_extension.lower() == '.csv':                  
                    # Start profiling table
                    res = cls.table_profiler(
                        spark=spark,
                        table=file_name,
                        source_type="csv",
                        path=path
                    )
                    results = results + res
    
            result_df = pd.DataFrame(results)
            
        except Exception as e:
            print(f"Error profiling csv: {str(e)}\n{traceback.format_exc()}")
            result_df = pd.DataFrame()
            
        finally:
            return result_df
    
    @classmethod
    def from_api(cls, spark: SparkSession, api_url: str, params: Dict) -> pd.DataFrame:
        """
        Profile data from an API endpoint, returning column statistics as a DataFrame.
        
        Args:
            spark: SparkSession object
            api_url: API endpoint URL
            params: Dictionary of API query parameters
        
        Returns:
            Pandas DataFrame with profiling results        
        """
        try:
            # Start profiling table
            res = cls.table_profiler(
                spark=spark,
                table=api_url,
                source_type="api",
                api_url=api_url,
                params=params
            )
    
            result_df = pd.DataFrame(res)
            
        except Exception as e:
            print(f"Error profiling api: {str(e)}\n{traceback.format_exc()}")
            result_df = pd.DataFrame()
            
        finally:
            return result_df

# Extract Functions

In [5]:
class Extract():
    @staticmethod
    def from_database(spark: SparkSession, table_to_extract: List, source_type: str = "source", write_log: bool = True) -> Optional[Dict]:
        current_timestamp = datetime.now()
        df_dict = {}
        
        try:
            # Database connection
            if source_type == "source":
                source_db_url, source_db_user, source_db_pass, source_db_name = source_engine()
                step = "staging"
            elif source_type == "staging":
                source_db_url, source_db_user, source_db_pass, source_db_name = stg_engine()
                step = "warehouse"
            else:
                raise ValueError("Unknown source type")
                return None
            
            # Connection properties
            connection_properties = {
                "user": source_db_user,
                "password": source_db_pass,
                "driver": "org.postgresql.Driver"
            }

            # Loop and extract table from table_to_extract
            for table_name in table_to_extract:
                # Read from postgres
                df = spark.read.jdbc(
                    url=source_db_url,
                    table=table_name,
                    properties=connection_properties
                )

                if df.count() == 0:
                    raise ValueError(f"{table_name} is empty")

                df_dict[table_name] = df
                
                # Log extract success
                if write_log:
                    log_msg = spark.createDataFrame(
                        [(step, f"{source_type}_extraction", "success", source_db_name, table_name, current_timestamp)],
                        ['step', 'process', 'status', 'source', 'table_name', 'etl_date']
                    )
                    load_log(spark, log_msg)
            # return dictionary of spark dataframe
            return df_dict
        
        except Exception as e:
            # Log failure
            if write_log:
                log_msg = spark.createDataFrame(
                    [(step, f"{source_type}_extraction", "fail", source_db_name, "", current_timestamp, str(e))],
                    ['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg']
                )
                load_log(spark, log_msg) 
            return None

    @staticmethod
    def from_csv(spark: SparkSession, path = "data/", step="staging", write_log: bool = True) -> Optional[DataFrame]:
        current_timestamp = datetime.now()
        df_dict = {}
        try:
            for file_name in os.listdir(path):
                fullpath = os.path.join(path, file_name)
            
                table_name, file_extension = os.path.splitext(file_name)
                    
                if file_extension.lower() == '.csv': 
                    df = spark.read.csv(f"{path}{file_name}", header=True, inferSchema=True)
                    if df.count() == 0:
                        raise ValueError(f"{table_name} is empty")
                    
                    df_dict[table_name] = df
                    
                    if write_log:
                        log_msg = spark.createDataFrame(
                            [(step, "extraction", "success", f"{path}{file_name}", file_name, current_timestamp)],
                            ['step', 'process', 'status', 'source', 'table_name', 'etl_date']
                        )
                        load_log(spark, log_msg)
                    
            return df_dict
        
        except Exception as e:
            if write_log:
                log_msg = spark.createDataFrame(
                    [(step, "extraction", "fail", f"{path}{file_name}", file_name, current_timestamp, str(e))],
                    ['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg']
                )
                load_log(spark, log_msg)
            
            print(f"Error extracting CSV {file_name}: {str(e)}")
            return None
        
    @staticmethod
    def from_api(spark: SparkSession, start_date: str, end_date: str, write_log: bool = True) -> Optional[DataFrame]:
        """
        Extract data from an API and convert to Spark DataFrame with logging.
        
        Args:
            spark: SparkSession object
            ds: Date string for API query
            write_log: Whether to log the extraction process (default: False)
        
        Returns:
            Spark DataFrame or None if extraction fails
        """
        current_timestamp = datetime.now()
        try:
            response = requests.get(
                url=API_PATH,
                params={"start_date": start_date, "end_date": end_date}
            )
            
            if response.status_code != 200:
                raise Exception(f"API request fail with status code: {response.status_code}")
            
            json_data = response.json()
            if not json_data:
                if write_log:
                    log_msg = spark.createDataFrame(
                        [("staging", "extraction", "skipped", "api", "milestone", current_timestamp, "No new data in API")],
                        ['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg']
                    )
                    load_log(spark, log_msg)
                return None
            
            # Convert JSON to Spark DataFrame
            df = spark.createDataFrame(json_data)
            
            if write_log:
                log_msg = spark.createDataFrame(
                    [("staging", "extraction", "success", "api", "milestone", current_timestamp)],
                    ['step', 'process', 'status', 'source', 'table_name', 'etl_date']
                )
                load_log(spark, log_msg)
            
            return df
        
        except Exception as e:
            if write_log:
                log_msg = spark.createDataFrame(
                    [("staging", "extraction", "fail", "api", "data", current_timestamp, str(e))],
                    ['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg']
                )
                load_log(spark, log_msg)
            
            print(f"Error extracting API data: {str(e)}")
            return None

# Load Functions

In [24]:
class Load():
    @staticmethod
    def to_staging(spark: SparkSession, df_dict):
        process = "load_to_staging"
        try:
            # Define current timestamp for logging
            current_timestamp = datetime.now()

            # Define connection properties
            db_url, db_user, db_pass, db_name = stg_engine()
            properties = {
                "user": db_user,
                "password": db_pass
            }
    
            for table_name, spark_df in df_dict.items():
                # Check if any dataframe row count = 0
                if spark_df.count() == 0:
                    raise ValueError(f"Dataframe in table: {table_name} is empty")

                # load data
                spark_df.write.jdbc(url = db_url,
                            table = table_name,
                            mode = "overwrite",
                            properties = properties)
            
            # Structure log message
            error_msg = ""
            status = "success"
                
        except Exception as e:
            # Structure log message
            # Capture full traceback information
            tb_str = traceback.format_exc()
            error_msg = f"""
            Fail to perform {process} for table '{table_name}'.
            
            Full Traceback:
            {tb_str}
            """
            status = "fail"
            
        finally:
            # log message
            log_msg = spark.sparkContext\
                .parallelize([("staging", process, status, "source transformation result", "", current_timestamp, error_msg)])\
                .toDF(["step", "process", "status", "source", "table_name", "etl_date", "error_msg"])
         
            load_log(spark, log_msg)

    @staticmethod
    def to_warehouse(spark: SparkSession, df_dict):
        current_timestamp = datetime.now()
        db_url, db_user, db_pass, db_name = wh_engine()
        properties = {
            "user": db_user,
            "password": db_pass
        }
        
        for table_name, df in df_dict.items():
            if table_name == "startup_event":
                wh_table_name = f"fct_{table_name}"
            else:
                wh_table_name = f"dim_{table_name}"
                
            try:
                # truncate table with sqlalchemy
                conn = wh_engine_sqlalchemy()
        
                with conn.connect() as connection:
                    # Execute the TRUNCATE TABLE command
                    quoted_table_name = quoted_name(wh_table_name, quote=True)
                    connection.execute(text(f"TRUNCATE TABLE {quoted_table_name} RESTART IDENTITY CASCADE"))
                    connection.commit()
                conn.dispose()
                
            except Exception as e:
                log_msg = spark.sparkContext\
                    .parallelize([("warehouse", "load", "fail", "validation passed tables", table_name, current_timestamp, str(e))])\
                    .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg'])
                load_log(spark, log_msg)
            
            try:
                # load data               
                df.write.jdbc(url = db_url,
                            table = wh_table_name,
                            mode = "append",
                            properties = properties)
                
                #log message
                log_msg = spark.sparkContext\
                    .parallelize([("warehouse", "load", "success", "validation passed tables", table_name, current_timestamp)])\
                    .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])
                
            except Exception as e:
                
                # log message
                log_msg = spark.sparkContext\
                    .parallelize([("warehouse", "load", "fail", "validation passed tables", table_name, current_timestamp, str(e))])\
                    .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg'])
            
            finally:
                load_log(spark, log_msg)
            
 

# Transform Functions

In [18]:
class Transform():
    @staticmethod
    def _hashing(
        df: DataFrame,
        hash_cols: List[str],
        hash_output_colname: str
    ) -> DataFrame:
        # Create UUID using sha256 hash
        # Ensure all hash_cols are cast to string for consistent hashing
        hash_expressions = [col(c).cast("string") for c in hash_cols]
        df = df.withColumn(hash_output_colname, sha2(concat_ws("||", *hash_expressions), 256))
        return df

    @staticmethod
    def _common_transformations(
        src_df: DataFrame,
        nk_mapping: Optional[Dict[str, str]] = None,
        type_mapping: Optional[Dict[str, Union[str, DataType]]] = None,
        literals: Optional[Dict[str, str]] = None,
        drop_cols: Optional[List[str]] = None,
        hash_cols: Optional[List[str]] = None,
        hash_output_colname: Optional[str] = None,
        fk_df: Optional[DataFrame] = None,
        fk_col: Optional[Dict[str, str]] = None,
        select_colname: Optional[List[str]] = None
    ) -> Optional[DataFrame]:
        """Applies renames, type changes, literals, drops, hash creation, foreign key joins, and column selection."""
        # Rename columns
        if nk_mapping:
            for original_col, new_cols in nk_mapping.items():
                if original_col in src_df.columns:
                    # Handle both single string and list of strings
                    if isinstance(new_cols, str):
                        new_cols = [new_cols]
                    
                    for new_col in new_cols:
                        src_df = src_df.withColumn(new_col, col(original_col))
    
        # Modify column data type
        if type_mapping:
            for colname, datatype in type_mapping.items():
                if colname in src_df.columns:
                    src_df = src_df.withColumn(colname, col(colname).cast(datatype))
                        
        # Create literal column
        if literals:
            for colname, literal in literals.items():
                src_df = src_df.withColumn(colname, lit(literal))
    
        # Drop columns
        if drop_cols:
            drop_cols = [c for c in drop_cols if c in src_df.columns]
            src_df = src_df.drop(*drop_cols)
    
        # Create UUID using sha256 hash
        if hash_cols and hash_output_colname:
            hash_cols = [c for c in hash_cols if c in src_df.columns]
            if hash_cols:
                hash_expressions = [coalesce(col(c).cast("string"), lit("null")) for c in hash_cols]
                src_df = src_df.withColumn(hash_output_colname, sha2(concat_ws("||", *hash_expressions), 256))
    
        # Create foreign key
        if fk_df and fk_col:
            for lookup_key, lookup_val in fk_col.items():
                if lookup_key in src_df.columns and lookup_val in fk_df.columns:
                    fk_df_subset = fk_df.select(lookup_key, lookup_val)
                    src_df = src_df.join(fk_df_subset, lookup_key, "left")
    
        # Select columns
        if select_colname:
            select_colname = [c for c in select_colname if c in src_df.columns]
            src_df = src_df.select(*select_colname)
    
        return src_df

    @staticmethod
    def _get_first_key(d, val):
        return next((key for key, value in d.items() if value == val), None)
        
    @classmethod
    def staging(cls, spark: SparkSession, df_dict: Dict[str, DataFrame]) -> Optional[Dict]:
        # Create completely independent DataFrames copy:
        dim = {
            key: df.select("*")
            for key, df in df_dict.items()
        }
        
        # Define current_timestamp for logging
        current_timestamp = datetime.now()
       
        # ----------------------------------------------------------------- #
        # Transform company into dim_company
    
        # office_id -> office_nk int
        # object_id -> object_nk
        # ensure latitude and longitude to be DecimalType(9,6)
        table_name = "company"
        hash_cols = ["office_nk", "object_nk", "description", "region",
                    "address1", "address2", "city", "zip_code",
                    "state_code", "country_code", "latitude", "longitude"]
        try:
            dim["company"] = cls._common_transformations(
                src_df=dim["company"],
                nk_mapping={
                    "office_id": "office_nk", 
                    "object_id": "object_nk"
                },
                type_mapping={
                    "latitude": DecimalType(9,6),
                    "longitude": DecimalType(9,6)
                },
                hash_cols=hash_cols,
                hash_output_colname="company_id",
                select_colname=["company_id"] + hash_cols
            )
            
            # Log success transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "success",
                               "staging extraction result",
                               table_name,
                               current_timestamp)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date'])
            load_log(spark, log_msg)
            
        except Exception as e:
            # Capture full traceback information
            tb_str = traceback.format_exc()
            error_msg = f"""
            fail to perform transform operation for table '{table_name}'.
            
            Error Details:
            - Error Type: {type(e).__name__}
            - Error Message: {str(e)}
            - Table: {table_name}
            
            Full Traceback:
            {tb_str}
            """
            # Log fail transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "fail",
                               "staging extraction result",
                               table_name,
                               current_timestamp,
                               error_msg)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date',
                       'error_msg'])
            load_log(spark, log_msg)
            return None
            
        # ----------------------------------------------------------------- #
        # Transform acquisition into dim_acquisition

        # acquisition_id -> acquisition_nk
        # acquiring_object_id -> acquiring_object_nk
        # acquired_object_id -> acquired_object_nk
        
        table_name = "acquisition"
        hash_cols =["acquisition_nk", "acquiring_object_nk", "acquired_object_nk", 
                    "term_code", "price_amount", "price_currency_code"]
        try:
            dim["acquisition"] = cls._common_transformations(
                src_df=dim["acquisition"],
                nk_mapping={
                    "acquisition_id": "acquisition_nk", 
                    "acquiring_object_id": "acquiring_object_nk",
                    "acquired_object_id": "acquired_object_nk"
                },
                hash_cols=hash_cols,
                hash_output_colname="acquisition_id",
                select_colname=["acquisition_id"] + hash_cols
            )
            # Log success transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "success",
                               "staging extraction result",
                               table_name,
                               current_timestamp)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date'])
            load_log(spark, log_msg)
            
        except Exception as e:
            # Capture full traceback information
            tb_str = traceback.format_exc()
            error_msg = f"""
            fail to perform transform operation for table '{table_name}'.
            
            Error Details:
            - Error Type: {type(e).__name__}
            - Error Message: {str(e)}
            - Table: {table_name}
            
            Full Traceback:
            {tb_str}
            """
            # Log fail transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "fail",
                               "staging extraction result",
                               table_name,
                               current_timestamp,
                               error_msg)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date',
                       'error_msg'])
            load_log(spark, log_msg)
            return None
            
        # ----------------------------------------------------------------- #
        # Transform ipos into dim_ipos

        # ipo_id -> ipo_nk
        # object_id -> object_nk
        table_name = "ipos"
        hash_cols = ["ipo_nk", "valuation_amount", 
                    "valuation_currency_code", "raised_amount",
                     "raised_currency_code", "stock_symbol"]
        try:
            dim["ipos"] = cls._common_transformations(
                src_df=dim["ipos"],
                nk_mapping={
                    "ipo_id": "ipo_nk", 
                    "object_id": "object_nk"
                },
                hash_cols=hash_cols,
                hash_output_colname="ipos_id",
                select_colname=["ipos_id"] + hash_cols
            )
            # Log success transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "success",
                               "staging extraction result",
                               table_name,
                               current_timestamp)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date'])
            load_log(spark, log_msg)
            
        except Exception as e:
            # Capture full traceback information
            tb_str = traceback.format_exc()
            error_msg = f"""
            fail to perform transform operation for table '{table_name}'.
            
            Error Details:
            - Error Type: {type(e).__name__}
            - Error Message: {str(e)}
            - Table: {table_name}
            
            Full Traceback:
            {tb_str}
            """
            # Log fail transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "fail",
                               "staging extraction result",
                               table_name,
                               current_timestamp,
                               error_msg)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date',
                       'error_msg'])
            load_log(spark, log_msg)        
            return None
            
        # ----------------------------------------------------------------- #
        # Transform funds into dim_funds

        # fund_id -> fund_nk
        # object_id -> object_nk
        table_name = "funds"
        hash_cols = ["fund_nk", "name", 
                    "raised_amount", "raised_currency_code"]
        try:
            dim["funds"] = cls._common_transformations(
                src_df=dim["funds"],
                nk_mapping={
                    "fund_id": "fund_nk", 
                    "object_id": "object_nk"
                },
                hash_cols=hash_cols,
                hash_output_colname="funds_id",
                select_colname=["funds_id"] + hash_cols
            )
            # Log success transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "success",
                               "staging extraction result",
                               table_name,
                               current_timestamp)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date'])
            load_log(spark, log_msg)
        except Exception as e:
            # Capture full traceback information
            tb_str = traceback.format_exc()
            error_msg = f"""
            fail to perform transform operation for table '{table_name}'.
            
            Error Details:
            - Error Type: {type(e).__name__}
            - Error Message: {str(e)}
            - Table: {table_name}
            
            Full Traceback:
            {tb_str}
            """
            # Log fail transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "fail",
                               "staging extraction result",
                               table_name,
                               current_timestamp,
                               error_msg)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date',
                       'error_msg'])
            load_log(spark, log_msg)
            return None
            
        # ----------------------------------------------------------------- #
        # Transform funding_rounds into dim_funding_rounds

        # funding_round_id -> funding_round_nk
        # object_id -> object_nk
        table_name = "funding_rounds"
        hash_cols=["funding_round_nk", "funding_round_type", 
                    "funding_round_code", "raised_amount_usd", "raised_amount",
                    "raised_currency_code", "pre_money_valuation_usd", "pre_money_valuation",
                    "pre_money_currency_code", "post_money_valuation_usd", "post_money_valuation", 
                    "post_money_currency_code", "participants", "is_first_round", "is_last_round",
                    "created_by"]
        try:
            dim["funding_rounds"] = cls._common_transformations(
                src_df=dim["funding_rounds"],
                nk_mapping={
                    "funding_round_id": "funding_round_nk", 
                    "object_id": "funded_object_nk"
                },
                hash_cols=hash_cols,
                hash_output_colname="funding_rounds_id",
                select_colname=["funding_rounds_id"] + hash_cols
            )
            # Log success transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "success",
                               "staging extraction result",
                               table_name,
                               current_timestamp)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date'])
            load_log(spark, log_msg)
        except Exception as e:
            # Capture full traceback information
            tb_str = traceback.format_exc()
            error_msg = f"""
            fail to perform transform operation for table '{table_name}'.
            
            Error Details:
            - Error Type: {type(e).__name__}
            - Error Message: {str(e)}
            - Table: {table_name}
            
            Full Traceback:
            {tb_str}
            """
            # Log fail transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "fail",
                               "staging extraction result",
                               table_name,
                               current_timestamp,
                               error_msg)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date',
                       'error_msg'])
            load_log(spark, log_msg)
            return None
            
        # ----------------------------------------------------------------- #
        # Transform investments into dim_investments

        # funding_round_id -> funding_round_nk
        # object_id -> object_nk
        # Create foreign key using funding_rounds_id from funding_rounds
        table_name = "investments"
        hash_cols=["funding_rounds_id", "investment_nk", "funding_round_nk",
                    "funded_object_nk", "investor_object_nk"]
        try: 
            dim["investments"] = cls._common_transformations(
                src_df=dim["investments"],
                nk_mapping={
                    "investment_id": "investment_nk", 
                    "funding_round_id": "funding_round_nk",
                    "funded_object_id": "funded_object_nk",
                    "investor_object_id": "investor_object_nk"
                },
                fk_df=dim["funding_rounds"],
                fk_col={"funding_round_nk":"funding_rounds_id"},
                hash_cols=hash_cols,
                hash_output_colname="investments_id",
                select_colname=["investments_id"] + hash_cols
            )
            # Log success transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "success",
                               "staging extraction result",
                               table_name,
                               current_timestamp)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date'])
            load_log(spark, log_msg)
        except Exception as e:
            # Capture full traceback information
            tb_str = traceback.format_exc()
            error_msg = f"""
            fail to perform transform operation for table '{table_name}'.
            
            Error Details:
            - Error Type: {type(e).__name__}
            - Error Message: {str(e)}
            - Table: {table_name}
            
            Full Traceback:
            {tb_str}
            """
            # Log fail transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "fail",
                               "staging extraction result",
                               table_name,
                               current_timestamp,
                               error_msg)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date',
                       'error_msg'])
            load_log(spark, log_msg)
            return None
            
        # ----------------------------------------------------------------- #
        # Transform people into dim_people

        # people_id -> people_nk
        # object_id -> object_nk
        table_name = "people"
        hash_cols=["people_nk", "first_name", 
                   "last_name", "birthplace", "affiliation_name"]
        try:
            dim["people"] = cls._common_transformations(
                src_df=dim["people"],
                nk_mapping={
                    "people_id": "not_used",
                    "object_id": "people_nk", 
                },
                hash_cols=hash_cols,
                hash_output_colname="people_id",
                select_colname=["people_id"] + hash_cols
            )
            # Log success transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "success",
                               "staging extraction result",
                               table_name,
                               current_timestamp)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date'])
            load_log(spark, log_msg)
        except Exception as e:
            # Capture full traceback information
            tb_str = traceback.format_exc()
            error_msg = f"""
            fail to perform transform operation for table '{table_name}'.
            
            Error Details:
            - Error Type: {type(e).__name__}
            - Error Message: {str(e)}
            - Table: {table_name}
            
            Full Traceback:
            {tb_str}
            """
            # Log fail transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "fail",
                               "staging extraction result",
                               table_name,
                               current_timestamp,
                               error_msg)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date',
                       'error_msg'])
            load_log(spark, log_msg)
            return None
            
        # ----------------------------------------------------------------- #
        # Transform relationships into dim_relationships

        # relationship_id -> relationship_nk
        # person_object_id -> person_object_nk
        # relationship_object_id -> relationship_object_nk
        # ensure data type is correct for
        ## start_at (timestamp), end_at (timestamp), is_past (bool) and sequence (integer)
        # Create foreign key: people_id -> dim_people, company_id -> company
        table_name = "relationships"
        hash_cols=["people_id", "company_id",
                    "relationship_nk", "person_object_nk", 
                    "relationship_object_nk", 
                    "start_at", "end_at", "is_past",
                    "sequence", "title"]
        try:
            dim["relationships"] = cls._common_transformations(
                src_df=dim["relationships"],
                nk_mapping={
                    "relationship_id": "relationship_nk", 
                    "person_object_id": "people_nk",
                    "relationship_object_id": "object_nk"
                },
                type_mapping={
                    "start_at":"timestamp",
                    "end_at":"timestamp",
                    "is_past":"boolean",
                    "sequence":"int"
                },
                fk_df=dim["people"],
                fk_col={"people_nk": "object_nk"}
            )
            dim["relationships"] = cls._common_transformations(
                src_df=dim["relationships"],
                fk_df=dim["company"],
                fk_col={"object_nk":"company_id"}
            )
            dim["relationships"] = cls._common_transformations(
                src_df=dim["relationships"],
                nk_mapping={
                    "people_nk": "person_object_nk", 
                    "object_nk": "relationship_object_nk"
                },                
                hash_cols=hash_cols,
                hash_output_colname="relationships_id",
                select_colname=["relationships_id"] + hash_cols
            )
            # Log success transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "success",
                               "staging extraction result",
                               table_name,
                               current_timestamp)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date'])
            load_log(spark, log_msg)
        except Exception as e:
            # Capture full traceback information
            tb_str = traceback.format_exc()
            error_msg = f"""
            fail to perform transform operation for table '{table_name}'.
            
            Error Details:
            - Error Type: {type(e).__name__}
            - Error Message: {str(e)}
            - Table: {table_name}
            
            Full Traceback:
            {tb_str}
            """
            # Log fail transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "fail",
                               "staging extraction result",
                               table_name,
                               current_timestamp,
                               error_msg)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date',
                       'error_msg'])
            load_log(spark, log_msg)
            return None
            
        # ----------------------------------------------------------------- #
        # Transform milestones into dim_milestones

        # milestone_id -> milestone_nk
        # object_id -> object_nk
        table_name = "milestones"
        hash_cols=["milestone_nk", "description", "milestone_code"]
        
        try:
            dim["milestones"] = cls._common_transformations(
                src_df=dim["milestones"],
                nk_mapping={
                    "milestone_id": "milestone_nk", 
                },
                hash_cols=hash_cols,
                hash_output_colname="milestones_id",
                select_colname=["milestones_id"] + hash_cols
            )
            # Log success transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "success",
                               "staging extraction result",
                               table_name,
                               current_timestamp)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date'])
            load_log(spark, log_msg)
        except Exception as e:
            # Capture full traceback information
            tb_str = traceback.format_exc()
            error_msg = f"""
            fail to perform transform operation for table '{table_name}'.
            
            Error Details:
            - Error Type: {type(e).__name__}
            - Error Message: {str(e)}
            - Table: {table_name}
            
            Full Traceback:
            {tb_str}
            """
            # Log fail transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "fail",
                               "staging extraction result",
                               table_name,
                               current_timestamp,
                               error_msg)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date',
                       'error_msg'])
            load_log(spark, log_msg)
            return None
            
        # ----------------------------------------------------------------- #
        # Transform event_type into dim_event_type
        ## unchanged
        
        # ----------------------------------------------------------------- #
        # Transform date into dim_date
        table_name = "date"
        try:
            dim["date"] = cls._common_transformations(
                src_df=dim["date"],
                type_mapping={
                    "date_id": "int"
                }
            )
            # Log success transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "success",
                               "staging extraction result",
                               table_name,
                               current_timestamp)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date'])
            load_log(spark, log_msg)
        except Exception as e:
            # Capture full traceback information
            tb_str = traceback.format_exc()
            error_msg = f"""
            fail to perform transform operation for table '{table_name}'.
            
            Error Details:
            - Error Type: {type(e).__name__}
            - Error Message: {str(e)}
            - Table: {table_name}
            
            Full Traceback:
            {tb_str}
            """
            # Log fail transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "fail",
                               "staging extraction result",
                               table_name,
                               current_timestamp,
                               error_msg)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date',
                       'error_msg'])
            load_log(spark, log_msg)
            return None
        
        # ----------------------------------------------------------------- #
        # Define common variable that will be used
        event_type_map = {row.event_type_id: row.event_type 
                          for row in dim["event_type"].select("event_type_id", "event_type").collect()}
        common_colname = ["company_id", "event_type_id",
                          "event_datetime", "event_date",
                          "source_url", "source_description"]
        
        # Transform data into fct_startup_event
        table_name = "startup_event"
        try:
            # ----------------------------------------------------------------- #
            # Event: Acquiring
            fct_acquiring = cls._common_transformations(
                src_df=df_dict["acquisition"].select("*"),
                nk_mapping={
                    "acquisition_id":"acquisition_nk",
                    "acquiring_object_id":"object_nk",
                    "acquired_at":"event_datetime",
                    "acquired_at":"event_date"
                },
                type_mapping={
                    "event_date":"timestamp",
                    "event_date":"date"
                },
                literals={
                    "event_type_id":cls._get_first_key(
                        event_type_map, "Acquiring"
                    )
                },
                drop_cols=["acquisition_id"],
                fk_df=dim["acquisition"],
                fk_col={"acquisition_nk": "acquisition_id"}
            )
            fct_acquiring_final = cls._common_transformations(
                src_df=fct_acquiring,
                fk_df=dim["company"],
                fk_col={"object_nk": "company_id"},
                select_colname=["acquisition_id", "acquisition_nk"] + common_colname
            )
            
            # ----------------------------------------------------------------- #
            # Event: Acquired 
            fct_acquired = cls._common_transformations(
                src_df=df_dict["acquisition"].select("*"),
                nk_mapping={
                    "acquisition_id":"acquisition_nk",
                    "acquired_object_id":"object_nk",
                    "acquired_at":"event_datetime",
                    "acquired_at":"event_date"
                },
                type_mapping={
                    "event_datetime":"timestamp",
                    "event_date":"date"
                },
                literals={
                    "event_type_id":cls._get_first_key(
                        event_type_map, "Acquired"
                    )
                },
                drop_cols=["acquisition_id"],
                fk_df=dim["acquisition"],
                fk_col={"acquisition_nk": "acquisition_id"}
            )
            fct_acquired_final = cls._common_transformations(
                src_df=fct_acquired,
                fk_df=dim["company"],
                fk_col={"object_nk": "company_id"},
                select_colname=["acquisition_id", "acquisition_nk"] + common_colname
            )
            
            # ----------------------------------------------------------------- #
            # Event: IPO 
            fct_ipos = cls._common_transformations(
                src_df=df_dict["ipos"].select("*"),
                nk_mapping={
                    "ipo_id":"ipo_nk",
                    "object_id":"object_nk",
                    "public_at":"event_datetime",
                    "public_at":"event_date"
                },
                type_mapping={
                    "event_date":"timestamp",
                    "event_date":"date"
                },
                literals={
                    "event_type_id":cls._get_first_key(
                        event_type_map, "IPO"
                    )
                },
                fk_df=dim["ipos"],
                fk_col={"ipo_nk": "ipos_id"}
            )
            fct_ipos_final = cls._common_transformations(
                src_df=fct_ipos,
                fk_df=dim["company"],
                fk_col={"object_nk": "company_id"},
                select_colname=["ipos_id", "ipo_nk"] + common_colname
            )  

            # ----------------------------------------------------------------- #
            # Event: Funds 
            fct_funds = cls._common_transformations(
                src_df=df_dict["funds"].select("*"),
                nk_mapping={
                    "fund_id":"fund_nk",
                    "object_id":"object_nk",
                    "funded_at":"event_datetime",
                    "funded_at":"event_date"
                },
                type_mapping={
                    "event_datetime":"timestamp",
                    "event_date":"date"
                },
                literals={
                    "event_type_id":cls._get_first_key(
                        event_type_map, "Funds"
                    )
                },
                fk_df=dim["funds"],
                fk_col={"fund_nk": "funds_id"}
            )
            fct_funds_final = cls._common_transformations(
                src_df=fct_funds,
                fk_df=dim["company"],
                fk_col={"object_nk": "company_id"},
                select_colname=["funds_id", "fund_nk"] + common_colname
            )
            
            # ----------------------------------------------------------------- #
            # Event: Received Funding, Made Investment and Funding Rounds
            fct_received_funding = cls._common_transformations(
                src_df=df_dict["investments"].select("*"),
                nk_mapping={
                    "investment_id":"investment_nk",
                    "funding_round_id":"funding_round_nk",
                    "funded_object_id":"object_nk"
                },
                literals={
                    "event_type_id":cls._get_first_key(
                        event_type_map, "Received Funding"
                    )
                },
                fk_df=dim["company"],
                fk_col={"object_nk": "company_id"},
                select_colname=["investment_nk", "funding_round_nk", "company_id", "event_type_id"]
            )
            fct_made_investment = cls._common_transformations(
                src_df=df_dict["investments"].select("*"),
                nk_mapping={
                    "investment_id":"investment_nk",
                    "funding_round_id":"funding_round_nk",
                    "investor_object_id":"object_nk"
                },
                literals={
                    "event_type_id":cls._get_first_key(
                        event_type_map, "Made Investment"
                    )
                },
                fk_df=dim["company"],
                fk_col={"object_nk": "company_id"},
                select_colname=["investment_nk","funding_round_nk", "company_id", "event_type_id"]
            )
            fct_funding_rounds = cls._common_transformations(
                src_df=df_dict["funding_rounds"].select("*"),
                nk_mapping={
                    "funding_round_id":"funding_round_nk",
                    "funded_at":["event_datetime", "event_date"]
                },
                type_mapping={
                    "event_datetime":"timestamp",
                    "event_date":"date"
                },
                fk_df=dim["funding_rounds"],
                fk_col={"funding_round_nk": "funding_rounds_id"},
                select_colname=["funding_rounds_id", "funding_round_nk", "event_datetime",
                                "event_date", "source_url", "source_description"]
            )
            fct_received_funding_final = fct_received_funding.join(
                fct_funding_rounds,"funding_round_nk","left"
            )
            fct_made_investment_final = fct_made_investment.join(
                fct_funding_rounds,"funding_round_nk","left"
            )
            fct_funding_rounds_final = fct_received_funding_final\
                                           .unionByName(fct_made_investment_final, allowMissingColumns=True)\
                                           .select(["funding_rounds_id", "funding_round_nk", "investment_nk"] + common_colname)
            
            # ----------------------------------------------------------------- #
            # Event: Milestone 
            fct_milestones = cls._common_transformations(
                src_df=df_dict["milestones"].select("*"),
                nk_mapping={
                    "milestone_id":"milestone_nk",
                    "object_id":"object_nk",
                    "milestone_at":"event_datetime",
                    "milestone_at":"event_date"
                },
                type_mapping={
                    "event_datetime":"timestamp",
                    "event_date":"date"
                },
                literals={
                    "event_type_id":cls._get_first_key(
                        event_type_map, "Milestone"
                    )
                },
                fk_df=dim["milestones"],
                fk_col={"milestone_nk": "milestones_id"}
            )
            fct_milestones_final = cls._common_transformations(
                src_df=fct_milestones,
                fk_df=dim["company"],
                fk_col={"object_nk": "company_id"},
                select_colname=["milestones_id", "milestone_nk"] + common_colname
            )   
            
            # ----------------------------------------------------------------- #
            # Union All to become fct_startup_event
            
            # Union with unionByName
            dim["startup_event"] = fct_acquiring_final
            for df in [fct_acquired_final, fct_ipos_final, fct_funds_final, fct_funding_rounds_final, fct_milestones_final]:
                dim["startup_event"] = dim["startup_event"].unionByName(df, allowMissingColumns=True)
            
            # Add startup_event_id
            dim["startup_event"] = cls._common_transformations(
                src_df=dim["startup_event"],
                hash_cols=["company_id", "acquisition_id", "ipos_id",
                           "funds_id", "funding_rounds_id", "milestones_id",
                           "event_type_id", "object_nk", "acquisition_nk",
                           "ipo_nk", "fund_nk", "investment_nk",
                           "funding_round_nk", "event_datetime",
                           "event_date", "source_url", "source_description"
                          ],
                hash_output_colname="startup_event_id"
            )

            # Add stable_id to help upsert later by hashing all natural key
            dim["startup_event"] = cls._common_transformations(
                src_df=dim["startup_event"],
                hash_cols=["object_nk", "acquisition_nk", "ipo_nk", 
                           "fund_nk", "funding_round_nk", "investment_nk",
                           "milestone_nk"],
                hash_output_colname="stable_id",
                select_colname=[
                    "startup_event_id", "stable_id", "acquisition_id", "ipos_id",  
                    "funds_id", "funding_rounds_id", "milestones_id"
                ] + common_colname
            )
            
            # Add created_at and updated_at
            for table_name, _ in dim.items():
                dim[table_name] = dim[table_name]\
                                     .withColumn("created_at", lit(datetime.now()))\
                                     .withColumn("updated_at", lit(datetime.now()))
                        
            # Log success transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "success",
                               "staging extraction result",
                               table_name,
                               current_timestamp)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date'])
            load_log(spark, log_msg)
            
        except Exception as e:
            # Capture full traceback information
            tb_str = traceback.format_exc()
            error_msg = f"""
            fail to perform transform operation for table '{table_name}'.
            
            Error Details:
            - Error Type: {type(e).__name__}
            - Error Message: {str(e)}
            - Table: {table_name}
            
            Full Traceback:
            {tb_str}
            """
            # Log fail transformation
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "transform_staging",
                               "fail",
                               "staging extraction result",
                               table_name,
                               current_timestamp,
                               error_msg)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date',
                       'error_msg'])
            load_log(spark, log_msg)
            return None
        # ----------------------------------------------------------------- #
        # Return back the dim dictionary
        return dim

# Validation Function

In [19]:
class Validate():    
    @staticmethod
    def transform_staging(
        spark: SparkSession,
        df_dict: Dict[str, DataFrame],
    ) -> Optional[bool]:
        try:
            current_timestamp = datetime.now()
            
            table_name = "startup_event"
            df = df_dict[table_name]
            
            col_to_check = "company_id"
            
            # Use first() to find any null - stops as soon as one is found
            try:
                first_null = df.filter(col(col_to_check).isNull()).first()
                validation_passed = first_null is None
            except:
                null_count = df.filter(col(col_to_check).isNull()).limit(1).count()
                validation_passed = null_count == 0
                
            if validation_passed:
                # Log success validation
                log_msg = spark.sparkContext\
                    .parallelize([("warehouse",
                                   "validate_transformation",
                                   "success",
                                   "staging_transformation_result",
                                   table_name,
                                   current_timestamp)])\
                    .toDF(['step',
                           'process',
                           'status',
                           'source',
                           'table_name',
                           'etl_date'])
                load_log(spark, log_msg)
            else:
                # Log fail validation
                error_msg = f"Found null values in company_id column of {table_name}"
                log_msg = spark.sparkContext\
                    .parallelize([("warehouse",
                                   "validate_transformation",
                                   "fail",
                                   "staging_transformation_result",
                                   table_name,
                                   current_timestamp,
                                   error_msg)])\
                    .toDF(['step',
                           'process',
                           'status',
                           'source',
                           'table_name',
                           'etl_date',
                           'error_msg'])
                load_log(spark, log_msg)
                
        except Exception as e:
            tb_str = traceback.format_exc()
            error_msg = f"""
            fail to perform validation operation for table '{table_name}'.
            
            Error Details:
            - Error Type: {type(e).__name__}  
            - Error Message: {str(e)}
            - Table: {table_name}
            
            Full Traceback:
            {tb_str}
            """
            log_msg = spark.sparkContext\
                .parallelize([("warehouse",
                               "validate_transformation",
                               "fail",
                               "staging_transformation_result",
                               table_name,
                               current_timestamp,
                               error_msg)])\
                .toDF(['step',
                       'process',
                       'status',
                       'source',
                       'table_name',
                       'etl_date',
                       'error_msg'])
            load_log(spark, log_msg)
            return False
            
        return validation_passed

# ETL Pipeline functions

In [23]:
# class ETLPipeline():
#     @staticmethod
#     def start(spark: SparkSession):

# ----------------------------------------------------------------- #
# EXTRACT FROM SOURCE
# 1. Extract from source database
db_table_to_extract = [
    "company",
    "acquisition",
    "ipos",
    "funds",
    "funding_rounds",
    "investments",
]
src_db_dict = Extract.from_database(
    spark=spark,
    table_to_extract=db_table_to_extract, 
    source_type="source"
)  
# 2. Extract from source csv                 
src_csv_dict = Extract.from_csv(
    spark=spark
)
        
# 3. Extract from source API
src_api_dict = {}
src_api_dict["milestones"] = Extract.from_api(
    spark=spark, 
    start_date="2005-01-01", 
    end_date="2011-01-01"
)

src_dict = {**src_db_dict, **src_csv_dict, **src_api_dict}

# ----------------------------------------------------------------- #
# LOAD TO STAGING
# 5. Load to staging database
Load.to_staging(
    spark=spark, 
    df_dict=src_dict
)

# ----------------------------------------------------------------- #
# EXTRACT FROM STAGING
# 6. Extract from staging database
stg_db_tbl = [
    "company",
    "acquisition",
    "ipos",
    "funds",
    "funding_rounds",
    "investments",
    "people",
    "relationships",
    "date",
    "milestones",
    "event_type"
]
src_dict = {}
src_dict = Extract.from_database(
    spark=spark,
    table_to_extract=stg_db_tbl, 
    source_type="staging"
)
# ----------------------------------------------------------------- #
# TRANSFORM STAGING
# 7. Transform staging database
src_dict = Transform.staging(
    spark=spark, 
    df_dict=src_dict
)

gc.collect()

# ----------------------------------------------------------------- #
# VALIDATE TRANSFORMATION
# 8. Validate data transformation from staging
validation_passed = Validate.transform_staging(
    spark=spark,
    df_dict=src_dict
)

if not validation_passed:
    print("Validation process fail. There is null in fact table")
else:
    # ----------------------------------------------------------------- #
    # LOAD TO WAREHOUSE
    # 9. Load data to warehouse
    Load.to_warehouse(
        spark=spark,
        df_dict=src_dict)

In [17]:
src_dict["ipos"].toPandas().head()

Unnamed: 0,ipo_id,object_id,valuation_amount,valuation_currency_code,raised_amount,raised_currency_code,public_at,stock_symbol,source_url,source_description,created_at,updated_at
0,1,c:1654,0.0,USD,0.0,USD,1980-12-19,NASDAQ:AAPL,,,2008-02-09 05:17:45,2012-04-12 04:02:59
1,2,c:1242,0.0,USD,0.0,,1986-03-13,NASDAQ:MSFT,,,2008-02-09 05:25:18,2010-12-11 12:39:46
2,3,c:342,0.0,USD,0.0,,1969-06-09,NYSE:DIS,,,2008-02-09 05:40:32,2010-12-23 08:58:16
3,4,c:59,0.0,USD,0.0,,2004-08-25,NASDAQ:GOOG,,,2008-02-10 22:51:24,2011-08-01 20:47:08
4,5,c:317,100000000000.0,USD,0.0,,1997-05-01,NASDAQ:AMZN,,,2008-02-10 23:28:09,2011-08-01 21:11:22


In [157]:
tes_ipos = src_dict["ipos"].select("*")
tes_ipos.filter(col("ipo_id")=="1377").select("source_description").show()

+------------------+
|source_description|
+------------------+
+------------------+



In [73]:
for k, v in src_dict.items():
    print(k)
    display(v.limit(5).toPandas().head(5))

company


Unnamed: 0,office_id,object_id,description,region,address1,address2,city,zip_code,state_code,country_code,latitude,longitude,created_at,updated_at,hash_id
0,28,c:29,,SF Bay,701 First Avenue,,Sunnyvale,94089,CA,USA,37.418531,-122.025485,2025-08-06 02:59:38.519871,2025-08-06 02:59:38.531407,16e2c15c2fb4005a1215fd1e7cf393741fd319e9ebba60...
1,31,c:32,Corporate Headquarters,Buckinghamshire,"Wethered House, Pound Lane",,Buckinghamshire,SL7 2AF,,GBR,0.0,0.0,2025-08-06 02:59:38.519871,2025-08-06 02:59:38.531407,bcfbc4563d195715cd9b1d9fc8834007b5b197d08fe8cb...
2,34,c:35,Headquarters,Los Angeles,888 East Walnut Street,,Pasadena,91101,CA,USA,34.149471,-118.132747,2025-08-06 02:59:38.519871,2025-08-06 02:59:38.531407,9f4e4003d0314ee4de3d93a413844cf3b6f6c09d57117d...
3,53,c:54,,SF Bay,"400 Montgomery St, Suite 900",,San Francisco,94104,CA,USA,37.793148,-122.402567,2025-08-06 02:59:38.519871,2025-08-06 02:59:38.531407,85c1d42e27cfdb4858ee8147905f0a7dc48c1a3b69febb...
4,65,c:66,Livestream HQ,New York,"111 8th Avenue, #1509",,New York,10011,NY,USA,40.726155,-73.995625,2025-08-06 02:59:38.519871,2025-08-06 02:59:38.531407,1aeb053acef70902bd19deeefcedd3078161f8e2ad6ae8...


acquisition


Unnamed: 0,acquisition_id,acquiring_object_id,acquired_object_id,term_code,price_amount,price_currency_code,acquired_at,source_url,source_description,created_at,updated_at,hash_id
0,148,c:2050,c:2108,cash,0.0,USD,2007-08-01,http://www.dating-weblog.com/50226711/avid_lif...,Ashley Madison acquired by Avid Life Media,2025-08-06 02:59:38.539496,2025-08-06 02:59:38.547236,141879dcbb6b99982989840b8c3cfc57eea59ee7b8c660...
1,833,c:16208,c:16257,,0.0,USD,2007-03-02,http://venturebeat.com/2007/03/02/leadis-panel...,"Leadis, panel display chip company, buys music...",2025-08-06 02:59:38.539496,2025-08-06 02:59:38.547236,006e11b66252dcbf1116cac2673486d59bc7e1b6e7acf9...
2,1238,c:7493,c:4369,,0.0,USD,2009-06-23,http://www.techcrunch.com/2009/06/23/posterous...,Posterous Acquires Fellow Y Combinator Alum Sl...,2025-08-06 02:59:38.539496,2025-08-06 02:59:38.547236,074d4339e2d6dfd0a961879331881cfccd09e4edee7c03...
3,1342,c:26071,c:26070,,0.0,USD,2009-03-16,http://www.businesswire.com/portal/site/google...,ManTech Completes the Acquisition of DDK Techn...,2025-08-06 02:59:38.539496,2025-08-06 02:59:38.547236,7851628491e2affc771c2fc2b75b0c0acd90518f80ec85...
4,1580,c:26919,c:26918,,7350000.0,USD,2008-06-09,http://findarticles.com/p/articles/mi_m0EIN/is...,"INX Acquires AccessFlow, Inc., a Leading VMwar...",2025-08-06 02:59:38.539496,2025-08-06 02:59:38.547236,42aead5d1c9d9b2cee6cd354d473d9f6f924f7fd55dcd7...


ipos


Unnamed: 0,ipo_id,object_id,valuation_amount,valuation_currency_code,raised_amount,raised_currency_code,public_at,stock_symbol,source_url,source_description,created_at,updated_at,hash_id
0,296,c:39190,0.0,USD,0.0,,NaT,NASDAQ:EDGW,,,2025-08-06 02:59:38.555792,2025-08-06 02:59:38.566955,d6c9609f60468b7b01cf4125c0f4e6d41c2ee99bb16efe...
1,467,c:29567,0.0,USD,0.0,USD,1986-10-03,NASDAQ:FISV,,,2025-08-06 02:59:38.555792,2025-08-06 02:59:38.566955,b519525ecccfdb213b1a61038ce1eb0674a0e5c209743c...
2,675,c:9389,0.0,USD,0.0,,NaT,NASDAQ:ZAGG,,,2025-08-06 02:59:38.555792,2025-08-06 02:59:38.566955,45da9425eb8f1668703bd3065561333dad216c78685d71...
3,691,c:82539,0.0,USD,0.0,,NaT,NASDAQ:AVNW,,,2025-08-06 02:59:38.555792,2025-08-06 02:59:38.566955,7aff283acac07901a4149648d4eb0f275654b7e1b2de17...
4,829,c:39921,0.0,USD,0.0,USD,2000-09-29,NYSE:UMC,,,2025-08-06 02:59:38.555792,2025-08-06 02:59:38.566955,003972c78fd391f911d6533809c2668732f5285067cbde...


funds


Unnamed: 0,fund_id,object_id,name,funded_at,raised_amount,raised_currency_code,source_url,source_description,created_at,updated_at,hash_id
0,296,f:1913,Health-Care Fund IV,2010-08-18,180000000.0,USD,https://www.fis.dowjones.com/article.aspx?Prod...,Prospect Venture Partners Secures $180M Toward...,2025-08-06 02:59:38.576227,2025-08-06 02:59:38.588152,ce98df44e9a3cae5c40c2fe9faf0c7e37d94d323c9dfdd...
1,467,f:7448,GCP III,2011-05-20,160000000.0,GBP,http://www.finsmes.com/2011/05/growth-capital-...,Growth Capital Partners Closes Â£160M Third Lo...,2025-08-06 02:59:38.576227,2025-08-06 02:59:38.588152,aeb68c8929eb6fc76e4601bc0334957695a5b8d8b7e191...
2,691,f:8768,EC3H,2011-06-02,1500000.0,USD,http://www.nebraskaglobal.com/News,Nebraska Global News,2025-08-06 02:59:38.576227,2025-08-06 02:59:38.588152,2ceb080c1bccdcb94d0d525c76e2296b060e2339a5d3de...
3,675,f:4850,Fund III,2010-08-02,73500000.0,USD,http://www.sec.gov/Archives/edgar/data/1497490...,SEC,2025-08-06 02:59:38.576227,2025-08-06 02:59:38.588152,6cce5e03e4f0902c48c981860a6d6a8976ea0789e72b48...
4,829,f:1140,Tandem Fund II,2012-06-14,32000000.0,USD,http://techcrunch.com/2012/06/14/tandem-raises...,"Tandem Raises $32M Fund From Verifone CEO, Pla...",2025-08-06 02:59:38.576227,2025-08-06 02:59:38.588152,e929a589c54bfcc6858aaa4580154d662cb9b1bd43c785...


funding_rounds


Unnamed: 0,funding_round_id,object_id,funded_at,funding_round_type,funding_round_code,raised_amount_usd,raised_amount,raised_currency_code,pre_money_valuation_usd,pre_money_valuation,...,post_money_currency_code,participants,is_first_round,is_last_round,source_url,source_description,created_by,created_at,updated_at,hash_id
0,28,c:34,2007-02-01,series-b,b,5500000.0,5500000.0,USD,0.0,0.0,...,USD,6,False,False,http://venturebeat.com/2007/03/06/widget-compa...,"Widget company, Clearspring, says it leads mar...",initial-importer,2025-08-06 02:59:38.599559,2025-08-06 02:59:38.611979,63e8afc119470f7b2ae44d262b2d13828439af25ad5aec...
1,31,c:35,2007-06-01,series-a,a,5000000.0,5000000.0,USD,0.0,0.0,...,USD,4,False,False,http://www.techcrunch.com/2007/06/13/openads-o...,,initial-importer,2025-08-06 02:59:38.599559,2025-08-06 02:59:38.611979,1bb839d19ea5eca80632e5189e8106ca3884a42227fccb...
2,34,c:40,2007-08-01,series-c+,c,25000000.0,25000000.0,USD,0.0,0.0,...,,5,False,False,http://newteevee.com/2007/06/14/veoh-goes-for-...,Veoh Goes for the Big Money,initial-importer,2025-08-06 02:59:38.599559,2025-08-06 02:59:38.611979,dc303232407768cd02bfffb903590b5916e4235d03e337...
3,53,c:75,2005-04-01,angel,seed,0.0,0.0,,0.0,0.0,...,,4,False,False,http://www.techcrunch.com/?p=23,Profile: Del.icio.us,initial-importer,2025-08-06 02:59:38.599559,2025-08-06 02:59:38.611979,1bb9667c029116d88cc99e119ceb00b603697bb15c2082...
4,65,c:84,2006-06-01,series-a,a,5000000.0,5000000.0,USD,0.0,0.0,...,USD,2,False,False,http://www.techcrunch.com/2006/08/02/kleiner-p...,,initial-importer,2025-08-06 02:59:38.599559,2025-08-06 02:59:38.611979,be19521611e2f64289c25e5410975f7dbd8d429e795320...


investments


Unnamed: 0,investment_id,funding_round_id,funded_object_id,investor_object_id,created_at,updated_at,hash_id
0,148,86,c:25732,f:72,2025-08-06 02:59:38.621590,2025-08-06 02:59:38.629096,9264c23c055c90c4e9900af1649e876cb4e7906d8529e9...
1,833,512,c:630,f:51,2025-08-06 02:59:38.621590,2025-08-06 02:59:38.629096,98ad0c5b6931a3256418160c9a7fbc88f43b2f8b06fa61...
2,463,276,c:326,f:11,2025-08-06 02:59:38.621590,2025-08-06 02:59:38.629096,800585bc517f56d2442c102cc7358c49ba0c83dec75b68...
3,471,278,c:302,f:89,2025-08-06 02:59:38.621590,2025-08-06 02:59:38.629096,6551ac561a259a78d17d7a7d13dc5ddf6d66640ec2b0d8...
4,496,291,c:325,f:1287,2025-08-06 02:59:38.621590,2025-08-06 02:59:38.629096,ad9e2b2aaa14bfb4224f04f3fb8de374aae0299aa82dab...


date


Unnamed: 0,date_id,date_actual,day_suffix,day_name,day_of_year,week_of_month,week_of_year,week_of_year_iso,month_actual,month_name,...,last_day_of_month,first_day_of_quarter,last_day_of_quarter,first_day_of_year,last_day_of_year,mmyyyy,mmddyyyy,weekend_indr,created_at,updated_at
0,19500101,1950-01-01,1st,Sunday,1,1,52,1950-W52,1,January,...,1950-01-31,1950-01-01,1950-03-31,1950-01-01,1950-12-31,11950,1011950,weekend,2025-08-06 03:49:03.952004,2025-08-06 03:49:03.964572
1,19500102,1950-01-02,2nd,Monday,2,1,1,1950-W01,1,January,...,1950-01-31,1950-01-01,1950-03-31,1950-01-01,1950-12-31,11950,1021950,weekday,2025-08-06 03:49:03.952004,2025-08-06 03:49:03.964572
2,19500103,1950-01-03,3rd,Tuesday,3,1,1,1950-W01,1,January,...,1950-01-31,1950-01-01,1950-03-31,1950-01-01,1950-12-31,11950,1031950,weekday,2025-08-06 03:49:03.952004,2025-08-06 03:49:03.964572
3,19500104,1950-01-04,4th,Wednesday,4,1,1,1950-W01,1,January,...,1950-01-31,1950-01-01,1950-03-31,1950-01-01,1950-12-31,11950,1041950,weekday,2025-08-06 03:49:03.952004,2025-08-06 03:49:03.964572
4,19500105,1950-01-05,5th,Thursday,5,1,1,1950-W01,1,January,...,1950-01-31,1950-01-01,1950-03-31,1950-01-01,1950-12-31,11950,1051950,weekday,2025-08-06 03:49:03.952004,2025-08-06 03:49:03.964572


people


Unnamed: 0,people_id,object_id,first_name,last_name,birthplace,affiliation_name,hash_id,created_at,updated_at
0,28,p:32,Ken,Howery,,The Founder's Fund,7bc45caa0807c532b07a56b463b4f496976ca0b80c9f54...,2025-08-06 02:59:38.658130,2025-08-06 02:59:38.665566
1,31,p:35,George,Zachary,,Millennial Media,049b5f2e183e1522852f203b2330de5b4c5e04c052126f...,2025-08-06 02:59:38.658130,2025-08-06 02:59:38.665566
2,34,p:40,Andy,Gavin,,Flektor,6489369ef88475e67b508ec93721e187739103ecb604b2...,2025-08-06 02:59:38.658130,2025-08-06 02:59:38.665566
3,53,p:63,Jared,Friedman,,Scribd,8e338e0dbc395cb75087f680f964fcbb36b40dea9e0880...,2025-08-06 02:59:38.658130,2025-08-06 02:59:38.665566
4,65,p:75,Pierre,Omidyar,,Omidyar Network,7fc4d066bca8f2cbcc827a0f1838bc5a2d222b5510f189...,2025-08-06 02:59:38.658130,2025-08-06 02:59:38.665566


relationships


Unnamed: 0,relationship_id,person_object_id,relationship_object_id,start_at,end_at,is_past,sequence,title,created_at,updated_at,hash_id
0,28,p:31,c:9,NaT,NaT,False,3,Board Member,2025-08-06 02:59:38.676302,2025-08-06 02:59:38.684417,2b431ff0f3403880e4863c8a516f21222d40f5ae909bb9...
1,31,p:33,f:6,NaT,NaT,False,5,Partner,2025-08-06 02:59:38.676302,2025-08-06 02:59:38.684417,91674b2266a873073be51c643585e3ec47659f1e1cf888...
2,34,p:35,c:9,NaT,NaT,False,2,Board Member,2025-08-06 02:59:38.676302,2025-08-06 02:59:38.684417,3c62d7409bbfe836d25a18481a2b0cc5ec43e60937e3ce...
3,53,p:52,c:14,NaT,NaT,False,1,Chief Technology Officer,2025-08-06 02:59:38.676302,2025-08-06 02:59:38.684417,78fefff8c1396f83e24735f52427e08d1aa904f0e6b99d...
4,65,p:64,c:15,NaT,NaT,False,1,COO/Co-Founder,2025-08-06 02:59:38.676302,2025-08-06 02:59:38.684417,24b9f7102a5e5e0725316389d4b67dfc75a3f5fc22a996...


milestones


Unnamed: 0,created_at,description,milestone_at,milestone_code,milestone_id,object_id,source_description,source_url,updated_at,hash_id
0,2025-08-06 02:59:38.693422,The African Virtual School Maths Book is avail...,2009-12-13,other,2453,c:36287,,http://www.amazon.com/African-Virtual-School-M...,2025-08-06 02:59:38.701004,25972407eb7c70e2d9ec566b59a25a86cb831256ae5e33...
1,2025-08-06 02:59:38.693422,SaaS business intelligence with TBlox,2008-01-01,other,3506,c:43798,Control Your TBlox Managed Business Activities,http://www.insfocus.com/TBlox/,2025-08-06 02:59:38.701004,9304413f9dc8b2a3cb88bef6b8a9c0dc0859e359b2e5e0...
2,2025-08-06 02:59:38.693422,Awarded Microsoft Australia Online Services Pa...,2010-09-03,other,6721,c:27524,DiData takes out Microsoft enterprise partner ...,"http://www.crn.com.au/News/230958,didata-takes...",2025-08-06 02:59:38.701004,1b95c2e31e3b7038a3d8f3f5994849847bf186bfe57258...
3,2025-08-06 02:59:38.693422,"SourceForge, Inc. Changes its Name to Geeknet,...",2009-11-04,other,2509,c:4303,,http://geek.net/press/sourceforge-inc-changes-...,2025-08-06 02:59:38.701004,c9cb5202411f13eee3fc39c53e2b30521ebca91c0b28a5...
4,2025-08-06 02:59:38.693422,"HONG KONG, Dec. 18 /PRNewswire-Asia/ -- Mulabo...",2009-12-18,other,2529,c:6336,Muecs Ltd. Launches an Online Collaboration To...,http://www.prnewswire.com/news-releases/muecs-...,2025-08-06 02:59:38.701004,64671c594f0aa50647a8050ba50c6a021c7b17dfa3863f...


In [20]:
# ubahh csv date
pd.options.display.max_columns = 25
date_df = pd.read_csv("data/date.csv")
date_df = date_df.set_index("date_id")
# date_df.to_csv("data/date.csv")

In [21]:
date_df

Unnamed: 0_level_0,date_actual,day_suffix,day_name,day_of_year,week_of_month,week_of_year,week_of_year_iso,month_actual,month_name,month_name_abbreviated,quarter_actual,quarter_name,year_actual,first_day_of_week,last_day_of_week,first_day_of_month,last_day_of_month,first_day_of_quarter,last_day_of_quarter,first_day_of_year,last_day_of_year,mmyyyy,mmddyyyy,weekend_indr
date_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1
19500101,1950-01-01,1st,Sunday,1,1,52,1950-W52,1,January,Jan,1,First,1950,1949-12-26,1950-01-01,1950-01-01,1950-01-31,1950-01-01,1950-03-31,1950-01-01,1950-12-31,11950,1011950,weekend
19500102,1950-01-02,2nd,Monday,2,1,1,1950-W01,1,January,Jan,1,First,1950,1950-01-02,1950-01-08,1950-01-01,1950-01-31,1950-01-01,1950-03-31,1950-01-01,1950-12-31,11950,1021950,weekday
19500103,1950-01-03,3rd,Tuesday,3,1,1,1950-W01,1,January,Jan,1,First,1950,1950-01-02,1950-01-08,1950-01-01,1950-01-31,1950-01-01,1950-03-31,1950-01-01,1950-12-31,11950,1031950,weekday
19500104,1950-01-04,4th,Wednesday,4,1,1,1950-W01,1,January,Jan,1,First,1950,1950-01-02,1950-01-08,1950-01-01,1950-01-31,1950-01-01,1950-03-31,1950-01-01,1950-12-31,11950,1041950,weekday
19500105,1950-01-05,5th,Thursday,5,1,1,1950-W01,1,January,Jan,1,First,1950,1950-01-02,1950-01-08,1950-01-01,1950-01-31,1950-01-01,1950-03-31,1950-01-01,1950-12-31,11950,1051950,weekday
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
20771227,2077-12-27,27th,Monday,361,4,52,2077-W52,12,December,Dec,4,Fourth,2077,2077-12-27,2078-01-02,2077-12-01,2077-12-31,2077-10-01,2077-12-31,2077-01-01,2077-12-31,122077,12272077,weekday
20771228,2077-12-28,28th,Tuesday,362,4,52,2077-W52,12,December,Dec,4,Fourth,2077,2077-12-27,2078-01-02,2077-12-01,2077-12-31,2077-10-01,2077-12-31,2077-01-01,2077-12-31,122077,12282077,weekday
20771229,2077-12-29,29th,Wednesday,363,5,52,2077-W52,12,December,Dec,4,Fourth,2077,2077-12-27,2078-01-02,2077-12-01,2077-12-31,2077-10-01,2077-12-31,2077-01-01,2077-12-31,122077,12292077,weekday
20771230,2077-12-30,30th,Thursday,364,5,52,2077-W52,12,December,Dec,4,Fourth,2077,2077-12-27,2078-01-02,2077-12-01,2077-12-31,2077-10-01,2077-12-31,2077-01-01,2077-12-31,122077,12302077,weekday


In [22]:
date_df = date_df.reset_index().drop_duplicates("date_id", keep="first").set_index("date_id")

In [23]:
date_df.to_csv("data/date.csv")

In [22]:
date_df["year_actual"]

0        1997
1        1997
2        1997
3        1997
4        1997
         ... 
29580    2077
29581    2077
29582    2077
29583    2077
29584    2077
Name: year_actual, Length: 29585, dtype: int64

In [26]:
date_df = pd.read_csv("data/date.csv")
date_df[date_df["date_id"]==20120323]


Unnamed: 0,date_id,date_actual,day_suffix,day_name,day_of_year,week_of_month,week_of_year,week_of_year_iso,month_actual,month_name,month_name_abbreviated,quarter_actual,quarter_name,year_actual,first_day_of_week,last_day_of_week,first_day_of_month,last_day_of_month,first_day_of_quarter,last_day_of_quarter,first_day_of_year,last_day_of_year,mmyyyy,mmddyyyy,weekend_indr
5560,20120323,2012-03-23,23rd,Friday,83,4,12,2012-W12,3,March,Mar,1,First,2012,2012-03-19,2012-03-25,2012-03-01,2012-03-31,2012-01-01,2012-03-31,2012-01-01,2012-12-31,32012,3232012,weekday


In [11]:
new_date_df.info()

NameError: name 'new_date_df' is not defined

In [10]:
import pandas as pd
from datetime import datetime, timedelta
import calendar

def generate_date_dimension_1980():
    # Create date range
    start_date = datetime(1950, 1, 1)
    end_date = datetime(1979, 12, 31)
    date_range = pd.date_range(start=start_date, end=end_date, freq='D')
    
    # Initialize list to store all rows
    rows = []
    
    for date in date_range:
        # Basic date info
        date_id = int(date.strftime('%Y%m%d'))
        date_actual = date.strftime('%Y-%m-%d')
        
        # Day suffix (1st, 2nd, 3rd, etc.)
        day = date.day
        if 10 <= day % 100 <= 20:
            suffix = 'th'
        else:
            suffix = {1: 'st', 2: 'nd', 3: 'rd'}.get(day % 10, 'th')
        day_suffix = f"{day}{suffix}"
        
        # Day name
        day_name = date.strftime('%A')
        
        # Day of year
        day_of_year = date.timetuple().tm_yday
        
        # Week calculations
        # Week of month (1-based)
        first_day_of_month = date.replace(day=1)
        week_of_month = ((date.day - 1) // 7) + 1
        
        # Week of year (ISO and regular)
        week_of_year = date.isocalendar()[1]
        week_of_year_iso = f"{date.year}-W{week_of_year:02d}"
        
        # Month info
        month_actual = date.month
        month_name = date.strftime('%B')
        month_name_abbreviated = date.strftime('%b')
        
        # Quarter info
        quarter_actual = (date.month - 1) // 3 + 1
        quarter_names = {1: 'First', 2: 'Second', 3: 'Third', 4: 'Fourth'}
        quarter_name = quarter_names[quarter_actual]
        
        # Year
        year_actual = date.year
        
        # Week boundaries (Monday to Sunday)
        days_since_monday = date.weekday()
        first_day_of_week = (date - timedelta(days=days_since_monday)).strftime('%Y-%m-%d')
        last_day_of_week = (date + timedelta(days=6-days_since_monday)).strftime('%Y-%m-%d')
        
        # Month boundaries
        first_day_of_month = date.replace(day=1).strftime('%Y-%m-%d')
        last_day_of_month = date.replace(day=calendar.monthrange(date.year, date.month)[1]).strftime('%Y-%m-%d')
        
        # Quarter boundaries
        quarter_start_month = (quarter_actual - 1) * 3 + 1
        first_day_of_quarter = datetime(date.year, quarter_start_month, 1).strftime('%Y-%m-%d')
        
        if quarter_actual == 4:
            last_day_of_quarter = datetime(date.year, 12, 31).strftime('%Y-%m-%d')
        else:
            next_quarter_start = datetime(date.year, quarter_start_month + 3, 1)
            last_day_of_quarter = (next_quarter_start - timedelta(days=1)).strftime('%Y-%m-%d')
        
        # Year boundaries
        first_day_of_year = f"{date.year}-01-01"
        last_day_of_year = f"{date.year}-12-31"
        
        # Special formats
        mmyyyy = f"{date.month:02d}{date.year}"  # Fixed: added zero-padding for month
        mmddyyyy = f"{date.month:02d}{date.day:02d}{date.year}"
        
        # Weekend indicator
        weekend_indr = 'weekend' if date.weekday() >= 5 else 'weekday'
        
        # Create row
        row = {
            'date_id': date_id,
            'date_actual': date_actual,
            'day_suffix': day_suffix,
            'day_name': day_name,
            'day_of_year': str(day_of_year),
            'week_of_month': str(week_of_month),
            'week_of_year': str(week_of_year),
            'week_of_year_iso': week_of_year_iso,
            'month_actual': str(month_actual),
            'month_name': month_name,
            'month_name_abbreviated': month_name_abbreviated,
            'quarter_actual': str(quarter_actual),
            'quarter_name': quarter_name,
            'year_actual': str(year_actual),
            'first_day_of_week': first_day_of_week,
            'last_day_of_week': last_day_of_week,
            'first_day_of_month': first_day_of_month,
            'last_day_of_month': last_day_of_month,
            'first_day_of_quarter': first_day_of_quarter,
            'last_day_of_quarter': last_day_of_quarter,
            'first_day_of_year': first_day_of_year,
            'last_day_of_year': last_day_of_year,
            'mmyyyy': mmyyyy,
            'mmddyyyy': mmddyyyy,
            'weekend_indr': weekend_indr
        }
        
        rows.append(row)
    
    # Create DataFrame
    df = pd.DataFrame(rows)
    return df

# Generate the date dimension for 1980-1996 (fixed function call)
date_dim_1980 = generate_date_dimension_1980()

# Display first few rows to verify
date_dim_1980

Unnamed: 0,date_id,date_actual,day_suffix,day_name,day_of_year,week_of_month,week_of_year,week_of_year_iso,month_actual,month_name,...,last_day_of_week,first_day_of_month,last_day_of_month,first_day_of_quarter,last_day_of_quarter,first_day_of_year,last_day_of_year,mmyyyy,mmddyyyy,weekend_indr
0,19500101,1950-01-01,1st,Sunday,1,1,52,1950-W52,1,January,...,1950-01-01,1950-01-01,1950-01-31,1950-01-01,1950-03-31,1950-01-01,1950-12-31,011950,01011950,weekend
1,19500102,1950-01-02,2nd,Monday,2,1,1,1950-W01,1,January,...,1950-01-08,1950-01-01,1950-01-31,1950-01-01,1950-03-31,1950-01-01,1950-12-31,011950,01021950,weekday
2,19500103,1950-01-03,3rd,Tuesday,3,1,1,1950-W01,1,January,...,1950-01-08,1950-01-01,1950-01-31,1950-01-01,1950-03-31,1950-01-01,1950-12-31,011950,01031950,weekday
3,19500104,1950-01-04,4th,Wednesday,4,1,1,1950-W01,1,January,...,1950-01-08,1950-01-01,1950-01-31,1950-01-01,1950-03-31,1950-01-01,1950-12-31,011950,01041950,weekday
4,19500105,1950-01-05,5th,Thursday,5,1,1,1950-W01,1,January,...,1950-01-08,1950-01-01,1950-01-31,1950-01-01,1950-03-31,1950-01-01,1950-12-31,011950,01051950,weekday
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
10952,19791227,1979-12-27,27th,Thursday,361,4,52,1979-W52,12,December,...,1979-12-30,1979-12-01,1979-12-31,1979-10-01,1979-12-31,1979-01-01,1979-12-31,121979,12271979,weekday
10953,19791228,1979-12-28,28th,Friday,362,4,52,1979-W52,12,December,...,1979-12-30,1979-12-01,1979-12-31,1979-10-01,1979-12-31,1979-01-01,1979-12-31,121979,12281979,weekday
10954,19791229,1979-12-29,29th,Saturday,363,5,52,1979-W52,12,December,...,1979-12-30,1979-12-01,1979-12-31,1979-10-01,1979-12-31,1979-01-01,1979-12-31,121979,12291979,weekend
10955,19791230,1979-12-30,30th,Sunday,364,5,52,1979-W52,12,December,...,1979-12-30,1979-12-01,1979-12-31,1979-10-01,1979-12-31,1979-01-01,1979-12-31,121979,12301979,weekend


In [11]:
date_df = pd.read_csv("data/date.csv")

In [12]:
date_df_new = pd.concat([date_df, date_dim_1980], ignore_index=True).sort_values("date_id").reset_index(drop=True).set_index("date_id")
date_df_new.to_csv("data/date.csv")

# Tes transform function

# Profiling data

## Spark session

In [5]:
profile_data_spark = SparkSession \
                    .builder \
                    .appName("profile_data") \
                    .getOrCreate()

In [6]:
profiling_result_path = "profiling_result/"

## Source Database

In [7]:
profile_db_df = ProfileData.from_database(profile_data_spark)

In [8]:
profile_db_df.table_name.unique()

array(['company', 'acquisition', 'funding_rounds', 'funds', 'investments',
       'ipos'], dtype=object)

In [None]:
profile_db_df.to_csv(f"{profiling_result_path}profile_db.csv")

In [None]:
company = profile_db_df.query("table_name == 'company'")
company

In [None]:
profile_db_df.query("table_name == 'acquisition'")

In [None]:
profile_db_df.query("table_name == 'funding_rounds'")

In [None]:
profile_db_df.query("table_name == 'funds'")

In [None]:
profile_db_df.query("table_name == 'investments'")

In [None]:
profile_db_df.query("table_name == 'ipos'")

## CSV

- date dan datetime masih dalam bentuk string

In [11]:
profile_csv_df = ProfileData.from_csv(profile_data_spark)

In [13]:
profile_csv_df.to_csv(f"{profiling_result_path}profile_csv.csv")

profile_csv_df

In [None]:
relationships = profile_csv_df.query("table_name == 'relationships.csv'")
relationships[["column_name", "data_type", "mode", "distinct_value"]]

In [None]:
relationship = profile_csv_df.query("table_name == 'relationships.csv' and column_name == 'person_object_id'")

In [None]:
relationship

In [None]:
rel = relationship["distinct_value"].values[0]
rel_prefix = {r[0] for r in rel}
rel_prefix

In [None]:
people = profile_csv_df.query("table_name == 'people.csv'")
people

## API

- data date dan datetime masih berbentuk string
- start_date = 2005-01-01
- end_date = 2011-01-01

In [None]:
params = {
    "start_date":"2005-01-01",
    "end_date":"2011-01-01"
}

profile_api_df = ProfileData.from_api(profile_data_spark, 
                                      api_url=API_PATH, 
                                      params=params)

In [None]:
profile_api_df.to_csv(f"{profiling_result_path}profile_api.csv")

In [None]:
profile_api_df[["column_name", "data_type", "distinct_value"]]

In [None]:
profile_api_df.query("column_name == 'description'")["distinct_value"].values[0]

In [None]:
profile_data_spark.stop()

# Run ETL Pipeline

# Try Extraction process

In [None]:
spark = SparkSession \
    .builder \
    .appName("ETL pipeline") \
    .getOrCreate()

## Source database

In [9]:
# List tables to extract from database
tables = ['company', 'acquisition', 'funding_rounds', 'funds', 'investments',
          'ipos']

# Initialize dictionary to store multiple tables
df_dict = {}

# Start loop to extract each tables and store in df_dict in the form of spark dataframe
for tab in tables:
    df = Extract.from_database(spark, 
                              table_name = tab, 
                              source_engine = source_engine, 
                              write_log = False)
    df_dict[tab] = df

TypeError: Extract.from_database() got an unexpected keyword argument 'initial_load'

In [None]:
df_db_company = df_dict["company"].toPandas()
df_db_company

In [None]:
df = Extract._extract_postgres(spark, "company", write_log=True)
df.show(5)

In [None]:
df.select(max("created_at").cast("string").alias("max_created_at")).first()[0]

In [None]:
API_PATH

In [None]:
ds = '2008-06-09'
try:
    response = requests.get(
        url=API_PATH,
        params={"start_date": ds, "end_date": ds},
    )

    if response.status_code != 200:
        print(f"fail to fetch data from API. Status code: {response.status_code}")

    json_data = response.json()
    if not json_data:
        print("No new data in Dellstore API. Skipped...")

except Exception as e:
    print(e)

In [None]:
json_df = pd.DataFrame(json_data)
json_df

# Unused code

In [14]:
tes_csv = pd.read_csv("data/event_type.csv")
tes_csv

Unnamed: 0.1,Unnamed: 0,event_type_id,event_type,created_at,updated_at
0,0,22d603c6a09a14c27e2881d3e9dc77dac499318b07b620...,Acquired,2025-08-07 08:35:32.123446,2025-08-07 08:35:32.123446
1,1,70d236289ca8544f3b0853a9ba7c5af7dbf7d28b11afe2...,Acquiring,2025-08-07 08:35:32.123446,2025-08-07 08:35:32.123446
2,2,54a22fae449c3a25678fdf6fc62c628b0fc67f61d6cfe9...,IPO,2025-08-07 08:35:32.123446,2025-08-07 08:35:32.123446
3,3,03f90270ba8c153f9b5888366ef04fdcf783f47876be54...,Received Funding,2025-08-07 08:35:32.123446,2025-08-07 08:35:32.123446
4,4,723ab14024a6e4f082338c3fc749a04dd72616bf7371f3...,Made Investment,2025-08-07 08:35:32.123446,2025-08-07 08:35:32.123446
5,5,3004682ecde79e43db18fe9274598e43529a9e3fc24b72...,Funds,2025-08-07 08:35:32.123446,2025-08-07 08:35:32.123446
6,6,9e86248cf2351e388065b80307b7ac00a2fe5ed922d93d...,Milestone,2025-08-07 08:35:32.123446,2025-08-07 08:35:32.123446


In [16]:
tes_csv = tes_csv.drop(columns="Unnamed: 0").set_index("event_type_id")
tes_csv

Unnamed: 0_level_0,event_type,created_at,updated_at
event_type_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
22d603c6a09a14c27e2881d3e9dc77dac499318b07b620a135b6ebd97b4ae817,Acquired,2025-08-07 08:35:32.123446,2025-08-07 08:35:32.123446
70d236289ca8544f3b0853a9ba7c5af7dbf7d28b11afe29691d0027c53b95fdc,Acquiring,2025-08-07 08:35:32.123446,2025-08-07 08:35:32.123446
54a22fae449c3a25678fdf6fc62c628b0fc67f61d6cfe9d8e738c60116d04d3f,IPO,2025-08-07 08:35:32.123446,2025-08-07 08:35:32.123446
03f90270ba8c153f9b5888366ef04fdcf783f47876be546dedcd021a5f3e02a2,Received Funding,2025-08-07 08:35:32.123446,2025-08-07 08:35:32.123446
723ab14024a6e4f082338c3fc749a04dd72616bf7371f31b8912b9dea12c1b42,Made Investment,2025-08-07 08:35:32.123446,2025-08-07 08:35:32.123446
3004682ecde79e43db18fe9274598e43529a9e3fc24b726bb27d235e9c81e577,Funds,2025-08-07 08:35:32.123446,2025-08-07 08:35:32.123446
9e86248cf2351e388065b80307b7ac00a2fe5ed922d93d1316c241cdd9c61146,Milestone,2025-08-07 08:35:32.123446,2025-08-07 08:35:32.123446


In [17]:
tes_csv.to_csv("data/event_type.csv")

In [10]:
# import pandas as pd
# import hashlib
# from datetime import datetime

# def sha256_hash(*cols):
#     concatenated = ''.join(str(col) if col is not None else '' for col in cols) + '||'
#     concatenated = concatenated.rstrip('||')
#     return hashlib.sha256(concatenated.encode()).hexdigest()

# event_types = [
#     'Acquired',
#     'Acquiring',
#     'IPO',
#     'Received Funding',
#     'Made Investment',
#     'Funds',
#     'Milestone'
# ]

# current_time = datetime.now()

# data = {
#     'event_type_id': [sha256_hash(event) for event in event_types],
#     'event_type': event_types,
#     'created_at': [current_time] * len(event_types),
#     'updated_at': [current_time] * len(event_types)
# }

# event_type_df = pd.DataFrame(data)

# try:
#     if event_type_df.shape[0] == 0:
#         raise ValueError("DataFrame is empty")
#     print("DataFrame created successfully:")
#     print(event_type_df)

# except Exception as e:
#     print(f"Exception caught: {e}")

DataFrame created successfully:
                                       event_type_id        event_type  \
0  22d603c6a09a14c27e2881d3e9dc77dac499318b07b620...          Acquired   
1  70d236289ca8544f3b0853a9ba7c5af7dbf7d28b11afe2...         Acquiring   
2  54a22fae449c3a25678fdf6fc62c628b0fc67f61d6cfe9...               IPO   
3  03f90270ba8c153f9b5888366ef04fdcf783f47876be54...  Received Funding   
4  723ab14024a6e4f082338c3fc749a04dd72616bf7371f3...   Made Investment   
5  3004682ecde79e43db18fe9274598e43529a9e3fc24b72...             Funds   
6  9e86248cf2351e388065b80307b7ac00a2fe5ed922d93d...         Milestone   

                  created_at                 updated_at  
0 2025-08-07 08:35:32.123446 2025-08-07 08:35:32.123446  
1 2025-08-07 08:35:32.123446 2025-08-07 08:35:32.123446  
2 2025-08-07 08:35:32.123446 2025-08-07 08:35:32.123446  
3 2025-08-07 08:35:32.123446 2025-08-07 08:35:32.123446  
4 2025-08-07 08:35:32.123446 2025-08-07 08:35:32.123446  
5 2025-08-07 08:35:32.12344

initial_load belum didelete!!!!

In [None]:
# @classmethod
#     def source(cls, spark: SparkSession, df_dict: Dict[str, DataFrame]) -> Optional[Dict]:
#         # Create completely independent DataFrames copy:
#         src = {
#             key: df.select("*")
#             for key, df in df_dict.items()
#         }

#         # Define current_timestamp for logging
#         current_timestamp = datetime.now()  
        
#         # Hash configurations for each table
#         hash_configs = {
#             "company": [
#                 "office_id",
#                 "object_id",
#                 "description",
#                 "region",
#                 "address1",
#                 "address2",
#                 "city",
#                 "zip_code",
#                 "state_code",
#                 "country_code",
#                 "latitude",
#                 "longitude",
#                 "created_at",
#                 "updated_at"
#             ],
            
#             "acquisition": [
#                 "acquisition_id",
#                 "acquiring_object_id",
#                 "acquired_object_id",
#                 "term_code",
#                 "price_amount",
#                 "price_currency_code",
#                 "acquired_at",
#                 "source_url",
#                 "source_description",
#                 "created_at",
#                 "updated_at",
#             ],
            
#             "ipos": [
#                 "ipo_id",
#                 "object_id",
#                 "valuation_amount",
#                 "valuation_currency_code",
#                 "raised_amount",
#                 "raised_currency_code",
#                 "public_at",
#                 "stock_symbol",
#                 "source_url",
#                 "source_description",
#                 "created_at",
#                 "updated_at",
#             ],
            
#             "funds": [
#                 "fund_id",
#                 "object_id",
#                 "name",
#                 "funded_at",
#                 "raised_amount",
#                 "raised_currency_code",
#                 "source_url",
#                 "source_description",
#                 "created_at",
#                 "updated_at",
#             ],
            
#             "funding_rounds": [
#                 "funding_round_id",
#                 "object_id",
#                 "funded_at",
#                 "funding_round_type",
#                 "funding_round_code",
#                 "raised_amount_usd",
#                 "raised_amount",
#                 "raised_currency_code",
#                 "pre_money_valuation_usd",
#                 "pre_money_valuation",
#                 "pre_money_currency_code",
#                 "post_money_valuation_usd",
#                 "post_money_valuation",
#                 "post_money_currency_code",
#                 "participants",
#                 "is_first_round",
#                 "is_last_round",
#                 "source_url",
#                 "source_description",
#                 "created_by",
#                 "created_at",
#                 "updated_at",
#             ],
            
#             "investments": [
#                 "investment_id",
#                 "funding_round_id",
#                 "funded_object_id",
#                 "investor_object_id",
#                 "created_at",
#                 "updated_at",
#             ],
            
#             "people": [
#                 "people_id",
#                 "object_id",
#                 "first_name",
#                 "last_name",
#                 "birthplace",
#                 "affiliation_name",
#             ],
            
#             "relationships": [
#                 "relationship_id",
#                 "person_object_id",
#                 "relationship_object_id",
#                 "start_at",
#                 "end_at",
#                 "is_past",
#                 "sequence",
#                 "title",
#                 "created_at",
#                 "updated_at",
#             ],
            
#             "milestones": [
#                 "milestone_id",
#                 "description",
#                 "milestone_at",
#                 "milestone_code",
#                 "object_id",
#                 "source_description",
#                 "source_url",
#                 "created_at",
#                 "updated_at",
#             ],
#         }
#         # Add hashing to all tables
#         # No hashing for date table and event_type table
#         for table_name, hash_cols in hash_configs.items():
#             try:
#                 if table_name in src:
#                     src[table_name] = cls._hashing(
#                         df=src[table_name],
#                         hash_cols=hash_cols,
#                         hash_output_colname="hash_id"
#                     )
#             except Exception as e:
#                 # Capture full traceback information
#                 tb_str = traceback.format_exc()
#                 error_msg = f"""
#                 fail to perform hashing operation for table '{table_name}'.
                
#                 Error Details:
#                 - Error Type: {type(e).__name__}
#                 - Error Message: {str(e)}
#                 - Table: {table_name}
#                 - Initial Load: {initial_load}
                
#                 Full Traceback:
#                 {tb_str}
#                 """
            
#                  # Log fail transformation
#                 log_msg = spark.sparkContext\
#                     .parallelize([("staging",
#                                    "transform_source",
#                                    "fail",
#                                    "source extraction result",
#                                    table_name,
#                                    current_timestamp,
#                                    error_msg)])\
#                     .toDF(['step',
#                            'process',
#                            'status',
#                            'source',
#                            'table_name',
#                            'etl_date',
#                            'error_msg'])
#                 load_log(spark, log_msg)
#                 return None

#         # ----------------------------------------------------------------- #
#         # Add created_at and updated_at
#         key_columns = {
#             "company": "office_id",
#             "acquisition": "acquisition_id",
#             "ipos": "ipo_id",
#             "funds": "fund_id",
#             "funding_rounds": "funding_round_id",
#             "investments": "investment_id",
#             "people": "people_id",
#             "relationships": "relationship_id",
#             "milestones": "milestone_id",
#         } 
        
#         for table_name, _ in src.items():
#             src[table_name] = src[table_name]\
#                                  .withColumn("created_at", lit(datetime.now()))\
#                                  .withColumn("updated_at", lit(datetime.now()))
            
#             # Create logic for upsert
#             if not initial_load:
#                 if table_name == "date":
#                     continue
#                 try:
#                     # Extract staging data
#                     stg_df = Extract.from_database(
#                         spark=spark,
#                         table_name=table_name,
#                         source_engine=stg_engine,
#                         write_log=False,
#                         initial_load=initial_load
#                     )
                    
#                     if stg_df is None or stg_df.count() == 0:
#                         # Capture full traceback information
#                         tb_str = traceback.format_exc()
#                         error_msg = f"""
#                         fail to perform upsert operation for table '{table_name}'.
                        
#                         Error Details:
#                         - Error Type: {type(e).__name__}
#                         - Error Message: {str(e)}
#                         - Table: {table_name}
#                         - Initial Load: {initial_load}
                        
#                         Full Traceback:
#                         {tb_str}
#                         """
#                         # Log fail transformation
#                         current_timestamp = datetime.now() if not initial_load else datetime(1111, 11, 11)
#                         log_msg = spark.sparkContext\
#                             .parallelize([("staging",
#                                            "transform_source",
#                                            "fail",
#                                            "source extraction result",
#                                            table_name,
#                                            current_timestamp,
#                                            error_msg)])\
#                             .toDF(['step',
#                                    'process',
#                                    'status',
#                                    'source',
#                                    'table_name',
#                                    'etl_date',
#                                    'error_msg'])
#                         load_log(spark, log_msg)
#                         return None
                        
#                     # Get join key and hash column
#                     join_key = key_columns[table_name]
#                     pk_col = "hash_id"
                    
#                     # Validate required columns exist
#                     if join_key not in src[table_name].columns:
#                         raise ValueError(f"Join key '{join_key}' not found in source DataFrame for table {table_name}")
#                     if pk_col not in src[table_name].columns:
#                         raise ValueError(f"Hash column '{pk_col}' not found in source DataFrame for table {table_name}")
                    
#                     # Create aliases for DataFrames
#                     src_df_alias = src[table_name].alias("src")
#                     stg_df_alias = stg_df.alias("stg")
                    
#                     # Perform inner join to find matching records
#                     joined_df = src_df_alias.join(
#                         stg_df_alias,
#                         col(f"src.{join_key}") == col(f"stg.{join_key}"),
#                         "inner"
#                     )
                    
#                     # Define update condition based on hash comparison
#                     update_condition = col(f"src.{pk_col}") != col(f"stg.{pk_col}")
                    
#                     # Build select expressions for update logic - use staging schema as reference
#                     select_expressions = []
#                     current_timestamp_lit = lit(datetime.now())
                    
#                     for column in stg_df.columns:
#                         if column == "updated_at":
#                             # Update timestamp only if record has changed
#                             expr = when(update_condition, current_timestamp_lit).otherwise(col(f"stg.{column}"))
#                         elif column == "created_at":
#                             # Keep original created_at from staging
#                             expr = col(f"stg.{column}")
#                         else:
#                             # Use source data if updated, otherwise keep staging data
#                             expr = when(update_condition, col(f"src.{column}")).otherwise(col(f"stg.{column}"))
                        
#                         select_expressions.append(expr.alias(column))
                    
#                     # Create updated DataFrame
#                     updated_df = joined_df.select(*select_expressions)
                    
#                     # Handle inserts (records in source but not in staging)
#                     insert_df = src[table_name].join(
#                         stg_df.select(join_key),
#                         join_key,
#                         "leftanti"
#                     )
                    
                    
#                     # Select columns in same order as staging
#                     insert_df = insert_df.select(*stg_df.columns)
                    
#                     # Combine updated and inserted records
#                     if insert_df.count() == 0:
#                         upserted_df = updated_df
#                     elif updated_df.count() == 0:
#                         upserted_df = insert_df
#                     else:
#                         upserted_df = updated_df.unionByName(insert_df)
                    
#                     src[table_name] = upserted_df
                    
#                     # Log success transforming
#                     log_msg = spark.sparkContext\
#                         .parallelize([("staging",
#                                        "transform_source",
#                                        "success",
#                                        "source",
#                                        table_name,
#                                        current_timestamp)])\
#                         .toDF(['step',
#                                'process',
#                                'status',
#                                'source',
#                                'table_name',
#                                'etl_date'])
#                     load_log(spark, log_msg)
                    
#                 except Exception as e:
#                     # Capture full traceback information
#                     tb_str = traceback.format_exc()
#                     error_msg = f"""
#                     fail to perform upsert operation for table '{table_name}'.
                    
#                     Error Details:
#                     - Error Type: {type(e).__name__}
#                     - Error Message: {str(e)}
#                     - Table: {table_name}
#                     - Initial Load: {initial_load}
                    
#                     Full Traceback:
#                     {tb_str}
#                     """
#                     # Log fail transformation
#                     current_timestamp = datetime.now() if not initial_load else datetime(1111, 11, 11)
#                     log_msg = spark.sparkContext\
#                         .parallelize([("staging",
#                                        "transform_source",
#                                        "fail",
#                                        "source extraction result",
#                                        table_name,
#                                        current_timestamp,
#                                        error_msg)])\
#                         .toDF(['step',
#                                'process',
#                                'status',
#                                'source',
#                                'table_name',
#                                'etl_date',
#                                'error_msg'])
#                     load_log(spark, log_msg)
#                     return None

#         # ----------------------------------------------------------------- #
#         # Return back the src dictionary
#         return src

In [None]:
   # @staticmethod
   #  def to_temp(spark: SparkSession, df, table_name, path="/temp"):
   #      current_timestamp = datetime.now()
       
   #      try:
   #          # load data to provided csv path
   #          full_path = os.path.join(path, table_name)
   #          os.chmod(os.path.join(os.getcwd(), path), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
   #          os.makedirs(full_path, exist_ok=True)
            
   #          # Write to single CSV file
   #          df.coalesce(1).write.mode("overwrite").csv(full_path, header=True)
            
   #          # Find the actual CSV file (Spark creates part-*.csv files)
   #          csv_files = glob.glob(f"{full_path}/part-*.csv")
            
   #          # Rename to desired filename
   #          final_csv_path = f"{full_path}.csv"
   #          os.rename(csv_files[0], final_csv_path)
            
   #          # Clean up temporary directory
   #          shutil.rmtree(full_path)
            
   #          print(f"CSV file written to: {final_csv_path}")

            
   #          #log message
   #          log_msg = spark.sparkContext\
   #              .parallelize([("warehouse", "load", "success", "staging transformation result", table_name, current_timestamp)])\
   #              .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])
            
   #      except Exception as e:
   #          # log message
   #          log_msg = spark.sparkContext\
   #              .parallelize([("warehouse", "load", "fail", "staging transformation result", table_name, current_timestamp, str(e))])\
   #              .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg'])
            
   #      finally:
   #          load_log(spark, log_msg) 