Skip to content
122 changes: 56 additions & 66 deletions src/midst_toolkit/models/clavaddpm/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from midst_toolkit.models.clavaddpm.enumerations import (
ClusteringMethod,
Configs,
DataAndKeyNormalizationType,
GroupLengthsProbDicts,
KeyScalingType,
RelationOrder,
Tables,
)
Expand Down Expand Up @@ -163,6 +163,7 @@ def _pair_clustering(
num_clusters: int,
parent_scale: float,
key_scale: float,
data_and_key_normalization: DataAndKeyNormalizationType = DataAndKeyNormalizationType.MINMAX,
clustering_method: ClusteringMethod = ClusteringMethod.KMEANS,
) -> tuple[pd.DataFrame, pd.DataFrame, dict[int, dict[int, float]]]:
"""
Expand All @@ -175,11 +176,12 @@ def _pair_clustering(
parent_name: Name of the parent table.
child_name: Name of the child table.
num_clusters: Number of clusters.
parent_scale: Scaling factor applied to the parent table, provided by the config.
It will be applied to the features to weight their importance during clustering.
key_scale: Scaling factor applied to the foreign key values that link
the child table to the parent table. This will weight how much influence
the parent-child relationship has in the clustering algorithm.
parent_scale: Scaling factor applied to the parent table, provided by the config. It will be applied to the
features to weight their importance during clustering.
key_scale: Scaling factor applied to the foreign key values that link the child table to the parent table.
This will weight how much influence the parent-child relationship has in the clustering algorithm.
data_and_key_normalization: Type of normalization for the child and parent data and keys. Default is
``DataAndKeyNormalizationType.MINMAX.``
clustering_method: Method of clustering. Default is ClusteringMethod.KMEANS.

Returns:
Expand Down Expand Up @@ -226,6 +228,7 @@ def _pair_clustering(
parent_primary_key,
parent_scale,
key_scale,
data_and_key_normalization,
)

cluster_labels = _get_cluster_labels(cluster_data, clustering_method, num_clusters)
Expand Down Expand Up @@ -333,35 +336,42 @@ def _merge_parent_data_with_child_data(
return merged_parent_data


def _get_min_max_and_quantile_for_numerical_columns(
def get_normalized_numerical_columns(
child_numerical_data: np.ndarray,
parent_numerical_data: np.ndarray,
parent_scale: float,
) -> tuple[np.ndarray, np.ndarray]:
normalization_method: DataAndKeyNormalizationType = DataAndKeyNormalizationType.MINMAX,
) -> np.ndarray:
"""
Get the min-max and quantile values for the numerical columns in both the
child and parent data.
The child and parent table numerical data are merged and then normalized together according to the normalization
scheme specified by ``normalization_method``. After normalization, data in the parent numerical data is scaled
by the ``parent_scale`` float.

Args:
child_numerical_data: Numpy array of the child numerical data.
parent_numerical_data: Numpy array of the parent numerical data.
parent_scale: Scaling factor applied to the parent data.
child_numerical_data: Numpy array of the child table numerical data.
parent_numerical_data: Numpy array of the parent table numerical data.
parent_scale: Scaling factor applied to the parent data AFTER normalization.
normalization_method: The approach to be used to normalized the combined data. Defaults to
DataAndKeyNormalizationType.MINMAX.

Returns:
A tuple with two numpy arrays, one with the min-max values and one with the quantile
values for the numerical columns.
A numpy array containing the merged child and parent table (in that order) numerical data, normalized using
the specified strategy and while child data scaled by the provided ``parent_scale``
"""
joint_matrix = np.concatenate([child_numerical_data, parent_numerical_data], axis=1)
matrix_p_index = child_numerical_data.shape[1]
parent_start_index = child_numerical_data.shape[1]

# Perform quantile normalization using QuantileTransformer
numerical_quantile = _quantile_normalize_sklearn(joint_matrix)
numerical_min_max = _min_max_normalize_sklearn(joint_matrix)
if normalization_method == DataAndKeyNormalizationType.MINMAX:
normalized_data = _min_max_normalize_sklearn(joint_matrix)
elif normalization_method == DataAndKeyNormalizationType.QUANTILE:
normalized_data = _quantile_normalize_sklearn(joint_matrix)
else:
raise ValueError(f"Unrecognized Normalization Method: {normalization_method}")

numerical_quantile[:, matrix_p_index:] = parent_scale * numerical_quantile[:, matrix_p_index:]
numerical_min_max[:, matrix_p_index:] = parent_scale * numerical_min_max[:, matrix_p_index:]
# Scale the parent data using the parent scale value
normalized_data[:, parent_start_index:] = parent_scale * normalized_data[:, parent_start_index:]

return numerical_min_max, numerical_quantile
return normalized_data


def _one_hot_encode_categorical_columns(
Expand Down Expand Up @@ -422,28 +432,28 @@ def _prepare_cluster_data(
parent_primary_key: str,
parent_scale: float,
key_scale: float,
key_scaling_type: KeyScalingType = KeyScalingType.MINMAX,
data_and_key_normalization: DataAndKeyNormalizationType = DataAndKeyNormalizationType.MINMAX,
) -> np.ndarray:
"""
Prepare the data for the clustering algorithm, which comprises of merging the parent
and child data, splitting the data into categorical and numerical columns, and
normalizing the data.
Prepare the data for the clustering algorithm, which comprises of merging the parent and child data, splitting
the data into categorical and numerical columns, and normalizing the data.

