In [None]:
!pip install triton eif==1.0.2

Importing data from the GitHub repository

In [None]:
import os
# Repository directory name
repo_dir = '/content/A-Hybrid-Learning-Approach/'
# Check if the directory exists
if os.path.isdir(repo_dir) == False:
  # Clone the GitHub repository into the current Colab directory
  !git clone https://github.com/Maxime1969/A-Hybrid-Learning-Approach.git

CodeT5 vectorization scripts

In [None]:
import numpy as np
import pandas as pd
import torch as pt
import gc
import chardet
from torch.nn.utils.rnn import pad_sequence
class ClsEmbedding():
   """
    Creates an ClsEmbedding object. This object transforms all the code snippets into a dataframe.

    Attributes
    ----------
    data : dataframe
        dataframe is obtained from the csv file containing the file names. the path of the csv file is indicated.
    datafile: string
       the path to the folder containing the files.
    tokenizer: tokenizer
       it is obtained from RobertaTokenizer.from_pretrained(model_name)
    model: T5ForConditionalGeneration.from_pretrained(model_name)
       pre-trained model
    batch: int
        the number of samples per batch.
    Methods
    -------
    get_embedding()
        transforms all the code snippets into a dataframe
    get_matrix()
        obtain the matrix of embeddings
    """
   def __init__(self, data, datafile, tokenizer, model, batch):

     self.data = data
     self.datafile = datafile

     self.tokenizer = tokenizer
     self.model = model
     self.stepbatch = 0
     self.batch = batch
     self.matrix_data= pd.DataFrame(columns=['Matrix'])

   #Embedding
   def get_embedding(self):
     X_data = self.data.iloc[:, 1].tolist()
     y_data =self.data.iloc[:, 5].tolist()

     if len(X_data)%self.batch == 0:
       self.stepbatch = int((len(X_data)/self.batch))
     else:
       self.stepbatch = int((len(X_data)//self.batch)) + 1

     for i in range(int(self.stepbatch)):
       batch_samples =[]
       generator_embeddings = []
       labels_retained=[]
       if len(X_data) - i*(self.batch + 1) <= self.batch:
          batch_samples = X_data[i*(self.batch + 1): len(X_data)]
          batch_label= y_data[i*(self.batch + 1): len(y_data)]

       else:
        batch_samples = X_data[i*(self.batch + 1):i*(self.batch + 1) + self.batch]
        batch_label= y_data[i*(self.batch + 1):i*(self.batch + 1) + self.batch]
       for k, batchs in enumerate(batch_samples):  # Loop with step of 'batch'
          if os.path.exists(self.datafile + "/" + batchs):
           with open(self.datafile + "/" + batchs, 'rb') as file:
            detected_encoding = chardet.detect(file.read())['encoding']
           with open(self.datafile + "/" + batchs, "r", encoding= detected_encoding) as fichier:
            contenu = fichier.readlines()
            if contenu!=[]:
              inputs = self.tokenizer.encode_plus(contenu, padding='longest', truncation=True, return_tensors='pt')
              inputs = inputs.to(device)
              outputs = self.model(inputs.input_ids, attention_mask=inputs.attention_mask, decoder_input_ids = inputs['input_ids'], output_hidden_states=True )
              embedding = outputs.encoder_last_hidden_state
              embedding = embedding.to(device)
              mean = pt.mean(embedding, dim=(1,2))
              std = pt.std(embedding, dim=(1,2))
              normalized_embedding = (embedding - mean) / std
              normalized_embedding = normalized_embedding.to(device)
              reduced_normalized_embedding = pt.mean(normalized_embedding, dim=0).to(device)
              fused_normalized_embeddings = pt.mean(reduced_normalized_embedding, dim=0).to(device)
              x_normalized = (fused_normalized_embeddings - fused_normalized_embeddings.min()) / (fused_normalized_embeddings.max() - fused_normalized_embeddings.min())
              generator_embeddings.append(x_normalized.cpu().detach().numpy())
              labels_retained.append(batch_label[k])

       df_emb = pd.DataFrame(generator_embeddings)
       # Create Create a DataFrame of labels (column named 'label')
       df_label = pd.DataFrame(labels_retained, columns=['label'])
       # Concatenate horizontally (axis=1) so that the labels are in one column
       batch_df = pd.concat([df_emb, df_label], axis=1)
       yield batch_df

   def get_matrix(self):
     data_list = []
     generator = self.get_embedding()
     for X_data in generator:
        df = pd.DataFrame(X_data)
        data_list.append(df)
     data = pd.concat(data_list, axis=0, ignore_index=True)
     return data

Data loading methodology

In [None]:
import gc
import chardet
from transformers import RobertaTokenizer, T5ForConditionalGeneration
import requests
from sklearn.utils import resample

#Setting device
if pt.cuda.is_available():
  device = "cuda"
  pt.cuda.empty_cache()
  # Enable GPU.
  pt.cuda.set_device(0)
else:
  device ="cpu"
gc.collect()
#Parameter
batch = 100
#embedding model
model_name = 'Salesforce/codet5-small'
tokenizer = RobertaTokenizer.from_pretrained(model_name)
model = T5ForConditionalGeneration.from_pretrained(model_name)

def get_load_test (default_name):

  match default_name:
    case 'Blob':
       data_ = pd.read_csv(repo_dir + '_datablob_.csv', sep =',', index_col = "Id")
       datafile = repo_dir + 'DataBlob'
    case 'LongMethod':
       data_ = pd.read_csv(repo_dir + 'data_longmethod.csv', sep =',', index_col = "Id")
       datafile = repo_dir + 'Data_LongMethod'
    case _:
       data_ = pd.read_csv(repo_dir + 'data_poltergeist_ant.csv', sep =',', index_col = "Id")
       datafile = repo_dir + 'File_apache-ant-annoted'

  data_ = resample(data_, replace = False, n_samples = len(data_), random_state=42)
  embedding = ClsEmbedding(data_, datafile, tokenizer, model, batch)
  X_data = embedding.get_matrix()
  return X_data


Loading data

In [None]:
X_data_blob = get_load_test('Blob')
X_data_longmethod = get_load_test('LongMethod')
X_data_poltergeist = get_load_test('Poltergeist')

Implementation of anomaly isolation models

In [None]:
import numpy as np
from sklearn.neighbors import NearestNeighbors
from eif import iForest
class iNNE:
    def __init__(self, X, t, n):
        self.t = t  # Number of samples
        self.n = n  # Sample size
        self.samples = []
        self.X = X
        self.rays = []
        self.centers = []
        self.lesindices = []
        self.models = []

    def fit(self, X):

        # Iterate t times to create t sets of n hyperspheres
        for _ in range(self.t):
            sample_indices = np.random.choice(len(X), self.n, replace=False)
            sample = X[sample_indices]
            # Calculate the nearest neighbors
            nbrs = NearestNeighbors(n_neighbors=1).fit(sample)
            self.models.append((nbrs, sample))

    def _isolation_score(self, x, modele, rays):
        scores = []
        i = 0
        # Iterate over each set of samples
        for nbrs, sample in modele:
           distance, index = nbrs.kneighbors([x])
           distance = distance[0][0]
           center_index = index[0][0]
           center_radius = (rays[i])[center_index]
           if distance < center_radius:
              # Find the nearest neighbor to the current center
              neighbor_index = np.argmin((rays[i]))
              neighbor_radius = (rays[i])[neighbor_index]

              score = 1 - (neighbor_radius / center_radius)
              scores.append(score)
           else:
              scores.append(1.0)
           i+=1
        # Return the average of the isolation scores
        return np.mean(scores)

    def predict(self, X, modele, rays):
        scores = [self._isolation_score(x, modele, rays) for x in X]
        return np.array(scores)


Threshold = [0.6, 0.7, 0.8, 0.9]

def get_outliers_iNNE(X_data, X_data_train, X_data_test, Threshold):
   listoutliers = []
   for threshold in Threshold:
      # Create an instance of iNNE
      inne = iNNE(t=100, n=256)
      inne.rays = []
      # Train the model on training data
      inne.fit(X_data_train)
      for nbrs, sample in inne.models:
         distance, _ = nbrs.kneighbors(sample)
         raidus = distance.flatten()
         inne.rays.append(raidus)
      scores = inne.predict(X_data_test, inne.models, inne.rays)
      score = (scores - threshold)
      #Anomaly prediction
      is_inlier = np.ones_like(scores, dtype=int)
      is_inlier[scores > threshold] = -1
      X_data['anomaly']= is_inlier
      outlierseif = [myindex for myindex in X_data.index if X_data.loc[myindex, 'anomaly'] == -1]
      listoutliers.append(outlierseif)
      X_data = X_data.drop(columns=X_data.columns[-1])
   ens_outliers = {item for sublist in listoutliers for item in sublist}
   return ens_outliers


def get_outliers_EIF(X_data, X_train, ntree, Threshold):
  myoutliers = []
  for epoch in range(100):
     listoutliers = []
     for threshold in Threshold:
        clf_extended = iForest(X_train, ntree, sample_size=256)
        # Anomaly Score for EIF
        scores_extended = clf_extended.compute_paths(X_in= None)
        scores = -(scores_extended - threshold)
        #Anomaly prediction for EIF
        is_inlier = np.ones_like(scores_extended, dtype=int)
        is_inlier[scores <  0] = -1
        X_data['anomaly']= is_inlier
        outlierseif = [myindex for myindex in X_data.index if X_data.loc[myindex, 'anomaly'] == -1]
        listoutliers.append(outlierseif)
        X_data = X_data.drop(columns=X_data.columns[-1])
     ens_outliers = {item for sublist in listoutliers for item in sublist}
     myoutliers.append((len(ens_outliers), list(ens_outliers)))
  number_outliers, list_outliers = zip(*myoutliers)
  number_outliers = np.array(number_outliers)
  return number_outliers, list_outliers


Definition of ensemble learning models

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold
from sklearn.ensemble import  BaggingClassifier, StackingClassifier, GradientBoostingClassifier, RandomForestClassifier
from imblearn.ensemble import RUSBoostClassifier

# Define stratified k-fold cross-validation
cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)

# Define the basic model for bagging
_estimator =  RandomForestClassifier(n_estimators = 100, random_state = 42)
#Apply the bagging
bagging_clf = BaggingClassifier(estimator=_estimator, n_estimators=10, max_samples=1.0, max_features=1.0, bootstrap=True, random_state=42, verbose = False)
# Define the basic model for stacking
models = []
n_models = 10
for i in range(n_models):
    estimator = RandomForestClassifier(n_estimators=100 + i*50, max_depth=5 + i*2, min_samples_split=2 + i, random_state=42 + i)
    models.append((f'estimate_{i}',  estimator))
# Define the final estimators
final_estimator_lr = LogisticRegression(solver='liblinear', random_state=42)
# Initialize the StackingClassifier
stacking_lr = StackingClassifier(estimators=models, final_estimator=final_estimator_lr, cv=cv, stack_method='predict_proba', n_jobs=-1, verbose=False)
# Define the basic model for boosting
base_model = GradientBoostingClassifier(n_estimators=100, random_state=42)
# Apply RUSBoost
rus_boost = RUSBoostClassifier(estimator=base_model, n_estimators=10, random_state=42)