Carrega módulos necessários

In [1]:
import datetime as dt
import numpy as np
import matplotlib.pyplot as plt
import csv
import itertools
from scipy.interpolate import interp1d
from scipy.optimize import differential_evolution, NonlinearConstraint, brute
from matplotlib.lines import Line2D
import warnings
warnings.filterwarnings('ignore')

Implementa funções de leitura auxiliares de csvs exportados da planilha 448 do HIDRO

In [2]:
def le_csv_medicoes_descarga_hidro(caminho_arquivo, delimitador=";"):
  with open(caminho_arquivo, "r", encoding="utf-8") as f:
      leitor = csv.reader(f, delimiter=delimitador)
      _ = next(leitor)  # ;;;;;;;;;;;;Colunas Auxiliares;;OBSERVAÇÕES
      cabecalho = next(leitor)
      dados = {i: list() for i in cabecalho}
      for linha in leitor:
        if all([i == "" for i in linha]):
          continue  # Pula linha em branco
        for i in range(len(linha)):
          # Transformação de formatos
          if cabecalho[i] == 'Data':
            dados[cabecalho[i]].append(dt.datetime.strptime(linha[i], '%d/%m/%Y'))
          elif cabecalho[i] == 'Hora':
            dados[cabecalho[i]].append(dt.datetime.strptime(f"01/01/1900 {linha[i]}", '%d/%m/%Y %H:%M'))
          elif cabecalho[i] == 'Número da medição':
            try:
              dados[cabecalho[i]].append(int(linha[i]))
            except:
              dados[cabecalho[i]].append(np.nan)
          elif cabecalho[i] == 'Cota (cm)':
            try:
              dados[cabecalho[i]].append(float(linha[i]))
            except:
              dados[cabecalho[i]].append(np.nan)
          elif cabecalho[i] == 'Vazão (m3/s)':
            try:
              dados[cabecalho[i]].append(float(linha[i]))
            except:
              dados[cabecalho[i]].append(np.nan)
          else:
            dados[cabecalho[i]].append(linha[i])
        # break
  return dados


def le_csv_hidro_curvas_detalhe(caminho_arquivo,  delimitador=";"):
  with open(caminho_arquivo, "r", encoding="utf-8") as f:
    leitor = csv.reader(f, delimiter=delimitador)
    cabecalho = next(leitor)
    cabecalho[0] = cabecalho[0].strip('\ufeff')
    dados = {i: list() for i in cabecalho if i != ''}
    _ = next(leitor)  # ;;;a;h0;n;Min.;Max.;;;;(abs.);;(%);;(abs.);;(%);
    for n, linha in enumerate(leitor):
      if linha[2] != '':  # Numero da curva - Nova curva
        for i in dados.keys():
          dados[i].append(list())  # Armazena dados da curva em uma lista
        dados['Estação'][-1].append(linha[0])
        dados['Tipo de Curva'][-1].append(linha[1])
        dados['Número da Curva'][-1].append(linha[2])
        dados['Equação - Coeficientes'][-1].append([float(i) for i in linha[3:5 + 1]])  # a, h0, n
        dados['Intervalo de Cotas (cm)'][-1].append([float(i) for i in linha[6:7 + 1]])  # min, max
        dados['Validade'][-1].append((dt.datetime.strptime(linha[8].split(' - ')[0], "%d/%m/%Y"), dt.datetime.strptime(linha[8].split(' - ')[1], "%d/%m/%Y")))  # data inicial - data final
        dados['Desvio Absoluto Médio (%)'][-1].append([float(i) if i != '' else ''  for i in linha[9:10 + 1]])  # tramo/curva total
        dados['Número de Medições com Desvios Positivos'][-1].append([int(i) if i != '' else '' for i in linha[11:14 + 1]])  # Abs. por Eq. e total (ARMAZENA O TOTAL APENAS NA PRIMEIRA CURVA)
        dados['Número de Medições com Desvios Negativos'][-1].append([int(i) if i != '' else '' for i in linha[15:19 + 1]])  # Abs. por Eq. e total (ARMAZENA O TOTAL APENAS NA PRIMEIRA CURVA)
      else:  # Numero da curva '' - Novo tramo
        dados['Estação'][-1].append(linha[0])
        dados['Tipo de Curva'][-1].append(linha[1])
        dados['Número da Curva'][-1].append(linha[2])
        dados['Equação - Coeficientes'][-1].append([float(i) for i in linha[3:5 + 1]])  # a, h0, n
        dados['Intervalo de Cotas (cm)'][-1].append([float(i) for i in linha[6:7 + 1]])  # min, max
        dados['Validade'][-1].append((dt.datetime.strptime(linha[8].split(' - ')[0], "%d/%m/%Y"), dt.datetime.strptime(linha[8].split(' - ')[1], "%d/%m/%Y")))  # data inicial - data final
        dados['Desvio Absoluto Médio (%)'][-1].append([float(i) if i != '' else '' for i in linha[9:10 + 1]])  # tramo/curva total
        dados['Número de Medições com Desvios Positivos'][-1].append([int(i) if i != '' else '' for i in linha[11:14 + 1]])  # Abs. por Eq.
        dados['Número de Medições com Desvios Negativos'][-1].append([int(i) if i != '' else '' for i in linha[15:19 + 1]])  # Abs. por Eq.
  return dados

