In [1]:
import numpy as np
import tensorflow as tf
print(tf.config.list_logical_devices())
import tensorflow.keras.backend as K
from tensorflow.keras.models import Sequential

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, balanced_accuracy_score, f1_score, accuracy_score,rand_score
from sklearn.cluster import MeanShift, KMeans
from sklearn import cluster
from sklearn.mixture import GaussianMixture

import matplotlib.pyplot as plt
import tqdm
import numpy as np

from scipy.ndimage import gaussian_filter
from scipy.stats import norm
import numpy as np

[LogicalDevice(name='/device:CPU:0', device_type='CPU'), LogicalDevice(name='/device:GPU:0', device_type='GPU')]


In [2]:
#Load data

x_train=np.load("data\\numpy\\x_train.npy")
y_train=np.load("data\\numpy\\y_train.npy")
p_train=np.load("data\\numpy\\p_train.npy",allow_pickle=True)

x_val=np.load("data\\numpy\\x_val.npy")
y_val=np.load("data\\numpy\\y_val.npy")
p_val=np.load("data\\numpy\\p_val.npy",allow_pickle=True)

x_test=np.load("data\\numpy\\x_test.npy")
y_test=np.load("data\\numpy\\y_test.npy")
p_test=np.load("data\\numpy\\p_test.npy",allow_pickle=True)

In [41]:
#Model
def build_encoder(input_dim, latent_dim):
    inputs = tf.keras.layers.Input(shape=(input_dim,), name='input')
    x = tf.keras.layers.Dense(64, activation='relu')(inputs)
    x = tf.keras.layers.Dense(32, activation='relu')(x)
    z = tf.keras.layers.Dense(latent_dim, activation='linear', name='z')(x)
    return tf.keras.Model(inputs, z, name='encoder')

def build_classifier(latent_dim, num_emotions):
    latent_inputs = tf.keras.layers.Input(shape=(latent_dim,), name='latent_input')
    x = tf.keras.layers.Dense(8, activation='relu')(latent_inputs)
    outputs = tf.keras.layers.Dense(num_emotions, activation='softmax', name='output')(x)
    return tf.keras.Model(latent_inputs, outputs, name='classifier')


def build_model(input_dim, latent_dim, num_emotions):
    inputs = tf.keras.layers.Input(shape=(input_dim,), name='input')
    encoder = build_encoder(input_dim, latent_dim)
    z = encoder(inputs)
    classifier = build_classifier(latent_dim, num_emotions)
    classifier_output = classifier(z)
    return tf.keras.Model(inputs, [z, classifier_output], name='Model')

In [40]:
def initialize_kmeans_plus_plus(X, n_clusters):
    centroids = [X[np.random.randint(len(X))]]
    while len(centroids) < n_clusters:
        distances = np.array([min([np.linalg.norm(x - c) for c in centroids]) for x in X])
        probabilities = distances / distances.sum()
        cumprobs = probabilities.cumsum()
        r = np.random.rand()
        for j, p in enumerate(cumprobs):
            if r < p:
                i = j
                break
        centroids.append(X[i])
    return np.array(centroids)

In [39]:
def kmeans(X, n_clusters,fixed_centroids, max_iter=300, tol=0.0001):
    X=np.array(X)
    centroids = initialize_kmeans_plus_plus(X, n_clusters)
    if len(fixed_centroids)>0:
      centroids = np.vstack((fixed_centroids, centroids))

    n_clusters=len(centroids)

    for _ in range(max_iter):
        labels = np.argmin(np.linalg.norm(X[:, np.newaxis] - centroids, axis=2), axis=1)
        np.unique(labels)
        np.seterr(all='ignore')
        new_centroids = np.array([X[labels == k].mean(axis=0) for k in range(n_clusters)])
        # Handle possible empty clusters
        for idx in range(n_clusters):
            if np.isnan(new_centroids[idx]).any():
                new_centroids[idx] = centroids[idx]
        
        if len(fixed_centroids)>0:
          new_centroids[:len(fixed_centroids)] = fixed_centroids

        if np.linalg.norm(centroids - new_centroids) < tol:
                break

        centroids = new_centroids
    return centroids, labels