Args:
child_data: Numpy array of the child data.
parent_data: Numpy array of the parent data.
child_domain: Dictionary of the domain of the child table. The domain dictionary
holds metadata about the columns of each one of the tables.
parent_domain: Dictionary of the domain of the parent table. The domain dictionary
holds metadata about the columns of each one of the tables.
child_domain: Dictionary of the domain of the child table. The domain dictionary holds metadata about the
columns of each one of the tables.
parent_domain: Dictionary of the domain of the parent table. The domain dictionary holds metadata about the
columns of each one of the tables.
all_child_columns: List of all child columns.
all_parent_columns: List of all parent columns.
parent_primary_key: Name of the parent primary key.
parent_scale: Scaling factor applied to the parent table, provided by the config.
It will be applied to the features to weight their importance during clustering.
key_scale: Scaling factor applied to the tables' keys. This will weight how much influence
the parent-child relationship has in the clustering algorithm.
key_scaling_type: Type of scaling for the tables' keys. Default is KeyScalingType.MINMAX.
parent_scale: Scaling factor applied to the parent table, provided by the config. It will be applied to the
features to weight their importance during clustering.
key_scale: Scaling factor applied to the tables' keys. This will weight how much influence the parent-child
relationship has in the clustering algorithm.
data_and_key_normalization: Type of normalization for the child and parent data and keys. Default is
``DataAndKeyNormalizationType.MINMAX.``

