In [18]:
import os
import sys
import pandas as pd
import configparser
import logging

from typing import Tuple
from pathlib import Path
from sklearn.model_selection import train_test_split

In [2]:
IMAGE_DIR = 'images'
RESULTS_DIR = 'results'
DATA_DIR = 'data'

logger = logging.getLogger(__name__)
logging.basicConfig(
    filename = 'test_log.log', 
    filemode = 'w+',
    level=logging.INFO, 
    format='%(asctime)s - %(name)s - %(levelname)s: %(message)s', 
    datefmt='%Y-%m-%d:%H:%M:%S'
)

# config = ConfigParser()

In [29]:
import pandas as pd
import numpy as np

pd.set_option('expand_frame_repr', False)
pd.set_option('display.max_columns', 999)

class Dataset:
    def __init__(self, data_path: str):
        self.data_path = data_path
        self.data = None

    def load_data(self, verbose: bool = False, **kwargs) -> None:
        """
        Loads data from the specified path. 
        """
        try:
            self.data = pd.read_csv(self.data_path, **kwargs)
            dataset_name = self.data_path.split("/")[-1].split(".")[0]
            if verbose:
                logger.info(f'Loading Dataset: {dataset_name}')
            
        except FileNotFoundError:
            print(f'Dataset: {dataset_name} not found in location: {data_path}')
         
        logger.info("Data loaded successfully")
        logger.info(f"Number of Rows: {len(self.data)} | Number of Features: {len(self.data.columns)}")

    def summary_statistics(self, target_col: str, normalize_counts = True) -> None:
        """Provides summary statistics of all columns"""
        if self.data is None or self.data.empty:
            raise ValueError("Data not loaded. Please load the data first using the load_data method") 
        else:
            target_class_dist = (
                self
                .data[target_col]
                .value_counts(normalize=normalize_counts)
                .sort_index()
                .reset_index()
                .style
                .format({'proportion': '{:,.2%}'})
                .to_string()
                
            )
            # return target_class_dist
            summary_df = self.data.describe(
                include='all', 
                percentiles = [0.01, 0.25, 0.5, 0.75, 0.99]
            ).round(2)

            logger.info('Target Column: {} | Class Distribution: {}'.format(target_col, target_class_dist))
            logger.info('Data Summary: {}'.format(summary_df))
            

    def check_missing_values(self):
        """Checks for any missing values in the dataset."""
        has_nulls = self.data.isnull().values.any()
        if not has_nulls:
            logger.warning("Warning Missing Values Detected")

    def check_outliers(self):
        """
        Detects outliers in each column of the DataFrame using the IQR method.
        
        Args:
        data (pd.DataFrame): The input DataFrame.
        
        Returns:
        dict: A dictionary where keys are column names and values are lists of indices of outliers.
        """
        if self.data is not None or not self.data.empty:
            numeric_cols = self.data.select_dtypes(include=[np.number]).columns
            outliers_dict = {}
        
            for col in numeric_cols:
                Q1 = self.data[col].quantile(0.25)
                Q3 = self.data[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                outlier_indices = self.data.index[(self.data[col] < lower_bound) | (self.data[col] > upper_bound)].tolist()
                
                if outlier_indices:
                    outliers_dict[col] = outlier_indices
        
            logger.info(f"Outliers Detected in columns: {list(outliers_dict.keys())}")
            return outliers_dict
        else:
            raise ValueError("Data not loaded. Please load the data first using the load_data method.")

    def cast_datatypes(self, column_type_map):
        """Casts datatypes to the appropriate formats based on the data content"""
        if self.data is not None or not self.data.empty:
            pass
        else:
            raise ValueError("Data not loaded. Please load the data first using the load_data method")


    def create_train_test_split(self, target_col: str, test_size: float, seed: int) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:

        has_nulls = self.data.isnull().values.any()
        if not has_nulls:
            logger.warning("Warning Missing Values Detected")

        X = self.data.loc[:, self.data.columns != target_col]
        y = self.data[target_col]

        X_TRAIN, X_TEST, y_train, y_test = train_test_split(X, y, random_state = seed, test_size = test_size)
        logger.info(f"Train Set Size: {len(X_TRAIN)} | Test Set Size: {len(X_TEST)}")

        return X_TRAIN, X_TEST, y_train, y_test

wine_data = Dataset('data/winequality-white.csv')
wine_data.load_data(delimiter=';')
# wine_data.data
wine_data.summary_statistics('quality')
wine_data.check_missing_values()
outliers_dict = wine_data.check_outliers()

X_TRAIN, X_TEST, y_train, y_test = wine_data.create_train_test_split('quality', 0.3, 42)

In [48]:
pd.read_csv(Path('data', 'winequality-white.csv'))

Unnamed: 0,"fixed acidity;""volatile acidity"";""citric acid"";""residual sugar"";""chlorides"";""free sulfur dioxide"";""total sulfur dioxide"";""density"";""pH"";""sulphates"";""alcohol"";""quality"""
0,7;0.27;0.36;20.7;0.045;45;170;1.001;3;0.45;8.8;6
1,6.3;0.3;0.34;1.6;0.049;14;132;0.994;3.3;0.49;9...
2,8.1;0.28;0.4;6.9;0.05;30;97;0.9951;3.26;0.44;1...
3,7.2;0.23;0.32;8.5;0.058;47;186;0.9956;3.19;0.4...
4,7.2;0.23;0.32;8.5;0.058;47;186;0.9956;3.19;0.4...
...,...
4893,6.2;0.21;0.29;1.6;0.039;24;92;0.99114;3.27;0.5...
4894,6.6;0.32;0.36;8;0.047;57;168;0.9949;3.15;0.46;...
4895,6.5;0.24;0.19;1.2;0.041;30;111;0.99254;2.99;0....
4896,5.5;0.29;0.3;1.1;0.022;20;110;0.98869;3.34;0.3...


In [23]:


def preprocess_data(df: pd.DataFrame, target_col: str, test_size: float, seed: int, verbose: bool = False) -> pd.DataFrame:
    if verbose:
        print(f"Number of Rows: {len(df)} | Number of Features: {len(df.columns)}")

    has_nulls = df.isnull().values.any()
    if not has_nulls:
        logger.warning("Warning Missing Values Detected")

    X, y = df.loc[:, df.columns != target_col], df[target_col]
    
    X_TRAIN, X_TEST, y_train, y_test = train_test_split(X, y, random_state = seed, test_size = test_size)
    if verbose:
        print(f"Train Set Size: {len(X_TRAIN)} | Test Set Size: {len(X_TEST)}")
    
    return X_TRAIN, X_TEST, y_train, y_test


X_TRAIN, X_TEST, y_train, y_test = preprocess_data(df, 'quality', 0.3, 42, verbose = True)

Number of Rows: 4898 | Number of Features: 12
Train Set Size: 3428 | Test Set Size: 1470


In [17]:
def build_pipeline(model, numeric_features, categorical_features, scale_features=False, normalize_features=False, scale_target=False, normalize_target=False):
    # Check scaling and normalization flags
    if scale_features and normalize_features:
        raise ValueError("Both scale_features and normalize_features cannot be True at the same time.")
    if scale_target and normalize_target:
        raise ValueError("Both scale_target and normalize_target cannot be True at the same time.")
    
    # Preprocessing for numeric features
    numeric_transformer = []
    if scale_features:
        numeric_transformer.append(('scaler', StandardScaler()))
    elif normalize_features:
        numeric_transformer.append(('scaler', MinMaxScaler()))
        
    numeric_transformer = Pipeline(steps=numeric_transformer)
    
    # Preprocessing for categorical features
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])
    
    # Combine preprocessing steps
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ])
    
    # Create pipeline
    steps = [('preprocessor', preprocessor), ('model', model)]
    pipeline = Pipeline(steps=steps)
    
    # Optionally scale the target
    if scale_target or normalize_target:
        target_scaler = TargetScaler(method='standard' if scale_target else 'minmax')
        pipeline = Pipeline(steps=[('target_scaler', target_scaler), ('pipeline', pipeline)])
    
    return pipeline

quality
3     0.4%
4     3.3%
5    29.7%
6    44.9%
7    18.0%
8     3.6%
9     0.1%
Name: proportion, dtype: object