In [None]:
import numpy as np
from sklearn.metrics import roc_curve
from sklearn.neighbors import KNeighborsClassifier
import random
import matplotlib.patheffects as PathEffects

from keras.layers import Input, Conv2D, Lambda, Dense, Flatten,MaxPooling2D,Dropout, concatenate
from keras.models import Model, Sequential
from keras.regularizers import l2
from keras import backend as K
from tensorflow.keras.optimizers import SGD
from keras.losses import binary_crossentropy
import os
import pickle
import matplotlib.pyplot as plt
from tensorflow.keras.optimizers import Adam

from itertools import permutations

import seaborn as sns

from keras.datasets import mnist
from sklearn.manifold import TSNE

from sklearn.svm import SVC


In [None]:
import torch
from sklearn.model_selection import train_test_split
import pandas as pd

In [None]:
series_data = pd.Series(np.arange(0, 2500, dtype=np.float64))
for i in range(len(series_data)):
  if i < 500:
    series_data[i] = 0
  elif i < 1000:
    series_data[i] = 1
  elif i < 1500:
    series_data[i] = 2
  elif i < 2000:
    series_data[i] = 3
  else:
    series_data[i] = 4
series_data

In [None]:
save_path = 'Data/2500_Smell.pt'
loaded_ast_embeddings = torch.load(save_path)
loaded_ast_embeddings = loaded_ast_embeddings.numpy()
loaded_ast_embeddings.shape

In [None]:
save_path = 'Data/graphcodebert_hidden_state_2500_smells.npy'
loaded_ast_embeddings = np.load(save_path)
loaded_ast_embeddings.shape

In [None]:
loaded_ast_embeddings = loaded_ast_embeddings.reshape(-1, 320*768)
loaded_ast_embeddings.shape

In [None]:
# data = pd.DataFrame({
#     'fn_smell_embeddings.pt': loaded_ast_embeddings.tolist(),
#     'smellKey_encoded': series_data
# })

X = loaded_ast_embeddings
y = np.array(series_data)

In [None]:
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

In [None]:
print(x_train.shape)
print(x_test.shape)
print(y_train.shape)
print(y_test.shape)

In [None]:
x_test, final_x_test, y_test, final_y_test = train_test_split(x_test,y_test,test_size=0.5, random_state=42)

In [None]:
print(x_train.shape)
print(x_test.shape)
print(y_train.shape)
print(y_test.shape)
print(final_x_test.shape)
print(final_y_test.shape)

In [None]:
# Define our own plot function
def scatter(x, labels, subtitle=None):
    # We choose a color palette with seaborn.
    palette = np.array(sns.color_palette("hls", 10))

    # We create a scatter plot.
    f = plt.figure(figsize=(8, 8))
    ax = plt.subplot(aspect='equal')
    sc = ax.scatter(x[:,0], x[:,1], lw=0, s=40,
                    c=palette[labels.astype(int)])
    plt.xlim(-25, 25)
    plt.ylim(-25, 25)
    ax.axis('off')
    ax.axis('tight')

    # We add the labels for each digit.
    txts = []
    for i in range(10):
        # Position of each label.
        xtext, ytext = np.median(x[labels == i, :], axis=0)
        txt = ax.text(xtext, ytext, str(i), fontsize=24)
        txt.set_path_effects([
            PathEffects.Stroke(linewidth=5, foreground="w"),
            PathEffects.Normal()])
        txts.append(txt)
        
    if subtitle != None:
        plt.suptitle(subtitle)
        
    plt.savefig(subtitle)

In [None]:
x_train_flat = x_train
x_test_flat = x_test
print(x_train.shape)
print(x_train_flat.shape)

In [None]:
tsne = TSNE()
train_tsne_embeds = tsne.fit_transform(x_train[:512])
scatter(train_tsne_embeds, y_train[:512], "Samples from Training Data")

eval_tsne_embeds = tsne.fit_transform(x_test[:512])
scatter(eval_tsne_embeds, y_test[:512], "Samples from Validation Data")

