# Additional Example


In [18]:
import autora
import numpy as np
from typing import Iterable, Literal, Optional

from sklearn.metrics import DistanceMetric
from sklearn.preprocessing import StandardScaler
import torch 
from torch.autograd import Variable
from torch import nn

In [19]:
from autora.experimentalist.pooler.poppernet import (
    PopperNet,
    class_to_onehot,
    plot_popper_diagnostics,
)
from autora.variable import ValueType, VariableCollection


def falsification_sampler(
    X,
    model,
    X_train: np.ndarray,
    Y_train: np.ndarray,
    metadata: VariableCollection,
    n: Optional[int] = None,
    training_epochs: int = 1000,
    training_lr: float = 1e-3,
    plot: bool = False,
):
    """
    A Sampler that generates samples for independent variables with the objective of maximizing the
    (approximated) loss of the model. The samples are generated by first training a neural network
    to approximate the loss of a model for all patterns in the training data.
    Once trained, the network is then provided with the candidate samples and the samples with
    the highest loss are selected.

    Args:
        X: The candidate samples of conditions to be evaluated.
        model: Scikit-learn model, could be either a classification or regression model
        X_train: Conditions that the model was trained on
        Y_train: Observations that the model was trained to predict
        metadata: Meta-data about the dependent and independent variables
        n: Number of samples to return
        training_epochs: Number of epochs to train the popper network for approximating the
        error of the model
        training_lr: Learning rate for training the popper network
        plot: Print out the prediction of the popper network as well as its training loss

    Returns: Samples with the highest loss

    """

    # format input

    X_train = np.array(X_train)
    if len(X_train.shape) == 1:
        X_train = X_train.reshape(-1, 1)

    # get target pattern for popper net
    model_predict = getattr(model, "predict_proba", None)
    if callable(model_predict) is False:
        model_predict = getattr(model, "predict", None)

    if callable(model_predict) is False or model_predict is None:
        raise Exception("Model must have `predict` or `predict_proba` method.")

    model_prediction = model_predict(X_train)
    if isinstance(model_prediction, np.ndarray) is False:
        try:
            model_prediction = np.array(model_prediction)
        except Exception:
            raise Exception("Model prediction must be convertable to numpy array.")
    if model_prediction.ndim == 1:
        model_prediction = model_prediction.reshape(-1, 1)

    return get_samples_from_model_prediction(
        X,
        model_prediction,
        X_train,
        Y_train,
        metadata,
        n,
        training_epochs,
        training_lr,
        plot,
    )


def get_samples_from_model_prediction(
    X,
    Y_predicted,
    X_train: np.ndarray,
    Y_train: np.ndarray,
    metadata: Optional[VariableCollection] = None,
    n: Optional[int] = None,
    training_epochs: int = 1000,
    training_lr: float = 1e-3,
    plot: bool = False,
):
    """
    A Sampler that generates samples for independent variables with the objective of maximizing the
    (approximated) loss of the model. The samples are generated by first training a neural network
    to approximate the loss of a model for all patterns in the training data. The model
    loss is computed based on the predictions of the model for its training data. Once trained,
    the network is then provided with the candidate samples and the samples with the highest loss
    are selected.

    Args:
        X: The candidate samples to be evaluated.
        Y_predicted: Prediction obtained from the model for the set of conditions X_train
        X_train: Conditions that the model was trained on
        Y_train: Observations that the model was trained to predict
        metadata: Meta-data about the dependent and independent variables
        n: Number of samples to return
        training_epochs: Number of epochs to train the popper network for approximating the
        error of the model
        training_lr: Learning rate for training the popper network
        plot: Print out the prediction of the popper network as well as its training loss

    Returns: Samples with the highest loss

    """
    X, scores = get_scored_samples_from_model_prediction(
        X,
        Y_predicted,
        X_train,
        Y_train,
        metadata,
        n,
        training_epochs,
        training_lr,
        plot,
    )

    return X