Implementa funções auxiliares para ajuste das curvas-chave e otimização dos parâmetros

In [3]:
def equacao_potencial(a, h, h0, n):
  return a*(h - h0)**n

Implementa funções auxiliares para plotagem dos dados

In [4]:
def arredonda_multiplos_10(x):
  return (x + 5)//10*10

def plota_cota_x_desvio(plota_legenda=False):  # TODO: Utilizar argumentos e não variáveis globais
  _markers = list(Line2D.markers)
  if plota_legenda:
    plt.figure(figsize=(10/0.8, 6))
  else:
    plt.figure(figsize=(10, 6))
  # for i in range(6, num_equacoes):
  for i in range(num_equacoes - 1, num_equacoes):  # Só plota a ultima curva
    for j in range(len(coefs_equacoes[i])):
      plt.scatter(cotas_medicao_equacoes[i][j], desvios_medios_equacoes[i][j], marker=_markers[i], label=f"EQ: {i + 1}, TRAMO: {j + 1}")
  plt.hlines(y=[-20, -10, 0, 20, 10], xmin=np.min(cota_medicao), xmax=np.max(cota_medicao), colors=['red', 'blue', 'green'], alpha=0.5, linestyle='--')
  plt.xticks(np.arange(arredonda_multiplos_10(np.min(cota_medicao)), arredonda_multiplos_10(np.max(cota_medicao)) + 100, 100))
  plt.xticks(np.arange(arredonda_multiplos_10(np.min(cota_medicao)), arredonda_multiplos_10(np.max(cota_medicao)) + 100, 50), minor=True)
  plt.grid(which='major', linestyle='--', alpha=0.50)
  plt.grid(which='minor', linestyle=':', alpha=0.25)
  plt.xlabel('Cota (cm)')
  plt.ylabel('Desvio (%)')
  if plota_legenda:  # TODO: Trabalhar melhor a legenda
    box = plt.gca().get_position()
    plt.gca().set_position([box.x0, box.y0, box.width*0.8, box.height])
    plt.legend(loc='center left', ncols=1, bbox_to_anchor=(1.0, 0.5))
  plt.show()
  return

Define caminhos e/ou diretórios

In [5]:
"""
74100000 IRAI
74500000 ALTO URUGUAI
74800000 PORTO LUCENA
75780000 PASSO SÃO BORJA
"""
CAMINHO_CSV_MEDICAO_DESCARGA = f"/content/75780000_medicoes_descarga.csv"
CAMINHO_CSV_CURVAS_HIDRO_DETALHES = f"/content/75780000_hidro_curvas_detalhe.csv"

Carrega dados de medição descarga e curvas-chave da estação em dicionários

In [6]:
medicoes_descarga = le_csv_medicoes_descarga_hidro(CAMINHO_CSV_MEDICAO_DESCARGA)
hidro_curvas_detalhe = le_csv_hidro_curvas_detalhe(CAMINHO_CSV_CURVAS_HIDRO_DETALHES)

Verifica cabeçalho (chaves/dados) disponiveis nos dicionários

In [7]:
print(list(medicoes_descarga.keys()))
print(list(hidro_curvas_detalhe.keys()))

['Código', 'Nível de consistência', 'Data', 'Hora', 'Número da medição', 'Cota (cm)', 'Vazão (m3/s)', 'Área molhada (m2)', 'Largura (m)', 'Velocidade média (m/s)', 'Profundidade (m)', '', 'Q=AxV', '(Q-AxV)/Q [%]']
['Estação', 'Tipo de Curva', 'Número da Curva', 'Equação - Coeficientes', 'Intervalo de Cotas (cm)', 'Validade', 'Desvio Absoluto Médio (%)', 'Número de Medições com Desvios Positivos', 'Número de Medições com Desvios Negativos']


