In [None]:
import sys
import logging
from typing import Tuple, List, TypedDict, cast
from dataclasses import dataclass
import pandas as pd
from joblib import Memory
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder, FunctionTransformer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.base import BaseEstimator
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression

In [None]:
def load_data() -> Tuple[pd.DataFrame, pd.DataFrame]:
    train = pd.read_csv('./data/train.csv')
    test = pd.read_csv('./data/test.csv')
    return train, test

In [None]:
def setup_logger(name: str | None = None) -> logging.Logger:
    """A function to setup the logger

    Args:
        name (str | None, optional): Logger name. Defaults to None.

    Returns:
        logging.Logger: Configured logger
    """

    file_handler = logging.FileHandler("logs.log", mode="a")
    stream_handler = logging.StreamHandler(stream=sys.stdout)

    logger = logging.getLogger(name or __name__)
    if logger.hasHandlers():
        logger.handlers.clear()

    logging.basicConfig(
        level=logging.WARNING,
        format="%(levelname)s %(name)s %(asctime)s | %(message)s",
        datefmt="%H:%M:%S",
        handlers=[file_handler, stream_handler],
    )

    
    logger.setLevel(logging.INFO)

    return logger

logger = setup_logger("Main")
logger.info("Logger setup complete")

In [None]:
@dataclass
class ColumnTypes(TypedDict):
    """
    A class to store the features and target columns of the dataset.

    Attributes:
    - ids: List[str]
    - labels: List[str]
    - numerical: List[str]
    - categorical: List[str]
    - targets: List[str]
    """

    ids: List[str]
    """An array of the columns that are used as the ID of the dataset."""
    labels: List[str]
    """An array of the columns that are used as the labels of the dataset. 
    Label encoding is to be used when there is a sequential correlation between the labels."""

    numerical: List[str]
    """An array of the columns that are numerical in nature."""
    categorical: List[str]
    """An array of the columns that are categorical in nature."""
    targets: List[str]
    """An array of the columns that are the target of the dataset."""   

def take_column_types_subset(column_types: ColumnTypes, columns: List[str], keep_ids_targets: bool = False) -> ColumnTypes:
    """A function that returns a subset of the ColumnTypes object.

    Args:
        columns (List[str]): A list of columns to be taken from the ColumnTypes object.

    Returns:
        ColumnTypes: A ColumnTypes object that contains the subset of columns.
    """
    return ColumnTypes(
        ids=[col for col in columns if col in column_types["ids"]] if keep_ids_targets is False else column_types["ids"],
        labels=[col for col in columns if col in column_types["labels"]],
        numerical=[col for col in columns if col in column_types["numerical"]],
        categorical=[col for col in columns if col in column_types["categorical"]],
        targets=[col for col in columns if col in column_types["targets"]] if keep_ids_targets is False else column_types["targets"],
    )



def get_columns_types() -> ColumnTypes:
    """A function that defines the types of columns in the dataset.

    Returns:
        ColumnTypes: ColumnTypes object that contains the columns of the dataset.
    """
    return ColumnTypes(
        ids=["id"],
        targets=["class"],
        labels=["gill-spacing"],
        numerical=[
            "cap-diameter",
            "stem-height",
            "stem-width",
        ],
        categorical=[
            "cap-shape",
            "cap-surface",
            "cap-color",
            "does-bruise-or-bleed",
            "gill-attachment",
            "gill-color",
            "stem-root",
            "stem-surface",
            "stem-color",
            "veil-type",
            "veil-color",
            "has-ring",
            "ring-type",
            "spore-print-color",
            "habitat",
            "season",
        ],
    )

@dataclass
class ColumnSet:
    """A class to store the information about the features of the dataset."""

    is_optional: bool
    """A boolean value to determine if the column list is optional."""
    columns: List[str]
    """An array of the columns that are to be used in the dataset."""