def get_scored_samples_from_model_prediction(
    X,
    Y_predicted,
    X_train: np.ndarray,
    Y_train: np.ndarray,
    metadata: Optional[VariableCollection] = None,
    n: Optional[int] = None,
    training_epochs: int = 1000,
    training_lr: float = 1e-3,
    plot: bool = False,
):
    """
    A Sampler that generates samples for independent variables with the objective of maximizing the
    (approximated) loss of the model. The samples are generated by first training a neural network
    to approximate the loss of a model for all patterns in the training data. The model
    loss is computed based on the predictions of the model for its training data. Once trained,
    the network is then provided with the candidate samples and the samples with the highest loss
    are selected.

    Args:
        X: The candidate samples to be evaluated.
        Y_predicted: Prediction obtained from the model for the set of conditions X_train
        X_train: Conditions that the model was trained on
        Y_train: Observations that the model was trained to predict
        metadata: Meta-data about the dependent and independent variables
        n: Number of samples to return
        training_epochs: Number of epochs to train the popper network for approximating the
        error of the model
        training_lr: Learning rate for training the popper network
        plot: Print out the prediction of the popper network as well as its training loss

    Returns:
        X: Samples with the highest loss
        scores: Normalized falsification scores for the samples

    """

    X = np.array(X)
    if len(X.shape) == 1:
        X = X.reshape(-1, 1)

    X_train = np.array(X_train)
    if len(X_train.shape) == 1:
        X_train = X_train.reshape(-1, 1)

    Y_train = np.array(Y_train)
    if len(Y_train.shape) == 1:
        Y_train = Y_train.reshape(-1, 1)

    if n is None:
        n = X.shape[0]

    if metadata is not None:
        if metadata.dependent_variables[0].type == ValueType.CLASS:
            # find all unique values in Y_train
            num_classes = len(np.unique(Y_train))
            Y_train = class_to_onehot(Y_train, n_classes=num_classes)

    # create list of IV limits
    iv_limit_list = list()
    if metadata is not None:
        ivs = metadata.independent_variables
        for iv in ivs:
            if hasattr(iv, "value_range"):
                value_range = cast(Tuple, iv.value_range)
                lower_bound = value_range[0]
                upper_bound = value_range[1]
                iv_limit_list.append(([lower_bound, upper_bound]))
    else:
        for col in range(X.shape[1]):
            min = np.min(X[:, col])
            max = np.max(X[:, col])
            iv_limit_list.append(([min, max]))

    # get input pattern for popper net
    popper_input = Variable(torch.from_numpy(X_train), requires_grad=False).float()

    # get dimensions of input and output
    n_input = X_train.shape[1]
    n_output = 1  # only predicting one MSE

    if isinstance(Y_predicted, np.ndarray) is False:
        try:
            Y_predicted = np.array(Y_predicted)
        except Exception:
            raise Exception("Model prediction must be convertable to numpy array.")
    if Y_predicted.ndim == 1:
        Y_predicted = Y_predicted.reshape(-1, 1)

    criterion = nn.MSELoss()
    model_loss = (Y_predicted - Y_train) ** 2
    model_loss = np.mean(model_loss, axis=1)

    # standardize the loss
    scaler = StandardScaler()
    model_loss = scaler.fit_transform(model_loss.reshape(-1, 1)).flatten()

    model_loss = torch.from_numpy(model_loss).float()
    popper_target = Variable(model_loss, requires_grad=False)

    # create the network
    popper_net = PopperNet(n_input, n_output)

    # reformat input in case it is 1D
    if len(popper_input.shape) == 1:
        popper_input = popper_input.flatten()
        popper_input = popper_input.reshape(-1, 1)

    # define the optimizer
    popper_optimizer = torch.optim.Adam(popper_net.parameters(), lr=training_lr)

    # train the network
    losses = []
    for epoch in range(training_epochs):
        popper_prediction = popper_net(popper_input)
        loss = criterion(popper_prediction, popper_target.reshape(-1, 1))
        popper_optimizer.zero_grad()
        loss.backward()
        popper_optimizer.step()
        losses.append(loss.item())

    if plot:
        popper_input_full = np.linspace(
            iv_limit_list[0][0], iv_limit_list[0][1], 1000
        ).reshape(-1, 1)
        popper_input_full = Variable(
            torch.from_numpy(popper_input_full), requires_grad=False
        ).float()
        popper_prediction = popper_net(popper_input_full)
        plot_popper_diagnostics(
            losses,
            popper_input,
            popper_input_full,
            popper_prediction,
            popper_target,
            Y_predicted,
            Y_train,
        )

    # now that the popper network is trained we can assign losses to all data points to be evaluated
    popper_input = Variable(torch.from_numpy(X)).float()
    Y = popper_net(popper_input).detach().numpy().flatten()
    scaler = StandardScaler()
    score = scaler.fit_transform(Y.reshape(-1, 1)).flatten()

    # order rows in Y from highest to lowest
    sorted_X = X[np.argsort(score)[::-1]]
    sorted_score = score[np.argsort(score)[::-1]]

    return sorted_X[:n], sorted_score[:n]

In [20]:
AllowedMetrics = Literal[
    "euclidean",
    "manhattan",
    "chebyshev",
    "minkowski",
    "wminkowski",
    "seuclidean",
    "mahalanobis",
    "haversine",
    "hamming",
    "canberra",
    "braycurtis",
    "matching",
    "jaccard",
    "dice",
    "kulsinski",
    "rogerstanimoto",
    "russellrao",
    "sokalmichener",
    "sokalsneath",
    "yule",
]


