In [1]:
import os

In [2]:
%pwd

'd:\\recommendation-engine\\research'

In [3]:
os.chdir("../")

In [4]:
%pwd

'd:\\recommendation-engine'

In [5]:
from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)
class DataIngestionConfig:
    root_dir: Path
    source: str
    local_data_file: Path
    unzip_dir: Path

In [6]:
from src.hybrid_recommender.constants import *
from src.hybrid_recommender.utils.common import read_yaml, create_directories


In [7]:
class ConfigurationManager:
    def __init__(self):
        config_filepath = CONFIG_FILE_PATH

        self.config = read_yaml(config_filepath)

        create_directories([self.config.artifacts_root])

    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config.data_ingestion

        create_directories([config.root_dir])

        data_ingestion_config = DataIngestionConfig(
            root_dir=config.root_dir,
            source=config.source,
            local_data_file=config.local_data_file,
            unzip_dir=config.unzip_dir 
        )

        return data_ingestion_config


In [8]:
import os
import urllib.request as request
import zipfile
from src.hybrid_recommender import logger
from src.hybrid_recommender.utils.common import get_size
from kaggle.api.kaggle_api_extended import KaggleApi
import pandas as pd


In [9]:
class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config
        self.api = KaggleApi()
        self.api.authenticate()

    def download_file(self):
        if not os.path.exists(self.config.local_data_file):
            logger.info(f"Downloading dataset from Kaggle: {self.config.source}")
            self.api.dataset_download_files(
                self.config.source,
                path=self.config.root_dir,
                unzip=False
            )
            temp_zip = os.path.join(self.config.root_dir, f"{self.config.source.split('/')[-1]}.zip")
            if os.path.exists(temp_zip):
                os.rename(temp_zip, self.config.local_data_file)
            
            logger.info(f"Downloaded dataset to: {self.config.local_data_file}")
        else:
            logger.info(f"File already exists of size: {get_size(Path(self.config.local_data_file))}")



    def extract_zip_file(self):
        """
        zip_file_path: str
        Extracts the zip file into the data directory
        Function returns None
        """
        unzip_path = self.config.unzip_dir
        os.makedirs(unzip_path, exist_ok=True)
        with zipfile.ZipFile(self.config.local_data_file, 'r') as zip_ref:
            zip_ref.extractall(unzip_path)
        logger.info(f"Extracted data to: {unzip_path}")
  
    def merge_data(self) -> pd.DataFrame:
        """
        Merges data from Books, Ratings, and Users CSV files into a single DataFrame
        with proper type handling and data cleaning
        
        Returns:
            pd.DataFrame: Cleaned and merged dataset
        """
        try:
            
            books_columns = [
                'ISBN', 'Book-Title', 'Book-Author', 
                'Year-Of-Publication', 'Image-URL-S', 
                'Image-URL-M', 'Image-URL-L'
            ]
            
            # Define dtype specifications for each file
            books_dtypes = {
                'ISBN': 'str',
                'Book-Title': 'str',
                'Book-Author': 'str',
                'Year-Of-Publication': 'Int32',  # Nullable integer type
                'Image-URL-S': 'str',
                'Image-URL-M': 'str',
                'Image-URL-L': 'str'
            }
            
            ratings_dtypes = {
                'User-ID': 'Int32',
                'ISBN': 'str',
                'Book-Rating': 'Int8'  # Ratings are typically small integers
            }
            
            users_dtypes = {
                'User-ID': 'Int32',
                'Location': 'str',
                'Age': 'Int8'  # Age can be nullable
            }
            
            # Read files with specified dtypes
            books_path = os.path.join(self.config.unzip_dir, "Books.csv")
            ratings_path = os.path.join(self.config.unzip_dir, "Ratings.csv")
            users_path = os.path.join(self.config.unzip_dir, "Users.csv")
            
            logger.info("Reading data files with specified dtypes...")
            books_df = pd.read_csv(
                books_path,
                usecols=books_columns,
                dtype=books_dtypes,
                encoding='latin1',
                on_bad_lines='warn'
            ).drop(columns=['Publisher'], errors='ignore')
            ratings_df = pd.read_csv(
                ratings_path,
                dtype=ratings_dtypes,
                encoding='latin1'
            )
            users_df = pd.read_csv(
                users_path,
                dtype=users_dtypes,
                encoding='latin1'
            )
            
            # Data cleaning
            logger.info("Cleaning data...")
            
            # Clean Year-Of-Publication (handle invalid years)
            current_year = pd.Timestamp.now().year
            books_df['Year-Of-Publication'] = books_df['Year-Of-Publication'].clip(
                lower=1800,  # Reasonable minimum for books
                upper=current_year
            )
            
            # Clean Age (handle invalid ages)
            users_df['Age'] = users_df['Age'].clip(
                lower=5,     # Reasonable minimum age
                upper=120    # Reasonable maximum age
            )
            
            # Clean ISBNs (remove any whitespace)
            books_df['ISBN'] = books_df['ISBN'].str.strip()
            ratings_df['ISBN'] = ratings_df['ISBN'].str.strip()
            
            # Merge data
            logger.info("Merging datasets...")
            book_ratings = pd.merge(
                ratings_df,
                books_df,
                on='ISBN',
                how='left',
                validate='many_to_one'
            )
            
            final_df = pd.merge(
                book_ratings,
                users_df,
                on='User-ID',
                how='left',
                validate='many_to_one'
            )
            
            # Additional cleaning after merge
            final_df = final_df.dropna(subset=['Book-Title', 'Book-Rating'], how='all')
            
            logger.info(f"Final merged dataset shape: {final_df.shape}")
            
            # Save merged data
            merged_path = os.path.join(self.config.root_dir, "merged_data.csv")
            final_df.to_csv(merged_path, index=False)
            logger.info(f"Saved cleaned merged data to {merged_path}")
            
            return final_df
            
        except Exception as e:
            logger.error(f"Error in merge_data: {str(e)}", exc_info=True)
            raise e

In [10]:
try:
    config = ConfigurationManager()
    data_ingestion_config = config.get_data_ingestion_config()
    data_ingestion = DataIngestion(config=data_ingestion_config)
    data_ingestion.download_file()
    data_ingestion.extract_zip_file()
    data_ingestion.merge_data()
except Exception as e:
    raise e

[2025-06-16 16:14:25,467: INFO: common: yaml file: config\config.yaml loaded successfully]
[2025-06-16 16:14:25,467: INFO: common: created directory at: artifacts]
[2025-06-16 16:14:25,467: INFO: common: created directory at: artifacts/data_ingestion]
[2025-06-16 16:14:25,467: INFO: 1823720442: Downloading dataset from Kaggle: arashnic/book-recommendation-dataset]
Dataset URL: https://www.kaggle.com/datasets/arashnic/book-recommendation-dataset


ChunkedEncodingError: ('Connection broken: IncompleteRead(10196436 bytes read, 15304352 more expected)', IncompleteRead(10196436 bytes read, 15304352 more expected))