def get_column_sets() -> List[ColumnSet]:
    """A function that defines the possible column combinations for the dataset.
    It will create a list of ColumnSet objects that contain the possible column combinations.

    Returns:
        List[ColumnSet]: A list of ColumnSet objects that contain the possible column combinations.
    """
    return [
        ColumnSet(is_optional=True, columns=["does-bruise-or-bleed"]),
        ColumnSet(is_optional=False, columns=["stem_height", "stem_width"]),
        ColumnSet(
            is_optional=True,
            columns=["cap-diameter", "cap-shape", "cap-surface", "cap-color"],
        ),
        ColumnSet(
            is_optional=True, columns=["gill-spacing", "gill-attachment", "gill-color"]
        ),
        ColumnSet(
            is_optional=True, columns=["stem-root", "stem-surface", "stem-color"]
        ),
        ColumnSet(is_optional=True, columns=["veil-type", "veil-color"]),
        ColumnSet(is_optional=True, columns=["has-ring", "ring-type"]),
        ColumnSet(is_optional=True, columns=["spore-print-color", "habitat", "season"]),
    ]

def get_possible_columns_configs(column_sets : List[ColumnSet]) -> List[List[str]]:
    """A function that returns the possible column configurations for the dataset.

    Returns:
        List[List[str]]: A list of lists that contain the possible column configurations.
    """
    optional_columns_sets = [
        column_set for column_set in column_sets if column_set.is_optional
    ]
    mandatory_columns_sets = [
        column_set for column_set in column_sets if not column_set.is_optional
    ]

    if len(optional_columns_sets) > 10:
        logger.warning(
            f"The number of optional columns sets is too high (more than 10) - {len(optional_columns_sets)}"
        )
    else:
        logger.info(
            f"The number of optional columns sets is {len(optional_columns_sets)}"
        )

    bitmap = 2 ** len(optional_columns_sets) - 1

    possible_columns_configs: List[List[str]] = []
    mandatory_columns: List[str] = [
        column
        for mandatory_set in mandatory_columns_sets
        for column in mandatory_set.columns
    ]
    if len(mandatory_columns) > 0:
        possible_columns_configs.append(mandatory_columns)
        
    for i in range(1, bitmap + 1):
        columns_config: List[List[str]] = [*mandatory_columns]
        for j in range(len(optional_columns_sets)):
            if i & (1 << j):
                columns_config.extend(optional_columns_sets[j].columns)

        possible_columns_configs.append(columns_config)

    return possible_columns_configs


In [None]:
def encode_label(X: pd.DataFrame, columns: List[str], name:str) -> pd.DataFrame:
    logger.info(f"Pipeline {name}: Encoding labels...")
    for column in columns:
        X.loc[:, column] = pd.Series(LabelEncoder().fit_transform(X[column])).rename(column)

    logger.info(f"Pipeline {name}: Encoding labels complete")
    return X

def clean_categorical(X: pd.DataFrame, columns: List[str], name:str) -> pd.DataFrame:
    categorical_outliers_frequency_limit = 0.01
    logger.info(f"Pipeline {name}: Cleaning categorical data...")
    logger.info(
        f"Outliers frequency limit is {categorical_outliers_frequency_limit}"
    )

    for column in columns:
        value_counts = X[column].value_counts().to_frame()
        sum_value_counts = value_counts["count"].sum()

        outliers = cast(
                pd.DataFrame,
                value_counts[
                    value_counts["count"]
                    < sum_value_counts * categorical_outliers_frequency_limit
                ],
            ).index.to_list()
        logger.info(f"Outliers for column '{column}' are {outliers}")

        X[column] = (
            pd.Series(
                X[column].apply(
                    lambda el: el if el not in outliers else "gibberish"
                )
            )
            .rename(column)
            .fillna("na")
            .astype("category")
        )

    logger.info(f"Pipeline {name}: Cleaning categorical data complete")
    return X

