In [None]:
import mate
import numpy as np
import matplotlib.pyplot as plt
from copy import deepcopy
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets

seed = 8

In [None]:
def kld(p: np.ndarray, q: np.ndarray, EPS: float = np.finfo(float).eps) -> float:
    p += EPS
    q += EPS
    p /= sum(p)
    q /= sum(q)
    idx_keep = (p > 0) & (q > 0) & (~np.isnan(p)) & (~np.isnan(q))
    p = p[idx_keep]
    q = q[idx_keep]
    return sum(p * np.log2(p / q))


def evaluate(output, log_interval):
    def f(x):
        idx = x // log_interval
        return evaluate_single(output["dists"][idx], output["estimators"][idx])
    n_steps = (len(output["dists"])-1) * log_interval
    interact(f, x=widgets.IntSlider(min=0, max=n_steps, step=log_interval, value=n_steps))


def evaluate_single(dists, estimators, figsize=(20,5), linewidth=2.5, eps=1e-3):
    # -- difference in means/variance
    IDs = list(range(len(dists)))
    mn_true = {ID: dist.mean for ID, dist in zip(IDs, dists)}
    var_true = {ID: dist.variance for ID, dist in zip(IDs, dists)}
    mn_ests = [{ID: est.mean(ID=ID) for ID in IDs} for est in estimators]
    # var_ests = [{ID: est.variance(ID=ID) for ID in IDs} for est in estimators]
    
    # -- KLD of full distributions
    n_samples = 10000
    x_test = np.linspace(eps, 1-eps, num=n_samples)
    q_true = {ID: dist.pdf(x_test) for ID, dist in zip(IDs, dists)}
    p_ests = [{ID: est.pdf(x_test, ID=ID) for ID in IDs} for est in estimators]
    kld_ests = [{ID: kld(p_est[ID], q_true[ID]) for ID in IDs} for p_est in p_ests]
    
    # -- plot results
    maxys = []
    fig, axs = plt.subplots(nrows=1, ncols=len(dists), figsize=figsize, sharey=True)
    for j in range(len(dists)):
        axs[j].plot(x_test, q_true[j], linewidth=linewidth, label="True Distribution")
        axs[j].fill_between(x_test, q_true[j], alpha=0.5)
        axs[j].set_title("Agent {}".format(j))
        maxys.append(max(q_true[j]))
        for i, est in enumerate(estimators):
            axs[j].plot(x_test, p_ests[i][j], linewidth=linewidth, label=est.name)
            
    # -- set y lim
    axs[0].set_ylim(0, 1.2*np.mean(maxys))
    axs[0].get_yaxis().set_ticklabels([])
    plt.legend()
    plt.show()

In [None]:
EPS = 1e-8

class TrustSampler:
    def __init__(
        self,
        temporal_correlation=None,
        spatial_correlation=None,
        platform_correlation=None,
    ):
        self.c_temporal = temporal_correlation
        self.c_spatial = spatial_correlation
        self.c_platfom = platform_correlation
        self._last_trust = None

    def sample(self, agents, dists, noise_strength=0.05):
        ID = np.array([agent.ID for agent in agents])
        trust_vs =  np.array([dist.rvs(n=1)[0] for dist in dists])
        
        # apply temporal correlations
        if self.c_temporal:
            if self._last_trust is not None:
                white_noise = noise_strength * np.random.randn(len(dists))
                trust_vs = (trust_vs + self.c_temporal * self._last_trust) / (1+self.c_temporal) + white_noise

        # apply spatial correlations
        if self.c_spatial:
            raise

        # apply platform correlations
        if self.c_platfom:
            raise
        
        # store for later
        trust_vs = np.clip(trust_vs, 0.0+EPS, 1.0-EPS)
        self._last_trust = trust_vs

        trusts = mate.measurement.TrustArray(trust=trust_vs, ID=ID)
        return trusts

## 1. Uncorrelated Stationary

The trust of each agent is provided as a sample from an underlying Beta distribution per agent. There are no correlations - each measurement is independent and each agent is independent.

In [None]:
def init_beta_agents(n_agents=8, t_prior=0.5):
    agents = [
        mate.state.Agent(
            ID=i,
            pose=None,
            fov=None,
            trust_prior=t_prior
        ) for i in range(n_agents)
    ]
    distributions = [
        mate.distribution.Beta(
            alpha=np.random.randint(low=1, high=10),
            beta=np.random.randint(low=1, high=10)
        ) for _ in range(n_agents)
    ]
    return agents, distributions


def init_estimators(agents, t0=0.0):
    estimators = [
        mate.estimate.MultiTrustEstimatorWrapper(
            t0=t0,
            estimator_base=mate.estimate.VotingTrustEstimator,
            p0=0.5,
            time_window=None
        ),
        mate.estimate.MultiTrustEstimatorWrapper(
            t0=t0,
            estimator_base=mate.estimate.MaximumLikelihoodTrustEstimator,
            distribution={"type": "Beta", "alpha": 1.0, "beta": 1.0},
            update_rate=1,
            time_window=30,
            forgetting=0.10,
            max_variance=None,

        ),
        mate.estimate.DirectLinearKalmanTrustEstimator(
            t0=t0,
            process_noise=0.20,
        )
    ]
    for est in estimators:
        for agent in agents:
            est.add_agent(ID=agent.ID)
    return estimators

