In [1]:
import os

In [2]:
%pwd

'c:\\Users\\vloke\\Documents\\My_Project\\E-learning_recommender_system\\research'

In [3]:
os.chdir("../")

In [4]:
%pwd

'c:\\Users\\vloke\\Documents\\My_Project\\E-learning_recommender_system'

## 1. Update entity


In [5]:
from dataclasses import dataclass
from pathlib import Path

@dataclass
class DataIngestionConfig:
    root_dir: Path
    extract_data: Path
    raw_data: str
    sql_query:str
    db_host: str
    db_port: str
    db_name: str
    db_user: str
    db_password: str

## 2. Update the Configuration manager in src Config

In [11]:
from E_learning_recommender_system.constants import CONFIG_FILE_PATH
from E_learning_recommender_system.utils.common import read_yaml,create_directories

In [12]:
class ConfigurationManger:
    def __init__(self,config_filepath = CONFIG_FILE_PATH):

        self.config = read_yaml(config_filepath)

        create_directories([self.config.artifacts_root])

    def get_data_ingestion_config(self)->DataIngestionConfig:
        config = self.config.data_ingestion

        create_directories([config.root_dir])
        create_directories([config.extract_data])

        data_ingestion_config = DataIngestionConfig(
            root_dir=config.root_dir,
            extract_data= config.extract_data,
            raw_data= config.raw_data,
            sql_query= config.sql_query,
            db_host = config.db_host,
            db_port = config.db_port ,   
            db_name = config.db_name,
            db_user = config.db_user,
            db_password = config.db_password
        )       

        return data_ingestion_config

## 3.Update the Components

In [13]:
import os
import pandas as pd
import psycopg2
from E_learning_recommender_system.logging import logging

In [14]:
class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config
    
    def extracted_data_from_sql(self):
        if not os.path.exists(self.config.raw_data):
            # Establish a connection to the PostgreSQL database
            connection = psycopg2.connect(
                host=self.config.db_host,
                port=self.config.db_port,
                database=self.config.db_name,
                user=self.config.db_user,
                password=self.config.db_password
            )

            try:
                # Execute the SQL query and fetch data into a Pandas DataFrame
                data = pd.read_sql_query(self.config.sql_query, connection)
                
                # Define the path to save the extracted data as a CSV file
                output_path = self.config.raw_data

                # Save the data to a CSV file
                data.to_csv(output_path, index=False)

                logging.info(f"Data extracted and saved to {output_path}")
            finally:
                # Close the database connection, regardless of success or failure
                connection.close()
        else:
            logging.info(f"CSV file already exists at {self.config.extract_data}")

## 5. Update the Pipeline

In [15]:
try:
    config = ConfigurationManger()
    data_ingestion_config = config.get_data_ingestion_config()
    data_ingestion = DataIngestion(config=data_ingestion_config)
    data_ingestion.extracted_data_from_sql()
except Exception as e:
    raise e