## Imports

In [40]:
import os
import numpy as np
from scipy.stats import chi2_contingency

import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal

import matplotlib
matplotlib.rcParams['text.usetex'] = False

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

## PQMass

In [41]:
# Function to compute the chi2 statistic for the PQM test
def get_pqm_chi2(x_samples, y_samples, num_refs=100):
    """
    Compute the chi2 statistic for the PQM test.

    Parameters
    ----------
    x_samples : np.ndarray
        Samples from the posterior.
    y_samples : np.ndarray
        True samples
    num_refs : int
        Number of reference samples to use.

    Returns
    -------
    float
        Chi2_pqm statistic.
    """
    # Randomly select reference samples from y_samples
    refs = np.random.choice(len(y_samples), num_refs, replace=False)
    refs = y_samples[refs]

    # Initialize counts for x_samples and y_samples
    counts_x = np.zeros(num_refs, dtype="int")
    counts_y = np.zeros(num_refs, dtype="int")

    # Count occurrences for x_samples
    for x in x_samples:
        d = np.linalg.norm(x.reshape(1, -1) - refs, axis=-1)
        idx = np.argmin(d)
        counts_x[idx] += 1

    # Count occurrences for y_samples
    for y in y_samples:
        d = np.linalg.norm(y.reshape(1, -1) - refs, axis=-1)
        idx = np.argmin(d)
        counts_y[idx] += 1

    # Compute chi2 statistic using contingency table
    chi2_stat, _, _, _ = chi2_contingency(np.array([counts_x, counts_y]))
    return chi2_stat

## Classes & Networks

In [42]:
# Actor network class to represent the policy
# Gives a probability distribution for the actions
class Actor(nn.Module):
    def __init__(self, state_size):
        super(Actor, self).__init__()
        self.linear1 = nn.Linear(state_size, 128)
        self.linear2 = nn.Linear(128, 256)
        # Output layer for mean and variance
        self.mean_layer = nn.Linear(256, state_size)
        self.variance_layer = nn.Linear(256, state_size)

    def forward(self, state):
        output = F.relu(self.linear1(state))
        output = F.relu(self.linear2(output))
        mean = self.mean_layer(output)
        variance = F.softplus(self.variance_layer(output))  # Using softplus to ensure variance is positive
        return mean, variance

# Critic network class to represent the value function
# Estimates the expected reward
class Critic(nn.Module):
    def __init__(self, state_size):
        super(Critic, self).__init__()
        self.linear1 = nn.Linear(state_size, 128)
        self.linear2 = nn.Linear(128, 256)
        self.linear3 = nn.Linear(256, 1)

    def forward(self, state):
        output = F.relu(self.linear1(state))
        output = F.relu(self.linear2(output))
        value = self.linear3(output)
        return value


In [138]:
# Custom environment class to simulate the problem setting without gym
class CustomEnv:
    def __init__(self, function, sample_size, num_refs=100, linspace=None):
        """
        Initialize the custom environment.

        Parameters
        ----------
        function : callable
            The function to sample from.
        sample_size : int
            The number of samples to generate.
        num_refs : int
            Number of reference samples for PQM.
        range : (Int, Int, Int)
            The linspace parameters used to
            generate true samples used in PQMass.
        """
        self.function = function
        self.sample_size = sample_size
        self.num_refs = num_refs
        self.linspace = (-1, 1, 1000) if linspace is None else linspace

        # Generate true samples to use in PQMass
        self.true_samples = self.function(np.linspace(self.linspace[0], self.linspace[1], self.linspace[2]).reshape(-1, 1))
        self.reset()

    def reset(self):
        """
        Reset the environment and return the initial state.

        Returns
        -------
        np.ndarray
            The initial state (randomly generated samples).
        """
        self.state = np.random.uniform(self.linspace[0], self.linspace[1], (self.sample_size, 1))
        return self.state

    def step(self, action):
        """
        Take an action and return the next state, reward, and done flag.

        Parameters
        ----------
        action : np.ndarray
            The action to take (new set of points).

        Returns
        -------
        tuple
            The next state, reward, and done flag.
        """
        # Concatenate the actions into a single array
        self.state = action  # Update the state
        reward = get_pqm_chi2(self.function(self.state), self.true_samples, self.num_refs)

        # Optimal reward is <= num_refs-1 (chi2 degrees of freedom - 1)
        # Set done flag if optimal reward is reached
        done = reward <= self.num_refs - 1

        return self.state, reward, done

    def get_pqm(self):
        return get_pqm_chi2(self.state, self.true_samples, self.num_refs)

## Training loop

In [114]:
# Function to compute returns for the critic
def compute_returns(next_value, rewards, masks, gamma=0.99):
    """
    Compute the discounted returns.

    Parameters
    ----------
    next_value : torch.Tensor
        The value of the next state.
    rewards : list
        List of rewards.
    masks : list
        List of masks indicating episode end.
    gamma : float
        Discount factor.

    Returns
    -------
    list
        List of discounted returns.
    """
    R = next_value
    returns = []
    for step in reversed(range(len(rewards))):
        R = rewards[step] + gamma * R * masks[step]
        returns.insert(0, R)
    return returns

  # Function to train the actor-critic model
