In [1]:
import numpy as np
import pandas as pd
from pandas import DataFrame, Series, Timestamp
import tensorflow as tf
import tensorflow.compat.v1 as tf1
from tensorflow import Graph
from tensorflow.compat.v1 import Session, ConfigProto
from tensorflow.python.eager.context import PhysicalDevice
from typing import Dict, List, Union, Generator, Tuple, TypeVar, Type
import os
from numpy import load, ndarray
import time
from tensorflow.python.keras.engine.functional import Functional
import json

In [2]:
_T = TypeVar("_T")

In [3]:
import sys
sys.path.insert(0, '..')

In [4]:
from data_formatters.base import GenericDataFormatter, InputTypes, DataTypes
from data_formatters.sorgenia_wind import SorgeniaFormatter

In [5]:
from expt_settings.configs import ExperimentConfig
from libs.hyperparam_opt import HyperparamOptManager
from libs.tft_model import TemporalFusionTransformer
import libs.utils as utils

In [6]:
def format_outputs(prediction: Union[List, ndarray]) -> DataFrame:
    """Returns formatted dataframes for prediction."""

    flat_prediction = pd.DataFrame(
        prediction[:, :, 0],
        columns=[
            't+{}'.format(i)
            for i in range(self.time_steps - self.num_encoder_steps)
        ])
    cols = list(flat_prediction.columns)
    flat_prediction['forecast_time'] = time[:, self.num_encoder_steps - 1, 0]
    flat_prediction['identifier'] = identifier[:, 0, 0]

    # Arrange in order
    return flat_prediction[['forecast_time', 'identifier'] + cols]

In [7]:
quantiles = [0.1, 0.5, 0.9]

In [9]:
class MyPredictor(object):
    def __init__(self, formatter: SorgeniaFormatter, params: Dict, config: ExperimentConfig):
        self.formatter = formatter
        self.params = params
        self.config = config
        self.columns = ['plant_name_up', 'time', 'kwh', 'dew_point_2m_C', 'temperature_2m_C', 'msl_pressure_hPa',
                         'sfc_pressure_hPa', 'precipitation_1h_mm', 'wind_speed_mean_10m_1h_ms', 'wind_speed_mean_100m_1h_ms',
                         'wind_dir_mean_100m_1h_d', 'wind_dir_mean_10m_1h_d', 'wind_gusts_10m_1h_ms','wind_gusts_100m_1h_ms',
                         'wind_gusts_100m_ms', 'wind_gusts_10m_ms', 'Day sin', 'Day cos', 'Year sin','Year cos','t','days_from_start',
                         'id', 'hour', 'day', 'day_of_week', 'month', 'categorical_id', 'hours_from_start', 'categorical_day_of_week',
                         'categorical_hour']
        print('Done.')
        
#     def predict(self, instances: DataFrame, **kwargs) -> List[Tuple]:
#         t0: float = time.perf_counter()
#         if tf.test.gpu_device_name(): 
#             print('Default GPU Device:{}'.format(tf.test.gpu_device_name()))
#         else:
#             print("Please install GPU version of TF")
#         gpu: List[PhysicalDevice] = tf.config.experimental.list_physical_devices('GPU')
#         tf.config.experimental.set_memory_growth(gpu[0], True)
#         tf_config: ConfigProto = utils.get_default_tensorflow_config(tf_device="gpu", gpu_id=0)
#         tf1.reset_default_graph()
#         with tf.Graph().as_default(), tf1.Session(config=tf_config) as sess:
#             tf1.keras.backend.set_session(sess)
#             self.model = TemporalFusionTransformer(self.params)
#             self.params.pop('exp_name', None)
#             self.params.pop('data_folder', None)
#             self.model.load(self.config.model_folder, use_keras_loadings=False)
#             output_map: Dict = self.model.predict(instances, return_targets=False)
#             # Extract predictions for each quantile into different entries
#             preds: DataFrame = self.formatter.format_predictions(output_map.get("p50"))
#         # convert output to a list if that's required by GCP
#         t1: float = time.perf_counter()
#         print("Time elapsed ", t1-t0)
        
