# Parte 02

Nessa parte, os modelos criados anteriormente serão utilizados para realizar predições. Para isso, eles devem ser registrados no TFX. Para efetuar as predições, os dados utilizados no treinamento desses modelos serão inseridos no SAVIME, o qual ficará encarregado de enviar e receber os dados para/de TFX. 
### **Não se esqueça de abaixo configurar em qual diretório se encontra a API PySavime e o local em que foram salvos os dados de treinamento.**

In [1]:
import os
import sys

# Necessário mudar o diretório de trabalho para o nível mais acima
if not 'notebooks' in os.listdir('.'):
    current_dir = os.path.abspath(os.getcwd())
    parent_dir = os.path.dirname(current_dir)
    os.chdir(parent_dir)

# Inserir aqui o caminho da biblioteca do savime
py_savime_path =  '/home/daniel/PycharmProjects/intelipetro/savime'
sys.path.insert(0, py_savime_path)

# Inserir aqui o caminho do arquivo de dados: um json contendo informações a respeito 
# da partição de x e y utilizada na parte 01.
data_fp = 'saved_models_elastic_net/data.json'

# Configuração do host e porta em que o SAVIME está escutando
host = '127.0.0.1'
port = 65000

In [2]:
%load_ext autoreload
%autoreload 2

import json
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error

# Savime imports
from savime.datatype import SavimeSupportedTypes
from savime.client import Client
import schema.define as savime_define
from misc.command_runner import CommandRunner
from util.converter import DataVariableBlockConverter
from util.data_variable import DataVariableBlockOps

from src.predictor_consumer import PredictionConsumer

O primeiro passo é carregar os dados gerados anteriormente. Um dos motivos é remover os metadados dos arrays de entrada e saída. Em outras palavras, tornar os arrays numpy em vetores contíguos do C/C++. Esses vetores serão posteriormente cadastrados no SAVIME.

In [3]:
with open(data_fp, 'r') as _in:
    data = json.load(_in)

output_dir = data['output_dir']
x_fp = os.path.join(output_dir, data['x_file_name'])
y_fp = os.path.join(output_dir, data['y_file_name'])

x_array = np.load(x_fp)
y_array = np.load(y_fp)
num_observations, num_features = x_array.shape
y_num_columns = 1 if len(y_array.shape) == 1 else y.shape[1]

x_c_fp = os.path.join(output_dir, 'x_data')
y_c_fp = os.path.join(output_dir, 'y_data')

# Salvar os arrays numpy em um formato "legível" ao SAVIME, isto é, como arrays (row-wise) e 
# sem metadados do numpy.
x_array.astype('float64').tofile(x_c_fp)
y_array.astype('float64').tofile(y_c_fp)

**Vamos agora definir os esquemas dos dados no sistema SAVIME.**

Grosso modo, SAVIME é um SGDB para matrizes multidimensionais. O elemento base de seu modelo de dados é o Tar. Pode-se pensar em um Tar como sendo um dataset de matrizes muldimensionais com dimensões nomeadas. Por exemplo, determinado tar pode possuir 3 dimensões nomeadas (tempo, latitude e longitude) e 2 atributos (temperatura e precipitação). Nesse caso, o tar conteria duas matrizes multidimensionais de terceira ordem, onde cada ponto (tempo, latitude, longitude) representa uma informação climática em determinado instante e local. Além do tar outros dois elementos são importantes no modelo de dados do SAVIME: dataset e subtar. O primeiro diz respeito ao array (atributo que está de fato armazenado em disco). Por sua vez, cada subtar é associado a um único tar e diz respeito a como um array/conjunto de arrays é carregado no tar.

A fim de tornar o processo de comunicação com o sistema facilitado nesse notebook são empregadas as funções da API PySavime. Essa API permite que as consultas sejam realizadas diretamente do ambiente Python e que os próprios resultados dessas consultas sejam elementos facilmente integrados no ciclo de desenvolvimento de modelos, como arrays numpy e dataframes pandas.

In [4]:
# Definição do datasets a serem utilizados:
# Lembre que tinha X tem duas features (length)

x_dataset = savime_define.file_dataset('x', x_c_fp, SavimeSupportedTypes.DOUBLE, length=num_features)
y_dataset = savime_define.file_dataset('y', y_c_fp, SavimeSupportedTypes.DOUBLE)

# O comando gerado será:
print(x_dataset.create_query_str(), y_dataset.create_query_str(), sep='\n')

CREATE_DATASET("x:double:2", "@/home/daniel/PycharmProjects/savime-notebooks/saved_models_elastic_net/x_data");
CREATE_DATASET("y:double:1", "@/home/daniel/PycharmProjects/savime-notebooks/saved_models_elastic_net/y_data");


In [5]:
# Definição do Tar a ser empregado:

