In [1]:
import os

In [2]:
os.chdir("../")

In [3]:
%pwd

'/Users/macbookpro/Documents/Documents - Macbook’s MacBook Pro/career/career_chief_rep'

# Configure config.yaml which holds information about where our data will be stored

In [None]:
# Configuration related to data transformation
data_transformation:
  # Directory where data transformation results and artifacts are stored
  root_dir: artifacts/data_transformation
  
  # Path to the ingested data file that will be used for validation
  data_source_file: artifacts/data_ingestion/gsearch_jobs.csv

  # Path to data validation status
  data_validation: artifacts/data_validation/status.txt

# Setup Entity

In [4]:
from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)
class DataTransformationConfig:
    """
    Configuration for the data transformation process.
    
    This configuration class captures the necessary paths and directories 
    required for the transformation of data post-ingestion and pre-model training.
    
    Attributes:
    - root_dir: Directory where data transformation results and artifacts are stored.
    - data_source_file: Path to the file where the ingested data is stored that needs to be transformed.
    """
    
    root_dir: Path  # Directory for storing transformation results and related artifacts
    data_source_file: Path  # Path to the ingested data file for transformation
    data_validation: Path # Path to the validated output file

# Configure the Configuration Manager

In [5]:
from src.career_chief.constants import *
from src.career_chief.utils.common import read_yaml, create_directories
from src.career_chief import logger
from src.career_chief.entity.config_entity import (DataIngestionConfig, DataValidationConfig)

class ConfigurationManager:
    """
    ConfigurationManager manages configurations needed for the data pipeline.

    The class reads configuration, parameter, and schema settings from specified files
    and provides a set of methods to access these settings. It also takes care of
    creating necessary directories defined in the configurations.

    Attributes:
    - config (dict): Configuration settings.
    - params (dict): Parameters for the pipeline.
    - schema (dict): Schema information.
    """
    
    def __init__(self, 
                 config_filepath = CONFIG_FILE_PATH, 
                 params_filepath = PARAMS_FILE_PATH, 
                 schema_filepath = SCHEMA_FILE_PATH) -> None:
        """
        Initialize ConfigurationManager with configurations, parameters, and schema.

        Args:
        - config_filepath (Path): Path to the configuration file.
        - params_filepath (Path): Path to the parameters file.
        - schema_filepath (Path): Path to the schema file.

        Creates:
        - Directories specified in the configuration.
        """
        self.config = self._read_config_file(config_filepath, "config")
        self.params = self._read_config_file(params_filepath, "params")
        self.schema = self._read_config_file(schema_filepath, "schema")

        # Create the directory for storing artifacts if it doesn't exist
        create_directories([self.config.artifacts_root])

    def _read_config_file(self, filepath: str, config_name: str) -> dict:
        """
        Read a configuration file and return its content.

        Args:
        - filepath (str): Path to the configuration file.
        - config_name (str): Name of the configuration (for logging purposes).

        Returns:
        - dict: Configuration settings.

        Raises:
        - Exception: If there's an error reading the file.
        """
        try:
            return read_yaml(filepath)
        except Exception as e:
            logger.error(f"Error reading {config_name} file: {filepath}. Error: {e}")
            raise

    
    def get_data_transformation_config(self) -> DataTransformationConfig:
        """
        Extract and return data transformation configurations as a DataTransformationConfig object.

        This method fetches settings related to data transformation, like directories and file paths,
        and returns them as a DataTransformationConfig object.

        Returns:
        - DataTransformationConfig: Object containing data transformation configuration settings.

        Raises:
        - AttributeError: If the 'data_transformation' attribute does not exist in the config file.
        """
        try:
            config = self.config.data_transformation
            
            # Ensure the root directory for data transformation exists
            create_directories([config.root_dir])

            # Construct and return the DataTransformationConfig object
            return DataTransformationConfig(
                root_dir=Path(config.root_dir),
                data_source_file=Path(config.data_source_file),
                data_validation=Path(config.data_validation),
            )

        except AttributeError as e:
            # Log the error and re-raise the exception for handling by the caller
            logger.error("The 'data_transformation' attribute does not exist in the config file.")
            raise e

# Build Component