Prepara dados para ajuste e verificação

In [8]:
num_equacoes = len(hidro_curvas_detalhe['Equação - Coeficientes'])
periodos_validade_equacoes = hidro_curvas_detalhe['Validade']
coefs_equacoes = hidro_curvas_detalhe['Equação - Coeficientes']
cota_min_max_equacoes = hidro_curvas_detalhe['Intervalo de Cotas (cm)']
datas_medicao = np.array(medicoes_descarga['Data'])
numero_medicao = np.array(medicoes_descarga['Número da medição'])
cota_medicao = np.array(medicoes_descarga['Cota (cm)'])
vazao_medicao = np.array(medicoes_descarga['Vazão (m3/s)'])

Curadoria dos dados (análise e remoção de pontos)

In [9]:
# # 74100000 IRAI (Dado ruim em 23-10-2015)
# popIndex = np.where(johnson_num_medicao == 223)[0]
# print(johnson_cota_medicao[popIndex], johnson_vazao_medicao[popIndex])
# johnson_datas_medicao = np.delete(johnson_datas_medicao, popIndex)
# johnson_cota_medicao = np.delete(johnson_cota_medicao, popIndex)
# johnson_vazao_medicao = np.delete(johnson_vazao_medicao, popIndex)
# johnson_num_medicao = np.delete(johnson_num_medicao, popIndex)

# johnson_Hlim_max = np.nanmax(johnson_cota_medicao[np.where(np.logical_and(johnson_datas_medicao >= johnson_periodo_validade[0],
#                                                           johnson_datas_medicao <= johnson_periodo_validade[1]))])/100.0
# johnson_Hlim_min = np.nanmin(johnson_cota_medicao[np.where(np.logical_and(johnson_datas_medicao >= johnson_periodo_validade[0],
#                                                           johnson_datas_medicao <= johnson_periodo_validade[1]))])/100.0

# johnson_periodo_validade = (dt.datetime(2012, 10, 30, 0, 0), dt.datetime(2016, 3, 23))  # 74100000 (Corte Temporal 1)
# johnson_periodo_validade = (dt.datetime(2016, 3, 24, 0, 0), dt.datetime(2025, 12, 31))  # 74100000 (Corte Temporal 2)
# johnson_periodo_validade = (dt.datetime(2012, 10, 30, 0, 0), dt.datetime(2025, 12, 31))  # 74100000
# johnson_periodo_validade = (dt.datetime(2022, 1, 1, 0, 0), dt.datetime(2025, 12, 31))  # 74500000
# johnson_periodo_validade = (dt.datetime(2012, 1, 1, 0, 0), dt.datetime(2025, 12, 31))  # 74800000

In [10]:
johnson_datas_medicao = datas_medicao.copy()
johnson_cota_medicao = cota_medicao.copy()
johnson_vazao_medicao = vazao_medicao.copy()
johnson_num_medicao = numero_medicao.copy()

# popIndex = np.where(johnson_num_medicao == 223)[0]
# print(johnson_cota_medicao[popIndex], johnson_vazao_medicao[popIndex])
# johnson_datas_medicao = np.delete(johnson_datas_medicao, popIndex)
# johnson_cota_medicao = np.delete(johnson_cota_medicao, popIndex)
# johnson_vazao_medicao = np.delete(johnson_vazao_medicao, popIndex)
# johnson_num_medicao = np.delete(johnson_num_medicao, popIndex)

# johnson_periodo_validade = (dt.datetime(2012, 10, 30, 0, 0), dt.datetime(2025, 12, 31))  # 7410
# johnson_periodo_validade = (dt.datetime(2012, 10, 30, 0, 0), dt.datetime(2019, 10, 19))  # 7410  01/02
# johnson_periodo_validade = (dt.datetime(2019, 10, 20, 0, 0), dt.datetime(2025, 12, 31))  # 7410 02/02

# johnson_periodo_validade = (dt.datetime(2022, 1, 1, 0, 0), dt.datetime(2025, 12, 31))  # 7450

# johnson_periodo_validade = (dt.datetime(2012, 1, 1, 0, 0), dt.datetime(2019, 10, 30))  # 7480 01/02
# johnson_periodo_validade = (dt.datetime(2019, 10, 31, 0, 0), dt.datetime(2025, 12, 31))  # 7480 01/02