# Dimensão index
index = savime_define.implicit_tar_dimension('index', SavimeSupportedTypes.INT32, 1, num_observations)
x = savime_define.tar_attribute('x', SavimeSupportedTypes.DOUBLE, num_features)
y = savime_define.tar_attribute('y', SavimeSupportedTypes.DOUBLE, y_num_columns)
tar = savime_define.tar('tutorial', [index], [x, y])

# O comando gerado será:
print(tar.create_query_str())

CREATE_TAR("tutorial", "*", "implicit, index, int32, 1, 100000, 1", "x, double: 2 | y, double: 1");


In [6]:
# Carregamento do SubTar:
# Aqui de fato os dados serão carregados no Tar.  

subtar_index = savime_define.ordered_subtar_dimension(index, 1, num_observations)
subtar_x = savime_define.subtar_attribute(x, x_dataset)
subtar_y = savime_define.subtar_attribute(y, y_dataset)
subtar = savime_define.subtar(tar, [subtar_index], [subtar_x, subtar_y])

print(subtar.load_query_str())

LOAD_SUBTAR("tutorial", "ordered, index, #1,#100000", "x, x | y, y")


### Para executar os passos a seguir é necessário que haja um servidor SAVIME escutando no host e porta definidos anteriormente.

In [7]:
# 1. Conexão é aberta e fechada com o SAVIME (contexto with)
# 2. Criação de um objeto de execução de comandos vinculado à conexão criada.
# 3. a) Criação dos datasets
#    b) Criação do subtar
#    c) Carregamento dos datasets por meio de um subtar

with Client(host=host, port=port) as client:
    command_runner = CommandRunner(client)
    
    command_runner.create(x_dataset)
    command_runner.create(y_dataset)
    command_runner.create(tar)
    command_runner.load(subtar)

In [8]:
# Os objetos abaixo são utilizados para converter do container mais genérico DataVariableBlock, retornado 
# como resposta da consulta, em  elmentos xarray e pandas. 
# Note que DataVariableBlocks contêm dois atributos: dims e attrs. Cada um desses contém um
# dicionário do tipo nome_array: array, onde array é um numpy array.

xarray_converter = DataVariableBlockConverter('xarray')
pandas_converter = DataVariableBlockConverter('pandas')

# Efetuar select no SAVIME a fim de verificar se o TAR for criado de forma adequada
with Client(host=host, port=port) as client:
    responses = client.execute(f'SELECT({tar.name})')
        
# Em geral, o retorno do SAVIME a uma consulta é dado por subtar. Ou seja, se o tar contem n subtars então
# a variável responses acima será uma lista com n DataVariableBlocks. Abaixo, eles são concatenados.
data_variable_block = DataVariableBlockOps.concatenate(responses)

# E abaixo convertidos
xdataset_response = xarray_converter(data_variable_block)
pandas_response = pandas_converter(data_variable_block)

In [9]:
# Obs.: 
# 1) Como xarray representa arrays como matrizes multidimensionais densas e SAVIME é mais genérico, 
# abarcando matrizes esparsas, é preciso manter uma máscara _mask_ identificando se determinado
# elemento na matriz está presente ou não.
# 2) Dataframes pandas é intrinsicamente uma estrutura tabular. Para permitir a representação de atributos
# com uma segunda dimensão (matrizes) emprega-se índices múltiplos. Note a coluna x abaixo.
print('XDATASET')
print(xdataset_response)
print('DATAFRAME')
print(pandas_response)

XDATASET
<xarray.Dataset>
Dimensions:  (_0_: 2, index: 100000)
Coordinates:
  * index    (index) int32 1 2 3 4 5 6 ... 99995 99996 99997 99998 99999 100000
  * _0_      (_0_) int64 0 1
    _mask_   (index, _0_) bool True True True True True ... True True True True
Data variables:
    x        (index, _0_) float64 -1.0 -1.0 -1.0 -1.0 -1.0 ... 1.0 1.0 1.0 1.0
    y        (index) float64 -2.095 -1.912 -2.03 -1.944 ... 5.955 5.871 5.932
DATAFRAME
               x                   y
               0         1         0
index                               
1      -0.999989 -0.999982 -2.094709
2      -0.999979 -0.999966 -1.912207
3      -0.999962 -0.999949 -2.030484
4      -0.999921 -0.999907 -1.943638
5      -0.999897 -0.999871 -2.003759
...          ...       ...       ...
99996   0.999898  0.999903  6.046098
99997   0.999909  0.999938  5.740115
99998   0.999960  0.999970  5.955184
99999   0.999974  0.999977  5.871285
100000  0.999984  0.999984  5.931968

[100000 rows x 3 columns]


In [10]:
# Checando se a resposta é correta
print('pandas ok:', np.allclose(x_array, pandas_response['x'].values.reshape(x_array.shape)))