In [None]:
Classifier_model = Sequential()
Classifier_model.add(Dense(350 , input_shape=(245760,), activation='relu'))
Classifier_model.add(Dense(50,activation='relu'))
Classifier_model.add(Dense(5, activation='softmax'))

In [None]:
from sklearn.preprocessing import LabelBinarizer

In [None]:
le = LabelBinarizer()

In [None]:
y_train_onehot = le.fit_transform(y_train)
y_test_onehot = le.transform(y_test)
final_y_test_onehot = le.transform(final_y_test)

In [None]:
y_train_onehot.shape

In [None]:
Classifier_model.compile(optimizer='adam',loss='categorical_crossentropy',metrics=['accuracy'])

In [None]:
x_train_flat.shape

In [None]:
Classifier_model.fit(x_train_flat,y_train_onehot, validation_data=(x_test_flat,y_test_onehot),epochs=100)

In [None]:
# predictions = Classifier_model.predict(final_x_test)
# predictions = np.argmax(predictions, axis=1)
Classifier_model.evaluate(final_x_test,final_y_test_onehot)

In [None]:
def generate_triplet(x,y,testsize=0.3,ap_pairs=10,an_pairs=10):
    data_xy = tuple([x,y])

    trainsize = 1-testsize

    triplet_train_pairs = []
    triplet_test_pairs = []
    for data_class in sorted(set(data_xy[1])):

        same_class_idx = np.where((data_xy[1] == data_class))[0]
        diff_class_idx = np.where(data_xy[1] != data_class)[0]
        A_P_pairs = random.sample(list(permutations(same_class_idx,2)),k=ap_pairs) #Generating Anchor-Positive pairs
        Neg_idx = random.sample(list(diff_class_idx),k=an_pairs)
        

        #train
        A_P_len = len(A_P_pairs)
        Neg_len = len(Neg_idx)
        for ap in A_P_pairs[:int(A_P_len*trainsize)]:
            Anchor = data_xy[0][ap[0]]
            Positive = data_xy[0][ap[1]]
            for n in Neg_idx:
                Negative = data_xy[0][n]
                triplet_train_pairs.append([Anchor,Positive,Negative])               
        #test
        for ap in A_P_pairs[int(A_P_len*trainsize):]:
            Anchor = data_xy[0][ap[0]]
            Positive = data_xy[0][ap[1]]
            for n in Neg_idx:
                Negative = data_xy[0][n]
                triplet_test_pairs.append([Anchor,Positive,Negative])    
                
    return np.array(triplet_train_pairs), np.array(triplet_test_pairs)

In [None]:
print(x_train.shape)
print(X.shape)
print(y_train.shape)
print(y.shape)

In [None]:
X_train, X_test = generate_triplet(X,y, ap_pairs=150, an_pairs=150,testsize=0.000000001)
X_train.shape

## Triplet NN

In [None]:
def triplet_loss(y_true, y_pred, alpha = 0.4):
    """
    Implementation of the triplet loss function
    Arguments:
    y_true -- true labels, required when you define a loss in Keras, you don't need it in this function.
    y_pred -- python list containing three objects:
            anchor -- the encodings for the anchor data
            positive -- the encodings for the positive data (similar to anchor)
            negative -- the encodings for the negative data (different from anchor)
    Returns:
    loss -- real number, value of the loss
    """
    print('y_pred.shape = ',y_pred)
    
    total_lenght = y_pred.shape.as_list()[-1]
#     print('total_lenght=',  total_lenght)
#     total_lenght =12
    
    anchor = y_pred[:,0:int(total_lenght*1/3)]
    positive = y_pred[:,int(total_lenght*1/3):int(total_lenght*2/3)]
    negative = y_pred[:,int(total_lenght*2/3):int(total_lenght*3/3)]

    # distance between the anchor and the positive
    pos_dist = K.sum(K.square(anchor-positive),axis=1)

    # distance between the anchor and the negative
    neg_dist = K.sum(K.square(anchor-negative),axis=1)

    # compute loss
    basic_loss = pos_dist-neg_dist+alpha
    loss = K.maximum(basic_loss,0.0)
 
    return loss