johnson_periodo_validade = (dt.datetime(2022, 1, 1, 0, 0), dt.datetime(2025, 12, 31))  # 7578

johnson_Hlim_max = 16.00
johnson_Hlim_min = 0.0

johnson_tramo_extrapolacao = hidro_curvas_detalhe['Equação - Coeficientes'][-2][-1]  # FIXO: Utiliza-se o tramo do consórcio da ANA [a, h0, n]
print(johnson_tramo_extrapolacao)

[1051.0345, 4.08, 1.435]


Método de Johnson

In [11]:
import pandas as pd

def _johnson_interp_tratar_cotas_vazoes(cotas, vazoes):
  """
  Recebe dois arrays (cotas, vazoes).
  Para cada valor duplicado em cotas, calcula a média das vazões correspondentes.
  Retorna arrays novos com cotas únicas e vazões médias.
  """
  df = pd.DataFrame({"cotas": cotas, "vazoes": vazoes})
  df_grouped = df.groupby("cotas", as_index=False).mean()  # média das vazões por cota
  return df_grouped["cotas"].values, df_grouped["vazoes"].values

def _johnson_calcula_parametros(h1, q1, h2, _cota_medicao, _vazao_medicao, cotas_tramo):
  _johnson_interp_cotas_unicas, _johnson_interp_vazoes_unicas = _johnson_interp_tratar_cotas_vazoes(_cota_medicao/100.0, _vazao_medicao)  # Estratéga para remoção de cotas repetidas: Faz-se a média (cota e vazões) dos registros com cotas repetidas
  q2 = interp1d(_johnson_interp_cotas_unicas, _johnson_interp_vazoes_unicas, kind='linear', fill_value="extrapolate")(h2)
  _Q3 = np.sqrt(q1*q2)
  _H3 = interp1d(_vazao_medicao, _cota_medicao/100.0, kind='linear', fill_value="extrapolate")(_Q3)
  _h0 = (_H3**2.0 - h1*h2)/(2.0*_H3 - h1 - h2)
  _n = (np.log(q2) - np.log(q1))/(np.log(h2 - _h0) - np.log(h1 - _h0))
  _a = q2/((h2 - _h0)**_n)
  _qcalc = equacao_potencial(round(_a, 4), cotas_tramo, round(_h0, 2), round(_n, 3))
  return q2, _Q3, _H3, _h0, _n, _a, _qcalc

def _johnson_tramos_grava_medicoes(datas_medicao, periodo_validade, johnson_hlim, _cota_medicao):
  johnson_tramos_mascara_dados, johnson_tramos_cotas_medicoes, johnson_tramos_vazoes_medicoes = list(), list(), list()
  for i in range(len(johnson_hlim) - 1):
    johnson_tramos_mascara_dados.append(
    np.where(np.logical_and.reduce((datas_medicao >= johnson_periodo_validade[0],
                                                    datas_medicao <= johnson_periodo_validade[1],
                                                    _cota_medicao >= johnson_hlim[i]*100.0,
                                                    _cota_medicao <= johnson_hlim[i + 1]*100.0)))[0]
    )
    # if len(johnson_tramos_mascara_dados[-1]) < 3:
    #   raise ValueError("Menos de 3 medições para o tramo!")
    johnson_tramos_cotas_medicoes.append(_cota_medicao[johnson_tramos_mascara_dados[-1]]/100.0)  # Em metros
    johnson_tramos_vazoes_medicoes.append(vazao_medicao[johnson_tramos_mascara_dados[-1]])
  return johnson_tramos_mascara_dados, johnson_tramos_cotas_medicoes, johnson_tramos_vazoes_medicoes

def _johnson_calcula_continuidade(h_cont, params_1, params_2):
  _qcont1 = equacao_potencial(a=round(params_1[0], 4), h=h_cont, h0=round(params_1[1], 2), n=round(params_1[2], 3))
  _qcont2 = equacao_potencial(a=round(params_2[0], 4), h=h_cont, h0=round(params_2[1], 2), n=round(params_2[2], 3))
  _contp = 100*np.abs(_qcont1 - _qcont2)/np.nanmean([_qcont1, _qcont2])  # Em (%)
  return _qcont1, _qcont2, _contp

Função auxiliar para reconstruir a curva após otimização dos hn

