# Discovering a parsimonious neural network - melting temperature laws #

<i>Saaketh Desai</i>, and <i>Alejandro Strachan</i>, School of Materials Engineering, Purdue University <br>

This notebook describes the procedure to train a parsimonious neural network, i.e., a network designed to reproduce the training and testing datasets in the simplest, most interpretable manner possible. We use Keras to train neural networks and the DEAP package for genetic algorithms. The outline of this notebook is:

1. Read datasets and split into training and testing sets
2. Create a generic model
3. Define the objective function for the genetic algorithm
4. Set up the genetic algorithm and save the results

In [None]:
!jupyter notebook --version
!python --version
import tensorflow as tf
print(tf.__version__)
import sklearn
print(sklearn.__version__)
import keras
print(keras.__version__)
import pandas as pd
print(pd.__version__)
# 5.7.8
# Python 3.7.7
# 1.13.1
# 0.24.1
# 2.2.4
# 0.24.2

In [1]:
import sys
import os
import random

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

from matplotlib import pyplot as plt

import tensorflow as tf
#Import Keras layers to build custom neural networks
import keras
from keras import backend as K
from keras import initializers
from keras.layers import Dense, Input, Activation, multiply
from keras.models import Sequential, Model, load_model
from keras.layers.merge import add, concatenate
# DEAP Distributed Evolutionary Algorithms in Python Genetic Algorithm (GA)
# https://github.com/DEAP/deap
# https://deap.readthedocs.io/en/master/examples/index.html
#Import modules from the ‘deap’ package
from deap import base, creator, tools, algorithms  
from multiprocessing import Pool

Using TensorFlow backend.


## Step 1: Read training and testing data##
We read in a CSV file containing the fundamental quantities such as bulk modulus, shear modulus, density etc., along with the experimental melting temperature. We then compute quantities such as effective sound speed ($v_m$) to compute effective temperatures $\theta_0, \theta_1, \theta_2, \theta_3$ and normalized inputs $\theta_1', \theta_2', \theta_3'$. Finally, we use the `train_test split()` method from scikit-learn to split the data into training and testing sets

In [3]:
df = pd.read_csv("./data/Combined_data_v3.csv")
print (df.shape)

(218, 15)


In [4]:
h = 6.62607015*1e-34
k = 1.380649*1e-23
Na = 6.0221407*1e23
pi = np.pi
hbar = 1.054571817*1e-34
# Compute Debye temp and effective sound speed
# define the volume and temp to unified
vs = np.sqrt(df['G_VRH']/df['density']) #from Zack
vp = np.sqrt((df['K_VRH'] + (4/3)*df['G_VRH'])/df['density']) #from Zack
vm = ( 3/( (1/vp)**3 + 2*(1/vs)**3 ) )**(1/3) #from JP Poirier paper

df['debye_temp'] = 10**13*(h/k)*(3/(4*pi*df['volume_per_atom']))**(1/3)*vm

df['a'] = (df['volume_per_atom'])**(1/3)

a = df['a']
m = df['mean_mass']
G = df['G_VRH']
K = df['K_VRH']

In [5]:
# define the temp unified unitless 
theta0 = (1.054571817/1.380649)*100*vm/a #hcross*vm/(k*a)
theta1 = (1.054571817**2*6.0221407/1.380649)*10*(1/(m*a**2)) #hcross**2/(m*a**2*k)
theta2 = (1/1.380649)*100*(a**3*G) #a**3*G/k
theta3 = (1/1.380649)*100*(a**3*K) #a**3*K/k
# temp to normailsed  Compute theta(s)
theta1_prime = theta1/theta0
theta2_prime = theta2/theta0
theta3_prime = theta3/theta0
# define the forth element in arrary
ones = np.ones(len(theta1_prime))
# define the Tm by the to normailsed 
Tm_prime = df['Tm']/theta0

#Create input/output arrays
inputs = np.array([theta1_prime, theta2_prime, theta3_prime, ones], dtype='float') # theta0, theta1,theta2, one 
inputs = inputs.T # transpose
outputs = np.array(Tm_prime).reshape(-1, 1) # reshape 

print (inputs.shape, outputs.shape)
#Split into train/test sets
train_inputs, test_inputs, train_outputs, test_outputs = train_test_split(inputs, outputs, test_size=0.2, random_state=0)
print (train_inputs.shape, train_outputs.shape)
print (test_inputs.shape, test_outputs.shape)

(218, 4) (218, 1)
(174, 4) (174, 1)
(44, 4) (44, 1)


