In [1]:
# /**
#  * @file train.ipynb
#  * @author Samay Pashine (samay@iiti.ac.in)
#  * @modified Samay Pashine (samay@iiti.ac.in)
#  * @brief Train the neural network model to predict yield on crop outputs, soil and climate basis.
#  * @version 2.0
#  * @date 2021-11-12
#  * @copyright Copyright (c) 2021
#  */

# Importing necessary libraries.
import os
import gc
from tqdm import tqdm
import pandas as pd
import numpy as np
import pyarrow.ipc as ipc
from datetime import datetime
import matplotlib.pyplot as plt
from tensorflow import config
from tensorflow import keras
from tensorflow.keras import Model, optimizers
from tensorflow.keras.layers import Dense, Concatenate
from tensorflow.keras.layers import LSTM
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.preprocessing import MinMaxScaler
from constants import *

In [2]:
def memory_growth_GPU():
    """ Enabling memory growth in GPU (if present) for training the model. """
    try:
        physicalDevices = config.experimental.list_physical_devices('GPU')
        config.experimental.set_memory_growth(physicalDevices[0], True)
    except:
        print("[ERR]. Could not enable the memory growth in GPU. Switching to CPU for training.")

In [3]:
def read_feather_in_chunks(filepath):
    """Function to read feather file in chunks instead of all at once.

    Args:
        filepath (str): Path of final_input feather file.

    Yields:
        data_df [pandas.DataFrame]: return pandas Dataframe from the feather file.
    """
    with ipc.RecordBatchFileReader(filepath) as reader:
        for batch_index in range(reader.num_record_batches):
            if batch_index == 0:
                batch = reader.get_batch(batch_index).to_pandas(use_threads=True, timestamp_as_object=True, )
            else:
                new_batch = reader.get_batch(batch_index).to_pandas(use_threads=True, timestamp_as_object=True, )
                data_df = pd.concat([batch, new_batch], ignore_index=True)
                batch = data_df
            
            # Instead of taking just one batch with 65,000 rows (approx.), 
            # we let the loop iterate over batches until it triggers the condition below.
            if (batch_index + 1) % 12 == 0:
                batch = pd.DataFrame()
                yield data_df