In [12]:
def reconstruir_curva(X, _parExtr, _cota_medicao, _vazao_medicao, _Hlim_minmax, johnson_periodo_validade, johnson_datas_medicao, johnson_cota_medicao):
  num_tramos = (len(X) - 1) // 2
  _johnson_DE_Hlim = [_Hlim_minmax[0]]
  _johnson_DE_Hlim.extend(X[-num_tramos:])  # Exclui o último valor (extrapolação final)
  _johnson_DE_Hlim.append(_Hlim_minmax[1])
  _, johnson_tramos_cotas_medicoes, johnson_tramos_vazoes_medicoes = _johnson_tramos_grava_medicoes(johnson_datas_medicao, johnson_periodo_validade, _johnson_DE_Hlim, johnson_cota_medicao)
  _johnson_interp_cotas_unicas, _johnson_interp_vazoes_unicas = _johnson_interp_tratar_cotas_vazoes(_cota_medicao/100.0, _vazao_medicao)  # Estratéga para remoção de cotas repetidas: Faz-se a média (cota e vazões) dos registros com cotas repetidas
  # Calcula cada tramo
  Tna_arr, Tnh0_arr, Tnn_arr, Tnqcalc_arr = list(), list(), list(), list()
  for n in range(1, len(X) - num_tramos):
    TnQ1 = interp1d(_johnson_interp_cotas_unicas, _johnson_interp_vazoes_unicas, kind='linear', fill_value="extrapolate")(X[n - 1])
    TnQ2, TnQ3, TnH3, Tnh0, Tnn, Tna, Tnqcalc = _johnson_calcula_parametros(X[n - 1], TnQ1, X[n], johnson_cota_medicao, johnson_vazao_medicao, cotas_tramo=johnson_tramos_cotas_medicoes[n - 1])
    Tna_arr.append(Tna)
    Tnh0_arr.append(Tnh0)
    Tnn_arr.append(Tnn)
    Tnqcalc_arr.append(Tnqcalc)
  # Calcula tramo da extrapolação
  Tna_arr.append(_parExtr[0])
  Tnh0_arr.append(_parExtr[1])
  Tnn_arr.append(_parExtr[2])
  Tnqcalc_arr.append(equacao_potencial(round(_parExtr[0], 4), johnson_tramos_cotas_medicoes[-1], round(_parExtr[1], 2), round(_parExtr[2], 3)))
  # Transforma em array
  Tna_arr = np.asarray(Tna_arr)
  Tnh0_arr = np.asarray(Tnh0_arr)
  Tnn_arr = np.asarray(Tnn_arr)
  Tnqcalc_arr = np.concatenate(Tnqcalc_arr)
  # Calcula desvios
  desvios = np.abs(Tnqcalc_arr - np.concatenate(johnson_tramos_vazoes_medicoes))/np.concatenate(johnson_tramos_vazoes_medicoes)
  # Calcula continuidade
  TnCont_arr = list()
  for n in range(1, num_tramos + 1):  # Inclui o tramo da extrapolação
    _, _, _tncont = _johnson_calcula_continuidade(_johnson_DE_Hlim[n], [Tna_arr[n - 1], Tnh0_arr[n - 1], Tnn_arr[n - 1]], [Tna_arr[n], Tnh0_arr[n], Tnn_arr[n]])
    TnCont_arr.append(_tncont)
  TnCont_arr = np.asarray(TnCont_arr)
  # Calcula FOB
  FOB = np.mean(100.0*desvios)
  print()
  print("****************************************")
  print(f"FOB: {FOB:.3f}")
  print(f"CONT: {TnCont_arr}")
  print(f"LIMITES: {_johnson_DE_Hlim}")
  print(f"hn: {X[:-num_tramos]}")
  print(f"MAX DESVIO: {np.nanmax(100.0*desvios):.3f}")
  for i in range(num_tramos + 1):  # Inclui extrapolação
    print(f"{Tna_arr[i]:.4f};{Tnh0_arr[i]:.2f};{Tnn_arr[i]:.3f}")
  print("****************************************")
  return

Ajuste por otimização: Algoritimo genético com n tramos e com o último tramo de extrapolação do consórcio ANA

