## Acesso à API (Python)

Este notebook descreve o processo de cadastro e execução de funções e dataflow na API utilizando a linguagem `Python`, por meio da biblioteca [requests](https://requests.readthedocs.io/en/latest/).

A sequência de operações descritas abaixo possui como objetivo demonstrar o processo 
de inferência realizado pelo COR (Centro de Operações Rio) através da API, o qual parte do pressuposto que já foi realizado o cadastro de metadados (domínio, projeto, execution_environment).

Uma documentação completa da API, gerada pelo framework Swagger, pode ser consultada em [][]http://localhost:8080/api/swagger-ui.

In [42]:
import requests
import json

OBS: Ao fazer o login no serviço disponibilizado pela URL "https://twinscie.dexl.lncc.br/api/login" deve-se usar o username e password próprio.

In [None]:
URL = "http://localhost:8080/api/login"
#URL = "https://twinscie-dsv.petrobras.com.br/api/login"
#LOGIN_URL = "https://twinscie.dexl.lncc.br/api/login"
response = requests.post(url=URL, json={"username": "admin", "password": "admin"})
token = response.json()["token"]
print(token)
headers = {
    "Authorization": f"Bearer {token}"
}
URL = "http://localhost:8080/api/"


eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTc0OTc0OTIzMiwianRpIjoiNmI1OTQ2MzYtODQ2NS00YzQ3LThkYWUtMjY3MzliMGI4NjUwIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6ImFkbWluIiwibmJmIjoxNzQ5NzQ5MjMyLCJjc3JmIjoiOTNhNTEyNmQtNGQwMS00MDJmLWJmZGEtYWIzZDA5ZmM2OTRhIiwiZXhwIjoxNzUwNjEzMjMyfQ.G6wiuBFNFFaDAKqiSfE0RLV4Q41ALT73cXbECmBbaKo


## Datasets (Cadastro)

##### Adicionando arquivo de dados de radar 

In [64]:

#Definição do caminho do arquivo que será realizado no upload.
files = {
        "files": open(file="./data/radar.parquet", mode="rb"),
}

payload = {
    #Nome do dataset no Twinscie. Nome que será usado em treinamento, processamento e inferência neste dataset.
    "name": "radar_data_path",   
    "domain_id": 1,
}

In [65]:
response = requests.post(
    url = URL + "datasets",
    data = payload,
    files = files,
    headers=headers
)


In [66]:
response.status_code

201

In [9]:
response.json()

{'domain': {'description': 'cor', 'id': 1, 'name': 'cor'},
 'file_type': 'parquet',
 'id': 68,
 'name': 'radar_data_path',
 'register': '2025-06-12T14:23:34.731674',
 'uri': 'http://localhost:8080/api/download/datasets/radar_data_path'}

##### Adicionando arquivo de dados 'rain_gauge'

In [10]:
#Definição do caminho do arquivo que será realizado no upload.
files = {
        "files": open(file="./data/rain_gauge.parquet", mode="rb"),
}

payload = {
    #Nome do dataset no Twinscie. Nome que será usado em treinamento, processamento e inferência neste dataset.
    "name": "rain_gauge_data_path",
    "domain_id": 1,
}

In [11]:
response = requests.post(
    url = URL + "datasets",
    data = payload,
    files = files,
    headers=headers
)

In [12]:
response.status_code

201

In [13]:
response.json()

{'domain': {'description': 'cor', 'id': 1, 'name': 'cor'},
 'file_type': 'parquet',
 'id': 69,
 'name': 'rain_gauge_data_path',
 'register': '2025-06-12T14:23:45.994315',
 'uri': 'http://localhost:8080/api/download/datasets/rain_gauge_data_path'}

##### Adicionando arquivo de dados 'grid_points'

In [14]:
#Definição do caminho do arquivo que será realizado no upload.
files = {
        "files": open(file="./data/grid_points.parquet", mode="rb"),
}

payload = {
    #Nome do dataset no Twinscie. Nome que será usado em treinamento, processamento e inferência neste dataset.
    "name": "grid_data_path",
    "domain_id": 1,
}

In [15]:
response = requests.post(
    url = URL + "datasets",
    data = payload,
    files = files,
    headers=headers
)

In [13]:
response.status_code

201

In [16]:
response.json()

{'domain': {'description': 'cor', 'id': 1, 'name': 'cor'},
 'file_type': 'parquet',
 'id': 70,
 'name': 'grid_data_path',
 'register': '2025-06-12T14:23:55.012220',
 'uri': 'http://localhost:8080/api/download/datasets/grid_data_path'}

##### Adicionando modelo convlstm.pt

OBS: Deve-se colocar o caminho para o arquivo convlstm.pt armazenado localmente.

In [17]:
#Definição do caminho do arquivo que será realizado no upload.
files = {
        "files": open(file="./data/convlstm.pt", mode="rb"),
}

payload = {
    "name": "model_path",
    "domain_id": 1,
}

In [18]:
response = requests.post(
    url = URL + "datasets",
    data = payload,
    files = files,
    headers=headers
)

In [19]:
response.status_code

201

In [20]:
response.json()

{'domain': {'description': 'cor', 'id': 1, 'name': 'cor'},
 'file_type': 'pt',
 'id': 71,
 'name': 'model_path',
 'register': '2025-06-12T14:24:03.411087',
 'uri': 'http://localhost:8080/api/download/datasets/model_path'}

### Function (Cadastro)

##### Adicionando função que realiza a leitura dos dados do disco

In [21]:
files = {
        "files": open(file="preprocessing.py", mode="rb"),
}
payload = {
"name": "load",       #Nome da função. Deve ser o mesmo nome definido na implementação da função no arquivo .py.
"alias": "load_cor",  #Apelido da função no Twinscie. Será usado para referenciar esta função na definição e execução do dataflow.
"description":  "Loads the data from the sources",
"input_arity":  "many",
"output_arity": "many",
"function_type": "source",     # Função de leitura dos dados no disco
"filename": "preprocessing.py",  # Nome do arquivo que contém a implementação da função
"params": json.dumps([  # parâmetros de entrada da função. Todos os argumentos da função devem estar especificados.
        {"name": "radar_data_path", "data_type": "path", "default_value": "radar.parquet"},
        {"name": "rain_gauge_data_path", "data_type": "path", "default_value": "rain_gauge.parquet"},
        {"name": "grid_data_path", "data_type": "path", "default_value": "grid_points.parquet"},
])
}

In [22]:
response = requests.post(
    url = URL + "functions",
    data = payload,
    files = files,
    headers=headers
)

In [23]:
response.status_code

201

In [24]:
response.json()

{'alias': 'load_cor',
 'description': 'Loads the data from the sources',
 'filename': 'preprocessing',
 'function_type': 'source',
 'id': 89,
 'input_arity': 'many',
 'name': 'load',
 'output_arity': 'many',
 'parameters': [{'data_type': 'path',
   'description': 'grid_points.parquet',
   'id': 114,
   'name': 'grid_data_path'},
  {'data_type': 'path',
   'description': 'radar.parquet',
   'id': 112,
   'name': 'radar_data_path'},
  {'data_type': 'path',
   'description': 'rain_gauge.parquet',
   'id': 113,
   'name': 'rain_gauge_data_path'}],
 'register': '2025-06-12 14:24:12',
 'uri': 'localhost:8080/functions/load_cor.zip'}

##### Adicionando função de preprocessamento

In [25]:
files = {
        "files": open(file="preprocessing.py", mode="rb"),
}
payload = {
        "name": "preprocessing", #Nome da função. Deve ser o mesmo nome definido na implementação da função no arquivo .py.
        "alias": "preprocessing_cor", #Apelido da função no Twinscie. Será usado para referenciar esta função na definição e execução do dataflow.
        "description":  "Preprocesses the data to be used as input for COR model",
        "input_arity":  "many",
        "output_arity": "one",
        "function_type": "transformer",  # Função de transformação dos dados.
        "filename": "preprocessing.py", # Nome do arquivo .py que contém a implementação da função
        "params": json.dumps([ # parâmetros de entrada da função. Todos os argumentos da função devem estar especificados.
                {"name": "df_radar", "data_type": "in_memory_artifact"},
                {"name": "df_rain_gauge", "data_type": "in_memory_artifact"},
                {"name": "df_grid", "data_type": "in_memory_artifact"},
        ])
}


In [26]:
response = requests.post(
    url = URL + "functions",
    data = payload,
    files = files,
    headers=headers
)

In [27]:
response.status_code

201

In [28]:
response.json()

{'alias': 'preprocessing_cor',
 'description': 'Preprocesses the data to be used as input for COR model',
 'filename': 'preprocessing',
 'function_type': 'transformer',
 'id': 90,
 'input_arity': 'many',
 'name': 'preprocessing',
 'output_arity': 'one',
 'parameters': [{'data_type': 'in_memory_artifact',
   'description': '',
   'id': 117,
   'name': 'df_grid'},
  {'data_type': 'in_memory_artifact',
   'description': '',
   'id': 115,
   'name': 'df_radar'},
  {'data_type': 'in_memory_artifact',
   'description': '',
   'id': 116,
   'name': 'df_rain_gauge'}],
 'register': '2025-06-12 14:24:21',
 'uri': 'localhost:8080/functions/preprocessing_cor.zip'}

##### Adicionando função de carregar o modelo

In [29]:
files = {
        "files": open(file="preprocessing.py", mode="rb"),
}
payload = {
        "name": "load_model", #Nome da função. Deve ser o mesmo nome definido na implementação da função no arquivo .py.
        "alias": "load_model_cor", #Apelido da função no Twinscie. Será usado para referenciar esta função na definição e execução do dataflow.
        "description":  "Load COR model",
        "input_arity":  "many",
        "output_arity": "one",
        "function_type": "source", # Função de leitura dos dados no disco
        "filename": "preprocessing.py", # Nome do arquivo .py que contém a implementação da função
        "params": json.dumps([ # parâmetros de entrada da função. Todos os argumentos da função devem estar especificados.
                {"name": "model_path", "data_type": "path", "default_value": "convlstm.pt"}
        ])
}

In [30]:
response = requests.post(
    url = URL + "functions",
    data = payload,
    files = files,
    headers=headers
)

In [31]:
response.status_code

201

In [32]:
response.json()

{'alias': 'load_model_cor',
 'description': 'Load COR model',
 'filename': 'preprocessing',
 'function_type': 'source',
 'id': 91,
 'input_arity': 'many',
 'name': 'load_model',
 'output_arity': 'one',
 'parameters': [{'data_type': 'path',
   'description': 'convlstm.pt',
   'id': 118,
   'name': 'model_path'}],
 'register': '2025-06-12 14:24:30',
 'uri': 'localhost:8080/functions/load_model_cor.zip'}

##### Adicionando função de inferência

In [33]:
files = {
        "files": open(file="preprocessing.py", mode="rb"),
}
payload = {
        "name": "predict", #Nome da função. Deve ser o mesmo nome definido na implementação da função no arquivo .py.
        "alias": "predict_cor", #Apelido da função no Twinscie. Será usado para referenciar esta função na definição e execução do dataflow.
        "description":  "Predicts the precipitation using the COR model",
        "input_arity":  "many",
        "output_arity": "one",
        "function_type": "transformer", # Função de transformação dos dados.
        "filename": "preprocessing.py", # Nome do arquivo .py que contém a implementação da função
        "params": json.dumps([ # parâmetros de entrada da função. Todos os argumentos da função devem estar especificados.
                {"name": "model", "data_type": "in_memory_artifact"},
                {"name": "X", "data_type": "complex"}
        ])
}

In [34]:
response = requests.post(
    url = URL + "functions",
    data = payload,
    files = files,
    headers=headers
)

In [35]:
response.status_code

201

In [36]:
response.json()

{'alias': 'predict_cor',
 'description': 'Predicts the precipitation using the COR model',
 'filename': 'preprocessing',
 'function_type': 'transformer',
 'id': 92,
 'input_arity': 'many',
 'name': 'predict',
 'output_arity': 'one',
 'parameters': [{'data_type': 'in_memory_artifact',
   'description': '',
   'id': 119,
   'name': 'model'},
  {'data_type': 'complex', 'description': '', 'id': 120, 'name': 'X'}],
 'register': '2025-06-12 14:24:38',
 'uri': 'localhost:8080/functions/predict_cor.zip'}

### Dataflow (Cadastro)

OBS1: o atributo alias deve possuir o mesmo valor do alias da função definido no cadastro da mesma.

OBS2: Os nomes definidos no input devem ser os mesmo nomes dados aos parâmetros relacionados aos datasets no momento do cadastro da função.  

OBS3: Caso um nó produza dados para outro nó, o nome dado no output do primeiro deve ser o mesmo no input do segundo. Ex: b_output e predictions.

OBS4: Dentro do conda.yaml deve ter a dependencia Kedro especificada.

In [44]:
files = {
        "files": open(file="conda.yaml", mode="rb"),
}
graph_data = [ # Definição de cada nó do dataflow
{ 
        "function_alias":"load_cor",  # Função que será executada no nó.
        "input":[                     # Parâmetros de entrada. Mesmos nomes de parâmetros definidos na especificação da função.  
                "radar_data_path",      
                "rain_gauge_data_path",
                "grid_data_path"
        ],
        "output":[             # Parâmetros de saída. Devem ter os mesmo nomes dos parâmetros de entrada do nó seguinte.           
                "df_radar", 
                "df_rain_gauge",
                "df_grid"
        ]
},
{ 
        "function_alias":"preprocessing_cor", # Função que será executada no nó.
        "input":[              # Parâmetros de entrada. Mesmos nomes de parâmetros definidos na especificação da função.            
                "df_radar", 
                "df_rain_gauge",
                "df_grid"
        ],
        "output":["X"] # Parâmetros de saída. Devem ter os mesmo nomes dos parâmetros de entrada do nó seguinte.    
},
{
        "function_alias":"load_model_cor", # Função que será executada no nó.
        "input":[            # Parâmetros de entrada. Mesmos nomes de parâmetros definidos na especificação da função. 
                "model_path"
        ],
        "output":["model"] # Parâmetros de saída. Devem ter os mesmo nomes dos parâmetros de entrada do nó seguinte.    
},
{
        "function_alias":"predict_cor", # Função que será executada no nó. 
        "input":[          # Parâmetros de entrada. Mesmos nomes de parâmetros definidos na especificação da função. 
                "X",
                "model"
        ],
        "output":None
}
]
payload = {
    "name": "rionowcast_dataflow",
    "graph": json.dumps(graph_data)
}

In [45]:
response = requests.post(
    url = URL + "dataflows",
    data = payload,
    files = files,
    headers=headers
)

In [46]:
response.status_code

201

In [48]:
response.json()

{'graph': '[{"function_alias": "load_cor", "input": ["radar_data_path", "rain_gauge_data_path", "grid_data_path"], "output": ["df_radar", "df_rain_gauge", "df_grid"]}, {"function_alias": "preprocessing_cor", "input": ["df_radar", "df_rain_gauge", "df_grid"], "output": ["X"]}, {"function_alias": "load_model_cor", "input": ["model_path"], "output": ["model"]}, {"function_alias": "predict_cor", "input": ["X", "model"], "output": null}]',
 'id': 1,
 'name': 'rionowcast_dataflow',
 'uri': 'localhost:8080/dataflows/rionowcast_dataflow.zip'}

### Dataflow (Execução)

OBS: Atentar para os ids, alias das funções e nomes dos parâmetros que estão sendo passados. Existe validação para isso, mas devem ser os ids retornados para cada uma das etapas anteriores. Nos params deve ser passado o nome do dataset e o seu respectivo nome cadastrado na base.
OBS2: Somente funções com parâmetros definidos pelo usuário devem ser declaradas.

In [67]:
data = {
    "dataflow_id": 1,  # Id do dataflow
    "environment_id": 1,  # Id do environment
    "parameters": [  # Definição dos parâmetros de execução
        {
            "function_alias":"load_cor",   # Alias da função e seus respectivos datasets de entrada NESTA execução
            "params": {"radar_data_path":"radar_data_path", "rain_gauge_data_path":"rain_gauge_data_path", "grid_data_path":"grid_data_path"}
        },
        { 
            "function_alias":"load_model_cor", # Alias da função e seus respectivos datasets de entrada NESTA execução
            "params": {"model_path":"model_path"}
        }
    ],
    "project_id": 1   # Id do projeto
}

In [68]:
response = requests.post(
    url = URL + "dataflow_run",
    json = data,
    headers=headers
)

In [40]:
response.status_code

201

In [33]:
response.json()

{'task_id': '9e051dc9-4541-4537-af9b-7018033e289b'}

In [34]:
task_id = response.json().get('task_id')

In [35]:
task_id

'9e051dc9-4541-4537-af9b-7018033e289b'

### Monitorando status de execução do Dataflow

In [None]:
response = requests.get(
    headers=headers,
    url = URL + "status_dataflow_run/" + task_id,
)

In [37]:
response.json()

{'state': 'FAILURE'}