In [4]:
if __name__ == "__main__":
    """  This is the driver code which initializes all the variable, trains the model and save the outputs. """
    
    # Calling the function to switch processing to GPU (if present).
    memory_growth_GPU()
    
    # Initializing variables.
    EPOCHS = 100
    LEARNING_RATE = 1e-9
    BATCH_SIZE = 128
    ES_PATIENCE = 3
    VAL_SPLIT = 0.2
    SEQUENCE = 1
    flag = 1
    
    for input_file in os.listdir(os.path.join(input_dir, final_inputs_dir)):
        """ Loop to iterate through all the input files in the directory for training.
        """
        batch_num = 1
        
        # Condition to check if the graph directory for the input_file exists. If not, then create one.
        if not os.path.isdir(os.path.join(output_dir, graphs_dir, input_file[:-8] + '_S-' + str(SEQUENCE))):
            try:
                print("[INFO]. Graph directory for the input file \'{}\' does not exists. Creating the directory.".format(input_file))
                os.makedirs(os.path.join(output_dir, graphs_dir, input_file[:-8] + '_S-' + str(SEQUENCE)))
                print("[INFO]. Directory created successfully.")
            except:
                print("[WARNING]. Directory for the input file \'{}\' already exists.".format(input_file))
        
        for batch in read_feather_in_chunks(os.path.join(input_dir, final_inputs_dir, input_file)):
            """ Loop to iterate through batches in the input feather files. """
            
            # Condition to check if the 'saving model' directory for the input_file exists. If not, then create one.
            if not os.path.isdir(os.path.join(output_dir, saved_models_dir, input_file[:-8] + '_S-' + str(SEQUENCE))):
                try:
                    print("[INFO]. Saving model directory for the input file \'{}\' does not exists. Creating the directory.".format(input_file))
                    os.makedirs(os.path.join(output_dir, saved_models_dir, input_file[:-8] + '_S-' + str(SEQUENCE), str(batch_num) + '_batch'))
                    print("[INFO]. Directory created successfully.")
                except:
                    print("[WARNING]. Directory for the input file \'{}\' already exists.".format(input_file))

            # Loop to calculate the tasmax, tasmin and precipitation_flux in the batch.
            print("[INFO]. Pre-Processing Batch-{} Inputs.".format(batch_num))
            for i in tqdm(range(len(batch))):
                batch.iloc[i, 25] += batch.iloc[i, 11]
                batch.iloc[i, 26] += batch.iloc[i, 11]    
                if batch.iloc[i, 10] != np.inf:
                    batch.iloc[i, 27] = (1 + batch.iloc[i, 10] / 100) * batch.iloc[i, 27]
            
            # Final formatting of the dataframe before traning.
            batch = batch.drop(columns=['index', 'time', 'lat', 'lon', 'index_x', 'index_y', 'spatial_ref', 'W', 'T'])
            batch.gravel = batch.gravel.astype(int)
            batch.clay = batch.clay.astype(int)
            batch.silt = batch.silt.astype(int)
            batch.sand = batch.sand.astype(int)
            batch.awc = batch.awc.astype(int)
            batch.cec_soil = batch.cec_soil.astype(int)
            batch.texture_class = batch.texture_class.astype(int)
            batch.CO2 = batch.CO2.astype(int)
            batch['plant-day'] = batch['plant-day'].astype(int)
            batch['maturity-day'] = batch['maturity-day'].astype(int)

            # Dividing the dataframe in static and dynamic dataframe on the basis of features.
            static_data_input = batch[['plant-day', 'maturity-day', 'CO2', 'N', 'A', 'texture_class', 'soil_ph',
                                        'soil_caco3', 'cec_soil', 'oc', 'awc', 'sand', 'silt', 'clay', 'gravel']]
            static_data_label = batch[['yield_mai']]
            weather_array_1 = batch[['tasmax', 'tasmin', 'pr', 'gdd']]
            
            # Scaling static and dynamic data to assist in the training.
            scaler = MinMaxScaler(feature_range=(0.01, 1))
            scaled_static_data = scaler.fit_transform(static_data_input)
            scaled_static_label = scaler.fit_transform(static_data_label)
            scaled_dynamic_data = scaler.fit_transform(weather_array_1)
            
            # Clear the memory buffer and deleting un-necessary variables.
            gc.collect()
            del batch, static_data_input, static_data_label, weather_array_1
            
            # Splitting the static and dynamic dataframe in training and testing set.
            test_size = 0.2
            fract = 1 - test_size

            static_X_train = scaled_static_data[:int(len(scaled_static_data) * fract)]
            static_X_test = scaled_static_data[int(len(scaled_static_data) * fract):]

            static_Y_train = scaled_static_label[:int(len(scaled_static_label) * fract)]
            static_Y_test = scaled_static_label[int(len(scaled_static_label) * fract):]

            dynamic_X_train = scaled_dynamic_data[:int(len(scaled_dynamic_data) * fract)]
            dynamic_X_test = scaled_dynamic_data[int(len(scaled_dynamic_data) * fract):]

            # Clear the memory buffer and deleting un-necessary variables.
            gc.collect()
            del scaled_static_data, scaled_static_label, scaled_dynamic_data

            # Defining the neural network for training the model.
            if flag == 1:
                dynamic_input = keras.Input(shape = (dynamic_X_train.shape[1], 1), dtype='float32')
                inner_lstm1 = LSTM(200, return_sequences=True)(dynamic_input)
                inner_lstm2 = LSTM(200, return_sequences=True)(inner_lstm1)
                lstm_out = LSTM(200, return_sequences=False)(inner_lstm2)

                static_input = keras.Input(shape = (static_X_train.shape[1]))
                inner_stat1 = Dense(200, activation='selu')(static_input)
                inner_stat1 = Dense(200, activation='selu')(inner_stat1)
                inner_stat2 = Dense(200, activation='selu')(inner_stat1)     

                x = Concatenate()([lstm_out, inner_stat2])

                x = Dense(200, activation='selu')(x)
                x = Dense(200, activation='selu')(x)
                x = Dense(200, activation='selu')(x)

                dynamic_output = Dense(1, activation = 'selu')(x)

                model = Model(inputs = [dynamic_input, static_input], outputs = [dynamic_output])

                model.compile(loss = keras.metrics.mean_squared_error,
                            optimizer = optimizers.Adam(learning_rate = LEARNING_RATE),
                            metrics = [keras.metrics.RootMeanSquaredError(name = 'rmse'), 'mae'])

                logs = "./logs/" + datetime.now().strftime("%Y%m%d-%H%M%S")
                es = EarlyStopping(monitor = 'val_loss', mode = 'min', verbose = 1, patience = 3)
                tboard_callback = keras.callbacks.TensorBoard(log_dir = logs, histogram_freq = 1, profile_batch = '500,520')

            else:
                try:
                    model = keras.models.load_model(os.path.join(output_dir, saved_models_dir, input_file[:-8] + '_S-' + str(SEQUENCE), str(batch_num-1)+'_batch'))
                except:
                    print("[INFO]. Input File has been completed. Moving onto the new input file.")
                    model = keras.models.load_model(prev_model)

                es = EarlyStopping(monitor = 'val_loss', mode = 'min', verbose = 1, patience = ES_PATIENCE)
                tboard_callback = keras.callbacks.TensorBoard(log_dir = logs, histogram_freq = 1, profile_batch = '500,520')

            # Training the mode on the dataset.
            history = model.fit(x = [dynamic_X_train, static_X_train], y = static_Y_train, validation_split = VAL_SPLIT, epochs = EPOCHS, callbacks = [tboard_callback, es], batch_size = BATCH_SIZE)

            # Plottting the loss graph and saving it in graph directory.
            plt.plot(history.history['loss'])
            plt.plot(history.history['val_loss'])
            plt.title('model loss')
            plt.ylabel('loss')
            plt.xlabel('epoch')
            plt.legend(['train', 'test'], loc='upper left')
            plt.savefig(os.path.join(output_dir, graphs_dir, input_file[:-8] + '_S-' + str(SEQUENCE), "Batch-{}_loss_stats.jpg".format(batch_num)))
            plt.clf()

            # Saving the model after each epoch in corresponding directory..
            model.save(os.path.join(output_dir, saved_models_dir, input_file[:-8] + '_S-' + str(SEQUENCE), str(batch_num)+'_batch'))
            
            # Condition to handle when one input_file is completed and about to switch to another one.
            if batch_num == 12:
                prev_model = os.path.join(output_dir, saved_models_dir, input_file[:-8] + '_S-' + str(SEQUENCE), str(batch_num) + '_batch')
            
            # Clearing the memory buffer and incrementing the variables.
            gc.collect()
            flag += 1
            batch_num += 1
    
        SEQUENCE += 1