print('xarray ok:', 
np.allclose(x_array,
xdataset_response['x'].values[xdataset_response['_mask_']].reshape(x_array.shape)))

pandas ok: True
xarray ok: True


### Para executar os passos abaixo é nessário que o servidor de modelos esteja rodando.

Execute o comando abaixo:
`tensorflow_model_server --rest_api_port=8501 --model_config_file=ARQUIVO_DE_MODELOS` 
Note que você deve trocar ARQUIVO_DE_MODELOS pelo caminho do arquivo no qual os modelos foram registrados. Esse arquivo é o `models.config` dentro da pasta `saved_models`.

In [11]:
tfx_host = 'localhost'
tfx_port = 8501

previously_computed_mse = np.array(data['metrics']['mean_squared_error'])
model_name_to_iid = data['iid']
whole_partition_iid = model_name_to_iid[data['model']]

mse = {}

for model_name, i in model_name_to_iid.items():
    # O objeto abaixo é vinculado ao model (dado por model_name)
    model_predictive_service = PredictionConsumer(host=tfx_host, port=tfx_port, model_name=model_name)
    
    # Abaixo é enviado o array x como consulta preditiva, e retornado y_hat
    y_array_hat = model_predictive_service.predict(x_array)
    
    # Computando o mse com o resultado retornardo por TFX.
    model_mse = mean_squared_error(y_array, y_array_hat)
    
    mse[i] = {'TFX': model_mse, 'Antes': previously_computed_mse[i][whole_partition_iid]}


def get_df_for_checking_result(result):
    df = pd.DataFrame.from_dict(mse, orient='index')
    df['Está Correto'] = np.isclose(df['TFX'].values, df['Antes'].values)
    df.sort_index(inplace=True)
    df.index.name = 'Model IID'
    return df

get_df_for_checking_result(mse)

Unnamed: 0_level_0,TFX,Antes,Está Correto
Model IID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,6.322893,6.322901,True
1,10.293973,10.293969,True
2,6.497287,6.497281,True
3,6.530949,6.530946,True
4,6.483487,6.483484,True
5,6.320596,6.320597,True
6,6.084279,6.084278,True
7,5.826059,5.826064,True
8,5.453071,5.45307,True
9,5.028097,5.028098,True


Agora ao invés de solicitar a predição diretamente ao TFX (model_predictive_service empregado anteriormente), utilizaremos o SAVIME.

In [12]:
mse = {}

with Client(host=host, port=port) as client:
    command_runner = CommandRunner(client)
    
    dim_spec = [(index.name, num_observations)]
    
    for model_name, i in model_name_to_iid.items():
        # Um modelo é registrado no savime, isto é, associado a um tar, identificado qual o atributo de entrada e
        # o formato da matriz multidimensional de entrada. Nesse caso, estamos enviando o vetor completo de 
        # observações, mas é totalmente possível pedir uma predição para somente uma parte dele.
        command_runner.register_model(model_name=model_name, model_tar=tar.name,
                                      input_attribute=x.name,
                                      dim_specification=dim_spec)
        
        # Aqui a predição é realizada.
        response = command_runner.predict(tar=tar.name, model_name=model_name, input_attribute=x.name)[0]
        y_array_hat_savime = response.attrs['op_result'].ravel()
        
        model_mse = mean_squared_error(y_array, y_array_hat_savime)
        
        mse[i] = {'TFX': model_mse, 'Antes': previously_computed_mse[i][whole_partition_iid]}
    
get_df_for_checking_result(mse)

Unnamed: 0_level_0,TFX,Antes,Está Correto
Model IID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,6.322893,6.322901,True
1,10.293973,10.293969,True
2,6.497287,6.497281,True
3,6.530949,6.530946,True
4,6.483487,6.483484,True
5,6.320596,6.320597,True
6,6.084279,6.084278,True
7,5.826059,5.826064,True
8,5.453071,5.45307,True
9,5.028097,5.028098,True


In [13]:
# Observe o resultado completo de uma das consultas executadas anteriormente, a predição é 
# retornada na coluna op_result
pandas_converter(response)

Unnamed: 0_level_0,op_result,x,x,y
Unnamed: 0_level_1,0,0,1,0
index,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
1,-1.631772,-0.999989,-0.999982,-2.094709
2,-1.631761,-0.999979,-0.999966,-1.912207
3,-1.631744,-0.999962,-0.999949,-2.030484
4,-1.631702,-0.999921,-0.999907,-1.943638
5,-1.631677,-0.999897,-0.999871,-2.003759
...,...,...,...,...
99996,0.420930,0.999898,0.999903,6.046098
99997,0.420942,0.999909,0.999938,5.740115
99998,0.420993,0.999960,0.999970,5.955184
99999,0.421008,0.999974,0.999977,5.871285
