Ref:
1. An introduction to ConvLSTM
    https://medium.com/neuronio/an-introduction-to-convlstm-55c9025563a7
2. How to work with Time Distributed data in a neural network
    https://medium.com/smileinnovation/how-to-work-with-time-distributed-data-in-a-neural-network-b8b39aa4ce00
3. A Visual Guide to Recurrent Layers in Keras
    https://amitness.com/2020/04/recurrent-layers-keras/
4. coursera-functional api
    https://www.coursera.org/lecture/customising-models-tensorflow2/multiple-inputs-and-outputs-XVZYB
5. keras - functional api
    https://www.tensorflow.org/guide/keras/functional
6. Advanced Keras — Constructing Complex Custom Losses and Metrics
    https://towardsdatascience.com/advanced-keras-constructing-complex-custom-losses-and-metrics-c07ca130a618
7. tensorflow: save and load model
    https://www.tensorflow.org/tutorials/keras/save_and_load
8. tensorflow: training and evaluation
    https://www.tensorflow.org/guide/keras/train_and_evaluate
9. Write custom callback
    https://www.tensorflow.org/guide/keras/custom_callback/

In [1]:
from Model import TimeCNN, ConvLSTM, factor, input_window, predict_window
import cv2 
import numpy as np
import pandas as pd
import re
import os
import glob
import time
import datetime
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import math, square
from tensorflow.keras import Input, models, optimizers, Model, metrics
from tensorflow.keras.backend import function
from tensorflow.keras.utils import plot_model
from tensorflow.keras.layers import Concatenate, Multiply
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard, Callback
#from tensorflow.keras.layers import Conv2D, ConvLSTM2D, BatchNormalization, MaxPooling3D, TimeDistributed, Flatten, Dense, Concatenate, Multiply, Add 
#from tensorflow import concat, split, math, square, constant
#from tensorflow.keras.backend import squeeze, reshape

#force channels-first ordering
from keras import backend
backend.set_image_data_format('channels_first')

Using TensorFlow backend.


## Define size of input ports and output ports 

In [2]:
# 3 days for 1 days prediction

# data size params
file_len = len(glob.glob("./FeatMap/AM/*.npy"))
timesteps = 3 # use 3 hr to predict 1 future hour (3 for 1)
future = 1 #*24
stride = 1
batch_size = None

input_ports = {'atm':(2, 200, 155), # channels: 'Pressure-Corrected AirMass', 'Cosine Incidence Angle' 
               'rain':(2, 200, 155), # channels: 'RH', 'Precp'
               'wind':(3, 200, 155), # channels: 'WS', 'WD_cos', 'WD_sin'
               'cloud':(1, 200, 155) # channels: 'StaImg'
               #'air':(200, 155, 2) # channels: air pollution
              }

target = {'etr':(1, 200, 155), 
          'hour':(1, 200, 155)
         } # elements: 'ETR from SOLPOS', '1h-unit matrix'

## Load input ports and output ports

In [3]:
etr = np.load("./etr_2017.npy")
sample_mask = (np.sum(np.sum(np.sum(etr, axis=-1), -1), -1) != 0.0)

In [4]:
atm = np.load("./atm_2017.npy")[sample_mask]
rain = np.load("./rain_2017.npy")[sample_mask]
wind = np.load("./wind_2017.npy")[sample_mask]
cloud = np.load("./cloud_2017.npy")[sample_mask]
irr = np.load("./irr_2017.npy")[sample_mask]
hour = np.load("./hour_2017.npy")[sample_mask]
shine = np.load("./shine_2017.npy")[sample_mask]
etr = etr[sample_mask]

In [5]:
assert atm.shape[0] == rain.shape[0]
assert wind.shape[0] == rain.shape[0]
assert cloud.shape[0] == rain.shape[0]
assert irr.shape[0] == rain.shape[0]
assert hour.shape[0] == rain.shape[0]
assert shine.shape[0] == rain.shape[0]
assert etr.shape[0] == rain.shape[0]

In [6]:
data_len = atm.shape[0]
# array indexing (index as array)
ind = np.random.permutation(data_len)
num_val_samples = int(data_len*0.1)

In [7]:
# training dataset
## (inputs, targets, sample_weights)
# You can either pass a flat (1D) Numpy array with the same length as the input samples 
## (1:1 mapping between weights and samples), 
### or in the case of temporal data, you can pass a 2D array with shape 
### (samples, sequence_length), to apply a different weight to every timestep of every sample.
train_dataset = tf.data.Dataset.from_tensor_slices(
    (
        {"atm": atm[ind][:-num_val_samples], 
         "rain": rain[ind][:-num_val_samples], 
         "wind": wind[ind][:-num_val_samples], 
         "cloud": cloud[ind][:-num_val_samples], 
         "hour_space": hour[ind][:-num_val_samples], 
         "etr_space": etr[ind][:-num_val_samples]
        },
        {"hour_ground_pred": shine[ind][:-num_val_samples], 
         "irr_ground_pred": irr[ind][:-num_val_samples]
        },
    )
)

In [8]:
# val dataset
val_dataset = tf.data.Dataset.from_tensor_slices(
    (
        {"atm": atm[ind][-num_val_samples:], 
         "rain": rain[ind][-num_val_samples:], 
         "wind": wind[ind][-num_val_samples:], 
         "cloud": cloud[ind][-num_val_samples:], 
         "hour_space": hour[ind][-num_val_samples:], 
         "etr_space": etr[ind][-num_val_samples:]
        },
        {"hour_ground_pred": shine[ind][-num_val_samples:], 
         "irr_ground_pred": irr[ind][-num_val_samples:]
        },
    )
)