2021-11-13 00:37:27.271300: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-11-13 00:37:27.281331: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-11-13 00:37:27.281835: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero


[INFO]. Pre-Processing Batch-1 Inputs.


100%|██████████| 786432/786432 [1:13:28<00:00, 178.39it/s]
2021-11-13 01:51:02.234080: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-11-13 01:51:02.240578: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-11-13 01:51:02.241352: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-11-13 01:51:02.241976: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there

Epoch 1/100


2021-11-13 01:51:19.939920: I tensorflow/stream_executor/cuda/cuda_dnn.cc:366] Loaded cuDNN version 8300
Could not load symbol cublasGetSmCountTarget from libcublas.so.11. Error: /usr/local/cuda-11.2/lib64/libcublas.so.11: undefined symbol: cublasGetSmCountTarget


 505/3933 [==>...........................] - ETA: 1:39 - loss: 0.4200 - rmse: 0.6480 - mae: 0.5486

2021-11-13 01:51:36.422577: I tensorflow/core/profiler/lib/profiler_session.cc:110] Profiler session initializing.
2021-11-13 01:51:36.422625: I tensorflow/core/profiler/lib/profiler_session.cc:125] Profiler session started.
2021-11-13 01:51:36.422710: E tensorflow/core/profiler/internal/gpu/cupti_error_manager.cc:133] cuptiGetTimestamp: ignored due to a previous error.
2021-11-13 01:51:36.422728: E tensorflow/core/profiler/internal/gpu/cupti_error_manager.cc:184] cuptiSubscribe: ignored due to a previous error.
2021-11-13 01:51:36.422738: E tensorflow/core/profiler/internal/gpu/cupti_error_manager.cc:457] cuptiGetResultString: ignored due to a previous error.
2021-11-13 01:51:36.422747: E tensorflow/core/profiler/internal/gpu/cupti_tracer.cc:1682] function cupti_interface_->Subscribe( &subscriber_, (CUpti_CallbackFunc)ApiCallback, this)failed with error 


 519/3933 [==>...........................] - ETA: 1:39 - loss: 0.4201 - rmse: 0.6482 - mae: 0.5488

2021-11-13 01:51:37.067614: I tensorflow/core/profiler/lib/profiler_session.cc:67] Profiler session collecting data.
2021-11-13 01:51:37.067900: E tensorflow/core/profiler/internal/gpu/cupti_error_manager.cc:140] cuptiFinalize: ignored due to a previous error.
2021-11-13 01:51:37.067929: E tensorflow/core/profiler/internal/gpu/cupti_error_manager.cc:457] cuptiGetResultString: ignored due to a previous error.
2021-11-13 01:51:37.067939: E tensorflow/core/profiler/internal/gpu/cupti_tracer.cc:1773] function cupti_interface_->Finalize()failed with error 
2021-11-13 01:51:37.195767: E tensorflow/core/profiler/internal/gpu/cupti_error_manager.cc:133] cuptiGetTimestamp: ignored due to a previous error.
2021-11-13 01:51:37.195806: E tensorflow/core/profiler/internal/gpu/cupti_error_manager.cc:133] cuptiGetTimestamp: ignored due to a previous error.
2021-11-13 01:51:37.195814: I tensorflow/core/profiler/internal/gpu/cupti_collector.cc:526]  GpuTracer has collected 0 callback api events and 0 a

 520/3933 [==>...........................] - ETA: 1:41 - loss: 0.4201 - rmse: 0.6481 - mae: 0.5487

2021-11-13 01:51:37.270257: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: ./logs/20211113-015107/plugins/profile/2021_11_13_01_51_37

2021-11-13 01:51:37.280271: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for trace.json.gz to ./logs/20211113-015107/plugins/profile/2021_11_13_01_51_37/samay.trace.json.gz
2021-11-13 01:51:37.393705: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: ./logs/20211113-015107/plugins/profile/2021_11_13_01_51_37

2021-11-13 01:51:37.403326: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for memory_profile.json.gz to ./logs/20211113-015107/plugins/profile/2021_11_13_01_51_37/samay.memory_profile.json.gz
2021-11-13 01:51:37.404955: I tensorflow/core/profiler/rpc/client/capture_profile.cc:251] Creating directory: ./logs/20211113-015107/plugins/profile/2021_11_13_01_51_37
Dumped tool data for xplane.pb to ./logs/20211113-01510

Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100