Returns:
Numpy array of the data prepared for the clustering algorithm.
Expand Down Expand Up @@ -475,21 +485,21 @@ def _prepare_cluster_data(
parent_numerical_data = merged_data[:, parent_numerical_columns]
parent_categorical_data = merged_data[:, parent_categorical_columns]

numerical_min_max, numerical_quantile = _get_min_max_and_quantile_for_numerical_columns(
numerical_normalized = get_normalized_numerical_columns(
child_numerical_data,
parent_numerical_data,
parent_scale,
data_and_key_normalization,
)

reshaped_parent_data = merged_data[:, parent_primary_key_index].reshape(-1, 1)
if key_scaling_type == KeyScalingType.MINMAX:
key_normalized = _min_max_normalize_sklearn(reshaped_parent_data)
numerical_normalized = numerical_min_max
elif key_scaling_type == KeyScalingType.QUANTILE:
key_normalized = _quantile_normalize_sklearn(reshaped_parent_data)
numerical_normalized = numerical_quantile
# Normalizing the parent table primary key data.
reshaped_parent_primary_key_data = merged_data[:, parent_primary_key_index].reshape(-1, 1)
if data_and_key_normalization == DataAndKeyNormalizationType.MINMAX:
key_normalized = _min_max_normalize_sklearn(reshaped_parent_primary_key_data)
elif data_and_key_normalization == DataAndKeyNormalizationType.QUANTILE:
key_normalized = _quantile_normalize_sklearn(reshaped_parent_primary_key_data)
else:
raise ValueError(f"Unsupported foreign key scaling type: {key_scaling_type}")
raise ValueError(f"Unsupported data and key normalization type: {data_and_key_normalization}")

key_scaled = key_scale * key_normalized

Expand Down Expand Up @@ -727,9 +737,6 @@ def _parse_numpy_number_as_int(number: np.number) -> int:
raise ValueError(f"Number is not a number: {item}")


# TODO: Refactor the functions below to be a single one with a "method" parameter.


def _quantile_normalize_sklearn(matrix: np.ndarray) -> np.ndarray:
"""
Quantile normalize the input matrix using Sklearn's QuantileTransformer.
Expand All @@ -745,15 +752,7 @@ def _quantile_normalize_sklearn(matrix: np.ndarray) -> np.ndarray:
random_state=42, # TODO: do we really need to hardcode the random state?
) # Change output_distribution as needed

normalized_data = np.empty((matrix.shape[0], 0))

# Apply QuantileTransformer to each column and concatenate the results
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Literally no idea why this was being done per column...

for col in range(matrix.shape[1]):
column = matrix[:, col].reshape(-1, 1)
transformed_column = transformer.fit_transform(column)
normalized_data = np.concatenate((normalized_data, transformed_column), axis=1)

return normalized_data
return transformer.fit_transform(matrix)


def _min_max_normalize_sklearn(matrix: np.ndarray) -> np.ndarray:
Expand All @@ -767,16 +766,7 @@ def _min_max_normalize_sklearn(matrix: np.ndarray) -> np.ndarray:
Numpy array of the normalized data.
"""
scaler = MinMaxScaler(feature_range=(-1, 1))

normalized_data = np.empty((matrix.shape[0], 0))

# Apply MinMaxScaler to each column and concatenate the results
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Literally no idea why this was being done per column...

for col in range(matrix.shape[1]):
column = matrix[:, col].reshape(-1, 1)
transformed_column = scaler.fit_transform(column)
normalized_data = np.concatenate((normalized_data, transformed_column), axis=1)

return normalized_data
return scaler.fit_transform(matrix)


def _aggregate_and_sample(
Expand Down
2 changes: 1 addition & 1 deletion src/midst_toolkit/models/clavaddpm/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ def _load_datasets(cls, directory: Path, dataset_name: str) -> ArrayDict:
splits = [k.value for k in list(DataSplit) if directory.joinpath(f"y_{k.value}.npy").exists()]
if not len(splits) > 0:
raise ValueError("Splits to be loaded is empty!")

datasets: ArrayDict = {}

for split in splits:
dataset = np.load(directory / f"{dataset_name}_{split}.npy", allow_pickle=True)
assert isinstance(dataset, np.ndarray), "Dataset must be of type Numpy Array"
Expand Down
6 changes: 3 additions & 3 deletions src/midst_toolkit/models/clavaddpm/enumerations.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


class ClusteringMethod(Enum):
"""Possioble clustering methods for multi-table training."""
"""Possible clustering methods for multi-table training."""

KMEANS = "kmeans"
GMM = "gmm"
Expand Down Expand Up @@ -102,8 +102,8 @@ class TargetType(Enum):
LONG = "long"


class KeyScalingType(Enum):
"""Possible types of scaling for the foreign key."""
class DataAndKeyNormalizationType(Enum):
"""Possible types of normalization for data and primary keys when clustering."""

MINMAX = "minmax"
QUANTILE = "quantile"
Expand Down
File renamed without changes.
83 changes: 83 additions & 0 deletions tests/unit/models/clavaddpm/test_clustering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import numpy as np

from midst_toolkit.common.random import set_all_random_seeds, unset_all_random_seeds
from midst_toolkit.models.clavaddpm.clustering import (
_min_max_normalize_sklearn,
_quantile_normalize_sklearn,
get_normalized_numerical_columns,
)
from midst_toolkit.models.clavaddpm.enumerations import DataAndKeyNormalizationType


def test_quantile_normalize_sklearn() -> None:
set_all_random_seeds(42)
data_to_normalize = np.random.randint(0, 3, (5, 5))
normalized_data = _quantile_normalize_sklearn(data_to_normalize)
assert np.allclose(
normalized_data,
np.array(
[
[5.19933758, -5.19933758, 5.19933758, 5.19933758, -5.19933758],
[-5.19933758, 5.19933758, 0.0, 5.19933758, 5.19933758],
[5.19933758, 5.19933758, -5.19933758, 5.19933758, 0.31863936],
[-5.19933758, 0.0, 0.0, -5.19933758, 0.31863936],
[-5.19933758, -5.19933758, 0.0, -5.19933758, -5.19933758],
]
),
atol=1e-5,
)
unset_all_random_seeds()


def test_min_max_normalize_sklearn() -> None:
set_all_random_seeds(42)
data_to_normalize = np.random.randint(0, 3, (5, 5))
normalized_data = _min_max_normalize_sklearn(data_to_normalize)
assert np.allclose(
normalized_data,
np.array(
[
[1.0, -1.0, 1.0, 1.0, -1.0],
[-1.0, 1.0, 0.0, 1.0, 1.0],
[1.0, 1.0, -1.0, 1.0, 0.0],
[-1.0, 0.0, 0.0, -1.0, 0.0],
[-1.0, -1.0, 0.0, -1.0, -1.0],
]
),
atol=1e-8,
)
unset_all_random_seeds()


def test_get_normalized_numerical_columns() -> None:
set_all_random_seeds(42)
child_data = np.random.randint(0, 3, (3, 3))
parent_data = np.random.randint(0, 3, (3, 3))
scale = 2.0
normalization_type = DataAndKeyNormalizationType.MINMAX

normalized_data = get_normalized_numerical_columns(child_data, parent_data, scale, normalization_type)
assert np.allclose(
normalized_data,
np.array(
[[-1.0, -1.0, 1.0, 2.0, 2.0, 2.0], [-1.0, -1.0, -1.0, -2.0, 2.0, -2.0], [-1.0, 1.0, 1.0, -2.0, -2.0, -2.0]]
),
atol=1e-6,
)

normalization_type = DataAndKeyNormalizationType.QUANTILE
normalized_data = get_normalized_numerical_columns(child_data, parent_data, scale, normalization_type)

assert np.allclose(
normalized_data,
np.array(
[
[-5.19933758, -5.19933758, 5.19933758, 2 * 5.19933758, 2 * 5.19933758, 2 * 5.19933758],
[-5.19933758, -5.19933758, -5.19933758, 2 * -5.19933758, 2 * 5.19933758, 2 * -5.19933758],
[-5.19933758, 5.19933758, 5.19933758, 2 * -5.19933758, 2 * -5.19933758, 2 * -5.19933758],
]
),
atol=1e-5,
)

unset_all_random_seeds()