In [6]:
#different activation function encoding python directory 
act_dict = {0: 'linear', 1: 'squared', 2: 'inverse', 3: 'multiply', 4: 'tanh'}
np.random.seed(300000)
weight_dict = {0: 0, 1: 1, 2: np.random.uniform(-1,1,1)[0]}
nact_terms = 4
nweight_terms = 13

## Step 2: Create a generic model

We will now create a generic model whose activations and weights will be optimized to discover PNNs

In [7]:
#define funtion for customised layer in NN
def squared_act(x):
    return x*x

def inverse_act(x):
    return 1/x

In [8]:
def create_node(input1, input2, input3, name, trainable1, trainable2, trainable3, act, x, idx):
    base = name
    n1 = base + "1"
    n2 = base + "2"
    n3 = base + "3"
    #Each connection is a Dense layer with 1 input and 1 output
    an1 = Dense(1, activation = 'linear', use_bias = False, name=n1, trainable=trainable1) (input1) #customised layer from node 1 in layer 1 to 1 connection layer 2 and input1 
    an2 = Dense(1, activation = 'linear', use_bias = False, name=n2, trainable=trainable2) (input2) #customised layer from node 2 in layer 1 to 1 connection and layer 2 and  input1 
    an3 = Dense(1, activation = 'linear', use_bias = False, name=n3, trainable=trainable3) (input3) #customised layer from node 3 in layer 1 to 1 connection and layer 2 and input1 
    
    node_list = [an1, an2, an3] # list of node 
    if (act == "multiply"):    #  customised actication function multiple or cube
        non_zero_list = []
        zero_list = []
        for i, j in enumerate(node_list):   #  dont muliple with zero if one of activation is zero in node list, will make weight zero 
            if (x[idx+i] == 1 or x[idx+i] == 2): #For a multiply activation, multiply non-zero nodes
                non_zero_list.append(j)
            else:
                zero_list.append(j)
        if ( len(non_zero_list) == 0 ):
            non_zero_list = node_list
            an = multiply(non_zero_list)
        if ( len(non_zero_list) == 1 ):
            anx = non_zero_list[0]
            an = add([anx, zero_list[0], zero_list[1]])
        else:
            an = multiply(non_zero_list)
    else:
        an = add(node_list)   #Add each connection
        if (act == "squared"):  #  squared or inverse activation on basis of condition  
            an = Activation(squared_act) (an)
        elif (act == "inverse"):
            an = Activation(inverse_act) (an) # Apply activation
        else:
            an = Activation(act) (an)
    return an

In [15]:
# create customised model now
def create_model(x):
    #initializer = keras.initializers.RandomUniform(minval=-0.001, maxval=0.001, seed=0)
    bias_initial = keras.initializers.Zeros()

    trainable_list = []
    for i in range(nweight_terms):
        if (x[i+nact_terms] == 2):
            trainable_list.append(True)
        else:
            trainable_list.append(False)

    input1 = Input(shape=(1,))  # inputs idenstified by (shape=(1,)
    input2 = Input(shape=(1,))
    input3 = Input(shape=(1,))
    input4 = Input(shape=(1,))
    # Create nodes a1, a2, a3 in the first hidden layer
    a1 = create_node(input1, input2, input3, "a1", trainable_list[0], trainable_list[1],   # custome layers connection to make one striing in nw end to end
                     trainable_list[2], act_dict[x[0]], x, 0+nact_terms)
    a2 = create_node(input1, input2, input3, "a2", trainable_list[3], trainable_list[4], 
                     trainable_list[5], act_dict[x[1]], x, 3+nact_terms)
    a3 = create_node(input1, input2, input3, "a3", trainable_list[6], trainable_list[7], 
                     trainable_list[8], act_dict[x[2]], x, 6+nact_terms)

    an1 = Dense(1, activation = 'linear', use_bias = False, name='output1', trainable=trainable_list[9]) (a1)   #customised layer from node 1 in layer 2 to dense connection layer 3 and input1 
    an2 = Dense(1, activation = 'linear', use_bias = False, name='output2', trainable=trainable_list[10]) (a2)
    an3 = Dense(1, activation = 'linear', use_bias = False, name='output3', trainable=trainable_list[11]) (a3)
    # Setup connections for output layer
    an4 = Dense(1, activation = 'linear', use_bias = False, name='output4', trainable=trainable_list[12]) (input4)

    act = act_dict[x[3]]
    node_list = [an1, an2, an3, an4]
    if (act == "multiply"):
        non_zero_list = []
        zero_list = []
        for i, j in enumerate(node_list):   # same for lyer 2,  dont muliple with zero if one of activation is zero in node list, will make weight zero 
            if (x[9+i] == 1 or x[9+i] == 2): # Add/multiply connections and apply activation functions to get output neuron
                non_zero_list.append(j)
            else:
                zero_list.append(j)
        if ( len(non_zero_list) == 0 ):
            non_zero_list = node_list
            an = multiply(non_zero_list)
        elif ( len(non_zero_list) == 1 ):
            anx = non_zero_list[0]
            an = add([anx, zero_list[0], zero_list[1], zero_list[2]])
        else:
            an = multiply(non_zero_list)
    else:
        an = add(node_list)
        if (act == "squared"):                    #  same layer 2 squared or inverse activation on basis of condition 
            an = Activation(squared_act) (an)
        elif (act == "inverse"):
            an = Activation(inverse_act) (an)
        else:
            an = Activation(act) (an)
    output = an
    # Define model with 3 inputs, 1 bias, and 1 output
    model = Model(inputs=[input1, input2, input3, input4], outputs=[output])
    optimizer = tf.train.AdamOptimizer(learning_rate=1e-3)
    model.compile(loss='mse', optimizer=optimizer)  # model comple to draw kera entire network
    
    layer_list = []
    for i in range(len(model.layers)):
        name = model.layers[i].name
        if ( ("activation" in name) or ("input" in name) or ("add" in name) or ("multiply" in name) ): # cudtome activation to feed in entire network
            continue
        else:
            layer_list.append(i)
    
    for i in range(len(layer_list)):
        model.layers[layer_list[i]].set_weights( [ np.array( [[ weight_dict[x[nact_terms+i]] ]] ) ] )
        #model.layers[layer_list[i]].set_weights( [ np.array( [[ weights_list[i] ]] ) ] )

    #model.summary()
    #Set some model weights
    return model, trainable_list

