In [1]:
import numpy as np
from matplotlib import pyplot as plt, animation

from sklearn.datasets import make_blobs

from skactiveml.utils import MISSING_LABEL, labeled_indices, unlabeled_indices
from skactiveml.visualization import plot_utilities, plot_decision_boundary
from skactiveml.visualization._misc import mesh
from skactiveml.base import *
  
from skactiveml.classifier import ParzenWindowClassifier

from skactiveml.base import SingleAnnotatorPoolQueryStrategy
from sklearn.metrics import pairwise_distances

In [7]:
class CoreSet(SingleAnnotatorPoolQueryStrategy):
    """ Core Set Selection

    This class implement various core-set based query strategies, i.e., the
    standard greedy algorithm for k-center problem [1], the robust k-center
    algorithm [1].

    Parameters
    ----------
    method: {'greedy', 'robust'}, default='greedy'
        The method to solve the k-center problem, k-center-greedy and robust
        k-center are possible
    missing_label: scalar or string or np.nan or None, default=np.nan
        Value to represent a missing label
    random_state: int or np.random.RandomState
        The random state to use

    References
    ----------
    [1] O. Sener und S. Savarese, „ACTIVE LEARNING FOR CONVOLUTIONAL NEURAL 
    NETWORKS: A CORE-SET APPROACH“, 2018.
    """

    def __init__(
            self, method='greedy', missing_label=MISSING_LABEL, random_state=None
    ):
        super().__init__(
            missing_label=missing_label, random_state=random_state
        )

        self.method = method

    def query(
            self,
            X,
            y,
            candidates=None,
            batch_size=1,
            return_utilities=False,
            **kwargs
    ):

        """ Query the next instances to be labeled

         Parameters
         ----------
         **kwargs
         X: array-like of shape (n_samples, n_features)
            Training data set, usually complete, i.e. including the labeled and
            unlabeled samples
         y: array-like of shape (n_samples, )
            Labels of the training data set (possibly including unlabeles ones
            indicated by self.missing_label)
         candidates: None or array-like of shape (n_candidates), dtype = int or
            array-like of shape (n_candidates, n_features),
            optional (default=None)
            If candidates is None, the unlabeled samples from (X,y) are considered
            as candidates
         batch_size: int, optional(default=1)
            The number of samples to be selectes in one AL cycle.
         return_utilities: bool, optional(default=False)
            If True, also return the utilites based on the query strategy

         Returns
         ----------
         query_indices: numpy.ndarry of shape (batch_size, )
            The query_indices indicate for which candidate sample a label is
            to queried, e.g., `query_indices[0]` indicates the first selected
            sample.
         utilities: numpy.ndarray of shape (n_samples, )
            The distance between each data point and its nearest center after
            each selected sample of the batch
         """

        X, y, candidates, batch_size, return_utilities = self._validate_data(
            X, y, candidates, batch_size, return_utilities, reset=True
        )

        X_cand, mapping = self._transform_candidates(candidates, X, y)
        """
        X_cand unlabeled samples
        mapping: indices of the original array
        """

        if self.method == 'greedy':
            if mapping is not None:
                query_indices, utilities = k_greedy_center(X, y, batch_size, self.random_state_, self.missing_label, mapping)
            else:
                X_with_cand = np.concatenate((X, X_cand), axis=0)
                n_new_cand = X_cand.shape[0]
                y_cand = np.full(shape=n_new_cand, fill_value=self.missing_label)
                y_with_cand = np.concatenate((y, y_cand), axis=None)
                mapping = np.arange(X.shape[0], X.shape[0] + n_new_cand)
                query_indices, utilities = k_greedy_center(X_with_cand, y_with_cand, self.random_state_, self.missing_label, mapping, n_new_cand)

        if return_utilities:
            return query_indices, utilities
        else:
            return query_indices

