# Parameters

In [1]:
subset_multiplier = 10   # increase the size of the subset by taking random composition of CCD
epoch = 60                # number of epochs for each AI
n = 25                    # number of loops for the genertic algorithm
K = 3                     # number of folds (= number of independant AI per generation)

train_on = "triplets"     # "triplets" or "blocks"
function = "tan"          # "tan" or "square"
train_with = "parameters" # "parameters" or "points" TODO

train_prop = 0.7          # proportion of the dataset used for training

weights = [7.686424453317564,2.8124992994763316,0.448360122048755,4.727911837705288] # TODO

# Import libs

In [2]:
import os,sys, copy
import data_io
import utils.archive as archive
import numpy             as np
import matplotlib.pyplot as plt
import pandas            as pd
import tensorflow        as tf
from   IPython.display   import display
from   tensorflow        import keras
from   classes.block     import Block
from   classes.triplet   import Triplet
from   classes.shot      import Shot
from   classes.ccd       import CCD
from   classes.rate      import Rate

# Definition of the model

In [3]:
def get_model(x_train,y_train):
    # mse = tf.keras.losses.MeanSquaredError()
    model = keras.models.Sequential()
    model.add(keras.layers.Input((len(x_train[0]),), name="InputLayer"))
    model.add(keras.layers.Dense(64, activation='relu', name='Dense_n1'))
    model.add(keras.layers.Dense(64, activation='relu', name='Dense_n2'))
    model.add(keras.layers.Dense(64, activation='relu', name='Dense_n3'))
    model.add(keras.layers.Dense(len(y_train), name='Output'))
    model.compile(optimizer = 'adam', loss = 'mse', metrics = ['accuracy'])
    return model

# Definition of efficiency functions

In [4]:
# tan
def ft(m,a,b,c,d):
    return a/4 * (1-np.tanh((m-b)/c)) * (1-np.tanh((m-b)/d))

# square
def fs(m,a,b,c,d):
    return (a-b*(m-21)**2) / (1+np.exp((m-c)/d))

# magnitude range
m = np.linspace(21,25.5,1000)

# Loading data and creating dataset

In [5]:
# Loading data
data_io.loadAll() # comment if already loaded to spare time

# Formating data in data set usable by the AI
data, outputs = data_io.get_ai_ready(items = Triplet.all, func=function,subsets_per_block=subset_multiplier)

print(len(data), "vectors containing", len(data[0])-outputs, "inputs and", outputs, "outputs")

# Normalization
mean = data[:,:-outputs].mean()
std  = data[:,:-outputs].std()
data[:,:-outputs] = (data[:,:-outputs] - mean) / std

Loading 2013AE.json
Loading 2013AO.json
Loading 2013BL.json
Loading 2014BH.json
Loading 2015AM.json
Loading 2015AP.json
Loading 2015BC.json
Loading 2015BD.json
Loading 2015BS.json
Loading 2015BT.json
1610 vectors containing 1728 inputs and 4 outputs


# Selecting a random test item

In [12]:
if train_on == "blocks":
    test_item = iter(Block.all.values())[np.random.randint(0,len(Block.all)-1)]
if train_on == "triplets":
    tripletList = []
    for rate in Rate.all:
        if type(rate.parent) == Triplet and rate.parent.id not in tripletList:
            tripletList.append(rate.parent.id)
    test_item = Triplet.all[tripletList[np.random.randint(0,len(tripletList)-1)]]

print("Test item:",test_item.id)

Test item: 15AM+1-1


# Prediction on test item

In [7]:
def prediction(test_item, model):
    if test_item.to_ai_ready(func="tan") is not None:
        new_data,outputs = test_item.to_ai_ready(func="tan")
    else:
        new_data,outputs = test_item.to_ai_ready(func="square")

    new_x = new_data[:-outputs]
    new_y = new_data[-outputs:]
    new_x = (new_x - mean) / std

    new_x=np.array(new_x).reshape(1,len(new_x))

    predictions = model.predict(new_x)

    print(f"Prediction : {predictions[0]}")

    # Plotting the result

    plt.subplot(int(np.ceil(np.sqrt(n))),int(np.ceil(np.sqrt(n))),i+1)
    plt.plot(m, ft(m,*predictions[0]),                     label="Machine Learning")
    plt.plot(m, fs(m,new_y[0],new_y[1],new_y[2],new_y[3]), label="Excpected")
    plt.grid()

    if i == 0:                    plt.title(f"Predition for {train_on} {test_item.id}")
    if i+1>n-np.ceil(np.sqrt(n)): plt.xlabel("Magnitude")
    if i%np.ceil(np.sqrt(n))==0:  plt.ylabel("Efficiency")
    if i==0:                      plt.legend()