def training_loop(actor, critic, env, n_iters, sample_size, lr=0.0001, max_steps=1000):
    """
    Train the actor-critic model.

    Parameters
    ----------
    actor : Actor
        The actor network.
    critic : Critic
        The critic network.
    env : CustomEnv
        The custom environment.
    n_iters : int
        Number of training iterations.
    sample_size : int
        Size of the sample set.
    lr : float
        Learning rate
    max_steps : int
        Maximum number of steps per episode.
    """
    optimizerA = optim.Adam(actor.parameters(), lr)
    optimizerC = optim.Adam(critic.parameters(), lr)
    for iter in range(n_iters):
        state = env.reset()
        log_probs = []
        values = []
        rewards = []
        masks = []
        entropy = 0

        for i in range(max_steps):
            state = torch.FloatTensor(state).to(device)

            # Get distribution of the actions
            mean, variance = actor(state)
            dist = Normal(mean, variance.sqrt())

            # Sample the actions
            actions = dist.sample()

            # Get expected best reward
            value = critic(state)

            next_state, reward, done = env.step(actions.cpu().numpy())

            print(f"Step {i} | Reward = {reward}")

            log_prob = dist.log_prob(actions).sum(dim=-1, keepdim=True)
            entropy += dist.entropy().sum(dim=-1).mean()

            log_probs.append(log_prob)
            values.append(value)
            rewards.append(torch.tensor([reward], dtype=torch.float, device=device))
            masks.append(torch.tensor([1-done], dtype=torch.float, device=device))

            state = next_state

            if done:
                break
        else:
            # This block executes if the loop completes without encountering a break
            print("Episode terminated without reaching the done condition.")

        next_state = torch.FloatTensor(next_state).to(device)
        next_value = critic(next_state)
        returns = compute_returns(next_value, rewards, masks)

        log_probs = torch.cat(log_probs)
        returns = torch.cat(returns).detach()
        values = torch.cat(values)

        advantage = returns - values

        actor_loss = -(log_probs * advantage.detach()).mean()
        critic_loss = advantage.pow(2).mean()

        # Update models
        optimizerA.zero_grad()
        optimizerC.zero_grad()
        actor_loss.backward()
        critic_loss.backward()
        optimizerA.step()
        optimizerC.step()

    # Save the trained models
    torch.save(actor, 'model/actor.pkl')
    torch.save(critic, 'model/critic.pkl')


## main

In [139]:
def main():
    # Define a simple function to sample from
    def simple_function(vars, doprint=False):
        mu, sigma = 0, 0.1
        res = 1/(sigma * np.sqrt(2 * np.pi)) * np.exp( - (vars - mu)**2 / (2 * sigma**2))
        if doprint: print(type(res), res)
        return res
    def simple_function2(vars, doprint=False):
        res = np.sin(5 * vars) + 0.5 * vars
        if doprint: print(type(res), res)
        return res
    
    sample_size = 100
    lr = 0.0001
    env = CustomEnv(function=simple_function, sample_size=sample_size, num_refs=100, linspace=(-5, 5, 10000))
    state_size = 1
    do_load = False
    
    # Load or create actor and critic models
    if os.path.exists('model/actor.pkl') and do_load:
        actor = torch.load('model/actor.pkl')
        print('Actor Model loaded')
    else:
        actor = Actor(state_size).to(device)
    if os.path.exists('model/critic.pkl') and do_load:
        critic = torch.load('model/critic.pkl')
        print('Critic Model loaded')
    else:
        critic = Critic(state_size).to(device)

    training_loop(actor, critic, env, n_iters=1, sample_size=sample_size, lr=lr)


In [140]:
main()

ValueError: The internally computed table of expected frequencies has a zero element at (0, 7).

## Inference

In [26]:
def inference(actor, env):
    """
    Perform inference using the trained actor model.

    Parameters
    ----------
    actor : Actor
        The trained actor model.
    env : CustomEnv
        The custom environment.
    """
    state = env.reset()

    state_tensor = torch.FloatTensor(state).to(device)

    # Get distribution of the actions from the actor
    mean, variance = actor(state_tensor)
    dist = Normal(mean, variance.sqrt())

    # Sample an action from the distribution
    action = dist.sample()
    sampled_points = action.cpu().numpy()

    # Take a step in the environment
    env.step(sampled_points)

In [90]:
from scipy.stats import norm

# Load the trained models
actor = torch.load('model/actor.pkl')

# Define a simple function to sample from
def simple_function(xs):
    res = []

    for x in xs:
        res.append(norm.pdf(x))
    return np.array(res)

sample_size = 50
env = CustomEnv(function=simple_function, sample_size=sample_size, num_refs=50, linspace=(-1, 1, 1000))

inference(actor, env)
sampled_points = env.state.reshape(1, -1)[0]
pqm = env.get_pqm()
print("Samples: ", sampled_points)
print("PQMass: ", pqm)

Samples:  [ 1.0192165  -0.7046166   0.16075183 -2.1100342   1.1247537  -0.56437665
 -2.0511155  -0.16812965  2.226415    1.0813334  -0.17472167 -1.4481682
 -0.2944311   0.28989506  1.4898683   0.7008458  -0.52827287 -1.0736123
 -0.54956603  1.0925659  -1.3257561   0.3261135  -0.39728162 -0.1515728
  0.5025027  -1.0999614  -0.03903792  0.7862698   0.95406634  0.63724816
 -1.0876268   0.98903584  0.5373073  -0.54583764  1.2409909  -0.7909513
  0.06562695 -0.17640752 -0.21591708  1.5101597  -0.08222449 -0.3518117
  0.66756463  0.14549813  1.044765    0.737229   -0.9649158  -1.6232824
  0.01011164  0.20067102]
PQMass:  477.2452820512822
