In [4]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import zscore

# Data exploration

In [1]:
def get_general_properties(data: pd.DataFrame):
    """
    Extracts general properties of the dataset.
    """
    return {
        "Number of Rows": data.shape[0],
        "Number of Columns": data.shape[1],
        "Columns": list(data.columns),
        "Missing Values": data.isnull().sum().to_dict(),
        "Duplicated Rows": data.duplicated().sum(),
        "Data Types": data.dtypes.to_dict(),
    }


def describe_numerical(data: pd.DataFrame):
    """
    Describes numerical variables in the dataset.
    """
    numerical_vars = data.select_dtypes(include=["int64", "float64"]).columns
    return data[numerical_vars].describe().to_dict()


def save_catalog_as_tsv(catalog: dict, file_path: str):
    """
    Saves the data catalog as a TSV file with improved readability.

    Parameters:
    catalog (dict): The dictionary containing dataset properties.
    file_path (str): The path where the TSV file will be saved.
    """
    flattened_catalog = []

    for key, value in catalog.items():
        if isinstance(value, dict):  # Handle nested dictionaries
            for sub_key, sub_value in value.items():
                flattened_catalog.append((f"{key} - {sub_key}", sub_value))
        elif isinstance(value, list):  # Handle lists (like column names)
            flattened_catalog.append((key, ", ".join(map(str, value))))
        else:
            flattened_catalog.append((key, value))

    # Convert to DataFrame
    catalog_df = pd.DataFrame(flattened_catalog, columns=["Property", "Value"])
    
    # Save as TSV
    catalog_df.to_csv(file_path, sep="\t", index=False)


def describe_categorical(data: pd.DataFrame):
    """
    Describes categorical variables in the dataset.
    """
    categorical_vars = data.select_dtypes(include=["object", "category"]).columns
    return {var: data[var].value_counts().to_dict() for var in categorical_vars}


def get_correlation_matrix(data: pd.DataFrame):
    """
    Computes the correlation matrix for numerical variables.
    """
    numerical_vars = data.select_dtypes(include=["int64", "float64"]).columns
    return data[numerical_vars].corr()


def detect_outliers(data: pd.DataFrame, column: str, z_thresh: float = 3):
    """
    Detects outliers in a specified column using the Z-score method.
    """
    z_scores = zscore(data[column].dropna())
    return data.loc[abs(z_scores) > z_thresh]

# Data preprocessing

In [5]:
def impute_missing_numerical(data: pd.DataFrame, strategy='mean'):
    """
    Impute missing numerical values using specified strategy.

    Parameters:
    data (pd.DataFrame): The input DataFrame containing numerical columns.
    strategy (str): Imputation strategy, one of 'mean', 'median'.

    Returns:
    pd.DataFrame: DataFrame with missing values imputed.
    """
    from sklearn.impute import SimpleImputer

    imputer = SimpleImputer(strategy=strategy)
    numerical_cols = data.select_dtypes(include=['float64', 'int64']).columns
    data[numerical_cols] = imputer.fit_transform(data[numerical_cols])

    return data

def standardize_numerical(data: pd.DataFrame):
    """
    Standardize numerical features in the DataFrame.

    Parameters:
    data (pd.DataFrame): The input DataFrame containing numerical columns.

    Returns:
    pd.DataFrame: DataFrame with standardized numerical features.
    """
    from sklearn.preprocessing import StandardScaler

    scaler = StandardScaler()
    numerical_cols = data.select_dtypes(include=['float64', 'int64']).columns
    data[numerical_cols] = scaler.fit_transform(data[numerical_cols])

    return data

def normalize_numerical(data: pd.DataFrame):
    """
    Normalize numerical features in the DataFrame.

    Parameters:
    data (pd.DataFrame): The input DataFrame containing numerical columns.

    Returns:
    pd.DataFrame: DataFrame with normalized numerical features.
    """
    from sklearn.preprocessing import MinMaxScaler

    scaler = MinMaxScaler()
    numerical_cols = data.select_dtypes(include=['float64', 'int64']).columns
    data[numerical_cols] = scaler.fit_transform(data[numerical_cols])

    return data
    
    #############################################
################## ENCODING ##########################
    #############################################