In [13]:
def funMinimizacao_N(X, _parExtr, _cota_medicao, _vazao_medicao, _Hlim_minmax, johnson_periodo_validade, johnson_datas_medicao, johnson_cota_medicao):
  # Definição de parametros fixos do algoritmo (limites dos parametros)
  _LIM_INF_n = 1.20
  _LIM_SUP_n = 2.0
  _LIM_CONT = 0.5
  _LIM_MAX_DESVIO = 20.0
  # Verificação de viabilidade dos parametros de entrada
  if len(X) < 3:  # Necessário mínimo de 2 pontos e o limite superior para um tramo
    return np.nan
  # Garantir que o número de elementos seja ímpar, ou seja, 1, 3, 5, 7...
  if len(X) % 2 == 1:
      # Calcula o número de tramos (exclui o tramo da interpolação)
      num_tramos = (len(X) - 1) // 2
      # Limite inferior
      _johnson_DE_Hlim = [_Hlim_minmax[0]]
      # Adicionando os elementos intermediários de X (todos, exceto os limites)
      _johnson_DE_Hlim.extend(X[-num_tramos:])  # Exclui o último valor (extrapolação final)
      # Limite superior
      _johnson_DE_Hlim.append(_Hlim_minmax[1])
  else:
      raise Exception("O número de parâmetros deve ser ímpar, como 3, 5, 7...")
  # Prepara registros para o correto funcionamento das interpolações necessárias
  _, johnson_tramos_cotas_medicoes, johnson_tramos_vazoes_medicoes = _johnson_tramos_grava_medicoes(johnson_datas_medicao, johnson_periodo_validade, _johnson_DE_Hlim, johnson_cota_medicao)
  _johnson_interp_cotas_unicas, _johnson_interp_vazoes_unicas = _johnson_interp_tratar_cotas_vazoes(_cota_medicao/100.0, _vazao_medicao)  # Estratéga para remoção de cotas repetidas: Faz-se a média (cota e vazões) dos registros com cotas repetidas
  # Calcula cada tramo
  Tna_arr, Tnh0_arr, Tnn_arr, Tnqcalc_arr = list(), list(), list(), list()
  for n in range(1, len(X) - num_tramos):
    TnQ1 = interp1d(_johnson_interp_cotas_unicas, _johnson_interp_vazoes_unicas, kind='linear', fill_value="extrapolate")(X[n - 1])
    TnQ2, TnQ3, TnH3, Tnh0, Tnn, Tna, Tnqcalc = _johnson_calcula_parametros(X[n - 1], TnQ1, X[n], johnson_cota_medicao, johnson_vazao_medicao, cotas_tramo=johnson_tramos_cotas_medicoes[n - 1])
    Tna_arr.append(Tna)
    Tnh0_arr.append(Tnh0)
    Tnn_arr.append(Tnn)
    Tnqcalc_arr.append(Tnqcalc)
  # Calcula tramo da extrapolação
  Tna_arr.append(_parExtr[0])
  Tnh0_arr.append(_parExtr[1])
  Tnn_arr.append(_parExtr[2])
  Tnqcalc_arr.append(equacao_potencial(round(_parExtr[0], 4), johnson_tramos_cotas_medicoes[-1], round(_parExtr[1], 2), round(_parExtr[2], 3)))
  # Transforma em array
  Tna_arr = np.asarray(Tna_arr)
  Tnh0_arr = np.asarray(Tnh0_arr)
  Tnn_arr = np.asarray(Tnn_arr)
  Tnqcalc_arr = np.concatenate(Tnqcalc_arr)
  # Calcula desvios
  desvios = np.abs(Tnqcalc_arr - np.concatenate(johnson_tramos_vazoes_medicoes))/np.concatenate(johnson_tramos_vazoes_medicoes)
  # Calcula continuidade
  TnCont_arr = list()
  for n in range(1, num_tramos + 1):  # Inclui o tramo da extrapolação
    _, _, _tncont = _johnson_calcula_continuidade(_johnson_DE_Hlim[n], [Tna_arr[n - 1], Tnh0_arr[n - 1], Tnn_arr[n - 1]], [Tna_arr[n], Tnh0_arr[n], Tnn_arr[n]])
    TnCont_arr.append(_tncont)
  TnCont_arr = np.asarray(TnCont_arr)
  # Calcula FOB
  FOB = np.mean(100.0*desvios) + np.sum(TnCont_arr)
  #### Penalidades ###
  # TODO: Penalidade para vazões em cotas específicas
  if np.any(desvios*100 >= _LIM_MAX_DESVIO):  # Penalidade para qualquer desvio acima
    FOB *= np.nanmax(desvios*100)
  if np.isnan(FOB) or np.any(np.isnan(TnCont_arr)):  # FOB ou Continuidade com valor inválido
    FOB = 1E6
  if np.any(TnCont_arr > _LIM_CONT):  # Continuidade
    FOB *= 10000
  if np.any(Tnn_arr < _LIM_INF_n) or np.any(Tnn_arr > _LIM_SUP_n):  # Parâmetro n
    FOB *= 1000
  return FOB