#         return [tuple(x) for x in preds.to_numpy()]

#     def predict(self, instances: ndarray, **kwargs) -> List[Tuple]:
#         # transform instances into DataFrame to accomodate tft predict method
#         sample_df = pd.DataFrame.from_records(instances, columns=self.columns)
#         t0: float = time.perf_counter()
#         if tf.test.gpu_device_name(): 
#             print('Default GPU Device:{}'.format(tf.test.gpu_device_name()))
#         else:
#             print("Please install GPU version of TF")
#         gpu: List[PhysicalDevice] = tf.config.experimental.list_physical_devices('GPU')
#         tf.config.experimental.set_memory_growth(gpu[0], True)
#         tf_config: ConfigProto = utils.get_default_tensorflow_config(tf_device="gpu", gpu_id=0)
#         tf1.reset_default_graph()
#         with tf.Graph().as_default(), tf1.Session(config=tf_config) as sess:
#             tf1.keras.backend.set_session(sess)
#             self.model = TemporalFusionTransformer(self.params)
#             self.params.pop('exp_name', None)
#             self.params.pop('data_folder', None)
#             self.model.load(self.config.model_folder, use_keras_loadings=False)
#             output_map: Dict = self.model.predict(sample_df, return_targets=False)
#             # Extract predictions for each quantile into different entries
#             preds: DataFrame = self.formatter.format_predictions(output_map.get("p50"))
#         # convert output to a list if that's required by GCP
#         t1: float = time.perf_counter()
#         print("Time elapsed ", t1-t0)
        
#         return [tuple(x) for x in preds.to_numpy()]
    
    def predict(self, instances: List, **kwargs) -> List[Tuple]:
        # transform instances into DataFrame to accomodate tft predict method
        sample_df = pd.DataFrame.from_records(instances, columns=self.columns)
        t0: float = time.perf_counter()
        if tf.test.gpu_device_name(): 
            print('Default GPU Device:{}'.format(tf.test.gpu_device_name()))
            gpu: List[PhysicalDevice] = tf.config.experimental.list_physical_devices('GPU')
            tf.config.experimental.set_memory_growth(gpu[0], True)
            tf_config: ConfigProto = utils.get_default_tensorflow_config(tf_device="gpu", gpu_id=0)
        else:
            print("Please install GPU version of TF")
            tf_config: ConfigProto = utils.get_default_tensorflow_config(tf_device="cpu")
#         gpu: List[PhysicalDevice] = tf.config.experimental.list_physical_devices('GPU')
#         tf.config.experimental.set_memory_growth(gpu[0], True)
#         tf_config: ConfigProto = utils.get_default_tensorflow_config(tf_device="gpu", gpu_id=0)
        tf1.reset_default_graph()
        with tf.Graph().as_default(), tf1.Session(config=tf_config) as sess:
            tf1.keras.backend.set_session(sess)
            self.model = TemporalFusionTransformer(self.params)
            self.params.pop('exp_name', None)
            self.params.pop('data_folder', None)
            self.model.load(self.config.model_folder, use_keras_loadings=True)
            output_map: Dict = self.model.predict(sample_df, return_targets=False)
            # Extract predictions for each quantile into different entries
            preds: DataFrame = self.formatter.format_predictions(output_map.get("p50"))
        # convert output to a list if that's required by GCP
        t1: float = time.perf_counter()
        print("Time elapsed ", t1-t0)
        
        return [tuple(x) for x in preds.to_numpy()]

    
        
        
    @classmethod
    def from_path(cls: Type[_T], model_dir: str) -> _T:
        """
         :params : folder with model checkpoints and params
        """
        config = ExperimentConfig('sorgenia_wind', model_dir)
        formatter = config.make_data_formatter()
        print("Formatter data folder is ", formatter.data_folder)
        # Sets up default params
        fixed_params: Dict = formatter.get_experiment_params()
        params: Dict = formatter.get_default_model_params()
        params["model_folder"]: str = config.model_folder
        model_folder = config.model_folder
        # Sets up hyperparam manager
        print("*** Loading hyperparm manager ***")
        opt_manager = HyperparamOptManager({k: [params[k]] for k in params},
                                           fixed_params, model_folder)
        params: Dict = opt_manager.get_next_parameters()
        params['exp_name'] = 'sorgenia_wind'
        params['data_folder'] = config.data_csv_path
        print("Data path is ", params['data_folder'])
        # load scalers
        formatter.load_scalers()
        
        return cls(formatter, params, config)