In [None]:
def create_base_network():
    """
    Base network to be shared.
    """
    model = Sequential()
    model.add(Dense(245760,input_shape=(245760,),activation='relu'))
    model.add(Dense(500,activation='relu'))

    model.add(Dropout(0.3))
    model.add(Dense(500,activation='relu'))

    model.add(Dropout(0.3))
    model.add(Dense(245760,name='embeddings',activation='softmax'))
    
    return model

In [None]:
adam_optim = Adam(lr=0.0001, beta_1=0.9, beta_2=0.999)

In [None]:
anchor_input = Input((245760,1), name='anchor_input')
positive_input = Input((245760,1 ), name='positive_input')
negative_input = Input((245760,1 ), name='negative_input')

# Shared embedding layer for positive and negative items
Shared_DNN = create_base_network()


encoded_anchor = Shared_DNN(anchor_input)
encoded_positive = Shared_DNN(positive_input)
encoded_negative = Shared_DNN(negative_input)


merged_vector = concatenate([encoded_anchor, encoded_positive, encoded_negative], axis=-1, name='merged_layer')

model = Model(inputs=[anchor_input,positive_input, negative_input], outputs=merged_vector)
model.compile(loss=triplet_loss, optimizer=adam_optim)

In [None]:
model.summary()

In [None]:
X_train.shape

In [None]:
Anchor = X_train[:,0,:].reshape(-1,245760,1)
Positive = X_train[:,1,:].reshape(-1,245760,1)
Negative = X_train[:,2,:].reshape(-1,245760,1)
Anchor_test = X_test[:,0,:].reshape(-1,245760,1)
Positive_test = X_test[:,1,:].reshape(-1,245760,1)
Negative_test = X_test[:,2,:].reshape(-1,245760,1)

Y_dummy = np.empty(( Anchor.shape[0],300))
Y_dummy2 = np.empty((Anchor_test.shape[0],1))

model.fit([Anchor,Positive,Negative],y=Y_dummy,validation_data=([Anchor_test,Positive_test,Negative_test],Y_dummy2), batch_size=512, epochs=100)

In [None]:
predict_ready_data = loaded_ast_embeddings.reshape(-1,245760,1)
predict_ready_data.shape

In [None]:
predictedModel = model.predict([predict_ready_data, predict_ready_data, predict_ready_data])

In [None]:
predictedModel.shape

In [None]:
outputAnchor = predictedModel[:, :245760]


In [None]:
y.size

In [None]:
x_train, x_test, y_train, y_test = train_test_split(outputAnchor, y, test_size=0.3, random_state=42)

In [None]:
x_test, final_x_test, y_test, final_y_test = train_test_split(x_test,y_test,test_size=0.5, random_state=42)

In [None]:
final_y_test_onehot = le.transform(final_y_test)

In [None]:
last_model = Sequential()
last_model.add(Dense(350 , input_shape=(245760,), activation='relu'))
last_model.add(Dense(50,activation='relu'))
last_model.add(Dense(5, activation='softmax'))
last_model.compile(optimizer='adam',loss='categorical_crossentropy',metrics=['accuracy'])

In [None]:
tsne = TSNE()
train_tsne_embeds = tsne.fit_transform(x_train[:512])
scatter(train_tsne_embeds, y_train[:512], "Samples from Training Data")

eval_tsne_embeds = tsne.fit_transform(x_test[:512])
scatter(eval_tsne_embeds, y_test[:512], "Samples from Validation Data")

In [None]:
y_train_onehot

In [None]:
last_model.fit(x_train,y_train_onehot, validation_data=(x_test,y_test_onehot),epochs=100)

In [None]:
last_model.evaluate(final_x_test,final_y_test_onehot)