def clean_numerical(X: pd.DataFrame, columns: List[str], name:str) -> pd.DataFrame:
    logger.info(f"Pipeline {name}: Cleaning numerical data...")
    for column in columns:
        n_nans = X[column].isna().sum()
        if n_nans > 0:
            logger.info(f"Column '{column}' has {n_nans} NaNs. Filling with mean {X[column].mean()}")
            X.loc[:, column] = X[column].fillna(X[column].mean())
        
    logger.info(f"Pipeline {name}: Cleaning numerical data complete")
    return X


def create_X_data_pipeline(
    name: str, column_types: ColumnTypes, model: BaseEstimator | None = None
) -> Pipeline:

    cleaning_transformer = Pipeline(steps=[
            (
                "cleaner_numerical",
                FunctionTransformer(clean_numerical, kw_args={"name": name, "columns": column_types["numerical"]}),
            ),
            (
                "cleaner_categorical",
                FunctionTransformer(clean_categorical, kw_args={"name": name, "columns": column_types["categorical"]}),
            ),
            (
                "label_encoder",
                FunctionTransformer(encode_label, kw_args={"name": name, "columns": column_types["labels"]}),
            ),  
        ]
    )

    preprocessor_transformer = ColumnTransformer(
        transformers=[
            ("preprocessor_numerical", StandardScaler(), column_types["numerical"]),
            (
                "preprocessor_categorical",
                OneHotEncoder(
                    drop=None, sparse_output=False, handle_unknown="error"
                ),
                column_types["categorical"],
            ),
        ],
        remainder="passthrough",
    )

    pipeline_steps = [
        ("cleaning", cleaning_transformer),
        ("preprocessing", preprocessor_transformer),
    ]

    if model is not None:
        pipeline_steps.append((f"{name}_model", model))

    pipeline = Pipeline(
        steps=pipeline_steps,
        memory=Memory(
            location="./cache/x",
            verbose=0,
        ),
    )

    return pipeline

def transform_target(X: pd.DataFrame, name: str) -> pd.DataFrame:
    logger.info(f"Pipeline {name}: Transforming target...")
    y = pd.Series(X["class"].apply(lambda el: 1.0 if el == "e" else 0.0)).rename("eadible")
    logger.info(f"Pipeline {name}: Transforming target complete")
    return y

def create_y_data_pipeline(name: str, column_types: ColumnTypes) -> Pipeline:

    encoding_target_transformer = FunctionTransformer(transform_target, kw_args={"name": name})

    pipeline = Pipeline(
        steps=[("preprocessing", encoding_target_transformer)],
        memory=Memory(
            location="./cache/y",
            verbose=0,
        ),
    )

    return pipeline

In [None]:
train, test = load_data()

In [None]:
def get_cv_accuracy(data: pd.DataFrame, raw_column_set: List[str], pipeline: Pipeline) -> float:
    data = data.copy()
    X = data[raw_column_set]
    y = create_y_data_pipeline("y", get_columns_types()).fit_transform(data)
    
    scores = cross_val_score(pipeline, X, y, cv=5, scoring="accuracy")

    return scores.mean()


In [None]:
# pipeline_y = create_y_data_pipeline(
#     name="dev_y",
#     column_types=column_types,
# )
# raw_transformed = pipeline_X.fit_transform(train.head(10000)[raw_column_set])
# df_transformed = pd.DataFrame(data=raw_transformed, columns=pipeline_X.named_steps["preprocessing"].get_feature_names_out())
# raw_y = pipeline_y.fit_transform(train.head(10000))

In [None]:
raw_column_set = ["stem-height", "stem-width", "cap-diameter", "cap-shape", "cap-surface", "gill-color", "stem-root", "stem-surface", "stem-color", "veil-type", "veil-color"]
column_types = take_column_types_subset(get_columns_types(), raw_column_set, keep_ids_targets=True)
pipeline_X = create_X_data_pipeline(
    name="dev_x",
    column_types=column_types,
    model=RandomForestClassifier(),
)

cv_score = get_cv_accuracy(train, raw_column_set, pipeline_X)
display(f"CV score: {cv_score}")