In [14]:
import numpy as np
from sklearn.decomposition import PCA
from sklearn.utils.extmath import randomized_svd
from utils.typing import Tensor, N, D

seed = 42
np.random.seed(seed)

In [15]:
n_sample_types = 3
d = 10
n_components = 1

data_types = np.random.rand(n_sample_types, d)
sample_counts = np.random.randint(1, 10, size=n_sample_types)
data_repeated = np.repeat(data_types, sample_counts, axis=0)

In [16]:
pca = PCA(n_components=n_components, svd_solver="randomized", random_state=seed)
pca = pca.fit(data_repeated)
pca_transformed = pca.transform(data_types)

In [17]:
class WPCA:
    def __init__(self, n_components=2):
        self.n_components = n_components

    def fit(self, X: Tensor[N, D], weights: Tensor[N]) -> None:
        weights = weights / np.sum(weights)
        self.mean_ = np.average(X, axis=0, weights=weights)
        _, _, Vt = randomized_svd(
            np.sqrt(weights)[:, np.newaxis] * (X - self.mean_),
            n_components=self.n_components,
            random_state=42
        )

        self.components_ = Vt[:self.n_components]
        return self

    def transform(self, X: Tensor[N, D]) -> Tensor[N, D]:
        return (X - self.mean_) @ self.components_.T

In [18]:
weights = sample_counts / np.sum(sample_counts)
wpca = WPCA(n_components=1)
wpca.fit(data_types, weights)
wpca_transformed = wpca.transform(data_types)


# match positive and negative signs
for i in range(n_components):
    if pca_transformed[0, i] > 0 and wpca_transformed[0, i] < 0 or pca_transformed[0, i] < 0 and wpca_transformed[0, i] > 0:
        print(f"Flipping sign of component {i} to match PCA")
        wpca_transformed[:, i] *= -1

Flipping sign of component 0 to match PCA


In [19]:
np.allclose(
    pca_transformed,
    wpca_transformed,
    atol=1e-6
)

True