Bounds e restrições genericos

In [14]:
eps = 1e-6  # margem para garantir desigualdade estrita

bnds_1T = [
    (johnson_Hlim_min, johnson_Hlim_max),   # T1H1
    (johnson_Hlim_min, johnson_Hlim_max),   # T1H2
    (johnson_Hlim_min, johnson_Hlim_max),   # Hlim[1] -> Limite superior do primeiro tramo
    ]

cnst_1T = [
    NonlinearConstraint(lambda x: x[1] - x[0], eps, np.inf),  # T1H2 > T1H1
    NonlinearConstraint(lambda x: x[1] - x[2], -np.inf, 0),   # T1H2 <= Hlim1
]

bnds_2T = [
    (johnson_Hlim_min, johnson_Hlim_max),   # T1H1
    (johnson_Hlim_min, johnson_Hlim_max),   # T1H2
    (johnson_Hlim_min, johnson_Hlim_max),   # T2H2
    (johnson_Hlim_min, johnson_Hlim_max),   # Hlim[1] -> Limite superior do primeiro tramo
    (johnson_Hlim_min, johnson_Hlim_max),   # Hlim[2] -> Limite superior do segundo tramo
    ]

cnst_2T = [
    NonlinearConstraint(lambda x: x[1] - x[0], eps, np.inf),  # T1H2 > T1H1
    NonlinearConstraint(lambda x: x[2] - x[1], eps, np.inf),  # T2H2 > T2H1  (T2H1 = T1H2)
    NonlinearConstraint(lambda x: x[1] - x[3], -np.inf, 0),   # T1H2 <= Hlim1
    NonlinearConstraint(lambda x: x[2] - x[4], -np.inf, 0),   # T2H2 <= Hlim2
    NonlinearConstraint(lambda x: x[4] - x[3], eps, np.inf),  # Hlim1 < Hlim2
]

bnds_3T = [
    (johnson_Hlim_min, johnson_Hlim_max),   # T1H1
    (johnson_Hlim_min, johnson_Hlim_max),   # T1H2
    (johnson_Hlim_min, johnson_Hlim_max),   # T2H2
    (johnson_Hlim_min, johnson_Hlim_max),   # T3H2
    (johnson_Hlim_min, johnson_Hlim_max),   # Hlim[1] -> Limite superior do primeiro tramo
    (johnson_Hlim_min, johnson_Hlim_max),   # Hlim[2] -> Limite superior do segundo tramo
    (johnson_Hlim_min, johnson_Hlim_max),   # Hlim[3] -> Limite superior do terceiro tramo
    ]

cnst_3T = [
    NonlinearConstraint(lambda x: x[1] - x[0], eps, np.inf),  # T1H2 > T1H1
    NonlinearConstraint(lambda x: x[2] - x[1], eps, np.inf),  # T2H2 > T2H1  (T2H1 = T1H2)
    NonlinearConstraint(lambda x: x[3] - x[2], eps, np.inf),  # T3H2 > T3H1  (T3H1 = T2H2)
    NonlinearConstraint(lambda x: x[1] - x[4], -np.inf, 0),   # T1H2 <= Hlim1
    NonlinearConstraint(lambda x: x[2] - x[5], -np.inf, 0),   # T2H2 <= Hlim2
    NonlinearConstraint(lambda x: x[5] - x[4], eps, np.inf),  # Hlim1 < Hlim2
    NonlinearConstraint(lambda x: x[6] - x[5], eps, np.inf),  # Hlim2 < Hlim3
]

In [None]:
bnds_3T = [
    (johnson_Hlim_min, 1.0),   # T1H1
    (1.0, 3.0),   # T1H2
    (2.0, 7.5),   # T2H2
    (7.5, 10.0),   # T3H2
    (2.0, 3.0),   # Hlim[1] -> Limite superior do primeiro tramo
    (3.0, 7.5),   # Hlim[2] -> Limite superior do segundo tramo
    (7.5, 10.0),   # Hlim[3] -> Limite superior do terceiro tramo
    ]