def k_greedy_center(X, y, batch_size, random_state, missing_label=MISSING_LABEL, mapping=None, n_new_cand=None):
    """
     An active learning method that greedily forms a batch to minimize
     the maximum distance to a cluster center among all unlabeled
     datapoints.

     Parameters:
     ----------
     X: array-like of shape (n_samples, n_features)
        Training data set, usually complete, i.e. including the labeled and
        unlabeled samples
     selected_samples: np.ndarray of shape (n_selected_samples, )
        index of datapoints already selectes
     batch_size: int, optional(default=1)
        The number of samples to be selected in one AL cycle.
        
     Return:
     ----------
     new_samples: numpy.ndarry of shape (batch_size, )
         The query_indices indicate for which candidate sample a label is
         to queried from the candidates
     utilities: numpy.ndarray of shape (batch_size, n_samples)
         The distance between each data point and its nearest center that used
         for selecting the next sample.
        """
    # read the labeled aka selected samples from the y vector
    selected_samples = labeled_indices(y, missing_label=missing_label)
    if mapping is None:
        mapping = unlabeled_indices(y, missing_label=missing_label)
    # initialize the utilities matrix with
    utilities = np.empty(shape=(batch_size, X.shape[0]))

    query_indices = np.array([], dtype=int)

    for i in range(batch_size):
        utilities[i] = update_distances(X, selected_samples, mapping)

        # select index
        idx = np.nanargmax(utilities[i])

        if len(selected_samples) == 0:
            idx = random_state.choice(mapping)
            # because np.nanargmax always return the first occurrence is returned

        query_indices = np.append(query_indices, [idx])
        selected_samples = np.append(selected_samples, [idx])

    if n_new_cand is not None:
        utilities = utilities[:, mapping]

    return query_indices, utilities

def update_distances(X, cluster_centers, mapping):
    """
    Update min distances by given cluster centers.

    Parameters:
    ----------
    X: array-like of shape (n_samples, n_features)
        Training data set, usually complete, i.e. including the labeled and
        unlabeled samples
    cluster_centers: indices of cluster centers

    Return:
    ---------
    dist: numpy.ndarray of shape (1, n_samples)
        - if there aren't any cluster centers existed, the default distance
            will be 0
        - if there are some cluster center existed, the return will be the
            distance between each data point and its nearest center after
            each selected sample of the batch
        """
    dist = np.empty(shape=(1, X.shape[0]))

    if len(cluster_centers) > 0:
        cluster_center_feature = X[cluster_centers]
        dist_matrix = pairwise_distances(X, cluster_center_feature)
        dist = np.min(dist_matrix, axis=1).reshape(1, -1)

    result_dist = np.full((1, X.shape[0]), np.nan)
    result_dist[0, mapping] = dist[0, mapping]
    result_dist[0, cluster_centers] = np.nan

    return result_dist

In [3]:
random_state = np.random.RandomState(0)
X, y_true = make_blobs(n_samples=10, n_features=2,
                       centers=[[0, 1], [-3, .5], [-1, -1], [2, 1], [1, -.5]],
                       cluster_std=.7, random_state=random_state)
y_true = y_true % 2
y = np.full(shape=y_true.shape, fill_value=MISSING_LABEL)

In [4]:
qs = CoreSet(method='greedy', random_state=42)

In [5]:
X_cand, _ = make_blobs(n_samples=5, n_features=2,
                       centers=[[0, 1], [-3, .5], [-1, -1], [2, 1], [1, -.5]],
                       cluster_std=.7, random_state=np.random.RandomState(1))
X_with_cand = np.concatenate((X, X_cand), axis=0)
n_new_cand = X_cand.shape[0]
y_cand = np.full(shape=n_new_cand, fill_value=np.nan)
y_with_cand = np.concatenate((y, y_cand), axis=None)
mapping = np.arange(X.shape[0], X.shape[0] + n_new_cand)


In [8]:

query_idx, utilities = qs.query(X=X, y=y, batch_size=2, return_utilities=True, candidates=X_cand)
print(utilities)

TypeError: 'missing_label' has type '<class 'numpy.ndarray'>', but must be a either a number, a string, np.nan, or None.