Citation: Richardson G, Knudby A, Chen W, Sawada M, Lovitt J, He L, et al. (2023) Dense neural network outperforms other machine learning models for scaling-up lichen cover maps in Eastern Canada. PLoS ONE 18(11): e0292839. https://doi.org/10.1371/journal.pone.0292839

In [None]:
import os, shutil, datetime, csv,math,scipy,time
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from scipy.stats import pearsonr
#displaying data
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.pyplot import imshow,figure
import matplotlib.image as mpimg
plt.rcParams['figure.dpi'] = 150 
%matplotlib inline
#TF imports
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import models
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Conv2D, Dense, Flatten, Dropout
from tensorflow.keras.models import Model, Sequential, load_model
from tensorflow.keras.optimizers import Adam
from keras import backend as K 
temp_weights=r'D:\Training_Data_Creation\01-logs\temp.h5'
'''TF code to tell TF version, GPU detected, and limit memory growth'''
print(f"Tensorflow ver. {tf.__version__}")
physical_device = tf.config.experimental.list_physical_devices('GPU')
print(f'Device found : {physical_device}')
tf.config.experimental.get_memory_growth(physical_device[0])

## Variables and Functions

In [None]:
import ipynb.fs 
#Importing metric functions
from .defs.Thesis_Functions import dtime,calculate_metrics,time_and_metrics
#Importing plotting functions
from .defs.Thesis_Functions import make_scatter_from_results,plot_hist_save
#Importing NN functions
from .defs.Thesis_Functions import get_model_mae,save_nn_for_log,nnDset_to_Results

In [None]:
#create training data in TF
Train_txt = r'D:\Training_Data_Creation\Pointer_files\MidpixTrain_ts3-mega40.txt'
val_txt = r'D:\Training_Data_Creation\Pointer_files\MidpixVal_ts3-mega40.txt'
Test_txt = r'D:\Training_Data_Creation\Pointer_files\MidpixTest_ts3-mega40.txt'
Testp_txt = r'D:\Training_Data_Creation\Pointer_files\MidpixTestP40_Mega_ts3-mega50.txt'

SEED = 42

#Lists where to store the paths and labels
def txt_to_list(filename):
    joint = []
    with open(filename, 'r') as File:
        infoFile = File.readlines() #Reading all the lines from File
        for line in infoFile: #Reading line-by-line
            words = line.split() #Splitting lines in words using space character as separator
            joint.append((words[0],words[1],words[2],words[3]))
    #print ('sample{}\n{} files'.format(joint[0],str(len(joint))))
    return joint
#for loading both images and labels
def load_img(img):
    image = tf.io.read_file(img)
    image = tf.image.decode_png(image, channels=3,dtype=tf.dtypes.uint16)
    image = tf.cast(image, tf.float32)/65535
    return image

## Model Training and Testing

In [None]:
def apply_symmetry(image,lab):
    i = tf.random.uniform(shape=(), minval=0, maxval=7, dtype=tf.int32)
    if i == 0:
        return image,lab
    elif i == 1:
        return tf.image.transpose(image),lab
    elif i == 2:
        return tf.image.rot90(image, k=1, name=None),lab
    elif i == 3:
        return tf.image.rot90(image, k=2, name=None),lab
    elif i ==4:
        return tf.image.rot90(image, k=3, name=None),lab
    elif i == 5:
        return tf.image.flip_left_right(image),lab
    elif i == 6:
        return tf.image.flip_up_down(image),lab
    else:
        return image,lab
def parse_image(joint):
    join1 = tf.experimental.numpy.append(load_img(joint[1]),load_img(joint[2]),axis=2)
    combo = tf.experimental.numpy.append(join1,load_img(joint[3]),axis=2)
    value = int(joint[0])/100
    return combo,value

In [None]:
train_joint,val_joint,test_joint=txt_to_list(Train_txt),txt_to_list(val_txt),txt_to_list(Test_txt)
BUFFER_SIZE,BATCH_SIZE = 20000,500
'''See if you can get batch size of 500'''
train_dataset = tf.data.Dataset.from_tensor_slices(train_joint)
train_dataset = train_dataset.map(parse_image)
train_dataset=train_dataset.map(apply_symmetry)
train_dataset = train_dataset.shuffle(buffer_size=BUFFER_SIZE, seed=SEED)
train_dataset = train_dataset.repeat().batch(BATCH_SIZE)
#Creating Validation dataset
val_dataset=tf.data.Dataset.from_tensor_slices(val_joint)
val_dataset = val_dataset.map(parse_image).batch(len(val_joint))
#Creating Test dataset
test_dataset=tf.data.Dataset.from_tensor_slices(test_joint)
test_dataset = test_dataset.map(parse_image).batch(len(test_joint))

print(train_dataset)
print(val_dataset)
print(test_dataset)



## Model Tuning
#### This section is for finding the optimal Hyperparameters

In [None]:
NN_name='CNN3size23'
def CNN_model(lr,u,u2,u3,u4): # the last tested. good for more data.
    model=Sequential()
    model.add(Conv2D(u, (3, 3), activation='relu', padding='same',input_shape=(3,3,9)))
    model.add(Conv2D(u, (3, 3), activation='relu', padding='valid'))
    model.add(Flatten())
    model.add(Dense(u2, activation='relu'))
    model.add(Dense(u2, activation='relu'))
    model.add(Dense(u3, activation='relu'))
    model.add(Dense(1, activation='linear'))
    model.compile(loss= "MeanAbsoluteError",
                  optimizer=keras.optimizers.Adam(learning_rate=lr),
                  metrics=[tf.keras.metrics.MeanAbsoluteError(),tf.keras.metrics.MeanSquaredError()])
    return model