## Build and compile model

In [9]:
def get_compiled_model(batch_size=None, device=None):

    # build model
    # create input ports 
    space_inputs = [Input(name=f'{key}_space', shape=value, batch_size=batch_size) for key,value in target.items()]
    port_inputs = [Input(name=f'{key}', shape=(timesteps,) + value, batch_size=batch_size) for key,value in input_ports.items()]

    # TimeDistributed CNN layers
    port_FeatMaps = [TimeCNN(i) for i in port_inputs]
    #print(inputs)
    #print(port_FeatMaps)

    # concat layer to stack four feature ports 
    port_concat = Concatenate(axis=2, name='port_concat')(port_FeatMaps) # concat axis: channel

    # ConvLSTM layers
    coef_FeatMap = ConvLSTM(inputs=port_concat)

    # two branches for two output ports
    pred_Etr, pred_Hour = [factor(inputs=coef_FeatMap, raw=space_inputs[i], port_len=len(input_ports)) for i in range(len(target))]

    # unit conversion from ETR[W/m2] to Irr[MJ/m2]
    ## Irr = ETR*(60*60*Hour)*10^-6
    pred_time = math.scalar_mul(3600/1000000, pred_Hour, name='unit_conversion')
    pred_Irr = Multiply(name='irr_ground_pred')([pred_Etr, pred_time])

    # connect functional api
    model = Model(inputs=[port_inputs, space_inputs], outputs=[pred_Irr, pred_Hour])

    #model.summary()
    plot_model(model, "Model.png", show_shapes=True)


    # compile model
    model.compile(
        optimizer=optimizers.RMSprop(1e-3),
        loss={
            "irr_ground_pred": MapLoss(loss_size=int(batch_size/device)),
            "hour_ground_pred": MapLoss(loss_size=int(batch_size/device))
        },
        loss_weights={
            "irr_ground_pred": 1.0,
            "hour_ground_pred": 0.5 # since the hour ground truth accuracy is not high enough
        },
        metrics={
            "irr_ground_pred": [
                metrics.MeanAbsoluteError()
                #metrics.MeanAbsolutePercentageError()
            ],
            "hour_ground_pred": [metrics.MeanAbsoluteError()]
        },
    )
    
    return model

## Custom loss func

In [10]:
def MapLoss(loss_size):
    bool_mask = np.load('./TruthMap/BoolMask.npy') 
    bool_mask = np.repeat(np.expand_dims(np.expand_dims(bool_mask, 0), 0), loss_size, 0)
    #print(bool_mask.shape)
    
    def Loss(y_true, y_pred):
    # computes the mean squared error between the real data and the prediction    
        return math.reduce_mean(square(y_true[bool_mask] - y_pred[bool_mask]))    
    
    return Loss

## Distributed training

In [11]:
batch_size = 8 # 24 hr prediction as a batch

# Create a MirroredStrategy.
strategy = tf.distribute.MirroredStrategy()
device = strategy.num_replicas_in_sync

# Open a strategy scope.
with strategy.scope():
    # Everything that creates variables should be under the strategy scope.
    # In general this is only model construction & `compile()`.
    model = get_compiled_model(batch_size=batch_size, device=device)

INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Redu

In [13]:
epoch = 20

# Prepare a directory to store all the checkpoints.
now = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d-%H:%M:%S')
checkpoint_dir = f"./Model_mcp/{now}"
if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)
    
mcp = ModelCheckpoint(filepath=checkpoint_dir + "/{epoch:02d}_val_loss={val_loss:.4f}.h5", 
                      #save_freq = 30,
                      save_best_only=True,
                      save_weights_only=True, # for model.load_weights
                      monitor='val_loss',
                      mode='auto',
                      verbose=0
                     )

# tensorboard
viz = TensorBoard(
    log_dir="./Tensorboard",
    histogram_freq=1,  # How often to log histogram visualizations
    embeddings_freq=1,  # How often to log embedding visualizations
    update_freq="epoch",
)


model.fit(
    train_dataset.shuffle(
        buffer_size=(data_len-int(data_len*0.1)), 
        reshuffle_each_iteration=True
    ).batch(batch_size),
    epochs=epoch,
    validation_data=val_dataset.shuffle(
        buffer_size=int(data_len*0.1), 
        reshuffle_each_iteration=True
    ).batch(batch_size),
    #x={"atm": atm, "rain": rain, "wind": wind, "cloud": cloud, "hour_space": hour, "etr_space": etr},
    #y={"hour_ground_pred": shine, "irr_ground_pred":irr},
    #validation_data=(atm, rain, wind, cloud, hour, etr, shine, irr),
    #validation_batch_size,
    #sample_weight=[sample_weight,sample_weight],
    #batch_size=batch_size,
    #steps_per_epoch=sample_size/batch_size,
    #validation_split=0.1,
    #shuffle=True, # shuffle every epoch
    callbacks=[mcp,
               #viz
              ],
    verbose=1
)

Epoch 1/20
INFO:tensorflow:batch_all_reduce: 48 all-reduces with algorithm = nccl, num_packs = 1, agg_small_grads_max_bytes = 0 and agg_small_grads_max_group = 10
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20


Epoch 20/20


<tensorflow.python.keras.callbacks.History at 0x7fbe2d33a940>