saveRes = None
initPop = 'latinhypercube'
x0 = None,  # [0.33334406, 1.85427339, 6.79508897, 9.34609693, 2.77983729348586, 7.062714816560372, 9.329696685621908]
for _ in range(3):
  print(_, dt.datetime.now())

  res = differential_evolution(
      funMinimizacao_N,
      bounds=bnds_3T,
      args=(johnson_tramo_extrapolacao, johnson_cota_medicao, johnson_vazao_medicao, (johnson_Hlim_min, johnson_Hlim_max), johnson_periodo_validade, johnson_datas_medicao, johnson_cota_medicao),
      x0 = x0,
      constraints=cnst_3T,
      strategy='rand1bin',  # rand1exp, randtobest1exp, rand1bin, randtobest1bin
      maxiter=500,
      popsize=250,
      init=initPop,
      tol=0.01,
      mutation=1.5,       # Valores baixos: passos curtos, exploração mais suave → convergência mais rápida, mas risco de ficar preso em ótimo local. / Valores altos: passos longos, busca mais global → explora mais espaço, mas demora a refinar.
      recombination=0.5,  # Valores baixos: filhos ficam muito parecidos com os pais → exploração lenta, risco de travar / Valores altos: mistura mais genes do mutante → maior diversidade, mas pode gerar instabilidade e oscilações.
      polish=False,
      # workers=-1,
      disp=True
      )

  print(f"{res.x}\n{res.fun}")

  if saveRes is None:
    saveRes = res
  else:
    if res.fun < saveRes.fun:
      saveRes = res
      x0 = res.x
  # break

res = saveRes

reconstruir_curva(res.x, johnson_tramo_extrapolacao, cota_medicao, vazao_medicao, (johnson_Hlim_min, johnson_Hlim_max), johnson_periodo_validade, johnson_datas_medicao, johnson_cota_medicao)


0 2025-09-23 13:08:10.110766
differential_evolution step 1: f(x)= 5.266600839102117
differential_evolution step 2: f(x)= 5.266600839102117
differential_evolution step 3: f(x)= 5.266600839102117
differential_evolution step 4: f(x)= 5.266600839102117
differential_evolution step 5: f(x)= 5.266600839102117
differential_evolution step 6: f(x)= 5.266600839102117
differential_evolution step 7: f(x)= 5.266600839102117
differential_evolution step 8: f(x)= 5.266600839102117
differential_evolution step 9: f(x)= 5.266600839102117
differential_evolution step 10: f(x)= 5.266600839102117
differential_evolution step 11: f(x)= 5.266600839102117
differential_evolution step 12: f(x)= 5.266600839102117
differential_evolution step 13: f(x)= 5.266600839102117
differential_evolution step 14: f(x)= 5.266600839102117
differential_evolution step 15: f(x)= 5.266600839102117
differential_evolution step 16: f(x)= 5.266600839102117
differential_evolution step 17: f(x)= 5.266600839102117
differential_evolution step 

Teste brute force

In [None]:
# johnson_T2_params = [1547.91, 0.26, 1.23]

# # bruteRanges_T2fixo = [
# #     slice(0.0, 3.0, 0.05),  # T1H1
# #     slice(3.0, 5.0, 0.05),  # T1H2
# #     slice(1.3, 1.4, 0.01),  # Hlim[1] -> Limite superior do primeiro tramo
# #     slice(4.7, 5.0, 0.01),  # Hlim[2] -> Limite superior do segundo tramo
# # ]

# bruteRanges = [
#     slice(-1.0, 3.0, 0.1),  # T1H1
#     slice(3.0, 4.3, 0.1),  # T1H2
#     slice(4.3, 12.0, 0.1),   # T2H2
#     slice(4.0, 4.3, 0.1),  # Hlim[1] -> Limite superior do primeiro tramo
#     slice(10.7, 11.0, 0.1),  # Hlim[2] -> Limite superior do segundo tramo
# ]

# res = brute(
#     funMinimizacao,
#     bruteRanges,
#     # args=(johnson_T2_params, johnson_tramo_extrapolacao, johnson_cota_medicao, johnson_vazao_medicao, (johnson_Hlim_min, johnson_Hlim_max), johnson_periodo_validade),
#     args=(johnson_tramo_extrapolacao, johnson_cota_medicao, johnson_vazao_medicao, (johnson_Hlim_min, johnson_Hlim_max), johnson_periodo_validade),
#     full_output=0,
#     disp=True,
#     workers=-1
#     )

# print(f"{res}")