In [10]:
# loss and keras.callbacks

losses = []
class PrintEpNum(keras.callbacks.Callback): # This is a function for the Epoch Counter
    def on_epoch_end(self, epoch, logs):
        sys.stdout.flush()
        sys.stdout.write("Current Epoch: " + str(epoch+1) + ' Loss: ' + str(logs.get('loss')) + '                     \r')
        losses.append(logs.get('loss'))
#EarlyStopping criterion to prevent overfitting
def train(model, train_inputs, train_outputs, verbose=False):
    mae_es= keras.callbacks.EarlyStopping(monitor='val_loss', patience=1000,
                                          min_delta=1e-5, verbose=1, mode='auto', restore_best_weights=True)

    terminate = keras.callbacks.TerminateOnNaN()
    # train model 
    EPOCHS = 10000 # Number of EPOCHS
    history = model.fit([train_inputs[:,0], train_inputs[:,1], train_inputs[:,2], train_inputs[:,3]], train_outputs[:,0],
                        epochs=EPOCHS,
                        shuffle=False, batch_size=len(train_inputs), verbose = False, callbacks=[mae_es, terminate],
                        validation_split=0.2)
    
    if verbose:
        plt.figure()
        plt.xlabel('Epoch')
        plt.ylabel('Mean Sq Error')
        plt.plot(history.epoch, np.array(history.history['loss']),label='Training loss')
        plt.legend()
        plt.show()
    return history

In [11]:
def f3(w):
    return w

## Step 3: Define the objective function for the genetic algorithm
The objective function consists of three parts: 
1. The mean squared error of the model on the test set 
2. A penalty term for non-linear activation functions
3. A penalty term for weights that are not fixed, simple values such as 0, 1

