## Imports

In [115]:
import numpy as np
from scipy.io import loadmat
import shapely as sh
from shapely.geometry import Point
import matplotlib.pyplot as plt
import geopandas as gpd

import tensorflow as tf
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.metrics import Accuracy, RootMeanSquaredError, MeanAbsoluteError
from tensorflow.keras.callbacks import LearningRateScheduler
from tensorflow.keras.models import Model
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Conv2DTranspose, BatchNormalization, Activation, \
    Concatenate
import os

## Data loading and preprocessing

In [106]:
dspl_path = '/home/alexrichard/LRZ Sync+Share/ML in Physics/data/train/trainData104/dspl'
dsplRadial_path = '/home/alexrichard/LRZ Sync+Share/ML in Physics/data/train/trainData104/dsplRadial'
trac_path = '/home/alexrichard/LRZ Sync+Share/ML in Physics/data/train/trainData104/trac'
tracRadial_path = '/home/alexrichard/LRZ Sync+Share/ML in Physics/data/train/trainData104/tracRadial'

In [110]:
def datasets(dspl_path, dsplRadial_path, trac_path, tracRadial_path):
    number_samples = len([name for name in os.listdir(dspl_path) if os.path.isfile(os.path.join(dspl_path, name))])
    number_radials = len([name for name in os.listdir(dsplRadial_path) if os.path.isfile(os.path.join(dsplRadial_path, name))])
    
    # save all samples in matrix
    samples = [] 
    for i, filename in enumerate(os.listdir(dspl_path)):
        f = os.path.join(dspl_path, filename)
        if os.path.isfile(f):
            sample = loadmat(f)
            if '__header__' in sample: del sample['__header__']
            if '__version__' in sample: del sample['__version__']
            if '__globals__' in sample: del sample['__globals__']
            sample['name'] = filename
            samples = np.append(samples, sample)
        else:
            continue
    samples = np.array(samples)

    # save all radial patterns of displacements in matrix
    dspl_radials = []
    for i, filename in enumerate(os.listdir(dsplRadial_path)):
        f = os.path.join(dsplRadial_path, filename)
        if os.path.isfile(f):
            radial = loadmat(f)
            if '__header__' in radial: del radial['__header__']
            if '__version__' in radial: del radial['__version__']
            if '__globals__' in radial: del radial['__globals__']
            radial['name'] = filename
            dspl_radials = np.append(dspl_radials, radial)
        else:
            continue
    dspl_radials = np.array(dspl_radials)
    
    # save all targets in matrix
    targets = []
    for i, filename in enumerate(os.listdir(trac_path)):
        f = os.path.join(trac_path, filename)
        if os.path.isfile(f):
            target = loadmat(f)
            if '__header__' in target: del target['__header__']
            if '__version__' in target: del target['__version__']
            if '__globals__' in target: del target['__globals__']
            target['name'] = filename
            targets = np.append(targets, target)
        else:
            continue 
    targets = np.array(targets)
    
    # save all radial patterns of traction forces in matrix
    trac_radials = []
    for i, filename in enumerate(os.listdir(tracRadial_path)):
        f = os.path.join(tracRadial_path, filename)
        if os.path.isfile(f):
            radial = loadmat(f)
            if '__header__' in radial: del radial['__header__']
            if '__version__' in radial: del radial['__version__']
            if '__globals__' in radial: del radial['__globals__']
            radial['name'] = filename
            trac_radials = np.append(trac_radials, radial)
        else:
            continue
    trac_radials = np.array(trac_radials)

    return samples, dspl_radials, targets, trac_radials

## Build model

In [116]:
def conv_block(input, num_filters, snd_batch_normalization=True):
    initializer = RandomNormal(mean=0.0, stddev=0.01)
    x = Conv2D(filters=num_filters, kernel_size=(3, 3), padding='same', kernel_initializer=initializer,
               bias_initializer='zeros')(input)
    print(x.shape)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(filters=num_filters, kernel_size=(3, 3), padding='same', kernel_initializer=initializer,
               bias_initializer='zeros')(x)
    print(x.shape)
    if snd_batch_normalization:
        x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x

In [117]:
def encoder_block(input, num_filters):
    x = conv_block(input, num_filters, snd_batch_normalization=False)
    print(x.shape)
    p = MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding='same')(x)
    print(p.shape)

    return x, p

In [None]:
def decoder_block(input, skip_features, num_filters):
    initializer = RandomNormal(mean=0.0, stddev=0.01)
    x = Conv2DTranspose(filters=num_filters, kernel_size=(3, 3), strides=(2, 2),
                        kernel_initializer=initializer, bias_initializer='zeros', padding='same')(input)
    print(x.shape)
    x = Concatenate()([x, skip_features])
    x = conv_block(x, num_filters, snd_batch_normalization=True)

    return x

In [118]:
def build_tracnet(input_shape, batch_size):
    # entry point into graph of layers
    inputs = Input(shape=input_shape, batch_size=batch_size)
    print(inputs.shape)

    # convolution and max-pooling operations with specified numbers of filters
    s1, p1 = encoder_block(inputs, 32)
    s2, p2 = encoder_block(p1, 64)
    s3, p3 = encoder_block(p2, 128)

    # base
    b1 = conv_block(p3, 256)

    # convolution and upsampling operations with specified numbers of filters
    d1 = decoder_block(b1, s3, 128)
    d2 = decoder_block(d1, s2, 64)
    d3 = decoder_block(d2, s1, 32)

    # output
    initializer = RandomNormal(mean=0.0, stddev=0.01)
    outputs = Conv2D(filters=2, kernel_size=(3, 3), padding='same', kernel_initializer=initializer,
                     bias_initializer='zeros')(d3)

    # build model
    model = Model(inputs, outputs, name='Tracnet')

    return model

