# 🎯 **Step 0: Importing Required Libraries**
---

In [119]:
import pandas as pd
import numpy as np
import psycopg2
import logging
import yaml
import os 

from datetime import date
from sqlalchemy import create_engine
from logging.handlers import RotatingFileHandler
from typing import Dict, Optional, Tuple, Any

# 🎯 **Step 1: Setting Up a Custom Logger for ETL Pipeline**
---

In [120]:
class Logger:
    """
    A logging utility class for managing and configuring loggers for the ETL pipeline.

    This class sets up a logger that writes logs to both a rotating file and the console.
    Logs are saved in a 'logs' directory located one level above the current file.
    It uses Python's built-in `logging` module with `RotatingFileHandler` to handle
    log rotation, ensuring that log files do not grow indefinitely.

    Methods:
        __init__():
            Initializes the Logger instance with a specified name and log level.

        _setup_logging(): 
            Configures the logger with file and console handlers.
            
        get_logger() -> logging.Logger:
            Returns the configured logger instance for use in other parts of the application.
    """

    def __init__(self, name: str = __name__, log_level: int = logging.DEBUG) -> None:
        """
        Initialize the Logger instance.

        Parameters:
            name (str): The name of the logger. Defaults to the module's name.
            log_level (int): Logging level to use (e.g., logging.DEBUG). Defaults to DEBUG.

        Returns:
            None

        Raises:
            Exception: If the log directory cannot be created or accessed.
        """
        
        self.logger = logging.getLogger(name)
        self.logger.setLevel(log_level)

        try:
            base_dir = os.path.dirname(os.path.dirname(__file__))
        except NameError:
            base_dir = os.getcwd()

        self.log_dir = os.path.join(base_dir, 'logs')
        self._setup_logging()

    def _setup_logging(self) -> None:
        """
        Internal method to set up file and console handlers with formatting and rotation.

        - Creates the logs directory if it doesn't exist.
        - Sets up a rotating file handler (5 MB max, up to 5 backups).
        - Adds a console stream handler.
        - Applies consistent formatting to both handlers.

        Returns:
            None
        """

        if not os.path.exists(self.log_dir):
            os.makedirs(self.log_dir)

        self.logger.handlers.clear()

        file_handler = RotatingFileHandler(
            filename=f'{self.log_dir}/etl_pipeline.log',
            maxBytes=1024 * 1024 * 5,  # 5 MB
            backupCount=5
        )
        file_handler.setLevel(logging.DEBUG)

        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)

        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)

        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)

        self.logger.propagate = False

    def get_logger(self) -> logging.Logger:
        """
        Get the configured logger instance.

        Returns:
            logging.Logger: The logger configured with file and console handlers.
        """
        
        return self.logger

In [121]:
# if __name__ == "__main__":
#     logger = Logger(__name__).get_logger()
#     logger.debug("Ini adalah pesan debug.")
#     logger.info("Ini adalah pesan info.")
#     logger.warning("Ini adalah pesan peringatan.")
#     logger.error("Ini adalah pesan kesalahan.")
#     logger.critical("Ini adalah pesan kritikal.")

# 🎯 **Step 2: Loading Configs and Setting Up Database Access**
---