In [12]:
# NN to genetic algorithm connection through individual layer function defined above make connection
# create_model(individual) used in the GA next
def objective_function(individual):
    new_model, trainable = create_model(individual)
    #print ("Trainable: ", trainable)
    valid_flag = True
    stringlist = []
    new_model.summary(print_fn=lambda x: stringlist.append(x)) #Determine # of trainable weights
    for string in stringlist:
        if ("Trainable params" in string):
            ntrainable = int(string[-1])

    if (ntrainable > 0):  # Train and evaluate model
        train(new_model, train_inputs, train_outputs, verbose=False)

    mse_train = new_model.evaluate([train_inputs[:, 0], train_inputs[:, 1], train_inputs[:, 2], train_inputs[:, 3]], 
                                   train_outputs, verbose=0)
    mse_test = new_model.evaluate([test_inputs[:, 0], test_inputs[:, 1], test_inputs[:, 2], test_inputs[:, 3]], 
                                  test_outputs, verbose=0)

    if (np.isnan(mse_train) or np.isnan(mse_test) or np.isinf(mse_train) or np.isinf(mse_test)):
        valid_flag = False

    weights = new_model.get_weights()
    weight_list = []
    for weight in weights:
        weight_list.append(weight[0][0])
    weight_list = np.array(weight_list)  # Collect final weights of model

    #handle nan weights
    if (np.isnan(weight_list).any()):
        valid_flag = False

    if (valid_flag):
        print (weight_list)
    else:
        mse_test = 1e50

    actfunc_term = [i**2 for i in individual[:nact_terms]]
    weights = individual[nact_terms:]
    weight_term = 0
    for j in range(nweight_terms):
        weight_term += f3(weights[j])
        
    mse_test_term = np.log10(mse_test)

    p = 0.1
    obj = mse_test_term + p*(np.sum(actfunc_term) + weight_term)  # Add MSE term, activation function term and weight score term of get obj func
    print ("Individual: ", individual, flush=True)
    print ("Objective function: ", mse_test, np.sum(actfunc_term), weight_term, obj, flush=True)

    keras.backend.clear_session()
    tf.reset_default_graph()
    return (obj,)

## Step 4: Set up the genetic algorithm and saving the results
Each network is expressed as an individual of 17 genes, the genes representing the possible activations and weights. We thus define an individual to be a custom container, which is repeated to create a population. For details on this, please refer to the DEAP guide on setting up a genetic algorithm, which can be found [here](https://deap.readthedocs.io/en/master/)

In [13]:
################### DEAP #####################
#create fitness class and individual class
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)
# create customised tool box to call later 
toolbox = base.Toolbox()  # Determine # of trainable weights
#pool = Pool(1)
#toolbox.register("attr_int", random.randint, 0, 3)
# custom Repeat function 
def custom_initRepeat(container, func, max1, max2, n):
    func_list = []
    for i in range(n):
        if (i < nact_terms):
            func_list.append(func(0, max1))  #Define custom repeat func to design tailormade individuals
        else:
            func_list.append(func(0, max2))
    return container(func_list[i] for i in range(n))

#gen = initRepeat(list, random.randint, 3, 7, 4)
toolbox.register("create_individual", custom_initRepeat, creator.Individual, random.randint, # register customised tool box in GA to be called with available tool
                 max1=4, max2=2, n=nact_terms+nweight_terms)
toolbox.register("population", tools.initRepeat, list, toolbox.create_individual) # population is used in next celll for GA to play torunament and choose best
# custom mutation function 
def custom_mutation(individual, max1, max2, indpb):
    size = len(individual)
    for i in range(size):
        if random.random() < indpb:
            if (i < nact_terms):     #Define custom mutation
                individual[i] = random.randint(0, max1)
            else:
                individual[i] = random.randint(0, max2)
    return individual,

We use the two point crossover method for mating two individuals, and perform a random mutation using the custom mutation function. We then define a population size of 200 and define the statistics that we wish to log in the output of the code. The stats object decides which quantities are saved to the logbook

In [14]:
cxpb = 0.5 # crossover probablity
mutpb = 0.3 # mutation probablity
ngens = 3 # no. of generation to train

# GA avaiable tool box and user definsed tool box from above register
toolbox.register("mate", tools.cxTwoPoint)  #crossover
#toolbox.register("mutate", tools.mutUniformInt, low=0, up=3, indpb=mutpb)
toolbox.register("mutate", custom_mutation, max1=4, max2=2, indpb=mutpb)
toolbox.register("select", tools.selTournament, tournsize=5)
toolbox.register("evaluate", objective_function)  #selection

random.seed(100000)
population = toolbox.population(n=5) #population 
interesting_individual = [0, 2, 0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 2, 2, 0, 2] # individual players 
for i in range(len(interesting_individual)):
    population[0][i] = interesting_individual[i]

hof = tools.HallOfFame(1)
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean)
stats.register("min", np.min)
stats.register("max", np.max)
#Use simple evolutionary algorithm
 # GA called run on user definsed tool box and avaiable tool with population, cxpb, mutpb, ngens variable
pop, logbook = algorithms.eaSimple(population, toolbox, cxpb, mutpb, ngens, stats=stats, halloffame=hof, verbose=True)

# ploting of data and evolution of equation is in the val_melting notebook
#(https://proxy.nanohub.org/weber/1919263/NXUGNZSrkpFDc2XN/26/notebooks/data/pnn/eval_melting.ipynb)

AttributeError: module 'tensorflow_core._api.v2.train' has no attribute 'AdamOptimizer'