In [None]:
import numpy as np
import matplotlib.pyplot as plt
import scipy.io as spio
import glob

from tensorflow import keras
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Conv2DTranspose, Concatenate, ConvLSTM2D, Activation, Dense, Dropout, Convolution2D, MaxPooling2D, AveragePooling2D, Flatten, BatchNormalization
from matplotlib import image, pyplot
from sklearn.metrics import mean_squared_error
from scipy import stats
from scipy.stats.stats import pearsonr
from sklearn.model_selection import train_test_split

tf.keras.backend.set_floatx('float32')

In [None]:
a=1138
h=1838

folder='.../f_c01/*.jpg'
filenames = glob.glob(folder) 
filenames.sort() 

In [None]:
train_val_img, test_img = train_test_split(filenames, test_size=50)

In [None]:
train_img, val_img = train_test_split(train_val_img, test_size=30)

In [None]:
im_samples = 128
im_size = 32
im_rep = 25
def read_images(fnames):
  for f in fnames:
    im = image.imread(f)
    conc = int(f.split('\\')[-1][1:3])
    for _ in range(im_samples):
      ix = np.random.randint(low=a, high=h-im_size)
      iy = np.random.randint(low=a, high=h-im_size)
      yield (np.array(im[iy:iy+im_size, ix:ix+im_size, :]) / 128.)-1., np.array([conc]) # np.array([conc / 50.])

def samples_count(fnames):
  return len(fnames) * im_samples

In [None]:
samples_count(train_img)

In [None]:
samples_count(val_img)

In [None]:
samples_count(test_img)

In [None]:
train_dataset = tf.data.Dataset.from_generator(
    lambda: read_images(train_img),
    output_signature=(tf.TensorSpec((im_size, im_size, 3), dtype="float32"), tf.TensorSpec((1), dtype="float32"))
)
val_dataset = tf.data.Dataset.from_generator(
    lambda: read_images(val_img),
    output_signature=(tf.TensorSpec((im_size, im_size, 3), dtype="float32"), tf.TensorSpec((1), dtype="float32"))
)
test_dataset = tf.data.Dataset.from_generator(
    lambda: read_images(test_img),
    output_signature=(tf.TensorSpec((im_size, im_size, 3), dtype="float32"), tf.TensorSpec((1), dtype="float32"))
)

# Main Section

In [None]:
def new_loss(y_true, y_pred):
  y_true = y_true
  y_pred = y_pred
  return keras.losses.mean_absolute_error(y_true, y_pred) + keras.losses.mean_squared_error(y_true, y_pred)

def loss_class(y_true, y_pred):
  y_true_idx = tf.cast(y_true, tf.int64)
  return tf.keras.losses.sparse_categorical_crossentropy(y_true_idx, y_pred, from_logits=True)

In [None]:
inp = keras.layers.Input(shape=(im_size, im_size, 3))  

x = keras.layers.Conv2D(48, (9, 9), padding="valid")(inp)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.Activation(activation='relu')(x)
x = keras.layers.AveragePooling2D(pool_size=(9, 9), strides=(1,1), padding='valid')(x)
x = keras.layers.Dropout(0.1)(x) 

x = keras.layers.Conv2D(96, (9, 9), padding="valid")(x)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.Activation(activation='relu')(x)
x = keras.layers.Dropout(0.1)(x)

x = keras.layers.ConvLSTM1D(48, kernel_size=7, return_sequences=True)(x)
x = keras.layers.Conv2D(48, (2, 2), activation="relu", padding="valid")(x)

x = keras.layers.Flatten()(x)
x = keras.layers.Dense(150, activation='relu')(x)
x = keras.layers.Dense(50, activation='relu')(x)
x = keras.layers.Dense(1, activation = 'linear')(x)

model3 = keras.Model(inp, x)
model3.compile(loss=new_loss, optimizer=keras.optimizers.Adam(learning_rate=0.00001, beta_1=0.7, beta_2=0.9, decay=0.00005), metrics=[]) 

In [None]:
model3.summary()