In [6]:
'''
Checklist

1. Check data validation status
2. Read data
3. Remove noise specific to technical resumes such as code snippets and special characters, and emojis
4. Normalize by standardizing technical terminologies and acronyms
5. Remove stop words and irrelevant words such as "@" in twitter mentions, or urls, pronouns, prepositions and conjunctions, etc
6. Lemmatization, here we want the same token for different word forms, e.g. wolves and wolf, or talks and talk
7. Stemming, here we remove and replace suffixes to get the root form of the word
8. Tokenize the cleaned and normalized text to prepare it for further NLP and ML processing. tokenizer = AutoTokenizer.from_pretrained("dslim/bert-large-NER ")
'''

import pandas as pd
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer, PorterStemmer
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
from sklearn.model_selection import train_test_split
from datasets import Dataset
from tqdm.auto import tqdm  # Optional, for progress bar support

from src.career_chief import logger
from src.career_chief.entity.config_entity import DataTransformationConfig

  from .autonotebook import tqdm as notebook_tqdm


[2024-03-06 06:13:37,921: 58: datasets: INFO: config:  PyTorch version 2.1.2 available.]


In [19]:
class DataTransformation:
    """
    Processes technical resume data for NLP tasks, performing cleaning, normalization,
    and preparation steps such as stop word removal, lemmatization, stemming, and tokenization.
    """
    
    def __init__(self, config: DataTransformationConfig):
        self.config = config
        self.df = self._load_data()
        self._download_nltk_resources()
        self.stop_words = self._initialize_stop_words()
        self._handle_missing_values()
        self.nlp_pipeline = self._initialize_nlp_pipeline()

    def _download_nltk_resources(self):
        """Download necessary NLTK resources."""
        nltk.download('stopwords')
        nltk.download('wordnet')
        nltk.download('omw-1.4')
        logger.info("NLTK resources downloaded successfully.")

    def _initialize_stop_words(self):
        """Initialize the list of stop words from NLTK."""
        logger.info("Stop words initialized.")
        return set(stopwords.words('english'))

    def _load_data(self):
        """Read data from the configured source."""
        try:
            return pd.read_csv(self.config.data_source_file)
        except Exception as e:
            logger.error(f"Failed to load data: {e}")
            raise

    def _handle_missing_values(self):
        """Remove records with missing values in critical columns only."""
        critical_columns = ['description']  # List critical columns here
        initial_shape = self.df.shape
        self.df.dropna(subset=critical_columns, inplace=True)
        final_shape = self.df.shape
        logger.info(f"Missing values handled in critical columns. Rows before: {initial_shape[0]}, after: {final_shape[0]}.")
        print(self.df)


    def remove_noise(self, df):
        """Remove noise such as code snippets, special characters, and emojis."""
        # Example regex to remove special characters and emojis
        df['cleaned_text'] = df['description'].apply(lambda x: re.sub(r'[^a-zA-Z\s]', '', str(x)))
        logger.info("Noise removed from the text.")
        return df

    def normalize_technical_terms(self, df):
        """Normalize technical terminologies and acronyms."""
        normalization_dict = {
            'AI': 'Artificial Intelligence',
            'ML': 'Machine Learning',
            # Add more terms and their normalized forms here
        }
        def normalize_text(text):
            for term, normal_form in normalization_dict.items():
                text = re.sub(r'\b{}\b'.format(term), normal_form, text, flags=re.IGNORECASE)
            return text
        df['cleaned_text'] = df['cleaned_text'].apply(normalize_text)
        logger.info("Technical terminologies and acronyms normalized.")
        return df

    def remove_stop_words(self, df):
        """Remove stop words and irrelevant words."""
        df['cleaned_text'] = df['cleaned_text'].apply(lambda x: ' '.join([word for word in str(x).split() if word.lower() not in self.stop_words]))
        logger.info("Stop words removed.")
        return df
    
    def lemmatize_text(self, df):
        """Lemmatize the text to get base forms of words."""
        lemmatizer = WordNetLemmatizer()
        df['cleaned_text'] = df['cleaned_text'].apply(lambda x: ' '.join([lemmatizer.lemmatize(word) for word in str(x).split()]))
        logger.info("Text lemmatized.")
        return df


    def stem_text(self, df):
        """Stem the text to get root forms of words."""
        stemmer = PorterStemmer()
        df['cleaned_text'] = df['cleaned_text'].apply(lambda x: ' '.join([stemmer.stem(word) for word in str(x).split()]))
        logger.info("Text stemmed.")
        return df
    
    def tokenize_text(self, df):
        """Tokenize the cleaned and normalized text."""
        # The tokenizer expects a list of texts, hence using `.tolist()`
        tokenizer = AutoTokenizer.from_pretrained("dslim/bert-large-NER")
        df['tokens'] = df['cleaned_text'].apply(lambda x: tokenizer.tokenize(str(x)))
        logger.info("Text tokenized.")
        return df

    def _initialize_nlp_pipeline(self):
        """Initialize the NLP pipeline for Named Entity Recognition (NER)."""
        tokenizer = AutoTokenizer.from_pretrained("dslim/bert-large-NER")
        model = AutoModelForTokenClassification.from_pretrained("dslim/bert-large-NER")
        logger.info("NER pipeline initialized.")
        return pipeline("ner", model=model, tokenizer=tokenizer)


    def apply_ner(self, df):
        """
        Applies Named Entity Recognition (NER) to the 'cleaned_text' column in batches,
        with improved error handling and efficiency.
        """
        batch_size = 100  # Adjust batch size based on memory and performance
        total_batches = (len(df) + batch_size - 1) // batch_size  # Calculate total number of batches

        # Initialize an empty list to store NER results
        ner_results = []

        # Process DataFrame in batches
        for batch_num in tqdm(range(total_batches), desc="Applying NER"):
            start_idx = batch_num * batch_size
            end_idx = min((batch_num + 1) * batch_size, len(df))
            batch_texts = df.iloc[start_idx:end_idx]['cleaned_text'].tolist()

            try:
                # Apply NER pipeline to the entire list of texts in the batch
                batch_ner_results = [self.nlp_pipeline(text) for text in batch_texts]
                ner_results.extend(batch_ner_results)
            except Exception as e:
                logger.error(f"NER application failed for batch {batch_num} (indices {start_idx}-{end_idx}): {e}")
                # Append None or a specific error indicator for each item in the failed batch
                ner_results.extend([None] * (end_idx - start_idx))

        # Assign the NER results back to the DataFrame
        df['ner_results'] = ner_results
        logger.info("Named Entity Recognition applied in batches.")
        return df

    
    def transform_data(self, df):
        """
        Applies all preprocessing steps to a given DataFrame.

        Args:
            df (pd.DataFrame): The DataFrame to be transformed.

        Returns:
            Dataset: A Dataset object containing the transformed data.
        """
        df = self.remove_noise(df)
        df = self.normalize_technical_terms(df)
        df = self.remove_stop_words(df)
        df = self.lemmatize_text(df)
        df = self.stem_text(df)
        df = self.tokenize_text(df)
        df = self.apply_ner(df)
        return Dataset.from_pandas(df)
    
    def split_data(self, test_size=0.2, val_size=0.1, random_state=None):
        """
        Splits the DataFrame into training, testing, and validation sets.

        Args:
            test_size (float): Fraction of the dataset to be used as test set.
            val_size (float): Fraction of the dataset to be used as validation set.
            random_state (int): Seed for random splitting for reproducibility.

        Returns:
            tuple: Three DataFrames for training, testing, and validation sets.
        """
        train_val_df, test_df = train_test_split(self.df, test_size=test_size, random_state=random_state)
        adjusted_val_size = val_size / (1 - test_size)
        train_df, val_df = train_test_split(train_val_df, test_size=adjusted_val_size, random_state=random_state)
        return train_df, test_df, val_df
    

    def transform_train_test_val_data(self):
        """
        Applies all preprocessing steps to a given DataFrame.

        Args:
            df (pd.DataFrame): The DataFrame to be transformed.

        Returns:
            Dataset: A Dataset object containing the transformed data.
        """
        train_df, test_df, val_df = self.split_data()
        train_ds = self.transform_data(train_df)
        test_ds = self.transform_data(test_df)
        val_ds = self.transform_data(val_df)
        return train_ds, test_ds, val_ds


