In [1]:
import tensorflow as tf
import numpy as np
import numpy.random as rng
import tensorflow.keras as keras
import tensorflow_probability as tfp

# Generate synthetic data

In [3]:
n_kcs = 20
pc_group = [0.3, 0.5, 0.8, 0.1]
n_groups = len(pc_group)
n_obs_per_kc = 50

# random assignment matrix
A = rng.multinomial(1, [1/n_groups] * n_groups, n_kcs)
G = np.argmax(A, axis=1)
print(A)
x = []
y = []
for kc in np.arange(n_kcs):
    g = G[kc]
    pc = pc_group[g]
    for i in range(n_obs_per_kc):
        x.append(kc)
        y.append(rng.binomial(1, pc))
x = np.array(x)
y = np.array(y)
X = keras.utils.to_categorical(x)
X.shape

[[0 1 0 0]
 [0 1 0 0]
 [1 0 0 0]
 [1 0 0 0]
 [1 0 0 0]
 [0 1 0 0]
 [0 1 0 0]
 [1 0 0 0]
 [0 0 0 1]
 [1 0 0 0]
 [1 0 0 0]
 [0 0 0 1]
 [0 0 1 0]
 [0 1 0 0]
 [0 0 0 1]
 [0 1 0 0]
 [0 1 0 0]
 [0 0 1 0]
 [0 0 1 0]
 [0 0 1 0]]


(1000, 20)

# Fit a simple membership model

In [20]:
# variable representing assignment probabilities
logit_P = tf.Variable(tf.random.normal((n_kcs, n_groups), mean=0, stddev=0.1), name="logit_P")

# variable representing emission probabilities
logit_O = tf.Variable(tf.random.normal((n_groups,1), mean=0, stddev=0.1), name="logit_O")

Xtf = tf.convert_to_tensor(X, dtype=tf.float32)
Ytf = tf.convert_to_tensor(y[:,np.newaxis], dtype=tf.float32)
lossf =keras.losses.BinaryCrossentropy(from_logits=True)

optimizer = keras.optimizers.Nadam(learning_rate=0.01)

epochs = 10000
for e in range(epochs):
    
    with tf.GradientTape() as t:
        
        S = tf.math.exp(logit_P) / tf.reduce_sum(logit_P, axis=1, keepdims=True)
        
        # extract emission probabilities
        logit_pc = tf.matmul(tf.matmul(X, S), logit_O)
        
        # calculate loss
        loss = lossf(Ytf, logit_pc)
    
    trainables = [logit_P, logit_O]
    grads = t.gradient(loss, trainables)
    optimizer.apply_gradients(zip(grads, trainables))
    
    if e % 1000 == 0:
        print("Loss: %8.4f" % loss.numpy())

Loss:   1.0435
Loss:   0.6481
Loss:   0.6174
Loss:   0.6011
Loss:   0.5965
Loss:   0.5852
Loss:   0.5743
Loss:   0.5666
Loss:   0.5636
Loss:   0.5625


In [21]:
lp = logit_P.numpy()
lp = np.exp(lp) / np.sum(np.exp(lp), axis=1, keepdims=True)
lp

array([[0.11367014, 0.66704434, 0.08698247, 0.132303  ],
       [0.23458944, 0.31500798, 0.22713794, 0.22326466],
       [0.1321291 , 0.6493795 , 0.08945897, 0.12903242],
       [0.24811177, 0.2621024 , 0.26495832, 0.22482753],
       [0.26800486, 0.2248898 , 0.25165382, 0.25545147],
       [0.12813126, 0.6435085 , 0.10383385, 0.12452642],
       [0.13496862, 0.63619095, 0.09894469, 0.12989576],
       [0.1200204 , 0.6447525 , 0.09857304, 0.13665406],
       [0.10909919, 0.6762711 , 0.08876687, 0.12586284],
       [0.23212045, 0.3752901 , 0.17686096, 0.21572846],
       [0.10063827, 0.69143814, 0.08350869, 0.12441489],
       [0.29794988, 0.18661135, 0.24030066, 0.27513808],
       [0.09852854, 0.7169448 , 0.0836041 , 0.10092262],
       [0.14307392, 0.6012197 , 0.11503052, 0.14067586],
       [0.22936614, 0.254871  , 0.25031465, 0.26544818],
       [0.11603247, 0.6673513 , 0.08913586, 0.12748036],
       [0.16001458, 0.5707206 , 0.11844661, 0.15081823],
       [0.09201317, 0.7167462 ,

In [22]:
# variable representing assignment probabilities
logit_P = tf.Variable(tf.random.normal((n_kcs, n_groups), mean=0, stddev=0.1), name="logit_P")

# variable representing emission probabilities
logit_O = tf.Variable(tf.random.normal((n_groups,1), mean=0, stddev=0.1), name="logit_O")

Xtf = tf.convert_to_tensor(X, dtype=tf.float32)
Ytf = tf.convert_to_tensor(y[:,np.newaxis], dtype=tf.float32)
lossf =keras.losses.BinaryCrossentropy(from_logits=True)

temperature = 0.5

optimizer = keras.optimizers.Nadam(learning_rate=0.1)

epochs = 10000
for e in range(epochs):
    
    with tf.GradientTape() as t:
        
        dist = tfp.distributions.RelaxedOneHotCategorical(temperature, logits=logit_P)
        
        # sample an assignment [n_kcs, n_groups]
        S = dist.sample()
        
        # quantize it
        #S = quantize(S)
        
        # extract emission probabilities
        logit_pc = tf.matmul(tf.matmul(X, S), logit_O)
        
        # calculate loss
        loss = lossf(Ytf, logit_pc)
    
    trainables = [logit_P, logit_O]
    grads = t.gradient(loss, trainables)
    optimizer.apply_gradients(zip(grads, trainables))
    
    if e % 1000 == 0:
        print("Loss: %8.4f" % loss.numpy())

dist = tfp.distributions.RelaxedOneHotCategorical(1e-6, logits=logit_P)
# sample an assignment [n_kcs, n_groups]
S = dist.sample()
print(S)

Loss:   0.6969
Loss:   0.5702
Loss:   0.5712
Loss:   0.5695
Loss:   0.5708
Loss:   0.5711
Loss:   0.5724
Loss:   0.5716
Loss:   0.5699
Loss:   0.5699
tf.Tensor(
[[0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [0. 0. 0. 1.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]], shape=(20, 4), dtype=float32)


In [24]:
samples = []
for i in range(100):
    samples.append(dist.sample().numpy())
samples = np.array(samples)
np.mean(samples, axis=0)

array([[0.25, 0.75, 0.  , 0.  ],
       [0.  , 1.  , 0.  , 0.  ],
       [0.  , 1.  , 0.  , 0.  ],
       [0.  , 1.  , 0.  , 0.  ],
       [0.  , 1.  , 0.  , 0.  ],
       [1.  , 0.  , 0.  , 0.  ],
       [1.  , 0.  , 0.  , 0.  ],
       [0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 1.  , 0.  ],
       [0.43, 0.57, 0.  , 0.  ],
       [0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 1.  , 0.  ],
       [0.  , 0.  , 0.  , 1.  ],
       [1.  , 0.  , 0.  , 0.  ],
       [0.  , 0.  , 1.  , 0.  ],
       [0.3 , 0.7 , 0.  , 0.  ],
       [1.  , 0.  , 0.  , 0.  ],
       [0.  , 0.  , 0.  , 1.  ],
       [0.  , 0.  , 0.  , 1.  ],
       [0.  , 0.  , 0.  , 1.  ]], dtype=float32)