def summed_dissimilarity_sampler(
    X: np.ndarray,
    X_ref: np.ndarray,
    n: int = 1,
    metric: AllowedMetrics = "euclidean",
    integration: str = "min",
) -> np.ndarray:
    """
    This dissimilarity samples re-arranges the pool of IV conditions according to their
    dissimilarity with respect to a reference pool X_ref. The default dissimilarity is calculated
    as the average of the pairwise distances between the conditions in X and X_ref.

    Args:
        X: pool of IV conditions to evaluate dissimilarity
        X_ref: reference pool of IV conditions
        n: number of samples to select
        metric (str): dissimilarity measure. Options: 'euclidean', 'manhattan', 'chebyshev',
            'minkowski', 'wminkowski', 'seuclidean', 'mahalanobis', 'haversine',
            'hamming', 'canberra', 'braycurtis', 'matching', 'jaccard', 'dice',
            'kulsinski', 'rogerstanimoto', 'russellrao', 'sokalmichener',
            'sokalsneath', 'yule'. See [sklearn.metrics.DistanceMetric][] for more details.

    Returns:
        Sampled pool
    """

    new_X, distance_scores = compute_dissimilarity(X, X_ref, n, metric, integration)

    return new_X


def compute_dissimilarity(
    X: np.ndarray,
    X_ref: np.ndarray,
    n: int = 1,
    metric: AllowedMetrics = "euclidean",
    integration: str = "min",
) -> np.ndarray:
    """
    This dissimilarity samples re-arranges the pool of IV conditions according to their
    dissimilarity with respect to a reference pool X_ref. The default dissimilarity is calculated
    as the average of the pairwise distances between the conditions in X and X_ref.

    Args:
        X: pool of IV conditions to evaluate dissimilarity
        X_ref: reference pool of IV conditions
        n: number of samples to select
        metric (str): dissimilarity measure. Options: 'euclidean', 'manhattan', 'chebyshev',
            'minkowski', 'wminkowski', 'seuclidean', 'mahalanobis', 'haversine',
            'hamming', 'canberra', 'braycurtis', 'matching', 'jaccard', 'dice',
            'kulsinski', 'rogerstanimoto', 'russellrao', 'sokalmichener',
            'sokalsneath', 'yule'. See [sklearn.metrics.DistanceMetric][] for more details.
        integration: Distance integration method used to compute the overall dissimilarity score
        for a given data point. Options: 'sum', 'prod', 'mean', 'min', 'max'.

    Returns:
        Sampled pool
    """

    if isinstance(X, Iterable):
        X = np.array(list(X))

    if isinstance(X_ref, Iterable):
        X_ref = np.array(list(X_ref))

    if X.ndim == 1:
        X = X.reshape(-1, 1)

    if X_ref.ndim == 1:
        X_ref = X_ref.reshape(-1, 1)

    if X.shape[1] != X_ref.shape[1]:
        raise ValueError(
            f"X and X_ref must have the same number of columns.\n"
            f"X has {X.shape[1]} columns, while X_ref has {X_ref.shape[1]} columns."
        )

    if X.shape[0] < n:
        raise ValueError(
            f"X must have at least {n} rows matching the number of requested samples."
        )

    dist = DistanceMetric.get_metric(metric)

    distances = dist.pairwise(X_ref, X)

    if integration == "sum":
        integrated_distance = np.sum(distances, axis=0)
    elif integration == "mean":
        integrated_distance = np.mean(distances, axis=0)
    elif integration == "max":
        integrated_distance = np.max(distances, axis=0)
    elif integration == "min":
        integrated_distance = np.min(distances, axis=0)
    elif integration == "prod":
        integrated_distance = np.prod(distances, axis=0)
    else:
        raise ValueError(f"Integration method {integration} not supported.")

    # normalize the distances
    scaler = StandardScaler()
    score = scaler.fit_transform(integrated_distance.reshape(-1, 1)).flatten()

    # order rows in Y from highest to lowest
    sorted_X = X[np.argsort(integrated_distance)[::-1]]
    sorted_score = score[np.argsort(score)[::-1]]

    return sorted_X[:n], sorted_score[:n]

In [53]:
def adjust_distribution(p, temperature):
        # temperature cannot be 0
        #If the temperature is very low (close to 0), then the sampling will become almost deterministic, picking the event with the highest probability.
        #If the temperature is very high, then the sampling will be closer to uniform, with all events having roughly equal probability.
        
        p = p / np.sum(p)  # Normalizing the initial distribution
        p = np.log(p) / temperature  
        p = np.exp(p)  
        p = p / np.sum(p) # Normalizing the final distribution
        return p


    
