In [1]:
import os

In [2]:
os.chdir("../")
%pwd

'/Users/macbookpro/Documents/predict_publications/publications_prediction'

# 1. Setup Config.yaml

In [None]:
# Configuration related to data transformation
data_transformation:
  # Directory where data transformation results and artifacts are stored
  root_dir: artifacts/data_transformation
  
  # Path to the ingested data file that will be used for validation
  data_source_file: artifacts/data_ingestion/train_data.csv

  # Path to data validation status
  data_validation: artifacts/initial_data_validation/status.txt

# 2. Update Schema (Not required in this stage)

# 3. Setup Params (Not Required at this stage)

# 4. Setup Entity

In [13]:
from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)
class DataTransformationConfig:
    """
    Configuration for the data transformation process.
    
    This configuration class captures the necessary paths and directories 
    required for the transformation of data post-ingestion and pre-model training.
    
    Attributes:
    - root_dir: Directory where data transformation results and artifacts are stored.
    - data_source_file: Path to the file where the ingested data is stored that needs to be transformed.
    """
    
    root_dir: Path  # Directory for storing transformation results and related artifacts
    data_source_file: Path  # Path to the ingested data file for transformation
    data_validation: Path # Path to the validated output file


# 5. Setup Configuration Manager

In [14]:
from predicting_publications.constants import *
from predicting_publications.utils.common import read_yaml, create_directories
from predicting_publications import logger
from predicting_publications.entity.config_entity import (DataIngestionConfig, 
                                                          DataValidationConfig)

class ConfigurationManager:
    """
    ConfigurationManager manages configurations needed for the data pipeline.

    The class reads configuration, parameter, and schema settings from specified files
    and provides a set of methods to access these settings. It also takes care of
    creating necessary directories defined in the configurations.

    Attributes:
    - config (dict): Configuration settings.
    - params (dict): Parameters for the pipeline.
    - schema (dict): Schema information.
    """
    
    def __init__(self, 
                 config_filepath = CONFIG_FILE_PATH, 
                 params_filepath = PARAMS_FILE_PATH, 
                 schema_filepath = SCHEMA_FILE_PATH,
                 feature_schema_filepath = FEATURE_SCHEMA_FILE_PATH) -> None:
        """
        Initialize ConfigurationManager with configurations, parameters, and schema.

        Args:
        - config_filepath (Path): Path to the configuration file.
        - params_filepath (Path): Path to the parameters file.
        - schema_filepath (Path): Path to the schema file.

        Creates:
        - Directories specified in the configuration.
        """
        self.config = self._read_config_file(config_filepath, "config")
        self.params = self._read_config_file(params_filepath, "params")
        self.schema = self._read_config_file(schema_filepath, "initial_schema")
        self.feature_schema_filepath = self._read_config_file(feature_schema_filepath, "feature_engineered_schema")

        # Create the directory for storing artifacts if it doesn't exist
        create_directories([self.config.artifacts_root])

    def _read_config_file(self, filepath: str, config_name: str) -> dict:
        """
        Read a configuration file and return its content.

        Args:
        - filepath (str): Path to the configuration file.
        - config_name (str): Name of the configuration (for logging purposes).

        Returns:
        - dict: Configuration settings.

        Raises:
        - Exception: If there's an error reading the file.
        """
        try:
            return read_yaml(filepath)
        except Exception as e:
            logger.error(f"Error reading {config_name} file: {filepath}. Error: {e}")
            raise
    

    def get_data_ingestion_config(self) -> DataIngestionConfig:
        """
        Extract and return data ingestion configurations as a DataIngestionConfig object.

        This method fetches settings related to data ingestion, like directories and file paths,
        and returns them as a DataIngestionConfig object.

        Returns:
        - DataIngestionConfig: Object containing data ingestion configuration settings.

        Raises:
        - AttributeError: If the 'data_ingestion' attribute does not exist in the config file.
        """
        try:
            config = self.config.data_ingestion
            # Create the root directory for data ingestion if it doesn't already exist
            create_directories([config.root_dir])
            
            return DataIngestionConfig(
                root_dir=Path(config.root_dir),
                local_data_file=Path(config.local_data_file),
            )

        except AttributeError as e:
            logger.error("The 'data_ingestion' attribute does not exist in the config file.")
            raise e
        

    def get_data_validation_config(self) -> DataValidationConfig:
        """
        Extract and return data validation configurations as a DataValidationConfig object.

        This method fetches settings related to data validation, like directories, file paths,
        and schema, and returns them as a DataValidationConfig object.

        Returns:
        - DataValidationConfig: Object containing data validation configuration settings.

        Raises:
        - AttributeError: If the 'data_validation' attribute does not exist in the config file.
        """
        try:
            # Extract data validation configurations
            config = self.config.data_validation
            
            # Extract schema for data validation
            schema = self.schema.columns
            
            # Ensure the parent directory for the status file exists
            create_directories([os.path.dirname(config.status_file)])

            
            # Construct and return the DataValidationConfig object
            return DataValidationConfig(
                root_dir=Path(config.root_dir),
                data_source_file=Path(config.data_source_file),
                status_file=Path(config.status_file),
                initial_schema=schema
            )

        except AttributeError as e:
            # Log the error and re-raise the exception for handling by the caller
            logger.error("The 'data_validation' attribute does not exist in the config file.")
            raise e


    def get_data_transformation_config(self) -> DataTransformationConfig:
        """
        Extract and return data transformation configurations as a DataTransformationConfig object.

        This method fetches settings related to data transformation, like directories and file paths,
        and returns them as a DataTransformationConfig object.

        Returns:
        - DataTransformationConfig: Object containing data transformation configuration settings.

        Raises:
        - AttributeError: If the 'data_transformation' attribute does not exist in the config file.
        """
        try:
            config = self.config.data_transformation
            
            # Ensure the root directory for data transformation exists
            create_directories([config.root_dir])

            # Construct and return the DataTransformationConfig object
            return DataTransformationConfig(
                root_dir=Path(config.root_dir),
                data_source_file=Path(config.data_source_file),
                data_validation=Path(config.data_validation),
            )

        except AttributeError as e:
            # Log the error and re-raise the exception for handling by the caller
            logger.error("The 'data_transformation' attribute does not exist in the config file.")
            raise e



