In [1]:
import os

In [2]:
%pwd

'c:\\Work\\Tasks\\Repo\\uc_api\\research'

In [3]:
os.chdir("../")

In [4]:
%pwd

'c:\\Work\\Tasks\\Repo\\uc_api'

In [5]:
from dataclasses import dataclass
from pathlib import Path


@dataclass(frozen=True)
class DataIngestionConfig:
    # Common Configurations
    root_dir: Path   
    unzip_dir: Path   
    local_data_file: Path

    # Common Parameters
    data_split_type: list
    data_split_ratio: float   

    # Parameters
    source_URL: Path  
    outsource_file: Path
    feature_column_name: str
    label_column_name: str    

In [6]:
from src.uc_api.constants import *
from src.uc_api.utils.common import read_yaml, create_directories

In [7]:
class ConfigurationManager:
    def __init__(self, 
                 config_filepath=CONFIG_FILE_PATH,
                 params_filepath=PARAMS_FILE_PATH,
        ):

        # Reading yaml files for config and params    
        self.config = read_yaml(config_filepath) 
        self.params = read_yaml(params_filepath)  

        create_directories([self.config.artifacts_root])

    def get_data_ingestion_config(self) -> DataIngestionConfig:
        # Data Ingestion configurations from yaml file
        config = self.config.data_ingestion

        # Common Arguments Parameters from yaml file
        params_common = self.params.CommonArguments

        # Use case specific(Sentiment) parameters from yaml file
        params = self.params.SentimentArguments   

        create_directories([config.root_dir])

        data_ingestion_config = DataIngestionConfig(
            # Data Ingestion configurations
            root_dir=config.root_dir,
            unzip_dir=config.unzip_dir,
            local_data_file=config.local_data_file,

            # Common Arguments Parameters
            data_split_type=params_common.data_split_type,
            data_split_ratio=params_common.data_split_ratio,

            # Use case specific parameters   
            source_URL=config.source_URL,
            outsource_file=config.outsource_file, 
            feature_column_name=params.feature_column_name,
            label_column_name = params.label_column_name,

        )

        return data_ingestion_config

In [8]:
import zipfile
import urllib.request as request
from datasets import *
from src.uc_api.logging import logger
from src.uc_api.utils.common import get_size

[2024-06-10 10:06:50: INFO:config: PyTorch version 2.3.1 available.]


In [9]:
import kaggle

class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config
    
    def download_file(self, source=None):    
        """
        source: Path
        Download data from kaggle in zip format
        Download data from URL in zip format
        Function returns None
        """                   
        if not os.path.exists(self.config.local_data_file):
            # Downlad data from kaggle in zip form
            if source == "kaggle":
                kaggle.api.authenticate()
                kaggle.api.dataset_download_files(self.config.source_URL, self.config.root_dir, unzip=False)
                os.rename(os.path.join(self.config.root_dir, self.config.outsource_file), self.config.local_data_file)
                logger.info(f"Dataset downloaded from {source}")
            # Downlad data from URL in zip form
            else:
                filename, headers = request.urlretrieve(
                    url=self.config.source_URL, filename=self.config.local_data_file
                )
                logger.info(f"{filename} download! with following info: \n{headers}")
        else:
            logger.info(
                f"File already exists of size: {get_size(Path(self.config.local_data_file))}"
            )

    def extract_zip_file(self):
        """
        zip_file_path: str
        Extracts the zip file into the data directory
        Function returns None
        """
        unzip_path = self.config.unzip_dir
        os.makedirs(unzip_path, exist_ok=True)
        with zipfile.ZipFile(self.config.local_data_file, "r") as zip_ref:
            zip_ref.extractall(unzip_path)

    def convert_csv_to_train_and_test_datasets(self):
        """
        zip_file_path: str
        Extracts the zip file into the data directory
        Function returns None
        """
        try:
            suffix = "csv"
            filenames = os.listdir(self.config.unzip_dir)
            selected_file = [
                filename for filename in filenames if filename.endswith(suffix) and filename.__contains__("train")
            ][0]

            # Splitting the dataset in train and test dataset
            train_test_dataset = load_dataset(
                suffix, 
                data_files=os.path.join(self.config.unzip_dir, selected_file),
                encoding='unicode_escape',                
            )["train"].train_test_split(test_size=self.config.data_split_ratio, seed=42)

            # Splitting the dataset in test and validation dataset
            test_valid_dataset = train_test_dataset["test"].train_test_split(test_size=0.5)

            # Dataset split dictionary
            dataset = DatasetDict({
                "train": train_test_dataset["train"],
                "validation": test_valid_dataset["train"],
                "test": test_valid_dataset["test"],
            })

            # Filteration of the columns based on the dataset split dictionary
            columns_to_keep = [self.config.feature_column_name, self.config.label_column_name]            
            def filter_columns(example):
                return {key: example[key] for key in columns_to_keep}
            
            # remove unncessary columns from each Data split
            for split_type in dataset.keys():
                dataset[split_type] = dataset[split_type].map(filter_columns, remove_columns=dataset[split_type].column_names)
            # Save the dataloader
            dataset.save_to_disk(os.path.join(self.config.root_dir, "processed"))
            print(dataset)
            logger.info("Successfully created arrow dataset")
        except Exception as e:
            logger.info("Error creating arrow dataset")
            raise e

In [10]:
try:
    config = ConfigurationManager()
    data_ingestion_config = config.get_data_ingestion_config()
    data_ingestion = DataIngestion(config=data_ingestion_config)
    data_ingestion.download_file(source="kaggle")
    data_ingestion.extract_zip_file()
    data_ingestion.convert_csv_to_train_and_test_datasets()

except Exception as e:
    raise e

[2024-06-10 10:06:51: INFO:common: yaml file: config\config.yaml loaded successfully]
[2024-06-10 10:06:51: INFO:common: yaml file: params.yaml loaded successfully]
[2024-06-10 10:06:51: INFO:common: created directory at: artifacts]
[2024-06-10 10:06:51: INFO:common: created directory at: artifacts/data_ingestion]
Dataset URL: https://www.kaggle.com/datasets/abhi8923shriv/sentiment-analysis-dataset
[2024-06-10 10:06:59: INFO:2524217308: Dataset downloaded from kaggle]


Generating train split: 0 examples [00:00, ? examples/s]

Map:   0%|          | 0/21984 [00:00<?, ? examples/s]

Map:   0%|          | 0/2748 [00:00<?, ? examples/s]

Map:   0%|          | 0/2749 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/21984 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/2748 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/2749 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['text', 'sentiment'],
        num_rows: 21984
    })
    validation: Dataset({
        features: ['text', 'sentiment'],
        num_rows: 2748
    })
    test: Dataset({
        features: ['text', 'sentiment'],
        num_rows: 2749
    })
})
[2024-06-10 10:07:06: INFO:2524217308: Successfully created arrow dataset]