def mixture_sampler(
    condition_pool: np.ndarray, weights: np.ndarray, temperature: int, 
    X_ref: np.ndarray, 
    X_train: np.ndarray,
    Y_train: np.ndarray, Y_predicted, num_samples: Optional[int] = None) -> np.ndarray:
    """
    Add a description of the sampler here.

    Args:
        condition_pool: pool of experimental conditions to evaluate
        num_samples: number of experimental conditions to select
        weights: array containing 4 weights -- importance of the falsification, confirmation, novelty, and familiarity (ideally, each pair of opposites? sums up to 1 or all? sum up to 1)
        temperature: how random is selection of conditions (cannot be 0; (0:1) - the choices are more deterministic than the choices made wrt
        the mixture scores; 1 - choices are made wrt to the mixture scores; (1, inf) - the choices are more random)
        X_ref, X_train, Y_train, Y_predicted: parameters required for falsification and novelty samplers
    
    Returns:
        Sampled pool of experimental conditions
    """
    
    
    falsification_ranking, falsification_scores = get_scored_samples_from_model_prediction(condition_pool, 
                                                                                           Y_predicted, X_train,
                                                                                           Y_train, n=condition_pool.shape[0])
    
    # getting rid of negative scores by introducing confirmation scores 
    confirmation_scores = -falsification_scores
    confirmation_scores[falsification_scores>0]=0
    falsification_scores[falsification_scores<0]=0
    
    # getting rid of negative scores by introducing familiarity scores 
    novelty_ranking, novelty_scores = compute_dissimilarity(condition_pool, X_ref, n=condition_pool.shape[0])
    
    familiarity_scores = -novelty_scores
    familiarity_scores[novelty_scores>0]=0
    novelty_scores[novelty_scores<0]=0
    
    # aligning the arrays based on the observations (condition pools)
    novelty_indices = np.argsort(novelty_ranking, axis=None)
    ranking_sorted = novelty_ranking[novelty_indices]
    novelty_scores_sorted = novelty_scores[novelty_indices]
    familiarity_scores_sorted = familiarity_scores[novelty_indices]

    falsification_indices = np.argsort(falsification_ranking, axis=None)
    falsification_scores_sorted = falsification_scores[falsification_indices]    
    confirmation_scores_sorted = confirmation_scores[falsification_indices] 
    
    weighted_mixture_scores = falsification_scores_sorted * weights[0] + confirmation_scores_sorted * weights[1] + novelty_scores_sorted * weights[2] + familiarity_scores_sorted * weights[3] 
    # each score is weighted by the relative importance of these different axes
    
    
    # adjust mixture scores wrt temperature
    weighted_mixture_scores_adjusted = adjust_distribution(weighted_mixture_scores, temperature)
    
    if num_samples is None:
        num_samples = condition_pool.shape[0]
    
    conditions = np.random.choice(ranking_sorted.T.squeeze(), num_samples,
              p=weighted_mixture_scores_adjusted, replace = False)
    
    return conditions


In [54]:
class MixtureExperimentalist:
    def __init__(self, weights: np.ndarray, temperature: int, 
    X_ref: np.ndarray, 
    X_train: np.ndarray,
    Y_train: np.ndarray, Y_predicted, num_samples: Optional[int] = None):
        self.weights = weights
        self.temperature = temperature
        self.X_ref = X_ref
        self.X_train = X_train
        self.Y_train = Y_train
        self.Y_predicted = Y_predicted
        self.num_samples = num_samples

    def __call__(self, condition_pool: np.ndarray, **kwargs):
        params = dict(weights = self.weights, temperature = self.temperature, X_ref = self.X_ref,
                     X_train = self.X_train, Y_train = self.Y_train, Y_predicted = self.Y_predicted,
                     num_samples = self.num_samples)
        params.update(kwargs)
        
        samples = mixture_sampler(condition_pool, **params)
        return samples
        


In [56]:
experimentalist = MixtureExperimentalist(weights = np.array([0.2,0.3,0.3,0.7]), temperature = 20, 
    X_ref = np.array([10,25,30]), 
    X_train = np.array([2,5,6]),
    Y_train = np.array([20,25,26]), Y_predicted = np.array([21,22,23]), num_samples = 2)

In [61]:
experimentalist(condition_pool = np.array([1,2,3]), num_samples = 3)

3
[[1]
 [2]
 [3]]
(3, 1)
[0.35493798 0.28306655 0.36199547]
(3,)
[1 2 3]


array([1, 3, 2])

In [52]:
mixture_sampler(
    np.array([1,2,3]), np.array([0.2,0.3,0.3,0.7]), 20, 
    np.array([10,25,30]), 
    np.array([2,5,6]),
    np.array([20,25,26]), np.array([21,22,23]), 2)

2
[[1]
 [2]
 [3]]
(3, 1)
[0.3509837  0.29130689 0.3577094 ]
(3,)
[1 2 3]


array([1, 3])