In [2]:
import os, sys
project_path = os.getcwd()

src_path = os.path.join(project_path, 'src')
assert os.path.exists(src_path) and os.path.isdir(src_path)
sys.path.append(src_path)



import copy
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras.regularizers import l1
from keras.callbacks import History, EarlyStopping, ModelCheckpoint, TerminateOnNaN, CSVLogger   
from keras.layers import Dense
from sklearn.preprocessing import StandardScaler
from src.dataset import dataset
from scipy.constants import Boltzmann as k, eV as q, zero_Celsius as T0
from src.PVmodels import Model7PFF




class CustomModel(keras.Model):
  def train_step(self, data):
    # x: [[S, T], [Rs, Gp, IL, LogI0, b]]
    # y: [Isc, Pmp, Imp, Vmp, Voc]
    x, y = data
    with tf.GradientTape() as tape: 
      # forward pass
      # - compute delta_x
      y_pred = self.predict_meas(x, training=True)
      
      # compute loss
      loss = self.compiled_loss(y, y_pred) 

    # Update weights
    grads = tape.gradient(loss, self.trainable_weights)    
    self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

    # compute metrics
    self.compiled_metrics.update_state(y, y_pred)
    return {m.name: m.result() for m in self.metrics}

  def test_step(self, data):
    x, y = data
    # forward pass
    # - compute delta_x
    y_pred = self.predict_meas(x, training=False)

    # compute loss
    # self.compiled_loss(y, y_pred)
    loss = self.compiled_loss(y, y_pred) 

    # compute metrics
    self.compiled_metrics.update_state(y, y_pred)
    return {m.name: m.result() for m in self.metrics}