# Cenerating folds

In [8]:
def create_folds(K, data, train_prop, outputs):

    # Creating folds
    folds = []
    for j in range(K):
        folds.append(data[j::K])

    # Splitting data for training and test...
    Xt_list = []; Yt_list = []; Xv_list = []; Yv_list = []
    for j, fold in enumerate(folds):
        train_sets = int(len(fold)*train_prop)
        index = np.zeros(len(fold),dtype=bool)
        index[:train_sets] = True
        np.random.shuffle(index)

        data_train = fold[index]
        data_test  = fold[~index]

        Xt_list.append(data_train[:,:-outputs])
        Yt_list.append(data_train[:,-outputs:])
        Xv_list.append(data_test [:,:-outputs])
        Yv_list.append(data_test [:,-outputs:])

    # Composing training data using fold != j
    for j in range(K):
        Xt_list[j] = np.concatenate(np.array(Xt_list)[np.arange(len(Xt_list))!=j])
        Yt_list[j] = np.concatenate(np.array(Yt_list)[np.arange(len(Yt_list))!=j])

    return np.array(Xt_list), np.array(Yt_list), np.array(Xv_list), np.array(Yv_list)

# Training and validate

In [9]:
def train(Xt,Yt,Xv,Yv,epoch):
    h = models[j].fit(Xt, Yt, epochs = epoch, verbose = 0)        #, validation_data = (x_test[j], y_test[j])
    s = models[j].evaluate(Xv, Yv, verbose=0)
    return h,s

# Keeping only the best model

In [10]:
def get_best_model(generation, models, scores, lastScore):
    minScore = lastScore
    output.write(f"{generation+1}, ")
    if minScore is None: minScore = scores[0][0]
    if model is None: model = models[0]
    average = 0
    for j,s in enumerate(scores):
        output.write(f"{s[0]}, ") 
        average += s[0]/K
        if s[0] < minScore:
            minScore = s[0]
            model = models[j]
    output.write(f"{average}, {minScore}\n")
    return model, minScore

# Training neural nework

In [11]:
model = None
minScore = None
path = archive.new(name = archive.description(M = subset_multiplier, N = n, K = K, E = epoch))
output = open(f"{path}/ouput.csv","w")

output.write("Generation, ")
for j in np.arange(K): output.write(f"Score of model {j}, ")
output.write("Average, Score retained\n")

# Loop over generations (genetic algorithm)
for i in range(n):
  
  print(f"🔁 Generation {i+1}/{n}")

  np.random.shuffle(data)

  Xt_list, Yt_list, Xv_list, Yv_list = create_folds(K, data, train_prop, outputs)
  

  ################################################################################
  # Training K models independently

  models = []; history = []; scores = []
  for j in range(K):

    print(f"🏃‍♀️ Training model  with fold {j} as test...", end="\r")

    # Sub sets for this fold

    Xt, Yt, Xv, Yv = Xt_list[j], Yt_list[j], Xv_list[j], Yv_list[j]

    # Getting new model if it's the first generation, and the old one if not

    if len(models) < j+1: models.append(get_model(Xt,Yt))
    else: models.append(model)

    # Training models

    res, score = train(Xt, Yt, Xv, Yv, epoch)
    history.append(res)
    scores.append(score)

  # Keeping the best one

  model, lastScore = get_best_model(i,models, scores, lastScore)

  # Making new prediction

  print("🔮 Prediction...")
  prediction()

################################################################################
# Saving results

output.close()
plt.savefig(f"{path}/tno_efficiency_rate.png")
model.save(f"{path}/model.ckpt")
plt.show()

🔁 Generation 1/25


TypeError: only integer scalar arrays can be converted to a scalar index