In [10]:
predictor = MyPredictor.from_path(r'C:\Users\Lorenzo\PycharmProjects\TFT\outputs\saved_models\sorgenia_wind\fixed')

Formatter data folder is  C:\Users\Lorenzo\PycharmProjects\TFT\outputs\saved_models\sorgenia_wind\fixed
*** Loading hyperparm manager ***
Data path is  C:\Users\Lorenzo\PycharmProjects\TFT\outputs\saved_models\sorgenia_wind\fixed\data\sorgenia_wind\data\sorgenia_final.csv
Done.


In [11]:
# load inference sample
sample: DataFrame = pd.read_csv(r'C:\Users\Lorenzo\PycharmProjects\TFT\outputs\data\sorgenia_wind\data\sorgenia_wind\data\sorgenia_wind_inference_sample.csv')
sample.head()

Unnamed: 0,plant_name_up,time,kwh,dew_point_2m_C,temperature_2m_C,msl_pressure_hPa,sfc_pressure_hPa,precipitation_1h_mm,wind_speed_mean_10m_1h_ms,wind_speed_mean_100m_1h_ms,...,days_from_start,id,hour,day,day_of_week,month,categorical_id,hours_from_start,categorical_day_of_week,categorical_hour
0,UP_MPNTLCDMRN_1,2020-08-06 23:00:00,-0.529162,0.866993,0.239003,-0.048342,-1.274784,-0.191301,-0.733072,-0.050166,...,1314,UP_MPNTLCDMRN_1,1.661345,6,0.000959,8,0,2.726227,3,23
1,UP_MPNTLCDMRN_1,2020-08-07 00:00:00,-0.576865,0.866993,0.197319,-0.101425,-1.304034,-0.191301,-0.733072,-0.12781,...,1314,UP_MPNTLCDMRN_1,-1.661381,7,0.500952,8,0,2.726368,4,0
2,UP_MPNTLCDMRN_1,2020-08-07 01:00:00,-0.642457,0.904884,0.127844,-0.119119,-1.304034,-0.191301,-0.792745,-0.321922,...,1314,UP_MPNTLCDMRN_1,-1.516914,7,0.500952,8,0,2.72651,4,1
3,UP_MPNTLCDMRN_1,2020-08-07 02:00:00,-0.666309,0.866993,0.072265,-0.136814,-1.313784,-0.191301,-0.792745,-0.399567,...,1314,UP_MPNTLCDMRN_1,-1.372448,7,0.500952,8,0,2.726651,4,2
4,UP_MPNTLCDMRN_1,2020-08-07 03:00:00,-0.691651,0.848048,0.044475,-0.136814,-1.323534,-0.191301,-0.792745,-0.360744,...,1314,UP_MPNTLCDMRN_1,-1.227982,7,0.500952,8,0,2.726793,4,3


In [12]:
# sample_array = sample.to_numpy()
columns: List = list(sample.columns)

In [13]:
# sample_array

In [14]:
sample_list: List = sample.values.tolist()

In [15]:
# x = json.dumps(sample.to_json())

In [16]:
# sample_df = pd.DataFrame.from_records(sample_list, columns=columns)
# sample_df.head()

In [17]:
# testing Predictor on sample
preds: List = predictor.predict(sample_list)

