Utils

In [1]:
import logging
import os
from pyspark.sql import SparkSession


def logging_process(log_file="script/log/info.log"):
    # Configure logging
    os.makedirs(os.path.dirname(log_file), exist_ok=True)
    logging.basicConfig(
        filename=log_file,
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s"
    )
    logger = logging.getLogger()
    return logger

def load_log_msg(spark: SparkSession, log_msg):

    DB_URL = "jdbc:postgresql://pipeline_db:5432/etl_log"
    table_name = "etl_log"

    # set config
    connection_properties = {
        "user":"postgres",
        "password":"cobapassword",
        "driver": "org.postgresql.Driver"
    }

    log_msg.write.jdbc(url = DB_URL,
                  table = table_name,
                  mode = "append",
                  properties = connection_properties)


def init_spark_session():
    spark = SparkSession.builder.appName(
        "Exercise Data Pipeline Week_6"
    ).getOrCreate()

    # handle legacy time parser
    spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

    return spark

staging_extract

In [2]:
from utils.helper import logging_process, init_spark_session, load_log_msg
from datetime import datetime
import logging
import pyspark

logging_process()


def extract_data(data_name: str, format_data: str) -> pyspark.sql.DataFrame:
    spark = init_spark_session()

    DB_URL = "jdbc:postgresql://source_db:5432/startup_investments"
    DB_USER = "postgres"
    DB_PASS = "cobapassword"

    jdbc_url = DB_URL
    connection_properties = {
        "user": DB_USER,
        "password": DB_PASS,
        "driver": "org.postgresql.Driver"
    }

    current_timestamp = datetime.now()
    log_message = None

    try:
        logging.info(f"===== Start Extracting {data_name} data =====")

        if format_data.lower() == "csv":
            df = spark.read.csv(f"data/{data_name}.csv", header=True)

        elif format_data.lower() == "db":
            df = spark.read.jdbc(
                url=jdbc_url,
                table=data_name,
                properties=connection_properties
            )

        else:
            raise ValueError("Format data not supported yet")

        logging.info(f"===== Finish Extracting {data_name} data =====")

        # Log success
        log_message = spark.sparkContext.parallelize([(
            "sources", "extract", "success", format_data, data_name, current_timestamp
        )]).toDF(["step", "process", "status", "source", "table_name", "etl_date"])

        return df

    except Exception as e:
        logging.error("====== Failed to Extract Data ======")
        logging.error(str(e))

        # Log failure
        log_message = spark.sparkContext.parallelize([(
            "sources", "extraction", "failed", format_data, data_name, current_timestamp, str(e)
        )]).toDF(["step", "process", "status", "source", "table_name", "etl_date", "error_msg"])

        raise

    finally:
        if log_message:
            try:
                load_log_msg(spark, log_message)
            except Exception as log_err:
                logging.error(f"Failed to write log to DB: {log_err}")


staging_load

In [3]:
from utils.helper import logging_process, init_spark_session, load_log_msg
import logging
import pyspark
from sqlalchemy import create_engine, text
from datetime import datetime

logging_process()


def load_data(df_result: pyspark.sql.DataFrame, table_name: str) -> None:
    """
    Function to truncate a table using SQLAlchemy and then load data into it using PySpark,
    while logging success/failure ETL events into a log table.
    """

    logging_process()

    spark = init_spark_session()
    current_timestamp = datetime.now()
    log_message = None

    # DB config
    DB_HOST = "pipeline_db"
    DB_PORT = "5432"
    DB_NAME = "staging"
    DB_USER = "postgres"
    DB_PASS = "cobapassword"

    jdbc_url = f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}"
    sqlalchemy_url = f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

    connection_properties = {
        "user": DB_USER,
        "password": DB_PASS,
        "driver": "org.postgresql.Driver"
    }

    try:
        logging.info("===== Start Load data to the database =====")

        # TRUNCATE TABLE using SQLAlchemy
        engine = create_engine(sqlalchemy_url)
        with engine.connect() as connection:
            connection.execute(text(f"TRUNCATE TABLE {table_name} CASCADE"))
            logging.info(f"===== Truncated table {table_name} successfully =====")

        # Load data using PySpark
        df_result.write.jdbc(
            url=jdbc_url,
            table=table_name,
            mode="append",
            properties=connection_properties,
        )

        logging.info("===== Finish Load data to the database =====")

        # SUCCESS log
        log_message = spark.sparkContext.parallelize([(
            "targets", "load", "success", "db", table_name, current_timestamp
        )]).toDF(["step", "process", "status", "source", "table_name", "etl_date"])

    except Exception as e:
        logging.error("===== Failed Load data to the database =====")
        logging.error(str(e))

        # FAILURE log
        log_message = spark.sparkContext.parallelize([(
            "targets", "load", "failed", "db", table_name, current_timestamp, str(e)
        )]).toDF(["step", "process", "status", "source", "table_name", "etl_date", "error_msg"])

        raise

    finally:
        if log_message is not None:
            try:
                load_log_msg(spark, log_message)
                logging.info("ETL log inserted successfully")
            except Exception as log_err:
                logging.error(f"Failed to write log to DB: {log_err}")


staging_pipeline

In [6]:
import logging

# Initialize logging
logging_process()

if __name__ == "__main__":
    logging.info("===== Start Banking Data Pipeline =====")

    try:
        # Extract data from CSV and database
        df_people = extract_data(data_name="people", format_data="csv")
        df_relationships = extract_data(data_name="relationships", format_data="csv")
        df_acquisition = extract_data(data_name="acquisition", format_data="db")
        df_funds = extract_data(data_name="funds", format_data="db")
        df_funding_rounds = extract_data(data_name="funding_rounds", format_data="db")
        df_company = extract_data(data_name="company", format_data="db")
        df_investments = extract_data(data_name="investments", format_data="db")
        df_ipos = extract_data(data_name="ipos", format_data="db")

        # Transform each dataset separately
        # df_transactions = transform_data(df_transactions, "transactions")

        # Load each transformed dataset into the data warehouse
        load_data(df_people, table_name="people")
        load_data(df_relationships, table_name="relationships")
        load_data(df_acquisition, table_name="acquisition")
        load_data(df_funds, table_name="funds")
        load_data(df_funding_rounds, table_name="funding_rounds")
        load_data(df_company, table_name="company")
        load_data(df_investments, table_name="investments")
        load_data(df_ipos, table_name="ipos")

        logging.info("===== Finish Investment Data Pipeline =====")

    except Exception as e:
        logging.error("===== Data Pipeline Failed =====")
        logging.error(e)
        raise

In [18]:
        load_data(df_people, table_name="people")
        load_data(df_relationships, table_name="relationships")
        load_data(df_acquisition, table_name="acquisition")
        load_data(df_funds, table_name="funds")
        load_data(df_funding_rounds, table_name="funding_rounds")
        load_data(df_company, table_name="company")
        load_data(df_investments, table_name="investments")
        load_data(df_ipos, table_name="ipos")

In [22]:
df_ipos = extract_data(data_name="ipos", format_data="db")

In [23]:
load_data(df_ipos, table_name="ipos")