# 5. Create Component

In [15]:
from predicting_publications import logger
import pandas as pd
from sklearn.model_selection import train_test_split
from pathlib import Path

class DataTransformation:
    """
    Handles the transformation of the ingested dataset, generating temporal features, 
    aggregating the data, and splitting it into training and validation sets.
    """

    def __init__(self, config: DataTransformationConfig):
        """
        Initializes the DataTransformation component by reading the data source file 
        specified in the config.

        Args:
        - config (DataTransformationConfig): Configuration settings for data transformation.

        Attributes:
        - df (pd.DataFrame): The data to be transformed.
        """
        self.config = config
        try:
            self.df = pd.read_csv(self.config.data_source_file)
        except FileNotFoundError:
            logger.error(f"File not found: {self.config.data_source_file}")
            raise

    def generate_temporal_features_and_aggregate(self):
        # Convert the 'timestamp' column to a datetime format if it's not already
        if self.df['timestamp'].dtype != 'datetime64[ns]':
            self.df['timestamp'] = pd.to_datetime(self.df['timestamp'], unit='s')

        """
        Generate temporal features and aggregate the dataset.
        """
        # Generating temporal features
        self.df['hour'] = self.df['timestamp'].dt.hour
        self.df['day'] = self.df['timestamp'].dt.day
        self.df['dayofweek'] = self.df['timestamp'].dt.dayofweek
        self.df['month'] = self.df['timestamp'].dt.month

        # Aggregating data by hour and location
        agg_columns = {
            'likescount': 'mean',
            'commentscount': 'mean',
            'symbols_cnt': 'mean',
            'words_cnt': 'mean',
            'hashtags_cnt': 'mean',
            'mentions_cnt': 'mean',
            'links_cnt': 'mean',
            'emoji_cnt': 'mean'
        }

        logger.info("Grouping data by timestamp, lon, lat, hour, day, day of week, and month")
        self.grouped_data = self.df.groupby(['timestamp', 'lon', 'lat', 'hour', 'day', 'dayofweek', 'month']).agg(agg_columns).reset_index()
        
        logger.info("Setting publication count grouped by timestamp, lon, and lat")
        self.grouped_data['publication_count'] = self.df.groupby(['timestamp', 'lon', 'lat']).size().values

    def split_data_into_train_and_test(self):
        """
        Split the aggregated data into training and test sets.
        """
        # Drop 'timestamp' as it's strongly correlated with other time features and may cause data leakage
        X = self.grouped_data.drop(['publication_count', 'timestamp'], axis=1)
        y = self.grouped_data['publication_count']

        # Split the data into training and validation sets and set them as class attributes
        logger.info("Splitting data into train and test values")
        self.X_train, self.X_val, self.y_train, self.y_val = train_test_split(X, y, test_size=0.2, random_state=42)
        logger.info(f"Training data shape: {self.X_train.shape}, Validation data shape: {self.X_val.shape}")
        print(f"Training data shape: {self.X_train.shape}, Validation data shape: {self.X_val.shape}")

    def _save_datasets(self, train_filename: str, test_filename: str):
        """
        Save the train and test datasets to the output path specified in the configuration.

        Args:
        - train_filename (str): Name of the file to save the training data.
        - test_filename (str): Name of the file to save the test data.
        """
        train_output_path = self.config.root_dir / train_filename
        test_output_path = self.config.root_dir / test_filename
        
        try:
            # Save training data
            train_data = pd.concat([self.X_train, self.y_train], axis=1)
            train_data.to_csv(train_output_path, index=False)
            logger.info(f"Training Data saved successfully to {train_output_path}")

            # Save test data
            test_data = pd.concat([self.X_val, self.y_val], axis=1)
            test_data.to_csv(test_output_path, index=False)
            logger.info(f"Test Data saved successfully to {test_output_path}")

        except Exception as e:
            logger.error(f"Error while saving the datasets: {e}")
            raise

    def orchestrate_transformation(self, train_filename: str = "train_data.csv", test_filename: str = "test_data.csv"):
        """
        Orchestrates the data transformation process by:
        1. Generating temporal features and aggregating the data.
        2. Splitting data into training and test sets.
        3. Saving the training and test datasets.

        Args:
        - train_filename (str): Name of the file to save the training data. Default is "train_data.csv".
        - test_filename (str): Name of the file to save the test data. Default is "test_data.csv".
        """
        self.generate_temporal_features_and_aggregate()
        self.split_data_into_train_and_test()
        self._save_datasets(train_filename, test_filename)