In [38]:
def selecting_data_kmeans_fixed(x_test_train,y_test_train,model_aux, fixed_c,mod="z"):
   if mod=="z":
      z,res=model_aux(x_test_train, training=False)

      if len(fixed_c)>0:
         z_fixed,res_fixed=model_aux(fixed_c, training=False)
      else:
         z_fixed=[]

      cs,res_kmeans=kmeans(z,5,z_fixed)

   best_x=[]
   best_y=[]
   for i in range(len(cs)):
      arg=np.argmin(np.sum(np.square(z-cs[i]),axis=1))
      best_x.append(x_test_train[arg])
      best_y.append(y_test_train[arg])
   return np.array(best_x),np.array(best_y)

In [37]:
def selecting_data_kmeans_fixed_1(x_test_train,y_test_train,model_aux,model_aux_prev, fixed_c,mod="z"):
   if mod=="z":
      z,res=model_aux(x_test_train, training=False)
      z_prev,_=model_aux_prev(x_test_train, training=False)
      z=np.concatenate([z,z_prev-z,res],axis=1)

      max=np.max(z,axis=0)
      min=np.min(z,axis=0)

      z= (z-min)/(max-min)

      if len(fixed_c)>0:
         z_fixed,res_fixed=model_aux(fixed_c, training=False)
         z_fixed_prev,_=model_aux_prev(fixed_c, training=False)
         z_fixed=np.concatenate([z_fixed,z_fixed_prev-z_fixed,res_fixed],axis=1)

         z_fixed= (z_fixed-min)/(max-min)


      else:
         z_fixed=[]

      cs,res_kmeans=kmeans(z,5,z_fixed)

   best_x=[]
   best_y=[]
   for i in range(len(cs)):
        arg=np.argmin(np.sum(np.square(z-cs[i]),axis=1))
        best_x.append(x_test_train[arg])
        best_y.append(y_test_train[arg])

   return np.array(best_x),np.array(best_y)

