# Loading Necesary Libraries

In [53]:
import os, time, math, csv, joblib
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.mixture import GaussianMixture
from tqdm import tqdm
from keras.models import model_from_json
from numpy.ma.core import resize

pd.options.display.float_format = '{:.2f}'.format

# Utils

In [54]:
def L2dist(a,b):
    return math.sqrt(math.pow(a[0]-b[0],2)+math.pow(a[1]-b[1],2))

## Features Extractor

In [55]:
def features_extractor(filename):
  F=[]
  ll = 0
  s_id=0
  source=ll*80
  #print("Source: ", source)
  df = pd.read_csv(filename)
  for hh in tqdm(range(ll+1,26)):
      target=hh*80
      #print("\tTarget: ", target)
      df_sel=df[df["Target_Distance"]==target]

      for i in range(1,ns+1):
          real = df_sel['Real'].tolist()
          imag = df_sel['Imag'].tolist()
          data_aux=list(zip(real,imag))
          gmm = GaussianMixture(n_components=16, random_state=0, init_params='kmeans', means_init=centers).fit(data_aux)
          mus=gmm.means_
          sigmas=gmm.covariances_
          features=[source,target]
          for jj in selCP:
              j=centers[jj-1]
              mindist=None
              k_inc=None
              for k in range(16):
                  d=L2dist(mus[k],j)
                  if mindist is None or mindist>d:
                      mindist=d
                      k_inc=k
              covmat=np.concatenate(list(sigmas[k_inc])).ravel().tolist()
              features = [*features, *mus[k_inc], *covmat]
          F.append(features)
  return F  

## Testing DNN

In [56]:
def testing_DNN(F, DNN_output_filepath):
  header=['source','target']
  for j in selCP:
      header=[*header,*['mu_r_'+str(j),'mu_i_'+str(j),'sigma_rr_'+str(j),'sigma_ri_'+str(j),'sigma_ir_'+str(j),'sigma_ii_'+str(j)]]
      
  DF_testing=pd.DataFrame(F)
  DF_testing.columns=header
  DF_testing.to_csv(DNN_output_filepath, index=False)

  #df=pd.read_csv(output_data_path+'/testing_data_DNN.csv')

  colnames=DF_testing.columns[2:26]
  # Target variable and train set
  X=DF_testing.iloc[:,2:26]
  #print(X.shape)
  #print(X.columns)
  Y=DF_testing.iloc[:,1].to_numpy().reshape(-1,1)
  Z=DF_testing.iloc[:,0].to_numpy().reshape(-1,1)

  df=DF_testing
  colnames=df.columns[1:25]
  X_test = sc_input.transform(X)
  Y_test = sc_output.transform(Y)

  Z=[i[0] for i in Z]

  return X_test, Y_test, Z

## Error Calc

In [73]:
def error_calc(X_test, Y_test, Z, plot_output_path, radio, function, plt_show= False):
  start_time = time.time()
  Y_test_pred=model_ann.predict(X_test)
  #Y_test_pred=model_ann.predict(X_test)
  time_eval_ann=time.time()-start_time

  real=list(list(zip(*Y_test))[0])

  #real=list(map(list, zip(*Y_train)))
  pred=list(Y_test_pred)

  dist_min=sc_output.data_min_[0]
  dist_max=sc_output.data_max_[0]

  real_abs=np.add(dist_min,np.multiply((dist_max-dist_min),real))
  real_abs=[int(np.round(i)) for i in real_abs]
  #print(real_abs)
  pred_abs=np.add(dist_min,np.multiply((dist_max-dist_min),pred))
  #pred_abs=sum(pred_abs, [])
  pred_abs=[i[0] for i in pred_abs]

  error=np.divide(np.abs(np.subtract(np.array(real_abs),np.array(pred_abs))),np.array(real_abs))

  res=pd.DataFrame({"source":Z, "target":real_abs,"pred":pred_abs,"error":error})

  fig = plt.figure(figsize=(10,5))
  plt.plot(real_abs,error,'bo')

  plt.xlabel('Distances') 
  plt.ylabel('Error') 
  plt.title('Dispertion Ratio {} with {} distribution'.format(str(radio), str(function))) 

  fig.savefig(plot_output_path, bbox_inches='tight')

  if plt_show == True:
    plt.show()

  return res

## Results