# 5. Pipeline

In [17]:
from predicting_publications import logger
from pathlib import Path

class DataTransformationPipeline:
    """
    This pipeline handles the data transformation steps.

    After the data validation stage, it's essential to transform the data before moving on 
    to model training. This class orchestrates the transformation by generating temporal 
    features, aggregating data, and splitting it into training and test sets.

    Attributes:
        STAGE_NAME (str): The name of this pipeline stage.
    """
    
    STAGE_NAME = "Data Transformation Pipeline"

    def __init__(self):
        """
        Initializes the pipeline with a configuration manager.
        """
        self.config_manager = ConfigurationManager()

    def run_data_transformation(self):
        """
        Run the data transformation steps.
        
        This method orchestrates the data transformation functions.
        """
        try:
            logger.info("Fetching data transformation configuration...")
            data_transformation_config = self.config_manager.get_data_transformation_config()

            logger.info("Initializing data transformation process...")
            data_transformation = DataTransformation(config=data_transformation_config)

            logger.info("Executing data transformation...")
            data_transformation.orchestrate_transformation()

            logger.info("Data Transformation Pipeline completed successfully.")

        except Exception as e:
            logger.error(f"Error encountered during the data transformation: {e}")
    
    def run_pipeline(self):
        """
        Run the entire Data Transformation Pipeline.
        
        This method orchestrates the process of the data transformation and
        provides logs for each stage of the pipeline.
        """
        try:
            with open(self.config_manager.get_data_transformation_config().data_validation, "r") as f:
                content = f.read()

            if "Overall Validation Status: All validations passed." in content:
                logger.info("Starting the Data Transformation Pipeline.")
                logger.info(f">>>>>> Stage: {DataTransformationPipeline.STAGE_NAME} started <<<<<<")
                self.run_data_transformation()
                logger.info(f">>>>>> Stage {DataTransformationPipeline.STAGE_NAME} completed <<<<<< \n\nx==========x")
            else:
                logger.error("Data Transformation Pipeline aborted due to validation errors.")
        except Exception as e:
            logger.error(f"Error encountered during the {DataTransformationPipeline.STAGE_NAME}: {e}")
            raise e

if __name__ == '__main__':
    pipeline = DataTransformationPipeline()
    pipeline.run_pipeline()


[2023-10-16 17:09:02,459: 42: predict_publications_logger: INFO: common:  yaml file: config/config.yaml loaded successfully]
[2023-10-16 17:09:02,461: 42: predict_publications_logger: INFO: common:  yaml file: params.yaml loaded successfully]
[2023-10-16 17:09:02,464: 42: predict_publications_logger: INFO: common:  yaml file: schema.yaml loaded successfully]
[2023-10-16 17:09:02,465: 42: predict_publications_logger: INFO: common:  yaml file: feature_engineered_schema.yaml loaded successfully]
[2023-10-16 17:09:02,467: 65: predict_publications_logger: INFO: common:  Created directory at: artifacts]
[2023-10-16 17:09:02,468: 65: predict_publications_logger: INFO: common:  Created directory at: artifacts/data_transformation]
[2023-10-16 17:09:02,468: 57: predict_publications_logger: INFO: 3209470565:  Starting the Data Transformation Pipeline.]
[2023-10-16 17:09:02,469: 58: predict_publications_logger: INFO: 3209470565:  >>>>>> Stage: Data Transformation Pipeline started <<<<<<]
[2023-10-