# Build Pipeline

In [20]:
from src.career_chief.entity.config_entity import DataTransformationConfig
from datasets import Dataset

class DataTransformationPipeline:
    STAGE_NAME = "Data Transformation Pipeline"

    def __init__(self):
        """
        Initializes the DataTransformationPipeline with necessary configurations
        obtained from the ConfigurationManager.
        """
        self.config_manager = ConfigurationManager()
        transformation_config = self.config_manager.get_data_transformation_config()
        self.data_transformation = DataTransformation(transformation_config)

    def run_pipeline(self):
        """
        Executes the data transformation process.
        """
        logger.info(f"{self.STAGE_NAME}: Starting data transformation process.")

        # Load and preprocess the data
        df = self.data_transformation._load_data()
        df = self.data_transformation._handle_missing_values()
        df = self.data_transformation.remove_noise(df)
        df = self.data_transformation.normalize_technical_terms(df)
        df = self.data_transformation.remove_stop_words(df)
        df = self.data_transformation.lemmatize_text(df)
        df = self.data_transformation.stem_text(df)
        df = self.data_transformation.tokenize_text(df)
        df = self.data_transformation.apply_ner(df)

        # Optionally, split the data and transform into Dataset objects
        train_df, test_df, val_df = self.data_transformation.split_data()
        train_ds = Dataset.from_pandas(train_df)
        test_ds = Dataset.from_pandas(test_df)
        val_ds = Dataset.from_pandas(val_df)

        # logger.info(f"{self.STAGE_NAME}: Data transformation process completed.")

        return train_ds, test_ds, val_ds

    def transform_new_data(self, file_path: str):
        """
        Applies the data transformation process to a new dataset.

        Args:
            file_path (str): The file path to the new dataset.
        
        Returns:
            Dataset: A Dataset object containing the transformed new data.
        """
        # Adjust the configuration temporarily for the new dataset
        original_file_path = self.data_transformation.config.data_source_file
        self.data_transformation.config.data_source_file = file_path

        logger.info(f"{self.STAGE_NAME}: Transforming new dataset from {file_path}.")
        new_df = self.data_transformation._load_data()
        new_df = self.data_transformation._handle_missing_values()
        # Apply all transformation steps as in run_pipeline
        new_df = self.data_transformation.remove_noise(new_df)
        # Continue applying all other steps...
        new_df = self.data_transformation.apply_ner(new_df)
        transformed_new_ds = Dataset.from_pandas(new_df)

        # Restore the original data source file path
        self.data_transformation.config.data_source_file = original_file_path

        return transformed_new_ds
    