In [58]:
def results(res, res_output_filepath):
  F=pd.read_csv(model_path+"/reference.csv")
  k=4

  one_classifier={}
  for i in range(25):
      row=F.iloc[i,]
      one_classifier[row["dist"]]=[row["mean"]-k*row["std"],row["mean"]+k*row["std"]]

  aux=[]
  for i in range(res.shape[0]):
      target=res.iloc[i,]["target"]
      pred=res.iloc[i,]["pred"]
      if pred>=one_classifier[target][0] and pred<=one_classifier[target][1]:
          aux.append(0)
      else:
          aux.append(1)
  res["detected"]=aux

  res.to_csv(res_output_filepath,index=False)

# Main

In [74]:
def main(radius, functions, plot_show=False):
  pref="consts_modified_source_distance_"
  for f in functions:
    for rd in radius:
      print('---------------')
      print('Evaluating {} function with a dispersion radius of {}.'.format(f, rd))
      suf="_radius_{}_funct_{}.csv".format(str(rd), str(f))
      # Feature Extraction
      filename=data_path+"/"+pref+str(0)+suf
      F = features_extractor(filename)
      # Testing DNN
      DNN_output_filepath = output_data_path+'/testing_data_DNN_radius_{}_funct_{}.csv'.format(str(rd), str(f))
      X_test, Y_test, Z = testing_DNN(F, DNN_output_filepath)
      # Error Calc
      plot_output_path = output_data_path+'/plots/testing_data_DNN_radius_{}_funct_{}.png'.format(str(rd), str(f))
      res = error_calc(X_test, Y_test, Z, plot_output_path, radio=rd, function=f, plt_show = plot_show)
      # Results
      res_output_filepath = output_data_path+"/results_radius_{}_funct_{}.csv".format(str(rd), str(f))
      results(res, res_output_filepath)

# Run

## Load Enviroment

In [60]:
# BLOQUE PARA USAR DESDE COLAB

# Google drive loading as work station for local-usage of the files.
from google.colab import drive
drive.mount('/content/gdrive',force_remount= True)

#-----------------------------------------------------------------------------

Mounted at /content/gdrive


In [61]:
# Lista para cambiar los paths rapido.
workers = ["Ronald", "Local"]

# Change the number to change the paths.
worker = workers[0]

if worker == "Ronald":
  #path= "/content/gdrive/MyDrive/Thesis_Workstation/ANN_dataset"
  #data_path= "/content/gdrive/MyDrive/Symbol_to_Symbol/ANN_dataset/modifiedData/Z/Z_mod" # mod data of features
  data_path= "/content/gdrive/MyDrive/Thesis_Workstation/ANN_dataset/modifiedData/synthetic_data/Z/Z_mod" # mod data of features

  output_data_path = "/content/gdrive/MyDrive/Thesis_Workstation/ANN_dataset/modifiedData/synthetic_data/Z/input_data_model" # output path for testing_data_DNN.csv and results.csv

  model_path = "/content/gdrive/MyDrive/Thesis_Workstation/ANN_dataset/trainedModel" # filepath with trained model
else: path = os.getcwd()

## Global Variables

In [62]:
#data_path=os.getcwd()+"/ANN_dataset/modifiedData/GD/GD_mod"
#output_data_path=os.getcwd()+"/ANN_dataset/modifiedData/GD/input_data_model"
#model_path=os.getcwd()+"/models"

#distances=list(range(1,25))
#print(distances)

distances = [0]
ns=25 # number of samples
centers=[[-3,3],[-1,3],[1,3],[3,3],[-3,1],[-1,1],[1,1],[3,1],[-3,-1],[-1,-1],[1,-1],[3,-1],[-3,-3],[-1,-3],[1,-3],[3,-3]]
selCP=[1,7,10,15]

## NN Model

In [63]:
model_ann = model_from_json(open(model_path+'/model_ann_architecture.json').read())
model_ann.load_weights(model_path+'/model_ann_weights.h5')
# dont forget to compile your model
model_ann.compile(loss='binary_crossentropy', optimizer='adam')
sc_input=joblib.load(model_path+'/scaler_input.joblib')
sc_output=joblib.load(model_path+'/scaler_output.joblib')

## Launch

In [None]:
fun = ['uniform', 'non_uniform', 'gaussian']
rad = [0.00000001, 0.01, 0.02, 0.03]

main(radius=rad, functions=fun)

---------------
Evaluating uniform function with a dispersion radius of 1e-08.


100%|██████████| 25/25 [00:35<00:00,  1.42s/it]


---------------
Evaluating uniform function with a dispersion radius of 0.01.


100%|██████████| 25/25 [00:51<00:00,  2.07s/it]


---------------
Evaluating uniform function with a dispersion radius of 0.02.


 28%|██▊       | 7/25 [00:09<00:29,  1.64s/it]