def one_hot_encode_categorical(data: pd.DataFrame, columns=None):
    """
    One-hot encode categorical variables in the DataFrame.

    Parameters:
    data (pd.DataFrame): The input DataFrame containing categorical columns.
    columns (list): List of column names to one-hot encode. If None, encode all categorical columns.

    Returns:
    pd.DataFrame: DataFrame with one-hot encoded categorical variables.
    """
    if columns is None:
        columns = data.select_dtypes(include=['object', 'category']).columns

    return pd.get_dummies(data, columns=columns)

def label_encode_categorical(data: pd.DataFrame, columns=None):
    """
    Label encode categorical variables in the DataFrame.

    Parameters:
    data (pd.DataFrame): The input DataFrame containing categorical columns.
    columns (list): List of column names to label encode. If None, encode all categorical columns.

    Returns:
    pd.DataFrame: DataFrame with label encoded categorical variables.
    """
    from sklearn.preprocessing import LabelEncoder

    if columns is None:
        columns = data.select_dtypes(include=['object', 'category']).columns

    label_encoder = LabelEncoder()
    for col in columns:
        data[col] = label_encoder.fit_transform(data[col])

    return data


# Visualisation

In [3]:
def plot_target_distribution(data: pd.DataFrame, target: str):
    """
    Plots the distribution of the target variable.
    """
    plt.figure(figsize=(8, 5))
    sns.countplot(x=target, data=data, palette="viridis")
    plt.title(f"Distribution of Target Variable: {target}")
    plt.xlabel(target)
    plt.ylabel("Count")
    plt.show()


def plot_numerical_distributions(data: pd.DataFrame, numerical_vars: list):
    """
    Plots histograms for numerical variables.
    """
    data[numerical_vars].hist(
        bins=20, figsize=(15, 10), color="teal", edgecolor="black"
    )
    plt.suptitle("Distributions of Numerical Variables", size=16)
    plt.show()


def plot_categorical_distributions(data: pd.DataFrame, categorical_vars: list):
    """
    Plots bar charts for categorical variables.
    """
    for var in categorical_vars:
        plt.figure(figsize=(8, 5))
        sns.countplot(
            y=var, data=data, palette="crest", order=data[var].value_counts().index
        )
        plt.title(f"Distribution of {var}")
        plt.xlabel("Count")
        plt.ylabel(var)
        plt.show()


def plot_pairwise_relationships(
    data: pd.DataFrame, numerical_vars: list, target: str = None
):
    """
    Plots pairwise relationships between numerical variables.
    """
    sns.pairplot(
        data[numerical_vars + ([target] if target else [])],
        hue=target,
        palette="coolwarm",
    )
    plt.suptitle("Pairwise Relationships", size=16)
    plt.show()


def plot_correlation_matrix(data: pd.DataFrame, numerical_vars: list):
    """
    Visualizes the correlation matrix for numerical variables.
    """
    plt.figure(figsize=(10, 8))
    correlation_matrix = data[numerical_vars].corr()
    sns.heatmap(correlation_matrix, annot=True, fmt=".2f", cmap="coolwarm", square=True)
    plt.title("Correlation Matrix")
    plt.show()


def plot_missing_data(data: pd.DataFrame):
    """
    Visualizes missing data patterns.
    """
    plt.figure(figsize=(12, 6))
    sns.heatmap(data.isnull(), cbar=False, cmap="viridis", yticklabels=False)
    plt.title("Missing Data Heatmap")
    plt.xlabel("Columns")
    plt.ylabel("Rows")
    plt.show()


def plot_outlier_detection(data: pd.DataFrame, column: str):
    """
    Visualizes outliers in a specified numerical variable using a boxplot.
    """
    plt.figure(figsize=(8, 5))
    sns.boxplot(x=column, data=data, palette="Set2")
    plt.title(f"Outlier Detection for {column}")
    plt.xlabel(column)
    plt.show()

# Model evaluation

In [None]:
from sklearn.model_selection import train_test_split

def split_data(data: pd.DataFrame, target: str, test_size=0.2, random_state=None):
    """
    Split data into training and testing sets.

    Parameters:
    data (pd.DataFrame): The input DataFrame.
    target (str): Name of the target column.
    test_size (float): Proportion of the dataset to include in the test split.
    random_state (int or None): Seed for random number generation.

    Returns:
    tuple: (X_train, X_test, y_train, y_test)
    """
    X = data.drop(columns=[target])
    y = data[target]
    return train_test_split(X, y, test_size=test_size, random_state=random_state)