In [122]:
class DatabaseConnection:
    """
    A class for managing PostgreSQL database connections using both psycopg2 and SQLAlchemy.

    This class provides methods to load configuration from a YAML file, establish connections 
    with psycopg2 for raw SQL operations, and SQLAlchemy for ORM or advanced queries.
    It also supports context manager protocol for automatic connection management.

    Methods:
        __init__(config_path: str) -> None:
            Initializes the DatabaseConnection instance with a configuration path.
            
        load_db_config() -> dict:
            Loads database configuration from a YAML file.

        connect() -> tuple[psycopg2.extensions.connection, sqlalchemy.engine.base.Engine]:
            Establishes and returns both psycopg2 and SQLAlchemy connections.

        close() -> None:
            Closes the psycopg2 connection.

        __enter__() -> tuple[psycopg2.extensions.connection, sqlalchemy.engine.base.Engine]:
            Enables usage with context managers (with statement).

        __exit__(exc_type, exc_val, exc_tb) -> None:
            Automatically closes the connection at the end of a with block.
    """

    def __init__(self, config_path: str) -> None:
        """
        Initialize the DatabaseConnection instance.

        Parameters:
            config_path (str): The path to the YAML configuration file.

        Returns:
            None
        """

        self.config_path = config_path
        self.connection = None
        self.engine = None
        self.logger = Logger(__name__).get_logger()  # Inisialisasi logger

    def load_db_config(self) -> dict:
        """
        Load the database configuration from the YAML file.

        Returns:
            dict: A dictionary containing database connection settings.

        Raises:
            Exception: If the configuration file cannot be read or parsed.
        """

        try:
            with open(self.config_path, 'r') as file:
                return yaml.safe_load(file)
        except Exception as e:
            self.logger.error(f"Failed to load database configuration: {e}")
            raise

    def connect(self) -> tuple:
        """
        Establish connections to the PostgreSQL database using psycopg2 and SQLAlchemy.

        Returns:
            tuple: A tuple containing:
                - psycopg2 connection object
                - SQLAlchemy engine object

        Raises:
            Exception: If the connection attempt fails.
        """

        try:
            config = self.load_db_config()
            db_config = config['postgresql']

            # Koneksi menggunakan psycopg2
            self.connection = psycopg2.connect(
                host=db_config['host'],
                database=db_config['database'],
                user=db_config['user'],
                password=db_config['password']
            )
            self.logger.info("Database connection successfully opened.")

            conn_string = f"postgresql+psycopg2://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}"
            self.engine = create_engine(conn_string)
            return self.connection, self.engine
        except Exception as e:
            self.logger.error(f"Failed to connect to database: {e}")
            raise

    def close(self) -> None:
        """
        Close the psycopg2 database connection.

        Returns:
            None

        Raises:
            Exception: If closing the connection fails.
        """

        if self.connection:
            try:
                self.connection.close()
                self.logger.info("Database connection closed.")
            except Exception as e:
                self.logger.error(f"Failed to close database connection: {e}")
                raise

    def __enter__(self) -> tuple:
        """
        Support for context manager entry. Calls `connect()`.

        Returns:
            tuple: The result of the `connect()` method.
        """

        return self.connect()

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """
        Support for context manager exit. Calls `close()`.

        Parameters:
            exc_type: Exception type, if raised.
            exc_val: Exception value, if raised.
            exc_tb: Traceback, if raised.

        Returns:
            None
        """
        
        self.close()

In [123]:
# if __name__ == "__main__":
#     db = DatabaseConnection('./config.yaml')
#     conn, engine = db.connect()
#     db.close()

# 🎯 **Step 3: Checking and Cleaning Data Quality**
---