### 1.1 No confidence provided

In [None]:
log_interval = 10

# initialize distributions/estimators
np.random.seed(seed)
agents, dists = init_beta_agents()
estimators = init_estimators(agents)
sampler = TrustSampler(temporal_correlation=1.0)
dt = 1.0
n_steps = 100

# run trust estimation
output = {"dists": [deepcopy(dists)], "estimators": [deepcopy(estimators)]}
trusts_all = []
for i in range(n_steps):
    t = dt * i
    trusts = sampler.sample(agents=agents, dists=dists, noise_strength=0.01)
    trusts_all.append(trusts)
    for est in estimators:
        est.update(timestamp=t, trust=trusts)
    if (i+1) % log_interval == 0:
        output["dists"].append(dists)
        output["estimators"].append(deepcopy(estimators))

In [None]:
evaluate(output, log_interval)

### 1.2 Confidence provided

Confidence generated as related to the deviation of the underlying Beta distribution with some noise added

In [None]:
def cfunc(tr, dist):
    return 1 - abs(tr-dist.mean)**(1/2)
    
# initialize distributions/estimators
log_interval = 10
np.random.seed(seed)
agents, dists = init_beta_agents()
estimators = init_estimators(agents)

dt = 1.0
n_steps = 100

# run trust estimation
ID = np.array([agent.ID for agent in agents])
output = {"dists": [deepcopy(dists)], "estimators": [deepcopy(estimators)]}
for i in range(n_steps):
    t = dt * i
    trust = np.array([dist.rvs(n=1)[0] for dist in dists])
    confidence = np.asarray([cfunc(tr, dist) for tr, dist in zip(trust, dists)])
    prior = 0.5 * np.ones((len(agents),))
    trusts = mate.measurement.UncertainTrustArray(trust=trust, confidence=confidence, prior=prior, ID=ID)
    for est in estimators:
        est.update(timestamp=t, trust=trusts)
    if (i+1) % log_interval == 0:
        output["dists"].append(dists)
        output["estimators"].append(deepcopy(estimators))

In [None]:
evaluate(output, log_interval)

## 2. Uncorrelated with Smooth Dynamics

### 1.1 No confidence provided

In [None]:
from typing import Any


class MovingDistribution:
    def __init__(self, weight: float, dist_0, dist_1):
        mn = (1-weight)*dist_0.mean + weight*dist_1.mean
        var = ((1-weight)*np.sqrt(dist_0.variance) + weight*np.sqrt(dist_1.variance))**2
        self.dist = type(dist_0)(mean=mn, var=var)

    def mean(self):
        return self.dist.mean
    
    def variance(self):
        return self.dist.variance
    
    def rvs(self, n: int):
        return self.dist.rvs(n=n)
    
    def pdf(self, x):
        return self.dist.pdf(x=x)


class DistributionSum:
    def __init__(self, weight: float, dist_0, dist_1):
        self.weight = weight
        self.dist_0 = dist_0
        self.dist_1 = dist_1
        
    def mean(self):
        return (1-self.weight)*self.dist_0.mean + \
            self.weight*self.dist_1.mean
    
    def variance(self):
        raise NotImplementedError        

    def rvs(self, n: int):
        return (1-self.weight)*self.dist_0.rvs(n=n) + \
            self.weight*self.dist_1.rvs(n=n)
    
    def pdf(self, x):
        return (1-self.weight)*self.dist_0.pdf(x=x) + \
            self.weight*self.dist_1.pdf(x=x)

In [None]:
# initialize distributions/estimators
log_interval = 10
np.random.seed(seed)
agents, dists_0 = init_beta_agents()
_, dists_1 = init_beta_agents()
estimators = init_estimators(agents)

dt = 1.0
n_steps = 100

# run trust estimation
ID = np.array([agent.ID for agent in agents])
dists = [MovingDistribution(weight=0.0, dist_0=dist_0, dist_1=dist_1) for dist_0, dist_1 in zip(dists_0, dists_1)]
output = {"dists": [deepcopy(dists)], "estimators": [deepcopy(estimators)]}
for i in range(n_steps):
    w = i / (n_steps-1)
    t = dt * i
    dists = [MovingDistribution(weight=w, dist_0=dist_0, dist_1=dist_1) for dist_0, dist_1 in zip(dists_0, dists_1)]
    trust = np.array([dist.rvs(n=1)[0] for dist in dists])
    trusts = mate.measurement.TrustArray(trust=trust, ID=ID)
    for est in estimators:
        est.update(timestamp=t, trust=trusts)
    if (i+1) % log_interval == 0:
        output["dists"].append(dists)
        output["estimators"].append(deepcopy(estimators))

In [None]:
evaluate(output, log_interval)