from sklearn.metrics import classification_report, confusion_matrix

def train_and_evaluate(model, X_train, X_test, y_train, y_test):
    """
    Train a machine learning model and evaluate its performance.

    Parameters:
    model: The machine learning model (already instantiated).
    X_train (pd.DataFrame): Training features.
    X_test (pd.DataFrame): Testing features.
    y_train (pd.Series): Training target.
    y_test (pd.Series): Testing target.

    Returns:
    dict: Dictionary containing evaluation metrics.
    """
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    report = classification_report(y_test, y_pred)
    matrix = confusion_matrix(y_test, y_pred)
    return {'classification_report': report, 'confusion_matrix': matrix}

from sklearn.model_selection import cross_val_score

def cross_validate(model, X, y, cv=5, scoring='accuracy'):
    """
    Perform k-fold cross-validation for model evaluation.

    Parameters:
    model: The machine learning model (already instantiated).
    X (pd.DataFrame): Input features.
    y (pd.Series): Target variable.
    cv (int): Number of folds for cross-validation.
    scoring (str or callable): Scoring metric to evaluate the model.

    Returns:
    list: Array of cross-validation scores.
    """
    scores = cross_val_score(model, X, y, cv=cv, scoring=scoring)
    return scores

    ##################
#### Feature Importance #####
    ##################
def get_feature_importance(model, feature_names):
    """
    Get feature importance from a fitted model.

    Parameters:
    model: The fitted machine learning model (must have feature_importances_ or coef_ attribute).
    feature_names (list): List of feature names.

    Returns:
    pd.DataFrame: DataFrame with feature names and their importance scores.
    """
    if hasattr(model, 'feature_importances_'):
        importance = model.feature_importances_
    elif hasattr(model, 'coef_'):
        importance = model.coef_.flatten()
    else:
        raise ValueError("Model does not have attribute for feature importance.")

    return pd.DataFrame({'Feature': feature_names, 'Importance': importance}).sort_values(by='Importance', ascending=False)


# Model serving

In [None]:
from joblib import dump, load

def save_model(model, filepath):
    """
    Save a trained machine learning model to a file.

    Parameters:
    model: The trained machine learning model object.
    filepath (str): The file path where the model will be saved.
    """
    dump(model, filepath)
    print(f"Model saved successfully at {filepath}")

def load_model(filepath):
    """
    Load a trained machine learning model from a file.

    Parameters:
    filepath (str): The file path from which to load the model.

    Returns:
    model: The loaded machine learning model object.
    """
    model = load(filepath)
    print(f"Model loaded successfully from {filepath}")
    return model


In [6]:
# Generic CRISP-DM Utility Functions

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Optional, Union, Dict, List
from scipy.stats import zscore

# Preprocessing
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import (
    StandardScaler, OneHotEncoder, MinMaxScaler, 
    LabelEncoder, FunctionTransformer
)
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.pipeline import Pipeline

# Modeling
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import (
    classification_report, confusion_matrix, 
    roc_curve, auc, precision_recall_curve, 
    PrecisionRecallDisplay, RocCurveDisplay
)
from joblib import dump, load

ModuleNotFoundError: No module named 'imblearn'

In [7]:
# utilities.ipynb
# ======================== DATA CATALOG ========================
def generate_data_catalog(
    data: pd.DataFrame, 
    discard_reasons: Optional[Dict[str, str]] = None
) -> pd.DataFrame:
    """
    Generates a generic data catalog for any dataset.
    
    Parameters:
    data: Input DataFrame
    discard_reasons: Optional dict of {column: reason} for variables to discard
    
    Returns:
    Catalog DataFrame with columns: 
    ['Variable', 'Type', 'Missing%', 'Unique', 'Discard', 'Reason']
    """
    catalog = []
    for col in data.columns:
        entry = {
            "Variable": col,
            "Type": data[col].dtype,
            "Missing%": round(data[col].isnull().mean() * 100, 2),
            "Unique": data[col].nunique(),
            "Discard": "Yes" if (discard_reasons and col in discard_reasons) else "No",
            "Reason": discard_reasons.get(col, "N/A") if discard_reasons else "N/A"
        }
        catalog.append(entry)
    
    return pd.DataFrame(catalog)

