In [None]:
import tensorflow as tf
import mixturemapping as mm
import numpy as np
import plotly.express as px
import pandas as pd

In [None]:
mixN = 3 #dimension of Gaussian Mixture
inputMixM = 2 #dimension of single Gaussian in the input mixture
outputMixM = 3 #dimension of a single Gaussian in the output mixture
sampleSize = 200
batchSize = 50
dataType = tf.float32

In [None]:
def __getSamples(mapFunc,
                 mixN,
                 inputMixM,
                 outputMixM,
                 sampleSize,
                 ):

    totalSamplesN = 100

    # compute an array of weights of the different mixture components
    w = np.random.uniform(low=0, high=1, size=(totalSamplesN, mixN))
    w = w / np.expand_dims(np.sum(w, axis=1), 1)

    # fill the initial values of the distributions
    trainFeatures = {
        "InputMean": np.random.uniform(high=20.0, size=(totalSamplesN, 1, inputMixM)) + np.random.uniform(high=10, size=(totalSamplesN, mixN, inputMixM)),
        "InputStdDev": np.random.uniform(high=0.5, size=(totalSamplesN, 1, inputMixM)) + np.random.uniform(high=0.1, size=(totalSamplesN, mixN, inputMixM)),
        "InputWeights": w,
    }

    # create a mapped set of training and validation samples
    TrainMeanSamplesArray = []
    TrainStdDevSamplesArray = []
    TrainWeightsSamplesArray = []
    TrainSamplesArray = []

    for i in range(totalSamplesN):

        mix = mm.utils.getSkleanGM(
            trainFeatures["InputWeights"][i], trainFeatures["InputMean"][i], trainFeatures["InputStdDev"][i])

        # copy the input to make the right length
        TrainMeanSamplesArray.append(np.transpose(
            [trainFeatures["InputMean"][i] for x in range(sampleSize)]))
        TrainStdDevSamplesArray.append(np.transpose(
            [trainFeatures["InputStdDev"][i] for x in range(sampleSize)]))
        TrainWeightsSamplesArray.append(np.transpose(
            [trainFeatures["InputWeights"][i] for x in range(sampleSize)]))

        TrainSamplesArray.append(np.transpose(
            [mapFunc(x) for x in mix.sample(sampleSize)[0]]))

    trainFeatures["TrainSamples"] = np.reshape(np.transpose(
        TrainSamplesArray, [0, 2, 1]), [sampleSize*totalSamplesN, outputMixM])

    trainFeatures["TrainMean"] = np.reshape(np.transpose(TrainMeanSamplesArray, [
                                            0, 3, 2, 1]), [sampleSize*totalSamplesN, mixN, inputMixM])
    trainFeatures["TrainStdDev"] = np.reshape(np.transpose(TrainStdDevSamplesArray, [
        0, 3, 2, 1]), [sampleSize*totalSamplesN, mixN, inputMixM])
    trainFeatures["TrainWeights"] = np.reshape(np.transpose(
        TrainWeightsSamplesArray, [0, 2, 1]), [sampleSize*totalSamplesN, mixN])

    trainFeatures["GroupedSamples"] = np.array(TrainSamplesArray)
    return trainFeatures


def getSimpleLinearA(mixN,
                     inputMixM,
                     outputMixM,
                     sampleSize,
                     ):
    def mapFunc(x):
        """
        we assume x to be a single sample vec, which is transformed to a point in the output distribution
        """
        return np.array([x[0]*0.9 + .9, -x[1]*0.9 - 0.3, -x[0]*0.5 + .2]) + np.random.normal(scale=.1, size=3)
    return {
        "trainFeatures": __getSamples(mapFunc,
                                      mixN,
                                      inputMixM,
                                      outputMixM,
                                      sampleSize,
                                      ),
                                      "testFeatures": {
                                          "mapping_kernel": [[0.9, 0, -0.5], [0.0, -0.9, 0]],
                                          "mapping_bias": [0.9, -0.3, 0.2],
                                          "cov_std": [.1, 0.1, 0.1],
                                      }

    }

In [None]:
example_data = getSimpleLinearA(
    mixN,
    inputMixM,
    outputMixM,
    sampleSize
)
trainFeatures = example_data["trainFeatures"]

In [None]:
tf.keras.backend.clear_session()


