diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29..0b3db93 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,4 @@ +- bump: patch + changes: + added: + - RandomForestClassifier and LogisticRegression models for categorical variable imputation. diff --git a/microimpute/models/imputer.py b/microimpute/models/imputer.py index 2a12bc3..1a6f88c 100644 --- a/microimpute/models/imputer.py +++ b/microimpute/models/imputer.py @@ -19,6 +19,20 @@ from microimpute.config import RANDOM_STATE, VALIDATE_CONFIG +class _ConstantValueModel: + """Simple model that always returns a constant value.""" + + def __init__(self, constant_value, variable_name: str): + self.constant_value = constant_value + self.variable_name = variable_name + + def predict(self, X: pd.DataFrame, **kwargs) -> pd.Series: + """Return the constant value for all rows.""" + return pd.Series( + self.constant_value, index=X.index, name=self.variable_name + ) + + class VariableTypeDetector: """Utility class for detecting and categorizing variable types.""" @@ -91,418 +105,141 @@ def categorize_variable( class DummyVariableProcessor: - """Handles conversion between original variables and dummy variables.""" + """Handles conversion of categorical predictors to dummy variables.""" def __init__(self, logger: logging.Logger): self.logger = logger - self.dummy_info = { - "original_dtypes": {}, - "column_mapping": {}, - "original_categories": {}, - } + self.dummy_mapping = {} # Maps original column to dummy columns - def preprocess_variables( + def preprocess_predictors( self, data: pd.DataFrame, predictors: List[str], imputed_variables: List[str], - ) -> Tuple[pd.DataFrame, List[str], List[str], Dict]: + ) -> Tuple[pd.DataFrame, List[str]]: """ - Process all variables, converting categoricals to dummies as needed. + Process only predictor variables, converting categoricals to dummies. + Imputation targets remain in original form. Returns: - Tuple of (processed_data, updated_predictors, updated_imputed_variables, imputed_vars_dummy_info) + Tuple of (processed_data, updated_predictors) """ - data = data[predictors + imputed_variables].copy() + # Start with a copy containing all needed columns + all_columns = list(set(predictors + imputed_variables)) + data = data[all_columns].copy() detector = VariableTypeDetector() - # Categorize all columns - column_categories = {} - for col in data.columns: + # Identify categorical predictors only (not targets) + categorical_predictors = [] + for col in predictors: # Only check predictors + if col not in data.columns: + continue var_type, categories = detector.categorize_variable( data[col], col, self.logger ) - column_categories[col] = (var_type, categories, data[col].dtype) - - # Process variables according to their types - bool_columns = [ - col - for col, (vtype, _, _) in column_categories.items() - if vtype == "bool" - ] - if bool_columns: - self._process_boolean_columns( - data, bool_columns, column_categories - ) - - categorical_columns = [ - col - for col, (vtype, _, _) in column_categories.items() - if vtype in ["categorical", "numeric_categorical"] - ] - - if categorical_columns: - data, predictors, imputed_variables = ( - self._process_categorical_columns( - data, - categorical_columns, - column_categories, - predictors, - imputed_variables, + if var_type in ["categorical", "numeric_categorical"]: + categorical_predictors.append(col) + self.logger.info( + f"Will create dummy variables for predictor '{col}' ({var_type})" ) - ) - - imputed_vars_dummy_info = self._filter_imputed_vars_info( - imputed_variables - ) - return data, predictors, imputed_variables, imputed_vars_dummy_info + # Process categorical predictors + updated_predictors = predictors.copy() - def _process_boolean_columns( - self, - data: pd.DataFrame, - bool_columns: List[str], - column_categories: Dict, - ) -> None: - """Process boolean columns by converting to float.""" - self.logger.info( - f"Converting {len(bool_columns)} boolean columns: {bool_columns}" - ) - - for col in bool_columns: - _, _, original_dtype = column_categories[col] - self.dummy_info["original_dtypes"][col] = ("bool", original_dtype) - self.dummy_info["column_mapping"][col] = [col] - data[col] = data[col].astype("float64") - - def _process_categorical_columns( - self, - data: pd.DataFrame, - categorical_columns: List[str], - column_categories: Dict, - predictors: List[str], - imputed_variables: List[str], - ) -> Tuple[pd.DataFrame, List[str], List[str]]: - """Process categorical columns by creating dummy variables.""" - for col in categorical_columns: - var_type, categories, original_dtype = column_categories[col] - self.dummy_info["original_dtypes"][col] = ( - ( - "numeric categorical" - if var_type == "numeric_categorical" - else "categorical" - ), - original_dtype, + if categorical_predictors: + # Create dummy variables for categorical predictors only + dummy_df = pd.get_dummies( + data[categorical_predictors], + columns=categorical_predictors, + dtype="float64", + drop_first=True, # Standard practice for predictors ) - if categories: - self.dummy_info["original_categories"][col] = categories - - if var_type == "numeric_categorical": - data[col] = data[col].astype("float64").astype("category") - - # Create dummy variables - dummy_data = pd.get_dummies( - data[categorical_columns], - columns=categorical_columns, - dtype="float64", - drop_first=True, - ) - - self.logger.debug( - f"Created {dummy_data.shape[1]} dummy variables from {len(categorical_columns)} categorical columns" - ) - # Create column mappings - for orig_col in categorical_columns: - related_dummies = [ - col - for col in dummy_data.columns - if col.startswith(f"{orig_col}_") - ] - - if not related_dummies: - self._handle_single_category_variable( - data, orig_col, column_categories[orig_col] + # Track mapping for each original column + for orig_col in categorical_predictors: + dummy_cols = [ + col + for col in dummy_df.columns + if col.startswith(f"{orig_col}_") + ] + self.dummy_mapping[orig_col] = dummy_cols + + # Update predictor list + updated_predictors.remove(orig_col) + updated_predictors.extend(dummy_cols) + + self.logger.debug( + f"Created {len(dummy_cols)} dummy variables for '{orig_col}'" ) - self.dummy_info["column_mapping"][orig_col] = [orig_col] - else: - self.dummy_info["column_mapping"][orig_col] = related_dummies - - # Combine data - numeric_data = data.drop( - columns=[col for col in categorical_columns if col in data.columns] - ) - data = pd.concat([numeric_data, dummy_data], axis=1) - - # Update predictor and imputed_variables lists with dummy columns' names - predictors, imputed_variables = self._update_variable_lists( - predictors, imputed_variables, data.columns - ) - for col in data.columns: - data[col] = data[col].astype("float64") + # Drop original categorical columns and add dummies + data = data.drop(columns=categorical_predictors) + data = pd.concat([data, dummy_df], axis=1) - return data, predictors, imputed_variables - - def _handle_single_category_variable( - self, data: pd.DataFrame, col: str, col_info: Tuple[str, List, Any] - ) -> None: - """Handle variables with only a single category.""" - var_type, categories, _ = col_info - - if var_type == "numeric_categorical": - self.logger.info( - f"Keeping numeric categorical '{col}' as numeric column" - ) - if categories: - data[col] = categories[0] - else: - self.logger.info( - f"Converting single-value categorical '{col}' to numeric encoding (1.0)" - ) - data[col] = 1.0 - - def _update_variable_lists( - self, - predictors: List[str], - imputed_variables: List[str], - data_columns: pd.Index, - ) -> Tuple[List[str], List[str]]: - """Update predictor and imputed variable lists with dummy columns.""" - new_predictors = predictors.copy() - new_imputed_variables = imputed_variables.copy() - - for col, dummy_cols in self.dummy_info["column_mapping"].items(): - if len(dummy_cols) > 0 and all( - dc in data_columns for dc in dummy_cols - ): - if col in new_predictors: - new_predictors.remove(col) - new_predictors.extend(dummy_cols) - elif col in new_imputed_variables: - new_imputed_variables.remove(col) - new_imputed_variables.extend(dummy_cols) - - return new_predictors, new_imputed_variables - - def _filter_imputed_vars_info(self, imputed_variables: List[str]) -> Dict: - """Create dummy info specific to imputed variables.""" - imputed_vars_dummy_info = { - "original_dtypes": {}, - "column_mapping": {}, - "original_categories": {}, - } - - for col in self.dummy_info["column_mapping"]: - dummy_cols = self.dummy_info["column_mapping"][col] - if any(dc in imputed_variables for dc in dummy_cols): - imputed_vars_dummy_info["column_mapping"][col] = dummy_cols - imputed_vars_dummy_info["original_dtypes"][col] = ( - self.dummy_info["original_dtypes"][col] + # Convert boolean predictors to float (but keep as single column) + for col in predictors: + if col in data.columns: + var_type, _ = detector.categorize_variable( + data[col], col, self.logger ) - if col in self.dummy_info["original_categories"]: - imputed_vars_dummy_info["original_categories"][col] = ( - self.dummy_info["original_categories"][col] + if var_type == "bool": + data[col] = data[col].astype("float64") + self.logger.debug( + f"Converted boolean predictor '{col}' to float64" ) - return imputed_vars_dummy_info + return data, updated_predictors - def reverse_dummy_encoding( + def apply_dummy_encoding_to_test( self, - imputations: Union[Dict[float, pd.DataFrame], pd.DataFrame], - dummy_info: Dict[str, Any], - ) -> Union[Dict[float, pd.DataFrame], pd.DataFrame]: - """Convert dummy variables back to original categorical format.""" - if isinstance(imputations, dict): - processed_imputations = {} - for quantile, df in imputations.items(): - processed_imputations[quantile] = ( - self._process_single_dataframe(df.copy(), dummy_info) - ) - else: - processed_imputations = self._process_single_dataframe( - imputations.copy(), dummy_info - ) - - return processed_imputations - - def _process_single_dataframe( - self, df: pd.DataFrame, dummy_info: Dict[str, Any] - ) -> pd.DataFrame: - """Process a single quantile DataFrame.""" - for orig_col, dummy_cols in dummy_info.get( - "column_mapping", {} - ).items(): - if orig_col not in dummy_info.get("original_dtypes", {}): - continue - - dtype_info = dummy_info["original_dtypes"][orig_col] - if not isinstance(dtype_info, tuple) or len(dtype_info) != 2: - self.logger.warning( - f"Unexpected dtype format for {orig_col}: {dtype_info}" + data: pd.DataFrame, + predictors: List[str], + ) -> Tuple[pd.DataFrame, List[str]]: + """Apply same dummy encoding to test data based on training mapping.""" + detector = VariableTypeDetector() + data = data.copy() + updated_predictors = predictors.copy() + + # Apply dummy encoding based on stored mapping + for orig_col, dummy_cols in self.dummy_mapping.items(): + if orig_col in predictors and orig_col in data.columns: + # Create dummies for this column + dummy_df = pd.get_dummies( + data[[orig_col]], + columns=[orig_col], + dtype="float64", + drop_first=False, # Don't drop first, we'll handle missing manually ) - continue - dtype_category, original_pandas_dtype = dtype_info + # Ensure we have the exact dummy columns from training + for dummy_col in dummy_cols: + if dummy_col not in dummy_df.columns: + dummy_df[dummy_col] = 0.0 # Missing category gets 0 - if dtype_category == "bool" and orig_col in df.columns: - df[orig_col] = self._reverse_boolean( - df[orig_col], original_pandas_dtype - ) - elif dtype_category in ["categorical", "numeric_categorical"]: - df = self._reverse_categorical( - df, - orig_col, - dummy_cols, - dummy_info, - dtype_category, - original_pandas_dtype, - ) - - return df + # Keep only the dummy columns from training + dummy_df = dummy_df[dummy_cols] - def _reverse_boolean( - self, series: pd.Series, original_dtype: Any - ) -> pd.Series: - """Convert float back to boolean.""" - threshold = 0.5 - bool_series = series > threshold - return bool_series.astype(original_dtype) + # Update data + data = data.drop(columns=[orig_col]) + data = pd.concat([data, dummy_df], axis=1) - def _reverse_categorical( - self, - df: pd.DataFrame, - orig_col: str, - dummy_cols: List[str], - dummy_info: Dict, - dtype_category: str, - original_dtype: Any, - ) -> pd.DataFrame: - """Convert dummy variables back to categorical.""" - available_dummies = [col for col in dummy_cols if col in df.columns] - - if not available_dummies: - return self._handle_single_category_reverse( - df, orig_col, dummy_cols, dummy_info, original_dtype - ) + # Update predictor list + updated_predictors.remove(orig_col) + updated_predictors.extend(dummy_cols) - categories = dummy_info["original_categories"][orig_col] - reference_category = self._find_reference_category( - orig_col, available_dummies, categories - ) - - # Convert dummies back to categorical - df[orig_col] = self._dummies_to_categorical( - df[available_dummies], orig_col, categories, reference_category - ) - - # Convert to original dtype if needed - if original_dtype != "object": - try: - df[orig_col] = df[orig_col].astype(original_dtype) - except (ValueError, TypeError) as e: - self.logger.warning( - f"Could not convert {orig_col} to {original_dtype}: {e}" + # Convert boolean predictors to float + for col in predictors: + if col in data.columns: + var_type, _ = detector.categorize_variable( + data[col], col, self.logger ) + if var_type == "bool": + data[col] = data[col].astype("float64") - # Drop dummy columns - df = df.drop(columns=available_dummies) - - return df - - def _handle_single_category_reverse( - self, - df: pd.DataFrame, - orig_col: str, - dummy_cols: List[str], - dummy_info: Dict, - original_dtype: Any, - ) -> pd.DataFrame: - """Handle reversal for single-category variables.""" - if ( - orig_col in df.columns - and len(dummy_cols) == 1 - and dummy_cols[0] == orig_col - ): - categories = dummy_info["original_categories"][orig_col] - df[orig_col] = categories[0] - - if original_dtype != "object": - try: - df[orig_col] = df[orig_col].astype(original_dtype) - except (ValueError, TypeError) as e: - self.logger.warning( - f"Could not convert {orig_col} to original dtype: {e}" - ) - - return df - - def _find_reference_category( - self, - orig_col: str, - available_dummies: List[str], - original_categories: List, - ) -> Any: - """Find the reference category that was dropped during dummy encoding.""" - dummy_categories = [] - for dummy_col in available_dummies: - category_part = dummy_col.replace(f"{orig_col}_", "", 1) - try: - if category_part.replace(".", "").replace("-", "").isdigit(): - dummy_categories.append(float(category_part)) - else: - dummy_categories.append(category_part) - except: - dummy_categories.append(category_part) - - for cat in original_categories: - if cat not in dummy_categories: - return cat - - return original_categories[0] if original_categories else None - - def _dummies_to_categorical( - self, - dummy_df: pd.DataFrame, - orig_col: str, - categories: List, - reference_category: Any, - ) -> pd.Series: - """Convert dummy columns to categorical values.""" - category_mapping = { - f"{orig_col}_{cat}": cat - for cat in categories - if f"{orig_col}_{cat}" in dummy_df.columns - } - - # Find max dummy value per row - max_idx = dummy_df.idxmax(axis=1) - max_values = dummy_df.max(axis=1) - - # Initialize with reference category - result = pd.Series(reference_category, index=dummy_df.index) - - # Assign to dummy categories where confidence > threshold - threshold = 0.5 - high_confidence_mask = max_values >= threshold - if high_confidence_mask.any(): - result.loc[high_confidence_mask] = max_idx[ - high_confidence_mask - ].map(category_mapping) + return data, updated_predictors - nan_mask = result.isna() - if nan_mask.any(): - result.loc[nan_mask] = reference_category - self.logger.warning( - f"Some values could not be mapped for {orig_col}, using reference category" - ) - - self.logger.info( - f"Assigned {high_confidence_mask.sum()} observations to dummy categories, " - f"{(~high_confidence_mask).sum()} to reference category '{reference_category}'" - ) - - return result + # Note: Old reverse encoding methods removed as we now handle categorical + # targets directly through classification models class Imputer(ABC): @@ -523,6 +260,16 @@ def __init__( self.imputed_variables: Optional[List[str]] = None self.imputed_vars_dummy_info: Optional[Dict[str, Any]] = None self.original_predictors: Optional[List[str]] = None + self.categorical_targets: Dict[str, Dict] = ( + {} + ) # {var_name: {"type": "categorical", "categories": [...]}} + self.boolean_targets: Dict[str, Dict] = ( + {} + ) # {var_name: {"type": "boolean", "dtype": ...}} + self.numeric_targets: List[str] = [] # [var_name, ...] + self.constant_targets: Dict[str, Dict] = ( + {} + ) # {var_name: {"value": constant, "dtype": ...}} self.seed = seed self.logger = logging.getLogger(__name__) @@ -561,6 +308,60 @@ def _validate_data(self, data: pd.DataFrame, columns: List[str]) -> None: f"Data contains {missing_count} missing values" ) + def identify_target_types( + self, data: pd.DataFrame, imputed_variables: List[str] + ) -> None: + """Identify and track variable types for imputation targets. + + Args: + data: DataFrame containing the data. + imputed_variables: List of variables to be imputed. + """ + detector = VariableTypeDetector() + + for var in imputed_variables: + if var not in data.columns: + continue + + # First check if the variable has a constant value + unique_values = data[var].dropna().unique() + if len(unique_values) == 1: + constant_val = unique_values[0] + self.constant_targets[var] = { + "value": constant_val, + "dtype": data[var].dtype, + } + self.logger.warning( + f"Target variable '{var}' has constant value {constant_val}. " + f"All imputations will use this constant value." + ) + continue + + var_type, categories = detector.categorize_variable( + data[var], var, self.logger + ) + + if var_type == "bool": + self.boolean_targets[var] = { + "type": "boolean", + "dtype": data[var].dtype, + } + self.logger.info(f"Identified boolean target: {var}") + + elif var_type in ["categorical", "numeric_categorical"]: + self.categorical_targets[var] = { + "type": var_type, + "categories": categories, + "dtype": data[var].dtype, + } + self.logger.info( + f"Identified categorical target: {var} with {len(categories) if categories else 0} categories" + ) + + else: + self.numeric_targets.append(var) + self.logger.debug(f"Identified numeric target: {var}") + @validate_call(config=VALIDATE_CONFIG) def preprocess_data_types( self, @@ -568,29 +369,36 @@ def preprocess_data_types( predictors: List[str], imputed_variables: List[str], ) -> Tuple[pd.DataFrame, List[str], List[str], Dict[str, Any]]: - """Ensure all predictor columns are numeric. Transform boolean and categorical variables if necessary. + """Preprocess predictors only - convert categorical predictors to dummies. + Imputation targets remain in original form for classification. Args: data: DataFrame containing the data. - predictors: List of column names to ensure are numeric. - imputed_variables: List of column names to ensure are numeric. + predictors: List of predictor column names. + imputed_variables: List of variables to impute (kept in original form). Returns: - Tuple of (data, predictors, imputed_variables, dummy_info) + Tuple of (processed_data, updated_predictors, imputed_variables, empty_dict) Raises: - ValueError: If any column cannot be converted to numeric. + ValueError: If any column cannot be processed. """ try: processor = DummyVariableProcessor(self.logger) - return processor.preprocess_variables( - data, predictors, imputed_variables + processed_data, updated_predictors = ( + processor.preprocess_predictors( + data, predictors, imputed_variables + ) ) + # Store the processor for later use in test data + self.dummy_processor = processor + + # Return empty dict as we no longer need dummy info for targets + return processed_data, updated_predictors, imputed_variables, {} + except Exception as e: - self.logger.error( - f"Error during donor data preprocessing: {str(e)}" - ) + self.logger.error(f"Error during data preprocessing: {str(e)}") raise RuntimeError("Failed to preprocess data types") from e @validate_call(config=VALIDATE_CONFIG) @@ -657,6 +465,9 @@ def fit( if weights is not None and (weights <= 0).any(): raise ValueError("Weights must be positive") + # Identify target types BEFORE preprocessing + self.identify_target_types(X_train, imputed_variables) + X_train, predictors, imputed_variables, imputed_vars_dummy_info = ( self.preprocess_data_types(X_train, predictors, imputed_variables) ) @@ -682,6 +493,10 @@ def fit( self.predictors, self.imputed_variables, self.original_predictors, + categorical_targets=self.categorical_targets, + boolean_targets=self.boolean_targets, + numeric_targets=self.numeric_targets, + constant_targets=self.constant_targets, **kwargs, ) return fitted_model @@ -815,70 +630,46 @@ def preprocess_data_types( self, data: pd.DataFrame, predictors: List[str], - ) -> pd.DataFrame: - """Ensure all predictor columns are numeric. Transform booleand and categorical variables if necessary. + dummy_processor: Optional[DummyVariableProcessor] = None, + ) -> Tuple[pd.DataFrame, List[str]]: + """Apply dummy encoding to test data predictors based on training mapping. Args: - data: DataFrame containing the data. - predictors: List of column names to ensure are numeric. + data: DataFrame containing the test data. + predictors: List of original predictor column names. + dummy_processor: Processor with training mappings (if available). Returns: - data: DataFrame with specified variables converted to numeric types. + Tuple of (processed_data, updated_predictors) Raises: ValueError: If any column cannot be converted to numeric. """ try: - processor = DummyVariableProcessor(self.logger) - processed_data, _, _, _ = processor.preprocess_variables( - data, predictors, [] - ) - return processed_data + if dummy_processor and hasattr(dummy_processor, "dummy_mapping"): + # Use existing processor with training mappings + return dummy_processor.apply_dummy_encoding_to_test( + data, predictors + ) + else: + # Fallback: create new processor (shouldn't happen normally) + processor = DummyVariableProcessor(self.logger) + # This will only encode predictors in test data + return processor.preprocess_predictors(data, predictors, []) except Exception as e: self.logger.error( - f"Error during receiver data preprocessing: {str(e)}" + f"Error during test data preprocessing: {str(e)}" ) raise RuntimeError("Failed to preprocess data types") from e - @validate_call(config=VALIDATE_CONFIG) - def postprocess_imputations( - self, - imputations: Union[Dict[float, pd.DataFrame], pd.DataFrame], - dummy_info: Dict[str, Any], - ) -> Union[Dict[float, pd.DataFrame], pd.DataFrame]: - """Convert imputed bool and categorical dummy variables back to original data types. - - This function reverses the encoding applied by preprocess_data, - converting dummy variables back to their original boolean or categorical forms. - For numeric categorical variables, values are rounded to the nearest valid category. - - Args: - imputations: Dictionary mapping quantiles to DataFrames of imputed values - dummy_info: Dictionary containing information about dummy variable mappings - and original data types - - Returns: - Dictionary mapping quantiles to DataFrames with original data types restored or a single DataFrame if only one quantile is provided. - - Raises: - RuntimeError: If conversion back to original types fails - """ - try: - processor = DummyVariableProcessor(self.logger) - return processor.reverse_dummy_encoding(imputations, dummy_info) - except Exception as e: - self.logger.error( - f"Error when postprocessing imputations: {str(e)}" - ) - raise RuntimeError( - f"Failed to post-process imputations: {str(e)}" - ) from e + # Note: postprocess_imputations removed - categorical targets now handled directly by classification @validate_call(config=VALIDATE_CONFIG) def predict( self, X_test: pd.DataFrame, quantiles: Optional[List[float]] = None, + return_probs: bool = False, **kwargs: Any, ) -> Dict[float, pd.DataFrame]: """Predict imputed values at specified quantiles. @@ -888,10 +679,12 @@ def predict( Args: X_test: DataFrame containing the test data. quantiles: List of quantiles to predict. If None, uses random quantile. + return_probs: If True, also return probability distributions for categorical/boolean variables. **kwargs: Additional model-specific parameters. Returns: Dictionary mapping quantiles to imputed values. + If return_probs=True, includes 'probabilities' key with probability distributions. Raises: ValueError: If input data is invalid. @@ -904,28 +697,29 @@ def predict( f"Invalid quantiles: {str(quantile_error)}" ) from quantile_error - X_test = self.preprocess_data_types(X_test, self.original_predictors) + # Get dummy processor from parent imputer if available + dummy_processor = getattr(self, "dummy_processor", None) + X_test, updated_predictors = self.preprocess_data_types( + X_test, self.original_predictors, dummy_processor + ) - for col in self.predictors: - if col not in X_test.columns: - self.logger.info( - f"Predictor '{col}' not found in test data columns. \n" - "Will create a dummy variable with 0.0 values for this column." - ) - X_test[col] = np.zeros(len(X_test), dtype="float64") + # Note: Missing dummy categories are already handled in apply_dummy_encoding_to_test + # Missing actual predictors will raise an error during preprocessing # Defer actual imputations to subclass with all parameters - imputations = self._predict(X_test, quantiles, **kwargs) - if self.imputed_vars_dummy_info is not None: - imputations = self.postprocess_imputations( - imputations, self.imputed_vars_dummy_info - ) + imputations = self._predict( + X_test, quantiles, return_probs=return_probs, **kwargs + ) + # No more postprocessing - categorical targets handled directly return imputations @abstractmethod @validate_call(config=VALIDATE_CONFIG) def _predict( - self, X_test: pd.DataFrame, quantiles: Optional[List[float]] = None + self, + X_test: pd.DataFrame, + quantiles: Optional[List[float]] = None, + return_probs: bool = False, ) -> Dict[float, pd.DataFrame]: """Predict imputed values at specified quantiles. diff --git a/microimpute/models/matching.py b/microimpute/models/matching.py index 6ec0dc8..7e9c698 100644 --- a/microimpute/models/matching.py +++ b/microimpute/models/matching.py @@ -35,6 +35,10 @@ def __init__( seed: int, imputed_vars_dummy_info: Optional[Dict[str, Any]] = None, original_predictors: Optional[List[str]] = None, + categorical_targets: Optional[Dict[str, Dict]] = None, + boolean_targets: Optional[Dict[str, Dict]] = None, + constant_targets: Optional[Dict[str, Dict]] = None, + dummy_processor: Optional[Any] = None, log_level: Optional[str] = "WARNING", hyperparameters: Optional[Dict[str, Any]] = None, ) -> None: @@ -50,6 +54,9 @@ def __init__( about dummy variables for imputed variables. original_predictors: Optional list of original predictor names before dummy encoding. + categorical_targets: Dictionary of categorical target info. + boolean_targets: Dictionary of boolean target info. + dummy_processor: Processor for handling dummy encoding in test data. hyperparameters: Optional dictionary of hyperparameters for the matching function, specified after tunning. """ @@ -64,19 +71,27 @@ def __init__( self.matching_hotdeck = matching_hotdeck self.donor_data = donor_data self.hyperparameters = hyperparameters + self.categorical_targets = categorical_targets or {} + self.boolean_targets = boolean_targets or {} + self.dummy_processor = dummy_processor @validate_call(config=VALIDATE_CONFIG) def _predict( - self, X_test: pd.DataFrame, quantiles: Optional[List[float]] = None + self, + X_test: pd.DataFrame, + quantiles: Optional[List[float]] = None, + return_probs: bool = False, ) -> Dict[float, pd.DataFrame]: """Predict imputed values using the matching model. Args: X_test: DataFrame containing the recipient data. quantiles: List of quantiles to predict. + return_probs: If True, return one-hot probability vectors for matched categories. Returns: Dictionary mapping quantiles to imputed values. + If return_probs=True, includes 'probabilities' key with one-hot encodings. Raises: ValueError: If model is not properly set up or @@ -128,10 +143,12 @@ def _predict( f"{len(self.donor_data)} donor records). Using chunking approach." ) return self._predict_chunked( - X_test_copy, quantiles, chunk_size + X_test_copy, quantiles, chunk_size, return_probs ) else: - return self._predict_single(X_test_copy, quantiles) + return self._predict_single( + X_test_copy, quantiles, return_probs + ) except ValueError as e: raise e @@ -143,6 +160,7 @@ def _predict_single( self, X_test_copy: pd.DataFrame, quantiles: Optional[List[float]] = None, + return_probs: bool = False, ) -> Dict[float, pd.DataFrame]: """Perform matching on the full dataset without chunking.""" try: @@ -168,13 +186,16 @@ def _predict_single( ) raise RuntimeError("Hot deck matching failed") from matching_error - return self._process_matching_results(fused0, X_test_copy, quantiles) + return self._process_matching_results( + fused0, X_test_copy, quantiles, return_probs + ) def _predict_chunked( self, X_test_copy: pd.DataFrame, quantiles: Optional[List[float]], chunk_size: int, + return_probs: bool = False, ) -> Dict[float, pd.DataFrame]: """Perform matching using chunking for large datasets.""" all_results = [] @@ -231,16 +252,65 @@ def _predict_chunked( combined_results = combined_results.loc[X_test_copy.index] return self._process_matching_results( - combined_results, X_test_copy, quantiles + combined_results, X_test_copy, quantiles, return_probs ) else: raise RuntimeError("No chunk results were produced") + def _generate_one_hot_probabilities( + self, + variable: str, + matched_values: np.ndarray, + index: pd.Index, + categorical_targets: Dict, + boolean_targets: Dict, + ) -> Optional[pd.DataFrame]: + """Generate one-hot probability matrix for categorical/boolean variables. + + Args: + variable: Name of the variable + matched_values: Array of matched category values + index: Index for the output DataFrame + categorical_targets: Dictionary of categorical target info + boolean_targets: Dictionary of boolean target info + + Returns: + DataFrame with one-hot encoded probabilities or None if not categorical + """ + if ( + variable not in categorical_targets + and variable not in boolean_targets + ): + return None + + # Determine categories + if variable in boolean_targets: + categories = [False, True] + else: + categories = categorical_targets[variable].get("categories", []) + + if not categories: + return None + + # Create probability matrix (one-hot encoding) + prob_df = pd.DataFrame( + 0.0, index=index, columns=[f"prob_{cat}" for cat in categories] + ) + + # Set 1.0 for matched category + for idx, val in enumerate(matched_values): + col_name = f"prob_{val}" + if col_name in prob_df.columns: + prob_df.iloc[idx, prob_df.columns.get_loc(col_name)] = 1.0 + + return prob_df + def _process_matching_results( self, fused0: pd.DataFrame, X_test_copy: pd.DataFrame, quantiles: Optional[List[float]], + return_probs: bool = False, ) -> Dict[float, pd.DataFrame]: """Process matching results into the expected output format.""" try: @@ -271,6 +341,11 @@ def _process_matching_results( # Create output dictionary with results imputations: Dict[float, pd.DataFrame] = {} + prob_results = {} if return_probs else None + + # Get target type information if available + categorical_targets = getattr(self, "categorical_targets", {}) + boolean_targets = getattr(self, "boolean_targets", {}) try: if quantiles: @@ -286,7 +361,24 @@ def _process_matching_results( ) imputed_df[variable] = fused0[variable].values + # Generate one-hot probabilities if requested + if return_probs and prob_results is not None: + prob_df = self._generate_one_hot_probabilities( + variable, + fused0[variable].values, + X_test_copy.index, + categorical_targets, + boolean_targets, + ) + if prob_df is not None: + prob_results[variable] = prob_df + imputations[q] = imputed_df + + # Add probabilities to results if requested + if return_probs and prob_results: + imputations["probabilities"] = prob_results + return imputations else: # If no quantiles specified, use a default one @@ -298,8 +390,28 @@ def _process_matching_results( for variable in self.imputed_variables: self.logger.info(f"Imputing variable {variable}") imputed_df[variable] = fused0[variable].values + + # Generate one-hot probabilities if requested + if return_probs and prob_results is not None: + prob_df = self._generate_one_hot_probabilities( + variable, + fused0[variable].values, + X_test_copy.index, + categorical_targets, + boolean_targets, + ) + if prob_df is not None: + prob_results[variable] = prob_df + imputations[q_default] = imputed_df + # Add probabilities to results if requested + if return_probs and prob_results: + return { + "imputations": imputations[q_default], + "probabilities": prob_results, + } + return imputations[q_default] except Exception as output_error: self.logger.error( @@ -352,6 +464,10 @@ def _fit( predictors: List[str], imputed_variables: List[str], original_predictors: Optional[List[str]] = None, + categorical_targets: Optional[Dict[str, Dict]] = None, + boolean_targets: Optional[Dict[str, Dict]] = None, + numeric_targets: Optional[List[str]] = None, + constant_targets: Optional[Dict[str, Dict]] = None, tune_hyperparameters: bool = False, **matching_kwargs: Any, ) -> MatchingResults: @@ -415,6 +531,9 @@ def _fit( imputed_variables=imputed_variables, imputed_vars_dummy_info=self.imputed_vars_dummy_info, original_predictors=self.original_predictors, + categorical_targets=categorical_targets, + boolean_targets=boolean_targets, + dummy_processor=getattr(self, "dummy_processor", None), seed=self.seed, log_level=self.log_level, hyperparameters=matching_kwargs, diff --git a/microimpute/models/ols.py b/microimpute/models/ols.py index 5ab7342..d202925 100644 --- a/microimpute/models/ols.py +++ b/microimpute/models/ols.py @@ -1,36 +1,223 @@ """Ordinary Least Squares regression model for imputation.""" -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional, Union import numpy as np import pandas as pd import statsmodels.api as sm from pydantic import validate_call from scipy.stats import norm +from sklearn.linear_model import LogisticRegression from microimpute.config import VALIDATE_CONFIG from microimpute.models.imputer import Imputer, ImputerResults +class _LogisticRegressionModel: + """Internal class to handle classification for categorical/boolean targets.""" + + def __init__(self, seed: int, logger): + self.seed = seed + self.logger = logger + self.classifier = None + self.output_column = None + self.var_type = None + self.categories = None + self.label_map = None + + def fit( + self, + X: pd.DataFrame, + y: pd.Series, + var_type: str, + categories: List = None, + **lr_kwargs: Any, + ) -> None: + """Fit logistic regression for categorical/boolean target. + + Note: y should be the ORIGINAL categorical/boolean column, + not dummy encoded. + """ + self.output_column = y.name + self.var_type = var_type + + if var_type == "boolean": + # For boolean, convert to 0/1 but keep as single target + y_encoded = y.astype(int) + self.categories = [False, True] + else: + # For categorical, create label encoding + self.categories = categories if categories else y.unique().tolist() + self.label_map = {cat: i for i, cat in enumerate(self.categories)} + y_encoded = y.map(self.label_map) + + # Check for unmapped values + if y_encoded.isna().any(): + self.logger.warning( + f"Found {y_encoded.isna().sum()} unmapped values in {self.output_column}" + ) + y_encoded = y_encoded.fillna(0) # Default to first category + + # Extract relevant LR parameters from kwargs + classifier_params = { + "penalty": lr_kwargs.get("penalty", "l2"), + "C": lr_kwargs.get("C", 1.0), + "max_iter": lr_kwargs.get("max_iter", 1000), + "solver": lr_kwargs.get( + "solver", "lbfgs" if len(self.categories) <= 2 else "saga" + ), + "multi_class": ( + "ovr" if len(self.categories) <= 2 else "multinomial" + ), + "random_state": self.seed, + } + + self.classifier = LogisticRegression(**classifier_params) + self.classifier.fit(X, y_encoded) + + def predict( + self, + X: pd.DataFrame, + return_probs: bool = False, + quantile: float = 0.5, + ) -> Union[pd.Series, pd.DataFrame]: + """Predict classes or probabilities. + + Args: + X: Input features + return_probs: If True, return probability distributions + quantile: For stochastic prediction, can influence decision threshold + """ + if return_probs: + probs = self.classifier.predict_proba(X) + # Return as DataFrame with proper column names + prob_df = pd.DataFrame( + probs, + columns=[f"prob_{cat}" for cat in self.categories], + index=X.index, + ) + return prob_df + else: + # For quantile-based prediction, we could adjust the threshold + # but for simplicity, using standard prediction + y_pred = self.classifier.predict(X) + + if self.var_type == "boolean": + predictions = pd.Series(y_pred.astype(bool), index=X.index) + else: + # Map back to original categories + reverse_map = {i: cat for cat, i in self.label_map.items()} + predictions = pd.Series(y_pred).map(reverse_map) + predictions.index = X.index + + predictions.name = self.output_column + return predictions + + +class _OLSModel: + """Internal class to handle OLS regression.""" + + def __init__(self, seed: int, logger): + self.seed = seed + self.logger = logger + self.model = None + self.output_column = None + + def fit(self, X: pd.DataFrame, y: pd.Series, **kwargs) -> None: + """Fit OLS model.""" + self.output_column = y.name + X_with_const = sm.add_constant(X) + self.model = sm.OLS(y, X_with_const).fit() + self.scale = self.model.scale + + def predict(self, X: pd.DataFrame) -> np.ndarray: + """Predict using OLS model.""" + X_with_const = sm.add_constant(X) + return self.model.predict(X_with_const) + + class OLSResults(ImputerResults): """ Fitted OLS instance ready for imputation. """ + def _predict_variable( + self, + model: Any, + variable: str, + X_test: pd.DataFrame, + quantile: float, + random_sample: bool, + return_probs: bool, + prob_results: Optional[Dict], + ) -> pd.Series: + """Predict a single variable using the appropriate model type. + + Args: + model: The model (_LogisticRegressionModel, _OLSModel, or _ConstantValueModel) + variable: Name of the variable to predict + X_test: Test data DataFrame + quantile: Quantile to predict + random_sample: Whether to use random sampling + return_probs: Whether to return probabilities + prob_results: Dictionary to store probability results + + Returns: + Series of predicted values + """ + # Import here to avoid circular import + from microimpute.models.imputer import _ConstantValueModel + + if isinstance(model, _ConstantValueModel): + # Constant model - just return the constant value + return model.predict(X_test) + elif isinstance(model, _LogisticRegressionModel): + # Classification for categorical/boolean targets + if return_probs and prob_results is not None: + # Get probabilities + probs = model.predict( + X_test[self.predictors], return_probs=True + ) + prob_results[variable] = probs + + # Get class predictions + imputed_values = model.predict( + X_test[self.predictors], return_probs=False, quantile=quantile + ) + else: + # Regression for numeric targets + X_test_with_const = sm.add_constant(X_test[self.predictors]) + mean_preds = model.predict(X_test_with_const) + se = np.sqrt(model.scale) + imputed_values = self._predict_quantile( + mean_preds=mean_preds, + se=se, + mean_quantile=quantile, + random_sample=random_sample, + ) + + return imputed_values + def __init__( self, - models: Dict[str, "OLS"], + models: Dict[ + str, Any + ], # Can be _OLSModel, _LogisticRegressionModel, or _ConstantValueModel predictors: List[str], imputed_variables: List[str], seed: int, imputed_vars_dummy_info: Optional[Dict[str, str]] = None, original_predictors: Optional[List[str]] = None, + categorical_targets: Optional[Dict[str, Dict]] = None, + boolean_targets: Optional[Dict[str, Dict]] = None, + constant_targets: Optional[Dict[str, Dict]] = None, + dummy_processor: Optional[Any] = None, log_level: Optional[str] = "WARNING", ) -> None: """Initialize the OLS results. Args: - model: Fitted OLS model. + models: Dictionary of fitted models (OLS or LogisticRegression) for each variable. predictors: List of predictor variable names. imputed_variables: List of imputed variable names. seed: Random seed for reproducibility. @@ -38,6 +225,9 @@ def __init__( about dummy variables for imputed variables. original_predictors: Optional list of original predictor variable names before dummy encoding. + categorical_targets: Dictionary of categorical target info. + boolean_targets: Dictionary of boolean target info. + dummy_processor: Processor for handling dummy encoding in test data. """ super().__init__( predictors, @@ -48,6 +238,10 @@ def __init__( log_level, ) self.models = models + self.categorical_targets = categorical_targets or {} + self.boolean_targets = boolean_targets or {} + self.constant_targets = constant_targets or {} + self.dummy_processor = dummy_processor @validate_call(config=VALIDATE_CONFIG) def _predict( @@ -55,6 +249,7 @@ def _predict( X_test: pd.DataFrame, quantiles: Optional[List[float]] = None, random_quantile_sample: Optional[bool] = False, + return_probs: bool = False, ) -> Dict[float, pd.DataFrame]: """Predict values at specified quantiles using the OLS model. @@ -62,9 +257,11 @@ def _predict( X_test: DataFrame containing the test data. quantiles: List of quantiles to predict. random_quantile_sample: If True, use random quantile sampling for prediction. + return_probs: If True, return probability distributions for categorical variables. Returns: Dictionary mapping quantiles to predicted values. + If return_probs=True, includes 'probabilities' key. Raises: RuntimeError: If prediction fails. @@ -72,8 +269,7 @@ def _predict( try: # Create output dictionary with results imputations: Dict[float, pd.DataFrame] = {} - - X_test_with_const = sm.add_constant(X_test[self.predictors]) + prob_results = {} if return_probs else None if quantiles: if random_quantile_sample: @@ -87,15 +283,21 @@ def _predict( imputed_df = pd.DataFrame() for variable in self.imputed_variables: model = self.models[variable] - mean_preds = model.predict(X_test_with_const) - se = np.sqrt(model.scale) - imputed_df[variable] = self._predict_quantile( - mean_preds=mean_preds, - se=se, - mean_quantile=q, - random_sample=random_quantile_sample, + imputed_df[variable] = self._predict_variable( + model, + variable, + X_test, + q, + random_quantile_sample, + return_probs, + prob_results, ) imputations[q] = pd.DataFrame(imputed_df) + + # Add probabilities to results if requested + if return_probs and prob_results: + imputations["probabilities"] = prob_results + return imputations else: q_default = 0.5 @@ -103,13 +305,14 @@ def _predict( for variable in self.imputed_variables: self.logger.info(f"Imputing variable {variable}") model = self.models[variable] - mean_preds = model.predict(X_test_with_const) - se = np.sqrt(model.scale) - imputed_df[variable] = self._predict_quantile( - mean_preds=mean_preds, - se=se, - mean_quantile=q_default, - random_sample=random_quantile_sample, + imputed_df[variable] = self._predict_variable( + model, + variable, + X_test, + q_default, + random_quantile_sample, + return_probs, + prob_results, ) imputations[q_default] = pd.DataFrame(imputed_df) return imputations[q_default] @@ -210,6 +413,11 @@ def _fit( predictors: List[str], imputed_variables: List[str], original_predictors: Optional[List[str]] = None, + categorical_targets: Optional[Dict[str, Dict]] = None, + boolean_targets: Optional[Dict[str, Dict]] = None, + numeric_targets: Optional[List[str]] = None, + constant_targets: Optional[Dict[str, Dict]] = None, + **kwargs: Any, ) -> OLSResults: """Fit the OLS model to the training data. @@ -230,20 +438,72 @@ def _fit( ) self.models = {} - X_with_const = sm.add_constant(X_train[predictors]) + + # Import the constant model from base imputer + from microimpute.models.imputer import _ConstantValueModel + for variable in imputed_variables: + # Handle constant targets separately + if variable in (constant_targets or {}): + constant_val = constant_targets[variable]["value"] + model = _ConstantValueModel(constant_val, variable) + self.models[variable] = model + self.logger.info( + f"Using constant value {constant_val} for variable {variable}" + ) + continue + Y = X_train[variable] - model = sm.OLS(Y, X_with_const).fit() - self.logger.info( - f"OLS model fitted successfully for the imputed variable {variable}, R-squared: {model.rsquared:.4f}" - ) + + # Choose appropriate model based on variable type + if variable in (categorical_targets or {}): + # Use logistic regression for categorical targets + model = _LogisticRegressionModel( + seed=self.seed, logger=self.logger + ) + model.fit( + X_train[predictors], + Y, + var_type=categorical_targets[variable]["type"], + categories=categorical_targets[variable].get( + "categories" + ), + **kwargs, + ) + self.logger.info( + f"Logistic regression fitted for categorical variable {variable}" + ) + elif variable in (boolean_targets or {}): + # Use logistic regression for boolean targets + model = _LogisticRegressionModel( + seed=self.seed, logger=self.logger + ) + model.fit( + X_train[predictors], Y, var_type="boolean", **kwargs + ) + self.logger.info( + f"Logistic regression fitted for boolean variable {variable}" + ) + else: + # Use OLS for numeric targets + model = _OLSModel(seed=self.seed, logger=self.logger) + model.fit(X_train[predictors], Y, **kwargs) + self.logger.info( + f"OLS regression fitted for numeric variable {variable}" + ) + self.models[variable] = model + return OLSResults( models=self.models, predictors=predictors, imputed_variables=imputed_variables, imputed_vars_dummy_info=self.imputed_vars_dummy_info, original_predictors=self.original_predictors, + categorical_targets=categorical_targets, + boolean_targets=boolean_targets, + constant_targets=constant_targets, + dummy_processor=getattr(self, "dummy_processor", None), seed=self.seed, log_level=self.log_level, ) diff --git a/microimpute/models/qrf.py b/microimpute/models/qrf.py index 2777e68..a27da93 100644 --- a/microimpute/models/qrf.py +++ b/microimpute/models/qrf.py @@ -8,6 +8,7 @@ import pandas as pd from pydantic import validate_call from quantile_forest import RandomForestQuantileRegressor +from sklearn.ensemble import RandomForestClassifier from microimpute.config import VALIDATE_CONFIG from microimpute.models.imputer import Imputer, ImputerResults @@ -38,6 +39,92 @@ def _get_sequential_predictors( return predictors + imputed_variables[:current_variable_index] +class _RandomForestClassifierModel: + """Internal class to handle classification for categorical/boolean targets.""" + + def __init__(self, seed: int, logger): + self.seed = seed + self.logger = logger + self.classifier = None + self.output_column = None + self.var_type = None + self.categories = None + self.label_map = None + + def fit( + self, + X: pd.DataFrame, + y: pd.Series, + var_type: str, + categories: List = None, + **rf_kwargs: Any, + ) -> None: + """Fit classifier for categorical/boolean target. + + Note: y should be the ORIGINAL categorical/boolean column, + not dummy encoded. + """ + self.output_column = y.name + self.var_type = var_type + + if var_type == "boolean": + # For boolean, convert to 0/1 but keep as single target + y_encoded = y.astype(int) + self.categories = [False, True] + else: + # For categorical, create label encoding + self.categories = categories if categories else y.unique().tolist() + self.label_map = {cat: i for i, cat in enumerate(self.categories)} + y_encoded = y.map(self.label_map) + + # Check for unmapped values + if y_encoded.isna().any(): + self.logger.warning( + f"Found {y_encoded.isna().sum()} unmapped values in {self.output_column}" + ) + y_encoded = y_encoded.fillna(0) # Default to first category + + # Extract relevant RF parameters from kwargs + classifier_params = { + "n_estimators": rf_kwargs.get("n_estimators", 100), + "max_depth": rf_kwargs.get("max_depth", None), + "min_samples_split": rf_kwargs.get("min_samples_split", 2), + "min_samples_leaf": rf_kwargs.get("min_samples_leaf", 1), + "max_features": rf_kwargs.get("max_features", "sqrt"), + "random_state": self.seed, + } + + self.classifier = RandomForestClassifier(**classifier_params) + self.classifier.fit(X, y_encoded) + + def predict( + self, X: pd.DataFrame, return_probs: bool = False + ) -> pd.Series: + """Predict classes or probabilities.""" + if return_probs: + probs = self.classifier.predict_proba(X) + # Return as DataFrame with proper column names + prob_df = pd.DataFrame( + probs, + columns=[f"prob_{cat}" for cat in self.categories], + index=X.index, + ) + return prob_df + else: + y_pred = self.classifier.predict(X) + + if self.var_type == "boolean": + predictions = pd.Series(y_pred.astype(bool), index=X.index) + else: + # Map back to original categories + reverse_map = {i: cat for cat, i in self.label_map.items()} + predictions = pd.Series(y_pred).map(reverse_map) + predictions.index = X.index + + predictions.name = self.output_column + return predictions + + class _QRFModel: """Internal class to handle QRF model with quantile prediction logic.""" @@ -103,18 +190,24 @@ class QRFResults(ImputerResults): def __init__( self, - models: Dict[str, _QRFModel], + models: Dict[ + str, Any + ], # Can be _QRFModel, _RandomForestClassifierModel, or _ConstantValueModel predictors: List[str], imputed_variables: List[str], seed: int, imputed_vars_dummy_info: Optional[Dict[str, str]] = None, original_predictors: Optional[List[str]] = None, + categorical_targets: Optional[Dict[str, Dict]] = None, + boolean_targets: Optional[Dict[str, Dict]] = None, + constant_targets: Optional[Dict[str, Dict]] = None, + dummy_processor: Optional[Any] = None, log_level: Optional[str] = "WARNING", ) -> None: """Initialize the QRF results. Args: - models: Dictionary of fitted QRF models for each variable. + models: Dictionary of fitted models (QRF or RF classifier) for each variable. predictors: List of column names used as predictors. imputed_variables: List of column names to be imputed. seed: Random seed for reproducibility. @@ -122,6 +215,9 @@ def __init__( about dummy variables for imputed variables. original_predictors: Optional list of original predictor variable names before dummy encoding. + categorical_targets: Dictionary of categorical target info. + boolean_targets: Dictionary of boolean target info. + dummy_processor: Processor for handling dummy encoding in test data. """ super().__init__( predictors, @@ -132,6 +228,10 @@ def __init__( log_level, ) self.models = models + self.categorical_targets = categorical_targets or {} + self.boolean_targets = boolean_targets or {} + self.constant_targets = constant_targets or {} + self.dummy_processor = dummy_processor @validate_call(config=VALIDATE_CONFIG) def _predict( @@ -139,6 +239,7 @@ def _predict( X_test: pd.DataFrame, quantiles: Optional[List[float]] = None, mean_quantile: Optional[float] = 0.5, + return_probs: bool = False, ) -> Dict[float, pd.DataFrame]: """Predict values at specified quantiles using the QRF model. @@ -148,9 +249,11 @@ def _predict( center of the beta distribution from which to sample when imputing each data point). mean_quantile: The mean quantile to used for prediction if quantiles are not provided. + return_probs: If True, return probability distributions for categorical variables. Returns: Dictionary mapping quantiles to predicted values. + If return_probs=True, includes 'probabilities' key. Raises: RuntimeError: If prediction fails. @@ -158,6 +261,7 @@ def _predict( try: # Create output dictionary with results imputations: Dict[float, pd.DataFrame] = {} + prob_results = {} if return_probs else None # Convert single mean_quantile to a list if quantiles not provided quantiles_to_process = quantiles if quantiles else [mean_quantile] @@ -175,6 +279,12 @@ def _predict( imputed_df = pd.DataFrame() # Create a copy of X_test that we'll augment with imputed values X_test_augmented = X_test.copy() + self.logger.debug( + f"X_test columns at start of _predict: {X_test_augmented.columns.tolist()}" + ) + + # Track dummy columns created from imputed categorical variables + imputed_dummy_cols = set() for i, variable in enumerate(self.imputed_variables): var_start_time = time.time() @@ -190,31 +300,82 @@ def _predict( var_predictors = _get_sequential_predictors( self.predictors, self.imputed_variables, i ) + self.logger.debug( + f"var_predictors for {variable}: {var_predictors}" + ) + self.logger.debug( + f"Available columns in X_test_augmented: {X_test_augmented.columns.tolist()}" + ) # Ensure we have all needed columns in X_test_augmented missing_cols = set(var_predictors) - set( X_test_augmented.columns ) if missing_cols: - self.logger.warning( - f"Missing columns for {variable}: {missing_cols}. " - "Using available columns only." - ) - var_predictors = [ - col - for col in var_predictors - if col in X_test_augmented.columns - ] + # Check if these are dummy columns from previously imputed categorical variables + imputed_missing = missing_cols & imputed_dummy_cols + + if imputed_missing: + self.logger.debug( + f"Adding zero-filled columns for missing categories " + f"from imputed variables: {imputed_missing}" + ) + # Add zeros for dummy columns from imputed categoricals + for col in imputed_missing: + X_test_augmented[col] = 0.0 + + # Any other missing columns will cause an error when we try to select them, + # which is the desired behavior to alert the user of missing predictors + + # Import constant model + from microimpute.models.imputer import _ConstantValueModel # Predict using the appropriate predictor set - imputed_values = model.predict( - X_test_augmented[var_predictors], mean_quantile=q - ) + if isinstance(model, _ConstantValueModel): + # Constant model - just return the constant value + imputed_values = model.predict(X_test_augmented) + elif isinstance(model, _RandomForestClassifierModel): + # Classification for categorical/boolean targets + if return_probs and prob_results is not None: + # Get probabilities + probs = model.predict( + X_test_augmented[var_predictors], + return_probs=True, + ) + prob_results[variable] = probs + + # Get class predictions + imputed_values = model.predict( + X_test_augmented[var_predictors], + return_probs=False, + ) + else: + # Regression for numeric targets + imputed_values = model.predict( + X_test_augmented[var_predictors], mean_quantile=q + ) + imputed_df[variable] = imputed_values # Add the imputed values to X_test_augmented for subsequent variables X_test_augmented[variable] = imputed_values + # If this is a categorical variable, track its dummy columns + # for future sequential imputation steps + if variable in self.categorical_targets: + # Track which dummy columns would be created for this variable + # using drop_first=True convention + unique_values = imputed_values.unique() + if len(unique_values) > 1: + # With drop_first=True, we create dummies for all but the first category + for val in sorted(unique_values)[1:]: + dummy_col = f"{variable}_{val}" + imputed_dummy_cols.add(dummy_col) + # Also create the actual dummy column if it will be used + X_test_augmented[dummy_col] = ( + imputed_values == val + ).astype(float) + # Log timing for individual variables when not processing multiple quantiles if not quantiles: var_time = time.time() - var_start_time @@ -228,7 +389,11 @@ def _predict( imputations[q] = imputed_df - qs = imputations.keys() + # Add probabilities to results if requested + if return_probs and prob_results: + imputations["probabilities"] = prob_results + + qs = [k for k in imputations.keys() if k != "probabilities"] if len(qs) < 2: q = list(qs)[0] @@ -282,6 +447,52 @@ def __init__( f"Batch processing enabled with batch_size={batch_size}" ) + def _create_model_for_variable(self, variable: str, **kwargs) -> Any: + """Create the appropriate model (classifier or regressor) based on variable type.""" + categorical_targets = getattr(self, "categorical_targets", {}) + boolean_targets = getattr(self, "boolean_targets", {}) + + if variable in categorical_targets: + # Use classifier for categorical targets + return _RandomForestClassifierModel( + seed=self.seed, logger=self.logger + ) + elif variable in boolean_targets: + # Use classifier for boolean targets + return _RandomForestClassifierModel( + seed=self.seed, logger=self.logger + ) + else: + # Use QRF for numeric targets + return _QRFModel(seed=self.seed, logger=self.logger) + + def _fit_model( + self, + model: Any, + X: pd.DataFrame, + y: pd.Series, + variable: str, + **kwargs, + ) -> None: + """Fit the model with appropriate parameters based on variable type.""" + categorical_targets = getattr(self, "categorical_targets", {}) + boolean_targets = getattr(self, "boolean_targets", {}) + + if isinstance(model, _RandomForestClassifierModel): + if variable in categorical_targets: + model.fit( + X, + y, + var_type=categorical_targets[variable]["type"], + categories=categorical_targets[variable].get("categories"), + **kwargs, + ) + elif variable in boolean_targets: + model.fit(X, y, var_type="boolean", **kwargs) + else: + # Regular QRF fit + model.fit(X, y, **kwargs) + def _get_memory_usage_info(self) -> str: """Get formatted memory usage information.""" if PSUTIL_AVAILABLE: @@ -297,6 +508,10 @@ def _fit( predictors: List[str], imputed_variables: List[str], original_predictors: Optional[List[str]] = None, + categorical_targets: Optional[Dict[str, Dict]] = None, + boolean_targets: Optional[Dict[str, Dict]] = None, + numeric_targets: Optional[List[str]] = None, + constant_targets: Optional[Dict[str, Dict]] = None, tune_hyperparameters: bool = False, **qrf_kwargs: Any, ) -> QRFResults: @@ -355,6 +570,7 @@ def _fit( imputed_variables, batch_variables, qrf_kwargs, + constant_targets, ) # Memory cleanup after each batch @@ -365,9 +581,27 @@ def _fit( ) else: # Process all variables sequentially + # Import constant model + from microimpute.models.imputer import ( + _ConstantValueModel, + ) + for i, variable in enumerate(imputed_variables): var_start_time = time.time() + # Handle constant targets + if variable in (constant_targets or {}): + constant_val = constant_targets[variable][ + "value" + ] + self.models[variable] = _ConstantValueModel( + constant_val, variable + ) + self.logger.info( + f"Using constant value {constant_val} for variable {variable}" + ) + continue + # Build predictor set: original predictors + previously imputed variables current_predictors = _get_sequential_predictors( predictors, imputed_variables, i @@ -384,17 +618,17 @@ def _fit( f" Memory usage: {self._get_memory_usage_info()}" ) - # Create and fit model - model = _QRFModel( - seed=self.seed, logger=self.logger + # Create appropriate model based on variable type + model = self._create_model_for_variable(variable) + self._fit_model( + model, + X_train[current_predictors], + X_train[variable], + variable, + **qrf_kwargs, ) try: - model.fit( - X_train[current_predictors], - X_train[variable], - **qrf_kwargs, - ) # Log post-imputation information var_time = time.time() - var_start_time @@ -403,10 +637,18 @@ def _fit( ) # Get model complexity metrics if available - if hasattr(model.qrf, "n_estimators"): + if hasattr(model, "qrf") and hasattr( + model.qrf, "n_estimators" + ): self.logger.info( f" Model complexity: {model.qrf.n_estimators} trees" ) + elif hasattr(model, "classifier") and hasattr( + model.classifier, "n_estimators" + ): + self.logger.info( + f" Model complexity: {model.classifier.n_estimators} trees (classifier)" + ) self.models[variable] = model @@ -433,6 +675,12 @@ def _fit( imputed_variables=imputed_variables, imputed_vars_dummy_info=self.imputed_vars_dummy_info, original_predictors=self.original_predictors, + categorical_targets=categorical_targets, + boolean_targets=boolean_targets, + constant_targets=constant_targets, + dummy_processor=getattr( + self, "dummy_processor", None + ), seed=self.seed, ), qrf_kwargs, @@ -482,6 +730,7 @@ def _fit( imputed_variables, batch_variables, qrf_kwargs, + constant_targets, ) # Memory cleanup after each batch @@ -492,10 +741,24 @@ def _fit( ) else: # Process all variables sequentially + # Import constant model + from microimpute.models.imputer import _ConstantValueModel + # Initialize and fit a QRF model for each variable for i, variable in enumerate(imputed_variables): var_start_time = time.time() + # Handle constant targets + if variable in (constant_targets or {}): + constant_val = constant_targets[variable]["value"] + self.models[variable] = _ConstantValueModel( + constant_val, variable + ) + self.logger.info( + f"Using constant value {constant_val} for variable {variable}" + ) + continue + # Build predictor set: original predictors + previously imputed variables current_predictors = _get_sequential_predictors( predictors, imputed_variables, i @@ -513,12 +776,14 @@ def _fit( ) # Create and fit model - model = _QRFModel(seed=self.seed, logger=self.logger) + model = self._create_model_for_variable(variable) try: - model.fit( + self._fit_model( + model, X_train[current_predictors], X_train[variable], + variable, **qrf_kwargs, ) @@ -529,10 +794,18 @@ def _fit( ) # Get model complexity metrics if available - if hasattr(model.qrf, "n_estimators"): + if hasattr(model, "qrf") and hasattr( + model.qrf, "n_estimators" + ): self.logger.info( f" Model complexity: {model.qrf.n_estimators} trees" ) + elif hasattr(model, "classifier") and hasattr( + model.classifier, "n_estimators" + ): + self.logger.info( + f" Model complexity: {model.classifier.n_estimators} trees (classifier)" + ) self.models[variable] = model @@ -566,6 +839,10 @@ def _fit( imputed_variables=imputed_variables, imputed_vars_dummy_info=self.imputed_vars_dummy_info, original_predictors=self.original_predictors, + categorical_targets=categorical_targets, + boolean_targets=boolean_targets, + constant_targets=constant_targets, + dummy_processor=getattr(self, "dummy_processor", None), seed=self.seed, log_level=self.log_level, ) @@ -580,6 +857,7 @@ def _fit_variable_batch( imputed_variables: List[str], batch_variables: List[str], qrf_kwargs: Dict[str, Any], + constant_targets: Optional[Dict[str, Dict]] = None, ) -> None: """Fit models for a batch of variables. @@ -590,10 +868,24 @@ def _fit_variable_batch( batch_variables: Variables in current batch qrf_kwargs: QRF model parameters """ + # Import constant model + from microimpute.models.imputer import _ConstantValueModel + for variable in batch_variables: var_start_time = time.time() i = imputed_variables.index(variable) + # Handle constant targets + if variable in (constant_targets or {}): + constant_val = constant_targets[variable]["value"] + self.models[variable] = _ConstantValueModel( + constant_val, variable + ) + self.logger.info( + f"Using constant value {constant_val} for variable {variable}" + ) + continue + # Build predictor set: original predictors + previously imputed variables current_predictors = _get_sequential_predictors( predictors, imputed_variables, i @@ -612,12 +904,14 @@ def _fit_variable_batch( # Create and fit model # Note: X_train is already preprocessed by base class - model = _QRFModel(seed=self.seed, logger=self.logger) + model = self._create_model_for_variable(variable) try: - model.fit( + self._fit_model( + model, X_train[current_predictors], X_train[variable], + variable, **qrf_kwargs, ) @@ -708,10 +1002,12 @@ def objective(trial: optuna.Trial) -> float: # Create and fit QRF model with trial parameters # Note: X_train_augmented is already preprocessed by base class - model = _QRFModel(seed=self.seed, logger=self.logger) - model.fit( + model = self._create_model_for_variable(var) + self._fit_model( + model, X_train_augmented[current_predictors], X_train[var], + var, **params, ) diff --git a/microimpute/models/quantreg.py b/microimpute/models/quantreg.py index c85c4a4..1d701e9 100644 --- a/microimpute/models/quantreg.py +++ b/microimpute/models/quantreg.py @@ -30,6 +30,8 @@ def __init__( original_predictors: Optional[List[str]] = None, log_level: Optional[str] = "WARNING", quantiles_specified: bool = False, + boolean_targets: Optional[Dict[str, Dict]] = None, + constant_targets: Optional[Dict[str, Dict]] = None, ) -> None: """Initialize the QuantReg results. @@ -43,6 +45,7 @@ def __init__( original_predictors: Optional list of original predictor variable names before dummy encoding. quantiles_specified: Whether quantiles were explicitly specified during fit. + boolean_targets: Dictionary of boolean target info for conversion back to bool. """ super().__init__( predictors, @@ -54,6 +57,8 @@ def __init__( ) self.models = models self.quantiles_specified = quantiles_specified + self.boolean_targets = boolean_targets or {} + self.constant_targets = constant_targets or {} @validate_call(config=VALIDATE_CONFIG) def _predict( @@ -61,6 +66,7 @@ def _predict( X_test: pd.DataFrame, quantiles: Optional[List[float]] = None, random_quantile_sample: Optional[bool] = False, + return_probs: bool = False, ) -> Dict[float, pd.DataFrame]: """Predict values at specified quantiles using the Quantile Regression model. @@ -69,6 +75,7 @@ def _predict( quantiles: List of quantiles to predict. If None, uses the quantiles from training. random_quantile_sample: If True, use random quantile sampling for prediction. + return_probs: Ignored for QuantReg (included for API consistency). Returns: Dictionary mapping quantiles to predicted values. @@ -77,6 +84,11 @@ def _predict( ValueError: If a requested quantile was not fitted during training. RuntimeError: If prediction fails. """ + # Log warning if return_probs is used with QuantReg + if return_probs: + self.logger.warning( + "return_probs parameter will be ignored by QuantReg, as QuantReg only supports numeric targets." + ) try: # Create output dictionary with results imputations: Dict[float, pd.DataFrame] = {} @@ -93,21 +105,43 @@ def _predict( imputed_df = pd.DataFrame() self.logger.info(f"Predicting with model for q={q}") for variable in self.imputed_variables: - try: - if q not in self.models[variable]: - error_msg = f"Model for quantile {q} not fitted. Available quantiles: {list(self.models.keys())}" - self.logger.error(error_msg) - raise ValueError(error_msg) - except Exception as quantile_error: - self.logger.error( - f"Error accessing quantiles: {str(quantile_error)}" - ) - raise RuntimeError( - f"Failed to access {q} quantile for prediction" - ) from quantile_error + # Import constant model + from microimpute.models.imputer import ( + _ConstantValueModel, + ) + + # Check if this is a constant target + # For constant targets, use any available quantile since value is the same + if variable in self.constant_targets: + # Get the constant model from any quantile (they're all the same) + available_q = list(self.models[variable].keys())[0] + model = self.models[variable][available_q] + predictions = model.predict(X_test) + else: + # Regular variable - check quantile exists + try: + if q not in self.models[variable]: + error_msg = f"Model for quantile {q} not fitted. Available quantiles: {list(self.models[variable].keys())}" + self.logger.error(error_msg) + raise ValueError(error_msg) + except Exception as quantile_error: + self.logger.error( + f"Error accessing quantiles: {str(quantile_error)}" + ) + raise RuntimeError( + f"Failed to access {q} quantile for prediction" + ) from quantile_error - model = self.models[variable][q] - imputed_df[variable] = model.predict(X_test_with_const) + model = self.models[variable][q] + if isinstance(model, _ConstantValueModel): + # This shouldn't happen as we handle constant targets above + predictions = model.predict(X_test) + else: + predictions = model.predict(X_test_with_const) + # Convert to boolean if this was a boolean target + if variable in self.boolean_targets: + predictions = predictions > 0.5 + imputed_df[variable] = predictions imputations[q] = imputed_df else: quantiles = list(self.models[self.imputed_variables[0]].keys()) @@ -122,10 +156,32 @@ def _predict( for q in quantiles: imputed_df = pd.DataFrame() for variable in self.imputed_variables: - model = self.models[variable][q] - imputed_df[variable] = model.predict( - X_test_with_const + # Import constant model + from microimpute.models.imputer import ( + _ConstantValueModel, ) + + # Check if this is a constant target + if variable in self.constant_targets: + # Get the constant model from any quantile + available_q = list( + self.models[variable].keys() + )[0] + model = self.models[variable][available_q] + predictions = model.predict(X_test) + else: + model = self.models[variable][q] + if isinstance(model, _ConstantValueModel): + # Constant model - just return the constant value + predictions = model.predict(X_test) + else: + predictions = model.predict( + X_test_with_const + ) + # Convert to boolean if this was a boolean target + if variable in self.boolean_targets: + predictions = predictions > 0.5 + imputed_df[variable] = predictions random_q_imputations[q] = imputed_df # Create a final dataframe to hold the random quantile imputed values @@ -158,10 +214,32 @@ def _predict( self.logger.info(f"Predicting with model for q={q}") imputed_df = pd.DataFrame() for variable in self.imputed_variables: - model = self.models[variable][q] - imputed_df[variable] = model.predict( - X_test_with_const + # Import constant model + from microimpute.models.imputer import ( + _ConstantValueModel, ) + + # Check if this is a constant target + if variable in self.constant_targets: + # Get the constant model from any quantile + available_q = list( + self.models[variable].keys() + )[0] + model = self.models[variable][available_q] + predictions = model.predict(X_test) + else: + model = self.models[variable][q] + if isinstance(model, _ConstantValueModel): + # Constant model - just return the constant value + predictions = model.predict(X_test) + else: + predictions = model.predict( + X_test_with_const + ) + # Convert to boolean if this was a boolean target + if variable in self.boolean_targets: + predictions = predictions > 0.5 + imputed_df[variable] = predictions imputations[q] = imputed_df self.logger.info( @@ -210,6 +288,10 @@ def _fit( predictors: List[str], imputed_variables: List[str], original_predictors: Optional[List[str]] = None, + categorical_targets: Optional[Dict[str, Dict]] = None, + boolean_targets: Optional[Dict[str, Dict]] = None, + numeric_targets: Optional[List[str]] = None, + constant_targets: Optional[Dict[str, Dict]] = None, quantiles: Optional[List[float]] = None, ) -> QuantRegResults: """Fit the Quantile Regression model to the training data. @@ -227,6 +309,25 @@ def _fit( ValueError: If any quantile is outside the [0, 1] range. RuntimeError: If model fitting fails. """ + # Check for unsupported categorical targets + if categorical_targets: + unsupported = list(categorical_targets.keys()) + error_msg = ( + f"QuantReg does not support categorical imputation targets: {unsupported}. " + f"Use QRF, OLS, or Matching models instead for categorical variables. " + f"QuantReg can only handle numeric and boolean targets." + ) + self.logger.error(error_msg) + raise ValueError(error_msg) + + # Warn about boolean targets being treated as numeric + if boolean_targets: + boolean_vars = list(boolean_targets.keys()) + self.logger.warning( + f"Boolean targets will be treated as numeric [0,1]: {boolean_vars}. " + f"Values will be thresholded at 0.5 during prediction." + ) + try: for variable in imputed_variables: self.models[variable] = {} @@ -247,11 +348,28 @@ def _fit( f"Prepared training data with {len(X_train)} samples, {len(predictors)} predictors" ) + # Import constant model + from microimpute.models.imputer import _ConstantValueModel + if quantiles: for q in quantiles: self.logger.info(f"Fitting quantile regression for q={q}") for variable in imputed_variables: + # Handle constant targets + if variable in (constant_targets or {}): + constant_val = constant_targets[variable]["value"] + self.models[variable][q] = _ConstantValueModel( + constant_val, variable + ) + self.logger.info( + f"Using constant value {constant_val} for variable {variable}" + ) + continue + Y = X_train[variable] + # Convert boolean to numeric for regression + if variable in (boolean_targets or {}): + Y = Y.astype(float) self.models[variable][q] = sm.QuantReg( Y, X_with_const ).fit(q=q) @@ -264,7 +382,21 @@ def _fit( ) for variable in imputed_variables: self.logger.info(f"Imputing variable {variable}") + # Handle constant targets + if variable in (constant_targets or {}): + constant_val = constant_targets[variable]["value"] + self.models[variable][q] = _ConstantValueModel( + constant_val, variable + ) + self.logger.info( + f"Using constant value {constant_val} for variable {variable}" + ) + continue + Y = X_train[variable] + # Convert boolean to numeric for regression + if variable in (boolean_targets or {}): + Y = Y.astype(float) self.models[variable][q] = sm.QuantReg( Y, X_with_const ).fit(q=q) @@ -280,6 +412,8 @@ def _fit( seed=self.seed, log_level=self.log_level, quantiles_specified=(quantiles is not None), + boolean_targets=boolean_targets, + constant_targets=constant_targets, ) except Exception as e: self.logger.error(f"Error fitting QuantReg model: {str(e)}") diff --git a/tests/test_models/test_imputers.py b/tests/test_models/test_imputers.py index 43440c1..035a12d 100644 --- a/tests/test_models/test_imputers.py +++ b/tests/test_models/test_imputers.py @@ -68,11 +68,13 @@ def data_with_edge_cases() -> pd.DataFrame: # Define all imputer model classes to test ALL_IMPUTER_MODELS = [OLS, QuantReg, QRF] +CATEGORICAL_MODELS = [OLS, QRF] try: from microimpute.models.matching import Matching ALL_IMPUTER_MODELS.append(Matching) + CATEGORICAL_MODELS.append(Matching) except ImportError: pass @@ -206,19 +208,46 @@ def test_boolean_variables(model_class: Type[Imputer]) -> None: @pytest.mark.parametrize( "model_class", ALL_IMPUTER_MODELS, ids=lambda cls: cls.__name__ ) -def test_imputation_categorical_bool_targets( +def test_imputation_bool_targets( model_class: Type[Imputer], ) -> None: - """Test imputing categorical and boolean target variables.""" + """Test imputing boolean target variables.""" diabetes = load_diabetes() df = pd.DataFrame(diabetes.data, columns=diabetes.feature_names) - # Add random boolean and categorical targets + # Add random boolean targets df["bool"] = np.random.choice([True, False], size=len(df)) + + predictors = ["age", "sex", "bmi", "bp"] + imputed_variables = ["bool", "s1"] + + X_train, X_test = preprocess_data(df) + + model = model_class() + fitted_model = model.fit(X_train, predictors, imputed_variables) + predictions = fitted_model.predict(X_test) + + # Default behavior returns DataFrame directly + assert isinstance(predictions, pd.DataFrame) + assert predictions["bool"].dtype == "bool" + assert not predictions["s1"].isna().any() + + +@pytest.mark.parametrize( + "model_class", CATEGORICAL_MODELS, ids=lambda cls: cls.__name__ +) +def test_imputation_categorical_targets( + model_class: Type[Imputer], +) -> None: + """Test imputing categorical target variables.""" + diabetes = load_diabetes() + df = pd.DataFrame(diabetes.data, columns=diabetes.feature_names) + + # Add random categorical targets df["categorical"] = np.random.choice(["one", "two", "three"], size=len(df)) predictors = ["age", "sex", "bmi", "bp"] - imputed_variables = ["categorical", "bool"] + imputed_variables = ["categorical"] X_train, X_test = preprocess_data(df) @@ -229,7 +258,38 @@ def test_imputation_categorical_bool_targets( # Default behavior returns DataFrame directly assert isinstance(predictions, pd.DataFrame) assert predictions["categorical"].dtype == "object" - assert predictions["bool"].dtype == "bool" + + # Test probability predictions for models that support it + if model_class.__name__ in ["OLS", "QRF", "Matching"]: + # Get predictions with probabilities using quantiles + # (this ensures consistent return format across models) + predictions_with_probs = fitted_model.predict( + X_test, quantiles=[0.5], return_probs=True + ) + assert isinstance(predictions_with_probs, dict) + assert 0.5 in predictions_with_probs + assert "probabilities" in predictions_with_probs + + # Check that we still get the categorical predictions + assert isinstance(predictions_with_probs[0.5], pd.DataFrame) + assert predictions_with_probs[0.5]["categorical"].dtype == "object" + + # Check probability format + probs = predictions_with_probs["probabilities"]["categorical"] + assert isinstance(probs, pd.DataFrame) + + # Should have columns for each category + expected_cols = ["prob_one", "prob_two", "prob_three"] + for col in expected_cols: + assert col in probs.columns + + # Probabilities should sum to 1 for each row (within tolerance) + row_sums = probs.sum(axis=1) + assert np.allclose(row_sums, 1.0, atol=1e-6) + + # All probabilities should be between 0 and 1 + assert (probs >= 0.0).all().all() + assert (probs <= 1.0).all().all() # === Edge Cases and Error Handling === @@ -326,6 +386,33 @@ def test_constant_predictor(model_class: Type[Imputer]) -> None: assert not predictions[0.5]["y"].isna().any() +@pytest.mark.parametrize( + "model_class", ALL_IMPUTER_MODELS, ids=lambda cls: cls.__name__ +) +def test_constant_target(model_class: Type[Imputer]) -> None: + """Test models with a constant target variable.""" + np.random.seed(42) + + data = pd.DataFrame( + { + "x1": np.random.randn(100), + "x2": np.random.randn(100), + "y": np.ones(100) * 100, # Constant target + } + ) + + X_train, X_test = preprocess_data(data) + + model = model_class() + fitted_model = model.fit(X_train, ["x1", "x2"], ["y"]) + + predictions = fitted_model.predict(X_test, quantiles=[0.1, 0.5, 0.9]) + + # All predictions should be close to 100 (the constant value) + for q in [0.1, 0.5, 0.9]: + assert np.allclose(predictions[q]["y"].values, 100.0, rtol=0.1) + + @pytest.mark.parametrize( "model_class", ALL_IMPUTER_MODELS, ids=lambda cls: cls.__name__ ) diff --git a/tests/test_models/test_ols.py b/tests/test_models/test_ols.py index eaa7793..e7ab456 100644 --- a/tests/test_models/test_ols.py +++ b/tests/test_models/test_ols.py @@ -145,30 +145,6 @@ def test_ols_perfect_collinearity() -> None: assert not predictions[0.5]["y"].isna().any() -def test_ols_constant_target() -> None: - """Test OLS with a constant target variable.""" - np.random.seed(42) - - data = pd.DataFrame( - { - "x1": np.random.randn(100), - "x2": np.random.randn(100), - "y": np.ones(100), # Constant target - } - ) - - X_train, X_test = preprocess_data(data) - - model = OLS() - fitted_model = model.fit(X_train, ["x1", "x2"], ["y"]) - - predictions = fitted_model.predict(X_test, quantiles=[0.1, 0.5, 0.9]) - - # All predictions should be close to 1 (the constant value) - for q in [0.1, 0.5, 0.9]: - assert np.allclose(predictions[q]["y"].values, 1.0, rtol=0.1) - - # === Cross-Validation Test === diff --git a/tests/test_models/test_qrf.py b/tests/test_models/test_qrf.py index a067a38..9b69ce0 100644 --- a/tests/test_models/test_qrf.py +++ b/tests/test_models/test_qrf.py @@ -621,7 +621,7 @@ def test_qrf_error_handling() -> None: try: predictions = fitted_model.predict(test_data) except Exception as e: - assert "preprocess data" in str(e).lower() + assert "none of" in str(e).lower() and "are in the" in str(e).lower() # === Internal Model Tests === diff --git a/tests/test_quantile_comparison.py b/tests/test_quantile_comparison.py index 863d7f6..8d2d6e6 100644 --- a/tests/test_quantile_comparison.py +++ b/tests/test_quantile_comparison.py @@ -242,8 +242,8 @@ def test_perfect_predictions() -> None: X_test = pd.DataFrame( { - "x": [10, 20, 30], - "y": [10, 20, 30], # Perfect match + "x": [12, 25, 100], + "y": [12, 25, 100], # Perfect match } )