# ==================== GENERIC PREPROCESSING ====================
def build_preprocessing_pipeline(
    numerical_strategy: str = 'median',
    categorical_strategy: str = 'most_frequent',
    scaler: Union[StandardScaler, MinMaxScaler, None] = StandardScaler()
) -> ColumnTransformer:
    """
    Creates generic preprocessing pipeline for any dataset.
    
    Automatically detects numerical/categorical columns.
    """
    numerical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy=numerical_strategy)),
        ('scaler', scaler if scaler else 'passthrough')
    ])
    
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy=categorical_strategy)),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ])
    
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numerical_transformer, make_column_selector(dtype_include=np.number)),
            ('cat', categorical_transformer, make_column_selector(dtype_include=object))
        ],
        remainder='passthrough'
    )
    
    return preprocessor

# ================== GENERIC VISUALIZATIONS =====================
def plot_roc_curve(
    y_true: pd.Series, 
    y_prob: np.ndarray,
    ax: Optional[plt.Axes] = None,
    title: str = 'ROC Curve'
) -> None:
    """Generic ROC curve plotter."""
    fpr, tpr, _ = roc_curve(y_true, y_prob)
    roc_auc = auc(fpr, tpr)
    
    ax = ax or plt.gca()
    RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=roc_auc).plot(ax=ax)
    ax.set_title(title)
    ax.plot([0, 1], [0, 1], 'k--')

def plot_feature_importance(
    model: object,
    feature_names: List[str],
    top_n: int = 10,
    ax: Optional[plt.Axes] = None,
    title: str = 'Feature Importance'
) -> None:
    """Generic feature importance visualization."""
    if hasattr(model, 'feature_importances_'):
        importance = model.feature_importances_
    elif hasattr(model, 'coef_'):
        importance = np.abs(model.coef_).mean(axis=0)  # Handle multi-class
    else:
        raise ValueError("Model doesn't support feature importance")
    
    importance_df = pd.DataFrame({
        'Feature': feature_names,
        'Importance': importance
    }).sort_values('Importance', ascending=False).head(top_n)
    
    ax = ax or plt.gca()
    sns.barplot(x='Importance', y='Feature', data=importance_df, ax=ax, palette='viridis')
    ax.set_title(title)

# ==================== GENERIC MLOPS UTILS ======================
def save_artifacts(
    model: object,
    preprocessor: Optional[ColumnTransformer] = None,
    model_path: str = 'model.joblib',
    preprocessor_path: str = 'preprocessor.joblib'
) -> None:
    """Generic artifact saver for deployment."""
    dump(model, model_path)
    if preprocessor:
        dump(preprocessor, preprocessor_path)
    print(f"Saved artifacts: {model_path}" + 
          (f", {preprocessor_path}" if preprocessor else ""))

# ================== GENERIC DATA QUALITY =======================
def detect_data_issues(
    data: pd.DataFrame,
    z_threshold: float = 3,
    correlation_threshold: float = 0.9
) -> Dict[str, Union[pd.DataFrame, float]]:
    """
    Generic data quality check:
    - Missing values
    - Duplicates
    - Outliers (Z-score)
    - High correlations
    """
    report = {}
    
    # Missing values
    report['missing_values'] = data.isna().sum()
    
    # Duplicates
    report['duplicate_rows'] = data.duplicated().sum()
    
    # Outliers
    numerical_cols = data.select_dtypes(include=np.number).columns
    z_scores = data[numerical_cols].apply(zscore)
    report['outliers'] = (np.abs(z_scores) > z_threshold).sum()
    
    # Correlations
    corr_matrix = data[numerical_cols].corr().abs()
    upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
    report['high_correlations'] = upper.stack().sort_values(ascending=False)[
        upper.stack() > correlation_threshold
    ]
    
    return report

# ==================== EXAMPLE USAGE ====================
#  if __name__ == "__main__":
#     # Sample generic usage
#     data = pd.DataFrame({
#         'age': [25, 30, None, 40],
#         'gender': ['M', 'F', 'M', None],
#         'readmitted': [1, 0, 1, 0]
#     })
    
    # Generate catalog
#     catalog = generate_data_catalog(data, {'gender': 'High missingness'})
#     print("Data Catalog:")
#     print(catalog)
    
    # Detect issues
#     print("\nData Issues Report:")
#     print(detect_data_issues(data))