with tf.name_scope("WaferGMM"):
    inMean = tf.keras.Input(shape=(mixN, inputMixM), name="Mean", dtype=dataType)
    inStdDev = tf.keras.Input(shape=(mixN, inputMixM), name="StdDev", dtype=dataType)
    inWeight = tf.keras.Input(shape=(mixN), name="Weight", dtype=dataType)

with tf.name_scope("SampleInput"):
    inTsamples = tf.keras.Input(shape=(outputMixM), name="TrainSamples", dtype=dataType)
    
with tf.name_scope("CoreModel"):
    covALayer = mm.layers.TrainableCovMatrix(outputMixM, name="CovA")
    covA = covALayer(inMean)

    mappingLayer = mm.layers.LinearMapping(outputMixM, name="Mapping", dtype=dataType)
    newDist = mappingLayer({'x': inMean, 'stdDev': inStdDev, 'w': inWeight, 'covA': covA})

    distLayer = mm.layers.Distribution(dtype=dataType, regularize_cov_epsilon=0.95)

    dist = distLayer(newDist)

with tf.name_scope("Outputs"):
    outputMeans = dist.mean()


In [None]:
model = tf.keras.Model(inputs=[inMean, inStdDev, inWeight, inTsamples], outputs=dist)

In [None]:
model.summary()

In [None]:
mappingLayer.sampling_ON()

optimizer = tf.optimizers.Adam()

initial_learning_rate = 0.01
optimizer = tf.optimizers.Adam(learning_rate=initial_learning_rate)


In [None]:
model.compile(optimizer=optimizer, loss=distLayer.sample_loss(inTsamples))

In [None]:
callbacks = [
    tf.keras.callbacks.EarlyStopping(
        # Stop training when `val_loss` is no longer improving
        monitor="val_loss",
        # "no longer improving" being defined as "no better than 1e-2 less"
        min_delta=1e-2,
        patience=20,
        verbose=1,
        restore_best_weights=True
    )
]

In [None]:
hist = model.fit([
    trainFeatures["TrainMean"],
    trainFeatures["TrainStdDev"],
    trainFeatures["TrainWeights"],
    trainFeatures["TrainSamples"]],
    shuffle=True, validation_split=0.1, 
    epochs=500, batch_size=100, 
    verbose=False, use_multiprocessing=False, 
    callbacks=callbacks,
    )

In [None]:
df_train = pd.DataFrame(hist.history["loss"], columns=["loss"])
df_train["type"] = "train"
df_val = pd.DataFrame(hist.history["val_loss"], columns=["loss"])
df_val["type"] = "val"

px.line(pd.concat([df_train, df_val]), y="loss", color="type", log_y=True)

In [None]:
mappingLayer.sampling_OFF()

modelSamples = tf.transpose(dist.sample(100), [1, 2, 0])
model2 = tf.keras.Model(inputs=[inMean, inStdDev, inWeight], outputs=modelSamples)

In [None]:
res = model2.predict([trainFeatures["InputMean"], trainFeatures["InputStdDev"], trainFeatures["InputWeights"]])

In [None]:
idx = 13

df_pred = pd.DataFrame(res[idx].T, columns=["x", "y", "z"])
df_pred["type"] = "pred"
df_true = pd.DataFrame(trainFeatures["GroupedSamples"][idx].T, columns=["x", "y", "z"])
df_true["type"] = "true"

px.scatter_3d(pd.concat([df_pred, df_true]), x="x", y="y", z="z", color="type")

In [None]:
delta = tf.keras.backend.eval(
    mappingLayer.kernel.mean() - example_data["testFeatures"]["mapping_kernel"]
)
print(delta)
max_delta = np.max(np.abs(delta))
print(max_delta)
assert max_delta < 0.05, "Mapping slope off"

In [None]:
delta = tf.keras.backend.eval(
    mappingLayer.bias.mean() - example_data["testFeatures"]["mapping_bias"]
)
print(delta)
max_delta = np.max(np.abs(delta))
print(max_delta)
assert max_delta < 0.05, "mapping bias off"

In [None]:
delta = tf.keras.backend.eval(
    covALayer.spread[0] - example_data["testFeatures"]["cov_std"]
)
print(delta)
max_delta = np.max(np.abs(delta))
print(max_delta)
assert max_delta < 0.05, "distribution spread off"