In [None]:
def assess_model(hparam,session_n):
    '''Defining the parameters to what will get passed into the NN'''
    spe,lr,u,u2,u3,u4=hparam[0],hparam[1],hparam[2],hparam[3],hparam[4],hparam[5]
    model = CNN_model(lr,u,u2,u3,u4) #defining NN with the parameter
    start=time.time()
    callbacks = [] #defining callbacks
    callbacks.append(tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10))
    callbacks.append(tf.keras.callbacks.ModelCheckpoint(temp_weights, save_best_only=True, save_weights_only=True))
    model_history=model.fit(train_dataset,validation_data=val_dataset,validation_steps=1,callbacks=callbacks,
                            batch_size=BATCH_SIZE,
                            steps_per_epoch=spe,
                            epochs=500)#running the model
    MAE=get_model_mae(model,temp_weights,val_dataset)
    end=time.time()
    runtime=str("%.2f"%(end-start))
    model_name= "{}-{}mae-{}r-{}p-{}sec".format(NN_name,str(round(MAE,5)),session_n,str(hparam),runtime)
    save_nn_for_log(model,temp_weights,val_dataset,logpath,model_name) #saving the model with informative info
    plot_hist_save(model,temp_weights,val_dataset,model_history,logpath,model_name)
    print('MAE is {} in {}sec'.format(MAE,runtime))
    models_dic[model_name]=MAE
    return MAE

In [None]:
def create_best_scatter (dataset,datasetstr):
    lowest = min(models_dic, key=models_dic.get)
    hparam=hp_MAE_dic[min(hp_MAE_dic)]
    '''Defining the parameters to what will get passed into the NN'''
    spe,lr,u,u2,u3,u4=hparam[0],hparam[1],hparam[2],hparam[3],hparam[4],hparam[5]
    model = CNN_model(lr,u,u2,u3,u4) #defining NN with the parameter
    model_weights_path = r"D:\Training_Data_Creation\01-logs\logs{}\{}".format(NN_name,lowest)
    files=os.listdir(model_weights_path)
    model.load_weights(model_weights_path+'\\'+files[1])
    #set result str to the string version of the dataset
    print('Best Scatterplot {}'.format(lowest))
    make_scatter_from_results(nnDset_to_Results(model,dataset),datasetstr,'01-Best {} scatters'.format(NN_name),logpath)
def create_txt_and_prints():
    txtfile=logpath+r'/01-{}Logs.txt'.format(NN_name)
    best5_models=sorted(models_dic, key=models_dic.get, reverse=False)[:10]
    worst5_models=sorted(models_dic, key=models_dic.get, reverse=True)[:10]
    with open(txtfile, 'w') as f:
        f.write("Overall runtime is "+str("%.2f"%(endmain-startmain))+' sec\n')
        f.write('Order goes SPE, BS, LR, Units (first layer then rest), MAE \n Best 10 Models \n')
        for line in list(best5_models):
            f.write(str(line))
            f.write('\n')
        f.write('\n Worst 10 Models \n')
        for line in list(worst5_models):
            f.write(str(line))
            f.write('\n')
        f.write('\n All Models \n')
        for line in list(models_dic):
            f.write(str(line))
            f.write('\n')
    print('Logs created, Best 10 models are:')
    for i in best5_models:
        print(i)
    print('\n Worst 10 models are:')
    for i in worst5_models:
        print(i)
    print('\n')
    print("Overall runtime is "+str("%.2f"%(endmain-startmain))+' sec')
def display_best_history():
    lowest = min(models_dic, key=models_dic.get)
    models_dic[lowest]
    model_weights_path = r"D:\Training_Data_Creation\01-logs\logs{}\{}".format(NN_name,lowest)
    files=os.listdir(model_weights_path)
    imgstr=files[0]
    img = mpimg.imread(model_weights_path+'\\'+imgstr)
    plt.imshow(img)

In [None]:
session_n=0
startmain = time.time()
hp_MAE_dic={}
hp_dic={'hp_lr':[.001],'hp_spe':[80,90,70],
        'hp_u':[28,24,20],'hp_u2':[28,24,20],'hp_u3':[28,24,20],'hp_u4':[0]}
models_dic={}
logpath=r"D:\Training_Data_Creation\01-logs\logs{}".format(NN_name)
for spe in hp_dic['hp_spe']:
    for lr in hp_dic['hp_lr']:
        for u in hp_dic['hp_u']:
            for u2 in hp_dic['hp_u2']:
                for u3 in hp_dic['hp_u3']:
                    for u4 in hp_dic['hp_u4']:
                        hparam=[spe,lr,u,u2,u3,u4]
                        print('--- Session {}: Testing {} spe, {} bs, {} lr and {},{},{},{} units'.format(session_n,spe,BATCH_SIZE,lr,u,u2,u3,u4))
                        MAE=assess_model(hparam,session_n)
                        plt.close()
                        hp_MAE_dic[MAE]=hparam
                        session_n+=1
endmain = time.time()

In [None]:
'''Functions work on most recently assessed model'''
endmain = time.time()
create_txt_and_prints()
create_best_scatter (val_dataset,'Val')

In [None]:
display_best_history()

## Assess Different Models
#### For evaluating test dataset and other datasets after model selection

In [None]:
#swap for testdset
create_best_scatter (test_dataset,'test')