In [124]:
class DataQuality:
    """
    A class to perform various data quality checks and cleaning operations
    on a collection of datasets.

    Methods:
        __init__(datasets: Dict[str, pd.DataFrame]) -> None:
            Initialize the DataQuality class with datasets.
            
        check_duplicates() -> pd.DataFrame:
            Check for duplicate rows in each dataset and return a DataFrame of duplicates.
        
        check_null_values() -> pd.DataFrame:
            Check for null values in each dataset and return a summary DataFrame.

        handle_duplicates(duplicates_df: pd.DataFrame) -> None:
            Remove duplicate rows from datasets based on 'url' column.

        impute_value(df: pd.DataFrame, column_name: str, method: str, dataset_name: str) -> None:
            Impute missing values in a specified column using a selected method.

        handle_null_values(null_values_df: pd.DataFrame, drop_null_threshold: float = 20.0) -> None:
            Handle missing values by dropping or imputing based on threshold and data type.
    """

    def __init__(self, datasets: Dict[str, pd.DataFrame]) -> None:
        """
        Initialize the DataQuality class with datasets.

        Parameters:
            datasets (Dict[str, pd.DataFrame]): Dictionary mapping dataset names to DataFrames.
        """

        self.datasets = datasets
        self.logger = Logger(__name__).get_logger()

    def check_duplicates(self) -> pd.DataFrame:
        """
        Check for duplicate rows in each dataset.

        Returns:
            pd.DataFrame: Combined DataFrame listing all duplicated rows across datasets.
        """

        combined_duplicates = pd.DataFrame(columns=['Dataset'])

        for name, df in self.datasets.items():
            duplicate_count = df.duplicated(keep='first').sum()

            if duplicate_count > 0:
                self.logger.info(f"Dataset '{name}' has {duplicate_count} duplicate rows.")

                duplicates_df = df[df.duplicated(keep='first')].copy()
                duplicates_df['Dataset'] = name
                duplicates_df['Jumlah Duplikasi'] = df.groupby(list(df.columns)).transform('size')

                cols = ['Dataset'] + [col for col in duplicates_df.columns if col != 'Dataset']
                duplicates_df = duplicates_df[cols]
                
                combined_duplicates = pd.concat([combined_duplicates, duplicates_df], ignore_index=True)
            else:  
                self.logger.info(f"Dataset '{name}' has no duplicates.")
        return combined_duplicates

    def check_null_values(self) -> pd.DataFrame:
        """
        Check for null values in each dataset and summarize them.

        Returns:
            pd.DataFrame: Summary of null values per column and dataset.
        """

        combined_nulls = pd.DataFrame()
        for name, df in self.datasets.items():
            df = self._convert_to_datetime(df)
            null_values = df.isnull().sum()
            null_columns = null_values[null_values > 0]

            if not null_columns.empty:
                self.logger.info(f"Dataset '{name}' has null values.")
                column_types = df.dtypes[null_columns.index]
                is_date_type = column_types.apply(pd.api.types.is_datetime64_any_dtype)
                null_value_summary = pd.DataFrame({
                    'Dataset': name,
                    'Kolom': null_columns.index,
                    'Tipe Data': column_types.values,
                    'Jumlah Null Values': null_columns.values,
                    'Persentase Null Values (%)': (null_columns.values / len(df)) * 100,
                    'Apakah Tipe Data Date?': is_date_type.values
                })
                combined_nulls = pd.concat([combined_nulls, null_value_summary], ignore_index=True)
            else:
                self.logger.info(f"Dataset '{name}' has no null values.")
        return combined_nulls

    def _convert_to_datetime(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Convert columns with 'date' or 'timestamp' in name to datetime.

        Parameters:
            df (pd.DataFrame): Input DataFrame.

        Returns:
            pd.DataFrame: DataFrame with converted datetime columns.
        """

        date_columns = [col for col in df.columns if 'date' in col.lower() or 'timestamp' in col.lower()]
        for col in date_columns:
            df[col] = pd.to_datetime(df[col], errors='coerce')
        return df

    def handle_duplicates(self, duplicates_df: pd.DataFrame) -> None:
        """
        Remove duplicate rows from datasets based on 'url' column.

        Parameters:
            duplicates_df (pd.DataFrame): DataFrame containing duplicate information.

        Returns:
            None
        """

        if 'Dataset' not in duplicates_df.columns:
            self.logger.info("There are no duplicates to deal with.")
            return
        
        datasets_with_duplicates = duplicates_df['Dataset'].unique()
        for dataset_name in datasets_with_duplicates:
            if dataset_name in self.datasets:
                self.logger.info(f"Removing duplicates from dataset '{dataset_name}'.")
                self.datasets[dataset_name] = self.datasets[dataset_name].drop_duplicates(subset='url', keep='first')

    def impute_value(self, df: pd.DataFrame, column_name: str, method: str, dataset_name: str) -> None:
        """
        Impute missing values in a specified column using a selected method.

        Parameters:
            df (pd.DataFrame): DataFrame to process.
            column_name (str): Column to impute.
            method (str): Imputation method ('mean', 'median', 'mode', 'interpolate').
            dataset_name (str): Name of the dataset for logging.

        Returns:
            None

        Raises:
            ValueError: If the method is not recognized.
        """

        if method == 'mean':
            df[column_name] = df[column_name].fillna(df[column_name].mean())
        elif method == 'median':
            df[column_name] = df[column_name].fillna(df[column_name].median())
        elif method == 'mode':
            df[column_name] = df[column_name].fillna(df[column_name].mode()[0])
        elif method == 'interpolate':
            df[column_name] = df[column_name].interpolate()
            df[column_name] = df[column_name].dt.floor('s')
        else:
            self.logger.error(f"The imputation method '{method}' is not recognized.")
            raise ValueError(f"Imputation method '{method}' is not recognized.")
        self.logger.info(f"Imputation is performed on column '{column_name}' of dataset '{dataset_name}' using method '{method}'.")

    def handle_null_values(self, null_values_df: pd.DataFrame, drop_null_threshold: float = 20.0) -> None:
        """
        Handle missing values by dropping or imputing based on threshold and data type.

        Parameters:
            null_values_df (pd.DataFrame): Summary of null values (output of `check_null_values()`).
            drop_null_threshold (float): Threshold percentage for dropping columns.

        Returns:
            None
        """
        
        for _, row in null_values_df.iterrows():
            dataset_name = row['Dataset']
            column_name = row['Kolom']
            null_percentage = row['Persentase Null Values (%)']
            df = self.datasets.get(dataset_name)
            if df is None:
                self.logger.error(f"Dataset '{dataset_name}' not found.")
                continue
            if null_percentage > drop_null_threshold:
                if column_name in df.columns:
                    df.drop(column_name, axis=1, inplace=True)
                    self.logger.info(f"Column '{column_name}' in dataset '{dataset_name}' has been dropped because the null percentage ({null_percentage}%) exceeds the threshold.")
                else:
                    self.logger.error(f"Column '{column_name}' is not found in dataset '{dataset_name}' and cannot be dropped.")
                    continue
            elif column_name in df.columns:
                if pd.api.types.is_numeric_dtype(df[column_name]):
                    skewness_value = df[column_name].dropna().skew()
                    method = 'median' if skewness_value != 0 else 'mean'
                    self.impute_value(df, column_name, method, dataset_name)
                elif pd.api.types.is_datetime64_any_dtype(df[column_name]):
                    self.impute_value(df, column_name, 'interpolate', dataset_name)
                else:
                    self.impute_value(df, column_name, 'mode', dataset_name)
            else:
                self.logger.error(f"Column '{column_name}' does not exist in dataset '{dataset_name}', continue to next column.")
                continue

In [125]:
# if __name__ == "__main__":
#     # Contoh data
#     data = {
#         'id': [1, 2, 3, 4, 5],
#         'name': ['Alice', 'Bob', None, 'Dave', 'Eve'],
#         'age': [25, 30, 35, None, 40],
#         'signup_date': ['2022-01-01', '2022-02-01', '2022-03-01', '2022-04-01', '2022-05-01']
#     }
#     df = pd.DataFrame(data)
#     datasets = {'example': df}

#     dq = DataQuality(datasets)
#     duplicates = dq.check_duplicates()
#     nulls = dq.check_null_values()
#     dq.handle_duplicates(duplicates)
#     dq.handle_null_values(nulls)
#     validation = dq.validate_data_types_and_ranges(
#         expected_types={'id': 'int64', 'name': 'object', 'age': 'float64', 'signup_date': 'datetime64[ns]'},
#         value_ranges={'age': (18, 100)}
#     )
#     print("Duplicates:\n", duplicates)
#     print("\nNull Values:\n", nulls)
#     print("\nValidation Results:\n", validation)

# 🎯 **Step 4: Extracting and Saving Data to Staging**
---

In [126]:
class DataExtractor:
    """
    A class for extracting data from CSV files and saving it into a staging area.

    This class loads a configuration from a YAML file and provides methods to extract data from a CSV file
    and save it in a Parquet format in the staging area.

    Methods:
        __init__(config_path: str) -> None:
            Initializes the DataExtractor instance with a configuration path.

        load_config() -> dict:
            Loads the configuration from the YAML file.

        extract_data(csv_file: str, staging_file: str) -> bool:
            Extracts data from a CSV file and saves it in Parquet format to the staging area.
    """

    def __init__(self, config_path: str) -> None:
        """
        Initialize the DataExtractor instance.

        Parameters:
            config_path (str): Path to the YAML configuration file.

        Returns:
            None
        """

        self.config_path = config_path
        self.logger = Logger(__name__).get_logger()
        self.db_connection = DatabaseConnection(config_path)

    def extract_data(self) -> pd.DataFrame:
        """
        Extracts data from a CSV file and saves it in Parquet format to the staging area.

        Parameters:
            csv_file (str): The path to the CSV file to be extracted.
            staging_file (str): The path where the extracted data should be saved in Parquet format.

        Returns:
            bool: Returns `True` if the data was successfully extracted and saved, `False` otherwise.

        Raises:
            Exception: If there is an error during the extraction or saving process.
        """

        try:
            conn, engine = self.db_connection.connect()
            
            query = """
                SELECT DISTINCT
                    a.title, 
                    c.name AS category, 
                    d.date AS publication_date, 
                    a.url, 
                    a.description AS summary
                FROM fact_news fn
                JOIN dim_article a ON fn.news_id = a.news_id
                JOIN dim_category c ON fn.category_id = c.category_id
                JOIN dim_date d ON fn.date_id = d.date_id;
            """
            data = pd.read_sql(query, engine)
            self.logger.info("Data successfully extracted from data warehouse.")
            return data
            
        except Exception as e:
            self.logger.error(f"Failed to extract data: {e}")
            return None
        finally:
            if self.db_connection:
                self.db_connection.close()

In [127]:
# import os
# os.makedirs('./extracted', exist_ok=True)

In [128]:
# if __name__ == "__main__":
#     config_path = './config.yaml'
#     extractor = DataExtractor(config_path)
#     data = extractor.extract_data()

#     if data is not None:
#         base_path = './'
#         staging_file = f"{base_path}extracted/staging_data.parquet"
#         data.to_parquet(staging_file, index=False)
#         extractor.logger.info(f"Data berhasil disimpan ke staging area ({staging_file}).")
#         display(data.head())
#     else:
#         extractor.logger.error("Tidak ada data yang diekstrak.")

In [129]:
# len(data)

# 🎯 **Step 5: Transforming and Validating Data**
---

In [130]:
class DataTransformer:
    """
    A class for transforming and processing data loaded from a staging file.

    This class provides methods to load configuration, read data from a staging area (parquet file),
    and perform data transformation including data quality checks (duplicates, null values, data types, etc.).

    Methods:
        __init__(config_path: str) -> None:
            Initializes the DataTransformer instance with a configuration path.
            
        load_config() -> dict:
            Loads the configuration from the YAML file.
        
        process_data(staging_file: str) -> pd.DataFrame | None:
            Reads data from the given staging file (parquet).
        
        transform_data(data: pd.DataFrame) -> pd.DataFrame | None:
            Transforms the data by performing various quality checks and validation.
    """

    def __init__(self, config_path: str) -> None:
        """
        Initialize the DataTransformer instance.

        Parameters:
            config_path (str): Path to the YAML configuration file.

        Returns:
            None
        """

        self.config_path = config_path
        self.logger = Logger(__name__).get_logger()
        self.dq = None

    def process_data(self, staging_file: str) -> pd.DataFrame | None:
        """
        Reads data from the given staging file (parquet).

        Parameters:
            staging_file (str): The path to the parquet file containing the staging data.

        Returns:
            pd.DataFrame | None: Returns the loaded data as a DataFrame, or None if loading fails.

        Raises:
            Exception: If there is an error during the reading process.
        """

        try:
            data = pd.read_parquet(staging_file)
            self.logger.info("Data was successfully read from the staging area.")
            return data
        except Exception as e:
            self.logger.error(f"Failed to process data from staging area: {e}")
            return None

    def transform_data(self, data: pd.DataFrame) -> pd.DataFrame | None:
        """
        Transforms the data by performing various quality checks and validation.

        This includes checking for duplicates, null values, and validating data types and ranges.

        Parameters:
            data (pd.DataFrame): The data to be transformed.

        Returns:
            pd.DataFrame | None: Returns the transformed data as a DataFrame, or None if transformation fails.

        Raises:
            Exception: If there is an error during the transformation process.
        """
        
        try:
            self.dq = DataQuality({'bbc_df': data})

            duplicates_df = self.dq.check_duplicates()
            self.dq.handle_duplicates(duplicates_df)

            null_values_df = self.dq.check_null_values()
            self.dq.handle_null_values(null_values_df, drop_null_threshold=25)

            data['publication_date'] = pd.to_datetime(data['publication_date'], errors='coerce')

            data['title'] = data['title'].str.replace(r'[^\w\s]', '', regex=True)
            data['summary'] = data['summary'].str.replace(r'[^\w\s]', '', regex=True)

            data['processed_date'] = pd.Timestamp.now()

            return data
        except Exception as e:
            self.logger.error(f"Failed to transform data: {e}")
            return None

In [131]:
# import os
# os.makedirs('./transformed', exist_ok=True)

In [132]:
# if __name__ == "__main__":
#     config_path = './config.yaml'
#     transformer = DataTransformer(config_path)
#     base_path = './'
#     staging_file = f"{base_path}extracted/staging_data.parquet"
#     data = transformer.process_data(staging_file)

#     if data is not None:
#         transformed_data = transformer.transform_data(data)

#         if transformed_data is not None:
#             # Simpan data yang telah di-transformasi
#             transformed_file = f"{base_path}transformed/transformed_data.parquet"
#             transformed_data.to_parquet(transformed_file, index=False)
#             transformer.logger.info(f"Data berhasil di-transformasi dan disimpan ke {transformed_file}.")
#             display(transformed_data.head())
#         else:
#             transformer.logger.error("Data tidak berhasil di-transformasi.")
#     else:
#         transformer.logger.error("Data tidak berhasil diproses dari staging area.")

# 🎯 **Step 6: Loading Clean Data to the Database**
---

In [133]:
class DataLoader:
    """
    A class for loading data into dimension and fact tables in a database.

    This class provides methods to load data into various dimension tables (e.g., `dim_date`, `dim_category`, 
    `dim_source_category`, `dim_tag`, and `dim_article`) as well as a fact table (`fact_news`). It also includes
    a method for parsing date strings.

    Methods:
        __init__(config_path: str) -> None:
            Initializes the DataLoader instance with a configuration file path.

        load_to_dimension_tables(data: pd.DataFrame) -> bool:
            Loads the provided data into various dimension tables in the database.

        load_to_fact_table(data: pd.DataFrame) -> bool:
            Loads the provided data into the fact table in the database.
    """

    def __init__(self, config_path: str) -> None:
        """
        Initialize the DataLoader instance.

        Parameters:
            config_path (str): Path to the YAML configuration file.

        Returns:
            None
        """

        self.config_path = config_path
        self.logger = Logger(__name__).get_logger()
        self.db_connection = DatabaseConnection(config_path)
        
    def process_data(self, staging_file: str) -> pd.DataFrame | None:
        """
        Reads data from the given staging file (parquet).

        Parameters:
            staging_file (str): The path to the parquet file containing the staging data.

        Returns:
            pd.DataFrame | None: Returns the loaded data as a DataFrame, or None if loading fails.

        Raises:
            Exception: If there is an error during the reading process.
        """

        try:
            data = pd.read_parquet(staging_file)
            self.logger.info("Data was successfully read from the transformed area.")
            return data
        except Exception as e:
            self.logger.error(f"Failed to process data from transformed area: {e}")
            return None

    def load_to_new_table(self, data: pd.DataFrame) -> bool:
        """
        Loads the provided data into a new table in the database.

        This method creates a new table named `dim_cleaned` if it does not already exist,
        and then loads the data into that table.
        
        Parameters:
            data (pd.DataFrame): The data to be loaded into the database.

        Returns:
            bool: Returns `True` if the data was successfully loaded, `False` otherwise.

        Raises:
            Exception: If there is an error during the table creation or data loading process.
        """
        
        try:
            conn, engine = self.db_connection.connect()
            
            create_table_query = """
                CREATE TABLE IF NOT EXISTS dim_cleaned (
                    cleaned_id SERIAL PRIMARY KEY,
                    title VARCHAR(255) NOT NULL,
                    category VARCHAR(100) NOT NULL,
                    publication_date TIMESTAMP NOT NULL,
                    url VARCHAR(255) NOT NULL,
                    summary TEXT NOT NULL,
                    processed_date TIMESTAMP NOT NULL
                );
            """

            with conn.cursor() as cursor:
                cursor.execute(create_table_query)
                conn.commit()
                self.logger.info("Table dim_cleaned was successfully created or already exists.")

            data.to_sql('dim_cleaned', engine, if_exists='append', index=False)
            self.logger.info("Data successfully loaded into dim_cleaned table.")
            return True

        except Exception as e:
            self.logger.error(f"Failed to create table dim_cleaned: {e}")
            return False
        finally:
            if self.db_connection:
                self.db_connection.close()

In [134]:
# if __name__ == "__main__":
#     config_path = './config.yaml'
#     loader = DataLoader(config_path)
#     base_path = './'
#     transformed_file = f"{base_path}transformed/transformed_data.parquet"
#     data = loader.process_data(transformed_file)
    
#     if data is not None:
#         succes = loader.load_to_new_table(data)

#         if succes:
#             loader.logger.info("Data berhasil dimuat ke tabel dim_cleaned.")    
#         else:
#             loader.logger.error("Data tidak berhasil dimuat ke tabel dim_cleaned.")
#     else:
#         loader.logger.error("Data tidak berhasil diproses dari staging area.")

# 🎯 **Step 7: Running the Full ETL Process**
---

In [None]:
class DataPipeline:
    """
    A class for managing the ETL (Extract, Transform, Load) pipeline.
    
    This class orchestrates the entire ETL process by coordinating the extraction, transformation,
    and loading of data. It uses the `DataExtractor`, `DataTransformer`, and `DataLoader` classes
    to perform each step of the pipeline. The pipeline is configured using a YAML file.

    Methods:
        __init__(config_path: str) -> None:
            Initializes the ETL pipeline with a configuration path.
            
        _load_config() -> dict:
            Loads configuration from the YAML file.

        run() -> bool:
            Executes the entire ETL pipeline, returning True if successful, False otherwise.        
    """

    def __init__(self, config_path: str) -> None:
        """
        Initialize the ETL pipeline with configuration path.

        Parameters:
            config_path (str): Path to the YAML configuration file.

        Returns:
            None
        """
        
        self.config_path = config_path
        self.config = self._load_config()
        self.extractor = DataExtractor(config_path)
        self.transformer = DataTransformer(config_path)
        self.loader = DataLoader(config_path)
        self.logger = self.loader.logger 

    def _load_config(self) -> dict:
        """
        Load the configuration from the YAML file.
        
        Returns:
            dict: Configuration settings loaded from the YAML file.
        Raises:
            Exception: If the configuration file cannot be read or parsed.
        """

        try:
            with open(self.config_path, 'r') as f:
                return yaml.safe_load(f)
        except Exception as e:
            self.loader.logger.error(f"Failed to load configuration: {e}")
            raise

    def run(self) -> bool:
        """
        Execute the entire ETL pipeline.

        Returns:
            bool: True if all steps succeeded, False otherwise.
        """
        
        try:
            # --- EXTRACT ---
            extracted_data = self.extractor.extract_data()
            if extracted_data is None:
                self.logger.error("Extraction failed.")
                return False

            staging_dir = self.config.get('staging_directory', './extracted')
            os.makedirs(staging_dir, exist_ok=True)
            staging_file = os.path.join(staging_dir, 'staging_data.parquet')
            extracted_data.to_parquet(staging_file, index=False)
            self.logger.info(f"Data saved to staging: {staging_file}")

            # --- TRANSFORM ---
            staging_data = self.transformer.process_data(staging_file)
            if staging_data is None:
                self.logger.error("Failed to load staging data.")
                return False

            transformed_data = self.transformer.transform_data(staging_data)
            if transformed_data is None:
                self.logger.error("Transformation failed.")
                return False

            transformed_dir = self.config.get('transformed_directory', './transformed')
            os.makedirs(transformed_dir, exist_ok=True)
            transformed_file = os.path.join(transformed_dir, 'transformed_data.parquet')
            transformed_data.to_parquet(transformed_file, index=False)
            self.logger.info(f"Transformed data saved: {transformed_file}")

            # --- LOAD ---
            loaded_data = self.loader.process_data(transformed_file)
            if loaded_data is None:
                self.logger.error("Failed to load transformed data.")
                return False

            load_success = self.loader.load_to_new_table(loaded_data)
            if not load_success:
                self.logger.error("Database loading failed.")
                return False

            self.logger.info("ETL pipeline completed successfully.")
            return True
        
        except Exception as e:
            self.logger.error(f"Pipeline failed: {str(e)}")
            return False

In [136]:
yaml_content = """
postgresql:
  host: localhost
  database: Assignment10-DWHModelling
  user: postgres
  password:
"""
with open('./config.yaml', 'w') as f:
        f.write(yaml_content)

In [137]:
if __name__ == "__main__":
    config_path = './config.yaml'
    pipeline = DataPipeline(config_path)
    
    if pipeline.run():
        pipeline.logger.info("All ETL steps completed successfully.")
    else:
        pipeline.logger.error("ETL pipeline encountered errors.")

2025-05-13 13:02:24,969 - __main__ - INFO - Database connection successfully opened.
2025-05-13 13:02:25,015 - __main__ - INFO - Data successfully extracted from data warehouse.
2025-05-13 13:02:25,016 - __main__ - INFO - Database connection closed.
2025-05-13 13:02:25,022 - __main__ - INFO - Data saved to staging: ./extracted\staging_data.parquet
2025-05-13 13:02:25,033 - __main__ - INFO - Data was successfully read from the staging area.
2025-05-13 13:02:25,036 - __main__ - INFO - Dataset 'bbc_df' has no duplicates.
2025-05-13 13:02:25,039 - __main__ - INFO - Dataset 'bbc_df' has no null values.
2025-05-13 13:02:25,048 - __main__ - INFO - Transformed data saved: ./transformed\transformed_data.parquet
2025-05-13 13:02:25,068 - __main__ - INFO - Data was successfully read from the transformed area.
2025-05-13 13:02:25,093 - __main__ - INFO - Database connection successfully opened.
2025-05-13 13:02:25,097 - __main__ - INFO - Table dim_cleaned was successfully created or already exists.