In [21]:
import logging
import time


In [27]:
class PipelineLogging:
    def __init__(self, pipeline_name: str, log_folder_path: str):
        self.pipeline_name = pipeline_name
        self.log_folder_path = log_folder_path
        logger = logging.getLogger(pipeline_name)
        logger.setLevel(logging.INFO)
        self.file_path = (
            f"{self.log_folder_path}/{self.pipeline_name}_{time.time()}.log"
        )
        file_handler = logging.FileHandler(self.file_path)
        file_handler.setLevel(logging.INFO)
        stream_handler = logging.StreamHandler()
        stream_handler.setLevel(logging.INFO)
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )
        file_handler.setFormatter(formatter)
        stream_handler.setFormatter(formatter)
        logger.addHandler(file_handler)
        logger.addHandler(stream_handler)
        self.logger = logger

    def get_logs(self) -> str:
        with open(self.file_path, "r") as file:
            return "".join(file.readlines())

In [35]:
pipeline_logging = PipelineLogging(pipeline_name="Chicago ETL", log_folder_path="etl_project/logs")

In [36]:
pipeline_logging.logger.info("Perform extract and load")

2024-01-22 11:54:07,765 - Chicago ETL - INFO - Perform extract and load
2024-01-22 11:54:07,765 - Chicago ETL - INFO - Perform extract and load


In [30]:
pipeline_logging.logger.info("Connecting to pgAdmin")

2024-01-22 11:36:10,573 - Chicago ETL - INFO - Connecting to pgAdmin


In [32]:
a = 65

In [33]:
pipeline_logging.logger.info(f"Connecting to pgAdmin - {a}")

2024-01-22 11:44:07,639 - Chicago Crime ETL - INFO - Connecting to pgAdmin - 65
2024-01-22 11:44:07,639 - Chicago Crime ETL - INFO - Connecting to pgAdmin - 65


In [37]:

pipeline_logging.logger.handlers.clear()

In [None]:
class MetaDataLogging:
    def __init__(self,pipeline_name: str, postgresql_client: PostgreSqlClient,
        config: dict = {},
        log_table_name: str = "pipeline_logs",
    ):
        self.pipeline_name = pipeline_name
        self.log_table_name = log_table_name
        self.postgresql_client = postgresql_client
        self.config = config
        self.metadata = MetaData()
        self.table = Table(
            self.log_table_name,
            self.metadata,
            Column("pipeline_name", String, primary_key=True),
            Column("run_id", Integer, primary_key=True),
            Column("timestamp", String, primary_key=True),
            Column("status", String, primary_key=True),
            Column("config", JSON),
            Column("logs", String),
        )
        self.run_id: int = self._get_run_id()

    def _get_run_id(self):
        """Gets the next run id. Sets run id to 1 if no run id exists."""
        self.postgresql_client.create_all_tables(metadata=self.metadata) # Create log table if it does not exist
        self._create_log_table()
        run_id = self.postgresql_client.engine.execute(
            select(func.max(self.table.c.run_id)).where(
                self.table.c.pipeline_name == self.pipeline_name
            )
        ).first()[0]
        if run_id is None:
            return 1
        else:
            return run_id + 1

    def log(self, status:str, timestamp:datetime, logs:str) -> None:
        """Writes pipeline metadata log to a database"""
        if timestamp is None:
            timestamp = datetime.now()
        insert_statement = insert(self.table).values(
            pipeline_name=self.pipeline_name,
            timestamp=timestamp,
            run_id=self.run_id,
            status=status,
            config=self.config,
            logs=logs,
        )
        self.postgresql_client.engine.execute(insert_statement)