## Training

In [120]:
def train_tracnet(dspl_train, trc_train, dspl_test, trc_test, input_shape=(104, 104, 2), epochs=100):
    batch_size = 32
    model = build_tracnet(input_shape, batch_size)
    model.compile(
        optimizer=SGD(momentum=0.9),
        loss=MeanSquaredError(),
        metrics=[Accuracy(), RootMeanSquaredError(), MeanAbsoluteError()]
    )
    model.summary()
    history = model.fit(
        dspl_train,
        trc_train,
        validation_data=(dspl_test, trc_test),
        batch_size=batch_size,
        epochs=epochs,
        callbacks=[LearningRateScheduler(lr_step_decay, verbose=2)],
        verbose=2
    )
    # plot accuracy
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.savefig('accuracy_1.png')
    plt.close()

    # plot loss
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.savefig('loss_1.png')
    plt.close()

    # plot rmse
    plt.plot(history.history['root_mean_squared_error'])
    plt.plot(history.history['val_root_mean_squared_error'])
    plt.title('model rmse')
    plt.ylabel('rmse')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.savefig('rmse_1.png')
    plt.close()

    # save model
    model.save("'model_1.tf'",save_format='tf',include_optimizer=True)

In [None]:
def lr_step_decay(epoch):
    initial_lr = 6e-4
    drop_rate = 0.7943
    epochs_drop = 10.0
    
    return initial_lr * np.power(drop_rate, np.floor(epoch/epochs_drop))

## Error calculation

For a given displacement field, calculate the predicted stress field and the normalized rmse relative to its ground truth for different Young's moduli.

In [73]:
def calcError(path):
    errors = []
    S = 160
    y_moduli = [2500, 5000, 10000, 20000, 400000]
    noise = 0.00765
    for nr in range(18, 56):
        name = f'MLData00{nr}.mat'.format
        path = os.path.join(path, name)
        if os.path.isfile(path):
            cell = [nr]
            trac_file = loadmat(path)
            brdx = trac_file['brdx']
            brdy = trac_file['brdy']
            tracGT = trac_file['tracGT']
            for i in y_moduli:
                name = f'MLData00{nr}-{i}.mat'.format(nr=nr, i=i)
                path = os.path.join(path, name)
                if os.path.isfile(path):
                    dspl_file = loadmat(path)
                    dspl = dspl_file['dspl']
                    trac = predictTrac(dspl, i)
                    err = errorTrac(trac, tracGT, brdx, brdy)
                    cell.append(err)

                    dspl = addNoise(dspl, noise)
                    trac = predictTrac(dspl, i)
                    err = errorTrac(trac, tracGT, brdx, brdy)
                    cell.append(err)
                else:
                    continue
            errors.append(cell)
        else:
            continue
    df = pd.DataFrame(errors, index=['first', 'second'],
                      columns=['File ID', '2,500Pa', '2,500Pa N', '5,000Pa', '5,000Pa N', '10,000Pa', '10,000Pa N',
                               '20,000Pa', '20,000Pa N', '40,000Pa', '40,000Pa N'])
    return df

`errorTrac` is called by `calcError` to actually perform the error calculation given a predicted stress field and the ground truth.

In [74]:
def errorTrac(filepath, filepath_GT, plot=False):
    file = loadmat(filepath)  # load prediction
    file_GT = loadmat(filepath_GT) # load ground truth
    brdx = file['brdx']  # x-values of predicted cell border
    brdy = file['brdy']  # y-values of predicted cell border
    trac = file['trac']
    tracGT = file_GT['trac']
    zipped = np.array(list(zip(brdx[0], brdy[0])))  # array with (x,y) pairs of cell border coordinates
    polygon = sh.geometry.Polygon(zipped)  # create polygon

    interior = np.zeros((file['dspl'].shape[0], file['dspl'].shape[1]), dtype=int)  # create all zero matrix
    for i in range(len(interior)):  # set all elements in interior matrix to 1 that actually lie within the cell
        for j in range(len(interior[i])):
            point = Point(i, j)
            if polygon.contains(point):
                interior[i][j] = 1

    # plot polygons using geopandas
    if plot:
        p = gpd.GeoSeries(polygon)
        p.plot()
        plt.show()

    # update prediction and ground truth by discarding areas outside of cell borders
    trac[:, :, 1] = trac[:, :, 1] * interior
    trac[:, :, 2] = trac[:, :, 2] * interior
    tracGT[:, :, 1] = tracGT[:, :, 1] * interior
    tracGT[:, :, 2] = tracGT[:, :, 2] * interior

    # compute rmse
    mse = np.sum(np.pow((trac[:, :, 1] - tracGT[:, :, 1], 2)), np.pow((trac[:, :, 2] - tracGT[:, :, 2], 2)))
    rmse = np.sqrt(mse)
    msm = np.sum(np.pow(tracGT[:, :, 1], 2) + np.pow(tracGT[:, :, 2], 2))
    rmsm = np.sqrt(msm)
    error = rmse / rmsm

    return error

`add_noise` applies Gaussian noise to a displacement field

In [114]:
def add_noise(dspl, N):
    dsplN = np.zeros(shape=dspl.shape)
    S = dspl.shape[0]
    stdev = N / np.sqrt(2)
    noise = np.random.normal(loc=0, scale=stdev, size=(S, S))
    dsplN[:, :, 1] = dspl[:, :, 1] + noise
    noise = np.random.normal(loc=0, scale=stdev, size=(S, S))
    dsplN[:, :, 2] = dspl[:, :, 2] + noise
    
    return dsplN