In [1]:
%run /home/jovyan/work/operations/spark_db_connection.ipynb import create_spark_session
%run /home/jovyan/work/operations/db_operations.ipynb import DataOperations
%run /home/jovyan/work/etl/src/load_data.ipynb import LoadData
%run /home/jovyan/work/etl/src/transform_data_types.ipynb import ResultsDataType
%run /home/jovyan/work/etl/src/data_extraction.ipynb import DataExtraction
%run /home/jovyan/work/model/src/transform_data.ipynb import TransformData
%run /home/jovyan/work/model/src/preprocess_data.ipynb import PreProcess

In [2]:
import logging
from typing import Annotated, Tuple, List
from pyspark.sql import SparkSession, DataFrame

In [3]:
class ETL:
    """
    Class for ETL process (extract -> transform -> load)
    """
        
    def extract_data(self, 
                     spark: SparkSession, 
                     path: str = "/home/jovyan/work/dataset/results.csv", 
                     table_name: str = "results"
                     ) -> DataFrame:
        """
        Extracts the data

        Args:
            spark: Active SparkSession
            path: Path to the CSV file storing data
            table_name: Name for the table to store data in database
        Returns:
            DataFrame: Extracted data from database
        """
        try:
            data_extraction = DataExtraction(spark)
            data_extraction.save_to_database(path=path, 
                                             table_name=table_name)
            load_data = LoadData(spark)
            df = load_data.load_from_database(table_name=table_name)
            return df
        except Exception as e:
            logging.error(f"Error while extracting data: {e}")
            raise e
    
    def transform_data(self, df: DataFrame) -> Tuple[
        Annotated[DataFrame, "cleaned_df"],
        Annotated[DataFrame, "train"],
        Annotated[DataFrame, "val"],
        Annotated[DataFrame, "test"]]:
        """
        Transforms the data

        Args:
            df: DataFrame to transformation
        Returns:
            Tuple[
            Annotated[DataFrame, "cleaned_df"],
            Annotated[DataFrame, "train"],
            Annotated[DataFrame, "val"],
            Annotated[DataFrame, "test"]]:
                - cleaned_df: Cleaned and transformed DataFrame
                - train: Training dataset
                - val: Validation dataset
                - test: Test dataset
        """
        try:
            results_dtype = ResultsDataType()
            correct_dtypes = results_dtype.define_dtype(df)

            transform_data = TransformData()
            deleted_empty_fields = transform_data.check_empty_fields(correct_dtypes)
            date_into_years = transform_data.convert_date_into_years(deleted_empty_fields)
            filtered_data = transform_data.filter_data(date_into_years)
            cat_feat, num_feat, targ = transform_data.describe_features_types()
            cleaned_df = transform_data.string_into_numeric(df=filtered_data,
                                                            categorical_features=cat_feat,
                                                            numeric_features=num_feat,
                                                            targets=targ)
            preprocess_data = PreProcess()
            train_df, val_df, test_df = preprocess_data.divide_data(cleaned_df)
            train, val, test = preprocess_data.standardize_data(train_df=train_df, 
                                                                 val_df=val_df, 
                                                                 test_df=test_df,
                                                                 targets=targ)
            
            return cleaned_df, train, val, test
        except Exception as e:
            logging.error(f"Error in data transformation: {e}")
            raise e
        
    def load_data(self, 
                  spark: SparkSession, 
                  df: DataFrame,
                  table_name: str = "cleaned_data"
                  ) -> None:
        """
        Saves cleaned and transformed data into database
        
        Args:
            spark: Active SparkSession
            df: Data to load
            table_name: Name of table to store the data
        Returns:
            table_name: Name of table to store the data
        """
        try:
            db_operations = DataOperations(spark)
            db_operations.save_data(df=df, 
                                    table_name=table_name)
        except Exception as e:
            logging.error(f"Error in data loading: {e}")
            raise e


In [None]:
def etl_pipeline(spark: SparkSession,
                 data_path: str = "/home/jovyan/work/dataset/results.csv", 
                 table_name: str = "results") -> None:
    """
    Executes a pipeline for ETL process

    This pipeline performs the following steps:
    1. **Data Extraction**: Connects to a PostgreSQL database and saves raw data.
    2. **Data Transformation**: Transforms and prepares the data for model training, including defining schemas.
    3. **Data Loading**: Loads the data to the database.

    Args:
        spark: Active SparkSession
        data_path: Path to the CSV file connecting raw data to extract
        table_name: Name of the table in PostgreSQL databse, where the data is stored
    """
    try:
        logging.info("Started ETL process")
        etl = ETL()
        df = etl.extract_data(spark, data_path, table_name)
        cleaned_df, train, val, test = etl.transform_data(df)
        etl.load_data(spark, cleaned_df)
        etl.load_data(spark, train, "train")
        etl.load_data(spark, val, "val")
        etl.load_data(spark, test, "test")
        logging.info("Successfully finished ETL process \n")
    except Exception as e:
        logging.error(f"Error in ETL pipeline: {e}")
        raise e