In [36]:
#copy the original model
def main():
  optimizer=tf.keras.optimizers.Adam()

  def classification_loss(classifier_outputs, labels, z_mean,unique_labels, lambda_var=0.1, lambda_inter=0.1):
      classification_loss = tf.reduce_mean(tf.keras.losses.categorical_crossentropy(labels, classifier_outputs), axis=-1)
      total_loss = classification_loss
      labels = tf.argmax(labels, axis=-1)

      # Calcular la penalización para la varianza intra-grupo
      intra_group_penalty =  tf.constant(0,dtype="float32")

      n_unique_labels = len(unique_labels)

      for i in range(n_unique_labels):
          if len(z_mean[labels == unique_labels[i]]) > 1:
              intra_group_penalty += tf.reduce_sum(tf.square(tf.math.reduce_std(z_mean[labels == unique_labels[i]], axis=0)))

      # Calcular la penalización para maximizar las distancias entre los centroides de los grupos
      inter_group_penalty = tf.constant(0,dtype="float32")
      if n_unique_labels > 1:
          for i in range(n_unique_labels):
              for j in range(i + 1, n_unique_labels):
                  inter_group_penalty += tf.reduce_sum(tf.square(tf.reduce_mean(z_mean[labels == unique_labels[i]], axis=0) - tf.reduce_mean(z_mean[labels == unique_labels[j]], axis=0)))

      # Agregar ambas penalizaciones a la pérdida total
      total_loss += lambda_var * intra_group_penalty + lambda_inter * 1/inter_group_penalty
      return total_loss

  @tf.function
  def train_step(model, inputs,labels, unique_labels):
      with tf.GradientTape() as tape:
          z, pred = model(inputs, training=True)
          loss=classification_loss(pred, labels, z, unique_labels )
      gradients = tape.gradient(loss, model.trainable_variables)
      optimizer.apply_gradients(zip(gradients,model.trainable_variables))
      return loss

  def val_step(model, inputs, labels, unique_labels):
      z, pred= model(inputs, training=False)
      loss=classification_loss(pred, labels, z, unique_labels)
      return loss

  model_aux=tf.keras.models.clone_model(model)
  pesos=model.get_weights()
  model_aux.set_weights(pesos)
  accs=[]

  x_min, x_max = -3, 3
  y_min, y_max = -3, 3

  for step in range(number_of_steps+1): #number of steps

    #create a copy of cls layers to predict the classifier borders
    new_model = Sequential([
      model_aux.get_layer("classifier").layers[0],  # Dense layer 1
      model_aux.get_layer("classifier").layers[1],  # Dense layer 2
      model_aux.get_layer("classifier").layers[2]   # Output layer
    ])
    new_model.set_weights(model_aux.get_layer("classifier").get_weights())

    #Plot contour Test
    if cls_borders+test_contour+train_contour>0:
      plt.figure(figsize=(7,7))
    if test_contour==True:
      z_test,_=model_aux(x_test_train)
      for i in range(5):
          # Filtrar datos por clase
          z_class = z_test[np.argmax(y_test_train, axis=1) == i]

          # Calcular el histograma bidimensional
          hist, xedges, yedges = np.histogram2d(z_class[:,0], z_class[:,1], bins=100, range=[[x_min, x_max], [y_min, y_max]])

          # Normalizar el histograma
          hist_normalized = hist.T

          # Aplicar suavizado gaussiano
          hist_smooth = gaussian_filter(hist_normalized, sigma=3)

          # Ajustar distribución normal al histograma
          hist_flat = hist_smooth.flatten()
          mu, std = norm.fit(hist_flat)

          # Calcular los percentiles correspondientes al 90% y 95%
          percentile_70 = norm.ppf(0.70, loc=mu, scale=std)
          percentile_80 = norm.ppf(0.80, loc=mu, scale=std)
          percentile_90 = norm.ppf(0.90, loc=mu, scale=std)

          upper_90_value = norm.sf(percentile_90, loc=mu, scale=std)
          upper_80_value = norm.sf(percentile_80, loc=mu, scale=std)
          upper_70_value = norm.sf(percentile_70, loc=mu, scale=std)

          # Definir los niveles de confianza para las curvas de nivel
          contour_levels = [upper_90_value]

          # Crear malla para las curvas de nivel
          xx = np.linspace(x_min, x_max, hist.shape[0])
          yy = np.linspace(y_min, y_max, hist.shape[1])
          XX, YY = np.meshgrid(xx, yy)

          # Tramar las curvas de nivel
          plt.contour(XX, YY, hist_smooth, colors=colors[i], levels=contour_levels, linewidths=1)

    #Plot contour Train
    if train_contour==True:
      z_train,_=model_aux(x_train)

      for i in range(5):
          # Filtrar datos por clase
          z_class = z_train[np.argmax(y_train, axis=1) == i]

          # Calcular el histograma bidimensional
          hist, xedges, yedges = np.histogram2d(z_class[:,0], z_class[:,1], bins=100, range=[[x_min, x_max], [y_min, y_max]])

          # Aplicar suavizado gaussiano
          hist_smooth = gaussian_filter(hist.T, sigma=3)

          # Ajustar distribución normal al histograma
          hist_flat = hist_smooth.flatten()
          mu, std = norm.fit(hist_flat)

          # Calcular los percentiles correspondientes al 90%
          percentile_90 = norm.ppf(0.90, loc=mu, scale=std)
          upper_90_value = norm.sf(percentile_90, loc=mu, scale=std)

          # Definir los niveles de confianza para las curvas de nivel
          contour_levels = [upper_90_value]

          # Crear malla para las curvas de nivel
          xx = np.linspace(x_min, x_max, hist.shape[0])
          yy = np.linspace(y_min, y_max, hist.shape[1])
          XX, YY = np.meshgrid(xx, yy)

          # Tramar las curvas de nivel
          plt.contour(XX, YY, hist_smooth, colors=colors[i], levels=contour_levels, linewidths=2, linestyles='dashed')

    #selecting samples

    #kmeans_fixed

    if selection_method=="kmeans_fixed":
      if step==0:
        samples_x,samples_y=selecting_data_kmeans_fixed(x_test_train,y_test_train,model_aux,[], mod="z")
      else:
        samples_x,samples_y=selecting_data_kmeans_fixed(x_test_train,y_test_train,model_aux,best_x, mod="z")
    
    if selection_method=="random":
      if step==0:
        selected=np.random.randint(0, len(x_test_train),5)
        selected=selected.reshape(-1,1)
      else:
        selected=np.vstack([selected,np.random.randint(0, len(x_test_train),5).reshape(-1,1)])
      
      selected=selected.reshape(-1,1)
      #print(selected)
      samples_x,samples_y=x_test_train[selected.reshape(-1)],y_test_train[selected.reshape(-1)]
    
    if selection_method=="kmeans_fixed_mov":
        if step==0:
          samples_x,samples_y=selecting_data_kmeans_fixed(x_test_train,y_test_train,model_aux,[], mod="z")
        else:
          samples_x,samples_y=selecting_data_kmeans_fixed_1(x_test_train,y_test_train,model_aux,model_prev,best_x, mod="z")


    z_selected,_=model_aux(samples_x)
    best_x=samples_x
    best_y=samples_y
    centroids,_ = model_aux(best_x)

    #condition to plot only 5 repetitions
    if step==number_of_steps:
      z_selected=z_selected[:-number_of_steps]

    #map the borders
    if cls_borders==True:
      def predict_cluster(point, centroids):
          distances = np.linalg.norm(point-centroids, axis=-1)
          return np.argmin(distances,axis=0)

      xx= np.arange(x_min, x_max, 0.1)
      Z = np.zeros((60,60))
      Z1= np.zeros((60,60))
      for i in range(len(xx)):
            for j in range(len(xx)):
                Z[j, i] = predict_cluster([xx[i], xx[j]], centroids)
                aux=tf.convert_to_tensor(np.array([[xx[i], xx[j]]]),dtype=tf.float32)
                Z1[j, i] = np.argmax(new_model(aux)[0])

      for u in range(5):
          plt.contour(xx, xx, Z1==u,alpha=0.25,linewidths=2,linestyles="solid", colors="grey")

      for iz,z in enumerate(z_selected.numpy()):
        plt.scatter(z[0],z[1], c=colors[np.argmax(samples_y,axis=1)][iz], alpha=0.3, marker="o",s=275)
        plt.text(z[0],z[1],str((np.ceil(iz/5+0.2)-1).astype(int)), color=colors[np.argmax(samples_y,axis=1)][iz],fontsize=18,horizontalalignment='center',verticalalignment='center', alpha=1)


    model_prev=tf.keras.models.clone_model(model)
    pesos=model_aux.get_weights()
    model_prev.set_weights(pesos)

    #retrain model
    for epoch in range(200):
        random=np.random.choice(np.arange(0,len(x_train)),len(best_x)*2)
        x= np.concatenate([best_x, x_train[random]])
        y= np.concatenate([best_y, y_train[random]])
        train_step(model_aux,x,y, np.unique(np.argmax(y,axis=1)))

    #evaluation
    z,res=model_aux(x_val)
    acc_val=balanced_accuracy_score(np.argmax(y_val,axis=1),np.argmax(res,axis=1))

    z,res=model_aux(x_test_test)
    acc_test=balanced_accuracy_score(np.argmax(y_test_test,axis=1),np.argmax(res,axis=1))



    if cls_borders+test_contour+train_contour>0:
      plt.xlabel("z1")
      plt.ylabel("z2")
      plt.xlim(-2.5,1.5)
      plt.ylim(-2.5,3)
      plt.grid()
      plt.show()
    accs.append((acc_val,acc_test))
  return accs