Default GPU Device:/device:GPU:0
Selecting GPU ID=0
category counts are  [7]
Resetting temp folder...
*** TemporalFusionTransformer params ***
# dropout_rate = 0.1
# hidden_layer_size = 160
# learning_rate = 0.001
# max_gradient_norm = 0.01
# minibatch_size = 64
# model_folder = C:\Users\Lorenzo\PycharmProjects\TFT\outputs\saved_models\sorgenia_wind\fixed
# num_heads = 4
# stack_size = 1
# total_time_steps = 180
# num_encoder_steps = 168
# num_epochs = 100
# early_stopping_patience = 10
# multiprocessing_workers = 5
# category_counts = [7]
# column_definition = [('id', <DataTypes.REAL_VALUED: 0>, <InputTypes.ID: 4>), ('time', <DataTypes.DATE: 2>, <InputTypes.TIME: 5>), ('kwh', <DataTypes.REAL_VALUED: 0>, <InputTypes.TARGET: 0>), ('hour', <DataTypes.REAL_VALUED: 0>, <InputTypes.KNOWN_INPUT: 2>), ('day_of_week', <DataTypes.REAL_VALUED: 0>, <InputTypes.KNOWN_INPUT: 2>), ('hours_from_start', <DataTypes.REAL_VALUED: 0>, <InputTypes.KNOWN_INPUT: 2>), ('dew_point_2m_C', <DataTypes.REAL_VALUED



Time elapsed  49.37873140000001


In [18]:
preds

[('2020-08-13 22:00:00',
  'UP_MPNTLCDMRN_1',
  55.741031646728516,
  361.58868408203125,
  812.1192626953125,
  804.7371826171875,
  694.0287475585938,
  702.2057495117188,
  659.377197265625,
  873.3704833984375,
  309.26556396484375,
  266.60772705078125,
  443.82244873046875,
  720.8809814453125),
 ('2020-08-13 22:00:00',
  'UP_MPNTLCSMBC_1',
  735.8509521484375,
  647.4838256835938,
  274.7702941894531,
  163.93458557128906,
  425.5662536621094,
  782.2981567382812,
  922.618408203125,
  934.3851318359375,
  841.6237182617188,
  697.5617065429688,
  588.6402587890625,
  527.5157470703125),
 ('2020-08-13 22:00:00',
  'UP_PEPIZZA_1',
  514.6080322265625,
  429.8370361328125,
  283.3421630859375,
  251.8887939453125,
  461.5499267578125,
  607.2701416015625,
  526.2996826171875,
  558.7825927734375,
  887.4940185546875,
  1389.95703125,
  1475.3828125,
  1637.0284423828125),
 ('2020-08-13 22:00:00',
  'UP_PRCLCDMZRD_1',
  127.7403564453125,
  237.7364501953125,
  237.6517333984375,
 

In [None]:
import googleapiclient.discovery

def predict_json(project, model, instances, version=None):
    """Send json data to a deployed model for prediction.

    Args:
        project (str): project where the Cloud ML Engine Model is deployed.
        model (str): model name.
        instances ([Mapping[str: Any]]): Keys should be the names of Tensors
            your deployed model expects as inputs. Values should be datatypes
            convertible to Tensors, or (potentially nested) lists of datatypes
            convertible to tensors.
        version: str, version of the model to target.
    Returns:
        Mapping[str: any]: dictionary of prediction results defined by the
            model.
    """
    # Create the ML Engine service object.
    # To authenticate set the environment variable
    # GOOGLE_APPLICATION_CREDENTIALS=<path_to_service_account_file>
    service = googleapiclient.discovery.build('ml', 'v1')
    name = 'projects/{}/models/{}'.format(project, model)

    if version is not None:
        name += '/versions/{}'.format(version)

    response = service.projects().predict(
        name=name,
        body={'instances': instances}
    ).execute()

    if 'error' in response:
        raise RuntimeError(response['error'])

    return response['predictions']