if __name__ == '__main__':
    # Initialize the pipeline
    pipeline = DataTransformationPipeline()

    # Execute the pipeline
    train_ds, test_ds, val_ds = pipeline.run_pipeline()

    # Here you can interact with the returned datasets
    # For demonstration, we'll simply print the sizes of these datasets
    print(f"Training dataset size: {len(train_ds)}")
    print(f"Testing dataset size: {len(test_ds)}")
    print(f"Validation dataset size: {len(val_ds)}")


[2024-03-06 06:38:26,883: 41: career_chief_logger: INFO: common:  yaml file: config/config.yaml loaded successfully]
[2024-03-06 06:38:26,887: 41: career_chief_logger: INFO: common:  yaml file: params.yaml loaded successfully]
[2024-03-06 06:38:26,890: 41: career_chief_logger: INFO: common:  yaml file: schema.yaml loaded successfully]
[2024-03-06 06:38:26,890: 64: career_chief_logger: INFO: common:  Created directory at: artifacts]
[2024-03-06 06:38:26,891: 64: career_chief_logger: INFO: common:  Created directory at: artifacts/data_transformation]
[2024-03-06 06:38:30,052: 20: career_chief_logger: INFO: 3758990984:  NLTK resources downloaded successfully.]
[2024-03-06 06:38:30,053: 24: career_chief_logger: INFO: 3758990984:  Stop words initialized.]


[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/macbookpro/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/macbookpro/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     /Users/macbookpro/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


[2024-03-06 06:38:30,272: 41: career_chief_logger: INFO: 3758990984:  Missing values handled in critical columns. Rows before: 37962, after: 37962.]
       Unnamed: 0  index                                              title  \
0               0      0                                       Data Analyst   
1               1      1                                       Data Analyst   
2               2      2                          Aeronautical Data Analyst   
3               3      3   Data Analyst - Consumer Goods - Contract to Hire   
4               4      4                Data Analyst | Workforce Management   
...           ...    ...                                                ...   
37957       37957    600                     Marketing Data & BI Analyst II   
37958       37958    601                                  Lead-Data Analyst   
37959       37959    602                                  Lead-Data Analyst   
37960       37960    603                                  Lea

Some weights of the model checkpoint at dslim/bert-large-NER were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
