In [2]:
import pandas as pd
import numpy as np

# Make numpy values easier to read.
np.set_printoptions(precision=3, suppress=True)

import tensorflow as tf
from tensorflow.keras import layers

# Load libriaries and functions.
from tensorflow import keras
tfk = tf.keras
tf.keras.backend.set_floatx("float32")
import tensorflow_probability as tfp
tfd = tfp.distributions
tfpl = tfp.layers 
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.layers import Dense, InputLayer, Dropout

import matplotlib.pyplot as plt
from scipy import stats
from patsy import dmatrices # helps format the input and target variables for ML

from statistics import mean

# import specific functions from the machine learning library, "sklearn"
from sklearn.model_selection import RepeatedKFold, ShuffleSplit, train_test_split, GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error

from scipy.stats import norm, gaussian_kde
from sklearn.neighbors import KernelDensity

import warnings
warnings.filterwarnings('ignore')

In [3]:
# read data from the .csv file

df = pd.read_csv("am_measurement_data.csv")
df.head()

Unnamed: 0,print_serial,design,print_location,hardware,layout,material,color,feature_category,feature_class,feature_id,feature_category_class,material_color,part_id,print_x_mm,print_y_mm,CAD_mm,measure_mm,DFT_mm,print_dist_from_origin_mm,thermal_cure
0,KR70035C,bracket,7,1,1,EPX,black,center,dist_length,bracket_dist_mm_c,center_dist_length,EPX_black,KR70035C_7,42,48,15.0,14.969881,-0.030119,63.780875,True
1,KR70035C,bracket,7,1,1,EPX,black,height,height,bracket_height_mm,height_height,EPX_black,KR70035C_7,42,48,8.0,7.644579,-0.355421,63.780875,True
2,KR70035C,bracket,7,1,1,EPX,black,inner,dia,bracket_inner_dia_mm_a,inner_dia,EPX_black,KR70035C_7,42,48,2.5,2.404393,-0.095607,63.780875,True
3,KR70035C,bracket,7,1,1,EPX,black,inner,length,bracket_inner_length_mm_b,inner_length,EPX_black,KR70035C_7,42,48,15.0,14.928239,-0.071761,63.780875,True
4,KR70035C,bracket,7,1,1,EPX,black,outer,thick,bracket_thick_mm_a,outer_thick,EPX_black,KR70035C_7,42,48,2.0,1.995835,-0.004165,63.780875,True


In [4]:
# Define the target (y) variable and the input (x) variables
# Notice "dmatrices" one-hot-encodes categorical variables for us 

y, x = dmatrices('DFT_mm ~ CAD_mm + print_x_mm + print_y_mm + print_dist_from_origin_mm + C(material) + C(hardware) + C(feature_category) + C(feature_class)', df, return_type="dataframe")
y = y.astype(float)
x = x.astype(float)
y.head()    # target variable to be predicted

Unnamed: 0,DFT_mm
0,-0.030119
1,-0.355421
2,-0.095607
3,-0.071761
4,-0.004165


In [5]:
# set aside 20% of train and test data for evaluation
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, shuffle = True, random_state = 2022)

# Use the same function above for the validation set
# x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.30, shuffle=True, random_state= 2022) # 0.25 x 0.8 = 0.2


print("x_train shape: {}".format(x_train.shape))
print("x_test shape: {}".format(x_test.shape))
print("y_train shape: {}".format(y_train.shape))
print("y_test shape: {}".format(y_test.shape))
# print("x_val shape: {}".format(x_val.shape))
# print("y val shape: {}".format(y_val.shape))

x_train shape: (1620, 16)
x_test shape: (405, 16)
y_train shape: (1620, 1)
y_test shape: (405, 1)


In [7]:
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
sc = StandardScaler()
mc = MinMaxScaler()
x_train = sc.fit_transform(x_train).astype("float32")
# x_val = sc.transform(x_val).astype("float32")
x_test = sc.transform(x_test).astype("float32")

In [8]:

# Independent pripors over each weight (works for different distributions
def prior(kernel_size, bias_size, dtype=None, kind="normal"):
    n = kernel_size + bias_size

    def make_dist():
        z = tf.zeros([n], dtype=dtype)

        if kind == "normal":              # N(0, σ^2)
            sigma = tf.cast(1.0, dtype)
            return tfd.Independent(tfd.Normal(loc=z, scale=sigma), reinterpreted_batch_ndims=1)

        if kind == "laplace":             # Laplace(0, b)
            b = tf.cast(0.5, dtype)
            return tfd.Independent(tfd.Laplace(loc=z, scale=b), reinterpreted_batch_ndims=1)

        if kind == "studentt":            # Student-t(df, 0, scale) (heavy-tailed)
            df = tf.cast(3.0, dtype)
            s  = tf.cast(1.0, dtype)
            return tfd.Independent(tfd.StudentT(df=df, loc=z, scale=s), reinterpreted_batch_ndims=1)

        if kind == "spike_slab":          # 2-component Gaussian mixture
            # “spike” small std, “slab” large std
            pi = tf.cast(0.7, dtype)      # spike probability
            scales = tf.stack([tf.cast(0.05, dtype), tf.cast(1.0, dtype)])  # [2]
            locs   = tf.zeros([2, n], dtype=dtype)                          # [2, n]
            comps  = tfd.Independent(
                tfd.Normal(loc=locs, scale=scales[:, None]),
                reinterpreted_batch_ndims=1
            )
            mix = tfd.Categorical(probs=tf.stack([pi, 1.0 - pi]))
            return tfd.MixtureSameFamily(mixture_distribution=mix,
                                         components_distribution=comps)

        raise ValueError(f"Unknown kind: {kind}")

    prior_model = tf.keras.Sequential([
        tfp.layers.DistributionLambda(lambda t: make_dist())
    ]) 

    return prior_model


# Define variational posterior weight distribution as multivariate Gaussian.
# Note that the learnable parameters for this distribution are the means,
# variances, and covariances.
def posterior(kernel_size, bias_size, dtype=None):
    n = kernel_size + bias_size
    posterior_model = keras.Sequential(
        [
            tfp.layers.VariableLayer(
                tfp.layers.MultivariateNormalTriL.params_size(n), dtype=dtype
            ),
            tfp.layers.MultivariateNormalTriL(n),
        ]
    )
    return posterior_model

In [20]:
train_size = len(x_train)

inputs = keras.Input(shape=(16,))
inputs = layers.BatchNormalization()(inputs)

# Create hidden layers with weight uncertainty using the DenseVariational layer.

densevar = tfp.layers.DenseVariational(
    units=8,
    make_prior_fn=lambda ks, bs, dt=None: prior(ks, bs, dt, kind="normal"),
    make_posterior_fn=posterior,
    kl_weight=1/train_size,
    activation="sigmoid")(inputs)
            
# Create a probabilistic output (Normal distribution), and use the `Dense` layer
# to produce the parameters of the distribution.
# We set units=2 to learn both the mean and the variance of the Normal distribution.

distribution_params = layers.Dense(units=2)(densevar)
outputs = tfp.layers.IndependentNormal(1)(distribution_params)

model_pbnn = keras.Model(inputs=inputs, outputs=outputs)

def negative_loglikelihood(targets, estimated_distribution):
    return -estimated_distribution.log_prob(targets)

# Compile model.
model_pbnn.compile(optimizer=tf.optimizers.RMSprop(learning_rate=0.001), loss=negative_loglikelihood, metrics=[keras.metrics.RootMeanSquaredError()])


In [21]:
model_pbnn.summary()

Model: "model_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_10 (InputLayer)       [(None, 16)]              0         
                                                                 
 dense_variational_4 (Dense  (None, 8)                 9452      
 Variational)                                                    
                                                                 
 dense_4 (Dense)             (None, 2)                 18        
                                                                 
 independent_normal_4 (Inde  ((None, 1),               0         
 pendentNormal)               (None, 1))                         
                                                                 
Total params: 9470 (36.99 KB)
Trainable params: 9470 (36.99 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [22]:
import time

n_epochs_pbnn=1000

print("training started...")

# get the start time
st = time.time()

history_pbnn = model_pbnn.fit(x_train, y_train, epochs=n_epochs_pbnn, validation_split=0.2, verbose=0) 

# get the end time
et = time.time()

# get the execution time
elapsed_time = et - st

print("training finished.")

print('Execution time:', elapsed_time, 'seconds')

training started...
training finished.
Execution time: 106.91274976730347 seconds


In [1]:
print("Evaluation...")

_, rmse = model_pbnn.evaluate(x_train, y_train, verbose=0)
print(f"Train RMSE: {round(rmse, 3)}")
    
_, rmse = model_pbnn.evaluate(x_test, y_test, verbose=0)
print(f"Test RMSE: {round(rmse, 3)}")