In [None]:
bsize = 160
train_batches = train_dataset.repeat().shuffle(samples_count(train_img)).batch(bsize).prefetch(buffer_size=tf.data.AUTOTUNE)
val_batches = val_dataset.repeat().shuffle(samples_count(val_img)).batch(bsize).prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
# training the model
save_best = keras.callbacks.ModelCheckpoint(
    'best',
    monitor='val_loss',
    verbose=1,
    save_best_only=True,
)
lr_drop = keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    patience=5,
    factor=0.1,
)

history = model3.fit(
    train_batches, 
    validation_data=val_batches, 
    steps_per_epoch=samples_count(train_img) // bsize,
    validation_steps=samples_count(val_img) // bsize,
    epochs=5,
    callbacks=[lr_drop]
)


In [None]:
plt.plot(history.history['val_loss'])
plt.plot(history.history['loss'])
plt.ylim(0, 10)
plt.title('Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['val_loss', 'loss'], loc = 'upper right')
plt.show()

In [None]:
from scipy.stats import pearsonr

print('TRAIN SET');
XX = []
YY = []
for xx, yy in train_dataset:
    XX.append(xx)
    YY.append(yy)
XX = np.array(XX)
YY = np.array(YY)


pred_val = model3.predict(XX)

s1 = 128
s2 = 195
YY1 = np.reshape(YY, (s1, s2, 1), order='F')
YY2 = np.mean(YY1, axis=0)

pred_val1 = np.reshape(pred_val, (s1, s2), order='F')
pred_val2 = np.mean(pred_val1, axis=0)

print('RMSEC:', np.sqrt(mean_squared_error(YY2[:, 0], pred_val2)))

corr,_ = pearsonr(YY2[:,0],pred_val2)
print('R^2 test:', corr**2)
    
fig, ax = plt.subplots()
ax.scatter(YY2[:, 0], pred_val2[:])
ax.plot([YY2.min(), YY2.max()], [YY2.min(), YY2.max()], 'k--', lw=1)
ax.set_xlabel('Measured')
ax.set_ylabel('Predicted')
plt.show()

In [None]:
print('VALIDATION SET');
XX = []
YY = []
for xx, yy in val_dataset:
    XX.append(xx)
    YY.append(yy)
XX = np.array(XX)
YY = np.array(YY)

pred_val = model3.predict(XX)

s1 = 128
s2 = 30
YY1 = np.reshape(YY, (s1, s2, 1), order='F')
YY2 = np.mean(YY1, axis=0)

pred_val1 = np.reshape(pred_val, (s1, s2), order='F')
pred_val2 = np.mean(pred_val1, axis=0)

print('RMSECV:', np.sqrt(mean_squared_error(YY2[:, 0], pred_val2)))
corr,_ = pearsonr(YY2[:,0],pred_val2)
print('R^2 test:', corr**2)

fig, ax = plt.subplots()
ax.scatter(YY2[:, 0], pred_val2[:])
ax.plot([YY2.min(), YY2.max()], [YY2.min(), YY2.max()], 'k--', lw=1)
ax.set_xlabel('Measured')
ax.set_ylabel('Predicted')
plt.show()


In [None]:
print('TEST SET');
XXt = []
YYt = []
for xx, yy in test_dataset:
    XXt.append(xx)
    YYt.append(yy)
XXt = np.array(XXt)
YYt = np.array(YYt)

pred_test= model3.predict(XXt)

s1 = 128
s3 = 50
YYt1 = np.reshape(YYt, (s1, s3, 1), order='F')
YYt2 = np.mean(YYt1, axis=0)

pred_test1 = np.reshape(pred_test, (s1, s3), order='F')
pred_test2 = np.mean(pred_test1, axis=0)

print('RMSEP:', np.sqrt(mean_squared_error(YYt2[:, 0], pred_test2)))
corr,_ = pearsonr(YYt2[:,0],pred_test2)
print('R^2 test:', corr**2)

fig, ax = plt.subplots()
ax.scatter(YYt2[:, 0], pred_test2)
ax.plot([YYt2.min(), YYt2.max()], [YYt2.min(), YYt2.max()], 'k--', lw=1)
ax.set_xlabel('Measured')
ax.set_ylabel('Predicted')
plt.show()