In [42]:
model=build_model(1024,4,5)
model.load_weights("weights\\4D\\mlp_4d_3")

person="0001"
x_test_person=x_test[p_test==person]
y_test_person=y_test[p_test==person]
x_test_train,x_test_test, y_test_train,y_test_test=train_test_split(x_test_person, y_test_person, test_size=0.3, random_state=123)


cls_borders=False
test_contour=False
train_contour=False
colors=np.array(['orange','black','green','blue','purple'])
selection_method="kmeans_fixed" #random, kmeans_fixed,kmeans_fixed_mov
number_of_steps=5
rep=2

accs=[]
for r in tqdm.tqdm(range(rep)):
    accs.append(main())
print(accs)

  new_centroids = np.array([X[labels == k].mean(axis=0) for k in range(n_clusters)])
  new_centroids = np.array([X[labels == k].mean(axis=0) for k in range(n_clusters)])
100%|██████████| 2/2 [01:28<00:00, 44.12s/it]

[[(0.8393221573830443, 0.5661843942391054), (0.8478475969405123, 0.580748781146561), (0.8564137004433828, 0.6169834010656001), (0.859086653529993, 0.6293263755758801), (0.8677432491144254, 0.6447663441795859), (0.8588021719975858, 0.6863757157242026)], [(0.8373328869339145, 0.5582301521511248), (0.8631121932138044, 0.6070810523631985), (0.8652420996891381, 0.6190210092390613), (0.8676194022816676, 0.6506621769050737), (0.864923731142319, 0.6534162723854808), (0.8665237311423191, 0.6681981697826814)]]