class ModelDNN(Model7PFF):
  def __init__(self, config:dict, project_path:str, seed:int=0):
    # check model properties
    assert isinstance(config, dict)

    assert isinstance(config['PVmodule'], str)
    self.PVmodule = config['PVmodule']

    assert all([
      isinstance(config['ds_ratio'], list),
      len(config['ds_ratio']) == 3,  # [train, val, test]
    ])

    assert isinstance(config['agent'], dict)
    self.agent_properties = config['agent']
    assert all([
      isinstance(self.agent_properties['layers'], list),
      len(self.agent_properties['layers']) > 0, # hidden layers
    ])
    assert all([
      isinstance(self.agent_properties['act_hidden'], str),
      self.agent_properties['act_hidden'] in ['relu', 'sigmoid', 'tanh']
      ])
    assert all([
      isinstance(self.agent_properties['act_output'], str),
      self.agent_properties['act_output'] in ['relu', 'sigmoid', 'tanh', 'linear']
      ])
    assert all([
      isinstance(config['agent']['regularizer'], keras.regularizers.L1) or 
      isinstance(config['agent']['regularizer'], keras.regularizers.L2),
      list(config['agent']['regularizer'].get_config().values())[0] <= 0.3
      ])
    
    # check exists project_path 
    assert isinstance(project_path, str) 
    assert self.exists_folder(project_path)
    self.path = project_path

    # check seed
    assert isinstance(seed, int) 
    np.random.seed(seed)
    tf.keras.utils.set_random_seed(seed)

    # ModelBase config
    params = pd.read_csv('data/params.csv', index_col=0).astype(np.float32)
    super().__init__(*params.loc[self.PVmodule], 25+T0, 1000)

    # dataset config
    self.data = dataset(project_path)(self.PVmodule, ratio=config['ds_ratio'], seed=seed)
    
    # get all (S, T)
    allST_physical = copy.deepcopy(self.data['x_data'][0])
    allS, allT = np.hsplit(allST_physical, 2)
    allT = allT + T0

    # calculation of x0 for all examples (in physical units for approach to standardization)
    b, IL, I0, Rs, Gp = self.params(allS, allT)
    allX0_physical = np.hstack([b, IL, np.log(I0), Rs, Gp])

    # measurement in physical units
    allY_physical = copy.deepcopy(self.data['y_data'][0])

    # Scaler config
    self.STscaler = StandardScaler().fit(allST_physical)
    self.Xscaler  = StandardScaler().fit(allX0_physical)
    self.Yscaler0 = StandardScaler().fit(allY_physical)

    # Inverse scaler config
    self.InSTscaler = lambda x: x*self.STscaler.var_**.5 + self.STscaler.mean_
    self.InXscaler  = lambda x: x* self.Xscaler.var_**.5 +  self.Xscaler.mean_
    self.InYscaler  = lambda x: x* self.Yscaler0.var_**.5 +  self.Yscaler0.mean_

    # rewrite YScaler (tensorflow)
    self.Yscaler = lambda x: (x - self.Yscaler0.mean_)/self.Yscaler0.var_**.5
    
    # sample standardization
    self.ST_tr,  self.Y_tr  = self.standardization(0)
    self.ST_val, self.Y_val = self.standardization(1)
    self.ST_tst, self.Y_tst = self.standardization(2)
    
    # Agent congfig
    self.agent = self.agent_generation
    self.agent.predict_meas = self.y_predict


  
  def exists_folder(self, dir):
    return all([os.path.exists(dir), os.path.isdir(dir)])

  def exists_file(self, dir):
    return all([os.path.exists(dir), os.path.isfile(dir)])

  @property
  def agent_generation(self):
    # input_layer = keras.Input(shape=[2, ]) # S, T
    input_layer = keras.Input(shape=[7, ]) # S, T, x
    x = Dense(units=self.agent_properties['layers'][0], 
              activation=self.agent_properties['act_hidden'], 
              activity_regularizer=self.agent_properties['regularizer'])(input_layer)
    for N_neu in self.agent_properties['layers'][1:]:
      x = Dense(units=N_neu, 
                activation=self.agent_properties['act_hidden'],
                activity_regularizer=self.agent_properties['regularizer'])(x)
    output_layer = Dense(5, activation=self.agent_properties['act_output'])(x) # delta_x
    return CustomModel(input_layer, output_layer)




  def standardization(self, idx):
    # get measurement (S, T)
    st = self.data['x_data'][idx]
    st[:,1]+=T0
    
    # get measurement (Isc, Pmp, Imp, Vmp, Voc)
    y = self.data['y_data'][idx]
    
    return [
      self.STscaler.transform(st),
      self.Yscaler(y)
    ]



    

  def get_corrected_parameters(self, ST, training=False):
    # compute S and T in physical values
    ST_phy = self.InSTscaler(ST) # to W/m2 and K

    # compute x0 with ModelBase
    S, T = np.hsplit(ST_phy, 2) 
    b, IL, I0, Rs, Gp = self.params(S, T) # [b, IL, I0, Rs, Gp]
    x0 = np.hstack([b, IL, np.log(I0), Rs, Gp])

    # normalized
    x0 = self.Xscaler.transform(x0)
    
    # compute x_corrected (normalized)
    try:
      x_corrected = self.agent(ST, training=training) + x0
    except:
      x_corrected = self.agent(tf.concat([x0, ST], axis=1), training=training) + x0 

    # # compute x_corrected (physical)
    x_corrected_phy = self.InXscaler(x_corrected)            
    
    # non-negativity constraints
    b, IL, logI0, Rs, Gp = tf.split(x_corrected_phy, axis=1, num_or_size_splits=5)
    b, IL, Rs, Gp = [tf.abs(k) for k in [b, IL, Rs, Gp]]
    I0 = tf.exp(logI0)
    
    return [b, IL, I0, Rs, Gp]
  
  def update_params(self, ST, training=False):
    # update function of class 'Model7PFF'
    # update params in ModelBase
    self.b, self.IL, self.I0, self.Rs, self.Gp = self.get_corrected_parameters(ST, training=training)    

  def predict(self, ST, MaxIterations=10000, tol=1e-9, alpha=0.15, beta=0.85, VmpIni=None, training=False): 
    # update function of class 'Model7PFF'
    self.update_params(ST, training=training)
    Isc = self.fun_Ipv(0)
    Vsc = self.fun_Vpv(Isc)
    Voc = self.fun_Vpv(0)
    if tf.reduce_any(tf.math.is_nan(Voc)):
      Voc = self.funVoc(0)
    Ioc = self.fun_Ipv(Voc)   
    fun_Vmp = lambda Vmp: self.fun_Ipv(Vmp)-self.fun_foc(Vmp)
    IniIterations = 0
    if VmpIni==None: 
      Vmp0, Vmp1 = Voc*alpha, Voc*beta
    else: 
      Vmp0, Vmp1 = VmpIni
    error = tf.math.abs(Vmp1-Vmp0)
    while tf.reduce_all(tf.math.less_equal(np.float64(tol), error)):
      IniIterations+=1
      Vmp11 = Vmp0-fun_Vmp(Vmp0)*(Vmp1-Vmp0)/(fun_Vmp(Vmp1)-fun_Vmp(Vmp0))
      Vmp0, Vmp1 = Vmp1, Vmp11
      error = tf.math.abs(Vmp1-Vmp0)
      if IniIterations==MaxIterations:
        break
    Vmp = Vmp1
    Imp = self.fun_Ipv(Vmp)
    Pmp = Vmp*Imp
    return [Isc, Vsc, Imp, Vmp, Pmp, Ioc, Voc]
  
  def y_predict(self, ST, training=False):
    [Isc, Vsc, Imp, Vmp, Pmp, Ioc, Voc] = self.predict(ST, training=training)
    ypred = tf.concat([Isc, Pmp, Imp, Vmp, Voc], axis=1)
    return self.Yscaler(ypred)







  def fit(self, batch_size:int=32, epochs:int=10, patience:int=5, test=False):
    """
      - batch_size: subsample size
      - epochs:     training epoch
      - patience:   epoch for shutdown criteria
    """

    assert isinstance(batch_size, int)
    assert isinstance(epochs, int)
    assert isinstance(patience, int)

    MSE = tf.keras.losses.MeanSquaredError(reduction=tf.keras.losses.Reduction.NONE)

    def metrics(y_true, y_pred, var):
      return tf.reduce_mean(tf.math.square(y_true-y_pred), axis=0)[var]
      

    def Isc(y_true, y_pred):
      return metrics(y_true, y_pred, var=0)
    
    def Pmp(y_true, y_pred):
      return metrics(y_true, y_pred, var=1)
    
    def Imp(y_true, y_pred):
      return metrics(y_true, y_pred, var=2)

    def Vmp(y_true, y_pred):
      return metrics(y_true, y_pred, var=3)
    
    def Voc(y_true, y_pred):
      return metrics(y_true, y_pred, var=4)
    
    def custom_loss(y_true, y_pred):
      loss = tf.reduce_mean(tf.math.square(y_true-y_pred))
      return loss
    
    self.agent.compile(loss = custom_loss, 
        metrics=[Isc, Pmp, Imp, Vmp, Voc],
        optimizer=keras.optimizers.Adam(learning_rate=1e-6), 
        run_eagerly=True)
    
    # return self.agent.fit( self.ST_val[:100], self.Y_val[:100], 
    #                           batch_size=batch_size, 
    #                           epochs=epochs,             
    #                           callbacks=[History(), TerminateOnNaN()]
    #                               )

    history = self.agent.fit( 
                        self.ST_tr, self.Y_tr, 
                        validation_data=(self.ST_val, self.Y_val),
                        batch_size=batch_size, 
                        epochs=epochs,             
                        callbacks=[
                                  History(), 
                                  TerminateOnNaN(),
                                  EarlyStopping(
                                        patience=patience, 
                                        monitor="val_loss", 
                                        restore_best_weights=True),
                                  # CSVLogger(
                                  #       os.path.join(self.model_seed, 'history.csv'), 
                                  #       separator=",", 
                                  #       append=False),
                                  # ModelCheckpoint(
                                  #       filepath=os.path.join(self.model_seed, 'Checkpoint'), 
                                  #       save_weights_only=True, 
                                  #       monitor='val_loss', 
                                  #       mode='min', 
                                  #       save_best_only=False),
                                        ]
                                  
                            )
    return history

  def random_search(self):
    print('a')



  












config = {
  'PVmodule':'xSi12922',
  'ds_ratio':[7, 2, 1], 
  'agent': {
    'layers': [100, 70, 40, 10], 
    'act_hidden': 'relu', 
    'act_output': 'linear',
    'regularizer': l1(0.15),
  },

}


dnn = ModelDNN(config, project_path, seed=0)

dnn.PVmodule

Read xSi12922


'xSi12922'

In [None]:

batch_size = 32
epochs = 20
patience = 5 
a = "/home/miguel/A-Neural-Network-Aided-Functional-Model-of-PVArrays-for-a-Wide-Range-of-Atmospheric-Conditions/results"

for k in os.listdir(a):
  print('\n'+k+'\n')
  try:
    dnn.agent.load_weights(os.path.join(a, k))
    history = dnn.fit(batch_size=batch_size, epochs=epochs, patience=patience, test=True)
  except:
    pass