<a href="https://www.kaggle.com/code/alizaahsan/ccd-selfattention?scriptVersionId=235796573" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# Code Clone Detection using Self-Attention Mechanism
## 

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
!pip install gensim

## 1. Gathering of Dataset

### Checking what the dataset is like, at first

In [None]:
import numpy as np
import datasets

# Load BCB dataset from Hugging Face
dataset = datasets.load_dataset("code_x_glue_cc_clone_detection_big_clone_bench", split="train")

# Select 100 samples (for quick testing)
data = dataset.select(range(100))

# Extract function pairs and labels
X1 = np.array([sample['func1'] for sample in data])  # First function in pair
X2 = np.array([sample['func2'] for sample in data])  # Second function in pair
y = np.array([sample['label'] for sample in data])   # Labels: 1 (Clone), 0 (Not Clone)


print(f"Loaded {len(X1)} function pairs from BCB dataset.")
print()
print(X1[0])

In [None]:
import datasets

# Load full training dataset (all splits)
train_dataset = datasets.load_dataset("code_x_glue_cc_clone_detection_big_clone_bench", split="train")

# Convert to Pandas DataFrame for easier handling
train_df = train_dataset.to_pandas()

# Save to a single CSV file (optional)
train_df.to_csv("bcb_train.csv", index=False)

print(f"Final Train Dataset Size: {train_df.shape}")
print(train_df.head())


In [None]:
print(train_df.columns)  # Ensure all expected columns exist
print(train_df.dtypes)   # Check data types
print(train_df.iloc[:5]) # Print first 5 rows clearly

In [None]:
import pandas as pd
pd.set_option('display.max_colwidth', None)  # Prevent truncation
print(train_df.head(10))

In [None]:
import pandas as pd

# # Load Train & Test (Assuming you've already converted them to Pandas DataFrames)
# train_df = pd.read_csv("train_data.csv")  # Adjust filename if needed
# test_df = pd.read_csv("test_data.csv")

# Display dataset details
print(f"Train Dataset: {train_df.shape}")
print(train_df.head(5))

In [None]:
#Check if there's a class imbalance
print('Label distribution Training set:')
print(train_df['label'].value_counts())

In [None]:
train_df.isnull().sum()

In [None]:
import re

# Function to clean code snippets
def clean_code(code):
    if not isinstance(code, str):
        return ""  # Handle NaN values safely
    
    code = re.sub(r"//.*", "", code)  # Remove single-line comments
    code = re.sub(r"/\*.*?\*/", "", code, flags=re.DOTALL)  # Remove multi-line comments
    code = re.sub(r"\s+", " ", code)  # Remove extra spaces/newlines
    return code.strip()

# Apply cleaning to train & test
train_df["func1"] = train_df["func1"].apply(clean_code)
train_df["func2"] = train_df["func2"].apply(clean_code)

print("Code Cleaning Done!")

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
np.random.seed(0)
plt.style.use("ggplot")

import tensorflow as tf
print('Tensorflow version:', tf.__version__)

In [None]:
# !pip install gensim


In [None]:
## Struggling with gensim nltk packages so I was trying to debug it. Extra attempts have been now commented out below. 

In [None]:
# !pip3 uninstall gensim nltk --yes

!pip install h5py
!pip install typing-extensions
!pip install wheel

!pip install gensim nltk==3.2.4

!pip install --upgrade --force-reinstall gensim nltk==3.9.1 scipy --no-cache-dir


import os
os.kill(os.getpid(), 9)

!pip install -r requirements.txt --use-deprecated=legacy-resolver

In [None]:
# Distribution graphs (histogram/bar graph) of column data
def plotPerColumnDistribution(df, nGraphShown, nGraphPerRow):
    nunique = df.nunique()
    df = df[[col for col in df if nunique[col] > 1 and nunique[col] < 50]] # For displaying purposes, pick columns that have between 1 and 50 unique values
    nRow, nCol = df.shape
    columnNames = list(df)
    nGraphRow = (nCol + nGraphPerRow - 1) / nGraphPerRow
    plt.figure(num = None, figsize = (6 * nGraphPerRow, 8 * nGraphRow), dpi = 80, facecolor = 'w', edgecolor = 'k')
    for i in range(min(nCol, nGraphShown)):
        plt.subplot(nGraphRow, nGraphPerRow, i + 1)
        columnDf = df.iloc[:, i]
        if (not np.issubdtype(type(columnDf.iloc[0]), np.number)):
            valueCounts = columnDf.value_counts()
            valueCounts.plot.bar()
        else:
            columnDf.hist()
        plt.ylabel('counts')
        plt.xticks(rotation = 90)
        plt.title(f'{columnNames[i]} (column {i})')
    plt.tight_layout(pad = 1.0, w_pad = 1.0, h_pad = 1.0)
    plt.show()

In [None]:
# Correlation matrix
def plotCorrelationMatrix(df, graphWidth):
    filename = df.dataframeName
    df = df.dropna('columns') # drop columns with NaN
    df = df[[col for col in df if df[col].nunique() > 1]] # keep columns where there are more than 1 unique values
    if df.shape[1] < 2:
        print(f'No correlation plots shown: The number of non-NaN or constant columns ({df.shape[1]}) is less than 2')
        return
    corr = df.corr()
    plt.figure(num=None, figsize=(graphWidth, graphWidth), dpi=80, facecolor='w', edgecolor='k')
    corrMat = plt.matshow(corr, fignum = 1)
    plt.xticks(range(len(corr.columns)), corr.columns, rotation=90)
    plt.yticks(range(len(corr.columns)), corr.columns)
    plt.gca().xaxis.tick_bottom()
    plt.colorbar(corrMat)
    plt.title(f'Correlation Matrix for {filename}', fontsize=15)
    plt.show()

In [None]:
# Scatter and density plots
def plotScatterMatrix(df, plotSize, textSize):
    df = df.select_dtypes(include =[np.number]) # keep only numerical columns
    # Remove rows and columns that would lead to df being singular
    df = df.dropna('columns')
    df = df[[col for col in df if df[col].nunique() > 1]] # keep columns where there are more than 1 unique values
    columnNames = list(df)
    if len(columnNames) > 10: # reduce the number of columns for matrix inversion of kernel density plots
        columnNames = columnNames[:10]
    df = df[columnNames]
    ax = pd.plotting.scatter_matrix(df, alpha=0.75, figsize=[plotSize, plotSize], diagonal='kde')
    corrs = df.corr().values
    for i, j in zip(*plt.np.triu_indices_from(ax, k = 1)):
        ax[i, j].annotate('Corr. coef = %.3f' % corrs[i, j], (0.8, 0.2), xycoords='axes fraction', ha='center', va='center', size=textSize)
    plt.suptitle('Scatter and Density Plot')
    plt.show()

## 4. Create Word Embeddings

In [None]:
nRowsRead = 50000 # specify 'None' if want to read whole file
# ner_dataset.csv may have more rows in reality, but we are only loading/previewing the first 5000 rows
df_train = pd.read_csv("bcb_train.csv", delimiter=',', nrows = nRowsRead, encoding='utf-8')
nRow, nCol = df_train.shape
print(f'There are {nRow} rows and {nCol} columns')

In [None]:
#!sudo apt install --reinstall python*-decorator

#Was earlier requiring this piece

In [None]:
!pip install nltk

In [None]:
import pandas as pd
import gensim
import nltk 
from nltk.tokenize import word_tokenize

# Download tokenizer
nltk.download('punkt')

# Tokenization function
def tokenize_code(code):
    return word_tokenize(code)  # Tokenize into words

# Apply tokenization to func1 and func2
df_train["func1_tokens"] = df_train["func1"].apply(tokenize_code)
df_train["func2_tokens"] = df_train["func2"].apply(tokenize_code)

# Combine tokenized functions for training Word2Vec
all_tokens = df_train["func1_tokens"].tolist() + df_train["func2_tokens"].tolist()

# Train Word2Vec model
word2vec_model = gensim.models.Word2Vec(sentences=all_tokens, vector_size=100, window=5, min_count=2, workers=4)

# Save model for future use
word2vec_model.save("word2vec_bcb.model")

print("Word2Vec training completed and model saved!")


In [None]:
import numpy as np
import pandas as pd
import gensim
import nltk
from nltk.tokenize import word_tokenize
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split

# Load Word2Vec model
word2vec_model = gensim.models.Word2Vec.load("word2vec_bcb.model")

# Function to tokenize and convert tokens to word indices
def text_to_indices(text, model, vocab):
    tokens = word_tokenize(text)
    indices = [model.wv.key_to_index[word] for word in tokens if word in vocab]  
    return indices


# Extract vocab from Word2Vec model
vocab = set(word2vec_model.wv.key_to_index.keys())

# Convert func1 and func2 into word index sequences
df_train["func1_indices"] = df_train["func1"].apply(lambda x: text_to_indices(x, word2vec_model, vocab))
df_train["func2_indices"] = df_train["func2"].apply(lambda x: text_to_indices(x, word2vec_model, vocab))

# Define max sequence length (let's take 200 based on average function length)
MAX_SEQ_LENGTH = 200  

# Apply padding
X1_padded = pad_sequences(df_train["func1_indices"], maxlen=MAX_SEQ_LENGTH, padding="post")
X2_padded = pad_sequences(df_train["func2_indices"], maxlen=MAX_SEQ_LENGTH, padding="post")

# One-hot encode labels (if not already encoded)
y = df_train["label"].astype(int)  # Ensure labels are integers

# Train-test split
# X1_train, X1_test, X2_train, X2_test, y_train, y_test = train_test_split(
#     X1_padded, X2_padded, y, test_size=0.2, random_state=42
# )
from sklearn.model_selection import train_test_split

# Step 1: Initial train-test split (80% train_val, 20% test)
X1_temp, X1_test, X2_temp, X2_test, y_temp, y_test = train_test_split(
    X1_padded, X2_padded, y, test_size=0.2, random_state=42
)

# Step 2: Split train_val into train and validation (80% train, 20% val of 80%)
X1_train, X1_val, X2_train, X2_val, y_train, y_val = train_test_split(
    X1_temp, X2_temp, y_temp, test_size=0.2, random_state=42
)



In [None]:
print("Train-Test Split Done!")
print(f"Train Set: {X1_train.shape}, {X2_train.shape}")
print(f"Test Set: {X1_test.shape}, {X2_test.shape}")
print(f"Validation Set: {X1_val.shape},{X2_val.shape} ")

In [None]:
# Create an embedding matrix with the same dimensions as Word2Vec embeddings
EMBEDDING_DIM = 100  # Same as Word2Vec vector_size

embedding_matrix = np.zeros((len(vocab) + 1, EMBEDDING_DIM))  

for word, idx in word2vec_model.wv.key_to_index.items():
    embedding_matrix[idx] = word2vec_model.wv[word]

print("Embedding Matrix Shape:", embedding_matrix.shape)


## Attention Mechanism

In [None]:
from tensorflow.keras.layers import Layer
import tensorflow.keras.backend as K

class AttentionLayer(Layer):
    def __init__(self, **kwargs):
        super(AttentionLayer, self).__init__(**kwargs)

    def build(self, input_shape): 
        self.W = self.add_weight(name='att_weight', shape=(input_shape[-1], 1),
                                 initializer='normal', trainable=True)
        self.b = self.add_weight(name='att_bias', shape=(input_shape[1], 1),
                                 initializer='zeros', trainable=True)
        super(AttentionLayer, self).build(input_shape)

    def call(self, x):
        e = K.tanh(K.dot(x, self.W) + self.b)
        a = K.softmax(e, axis=1)
        output = x * a
        return K.sum(output, axis=1), a


## Fusion Layer

In [None]:
#Create a TemporalAveragePooling layer 
from tensorflow.keras.layers import Layer
import tensorflow as tf

class TemporalAveragePooling(Layer):
    def call(self, inputs):
        return tf.reduce_mean(inputs, axis=1)

In [None]:
from tensorflow.keras.layers import Embedding, Input
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Bidirectional, LSTM, Dense, Dropout
from tensorflow.keras.regularizers import l2
# Define input layers

input1 = Input(shape=(MAX_SEQ_LENGTH,))
input2 = Input(shape=(MAX_SEQ_LENGTH,))

# Shared Embedding Layer
embedding_layer = Embedding(input_dim=embedding_matrix.shape[0], 
                            output_dim=embedding_matrix.shape[1], 
                            weights=[embedding_matrix], 
                            trainable=False)

# Embedded representations
embedding1 = embedding_layer(input1)
embedding2 = embedding_layer(input2)

# Apply shared BiLSTM
shared_bilstm = Bidirectional(LSTM(128, return_sequences=True))
lstm_output1 = shared_bilstm(embedding1)
lstm_output2 = shared_bilstm(embedding2)

# Apply attention to both LSTM outputs
context1, _ = AttentionLayer()(lstm_output1)
context2, _ = AttentionLayer()(lstm_output2)

# Concatenate attention-based contexts
fusion_output = tf.keras.layers.concatenate([context1, context2])


## Classification layer

In [None]:
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2

# Add dropout
fusion_output = Dropout(0.3)(fusion_output)

# Dense Layer
dense = Dense(64, activation='relu', kernel_regularizer=l2(0.001))(fusion_output)
dense = Dropout(0.3)(dense)

# Output Layer
output = Dense(1, activation='sigmoid')(dense)

# Define the model
final_model = Model(inputs=[input1, input2], outputs=output)

# Compile
final_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Summary
final_model.summary()


## Training

In [None]:
print(np.unique(y_train))  # Should print: [0 1]

#Purpose: To verify the class labels in your training data. If this doesn’t print [0 1], then you might have:
#Unexpected labels like 'True', 'False', 'normal', 'anomalous', or even strings or floats.
#A typo or data leakage in preprocessing.
#Why it's important: compute_class_weight() expects labels like [0, 1]. If the labels are not clean, your model may train incorrectly or crash.

### Convert labels to integer numpy arrays:
*To make sure your labels:*
- Are NumPy arrays (not Pandas Series).
- Are in the correct integer format.

#### Keras needs inputs as np.ndarray for training.

#### Loss functions like BinaryCrossentropy expect integer or float values.

#### compute_class_weight() also expects a flat array of class integers.*


In [None]:
y_train = y_train.to_numpy().astype(int)
y_test = y_test.to_numpy().astype(int)

In [None]:
y_val = y_val.to_numpy().astype(int)

In [None]:
print(X1_train.shape, X2_train.shape, y_train.shape)
print(X1_test.shape, X2_test.shape, y_test.shape)

In [None]:
print(type(X1_train), type(X2_train))
print(type(y_train))

In [None]:
print(y_train[:10])


In [None]:
import numpy as np
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

# ✅ Compute Class Weights for Imbalanced Data
classes = np.array([0, 1])
class_weights = compute_class_weight(class_weight='balanced', classes=classes, y=y_train)
class_weight_dict = {cls: weight for cls, weight in zip(classes, class_weights)}

# Ensure all classes have weights
for cls in [0, 1]:
    if cls not in class_weight_dict:
        class_weight_dict[cls] = 1.0  # Assign default weight

print("Class Weights:", class_weight_dict)  # Debugging output

# ✅ Define Early Stopping (Prevents Overfitting)
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# ✅ Save Best Model Checkpoint
model_checkpoint = ModelCheckpoint("best_model.keras", save_best_only=True, monitor="val_loss")

# ✅ Train the Model with Class Weights & Callbacks
# history = final_model.fit(
#     [X1_train, X2_train], y_train, 
#     epochs=20, batch_size=64, 
#     validation_data=([X1_test, X2_test], y_test),
#     class_weight=class_weight_dict,  # Include computed class weights
#     callbacks=[early_stopping, model_checkpoint]  # Add callbacks
# )

history = final_model.fit(
    [X1_train, X2_train], y_train, 
    epochs=20, batch_size=64, 
    validation_data=([X1_val, X2_val], y_val),  # 👈 Corrected validation data
    class_weight=class_weight_dict,
    callbacks=[early_stopping, model_checkpoint]
)

In [None]:
import matplotlib.pyplot as plt

epochs = range(1, len(history.history['loss']) + 1)

# Plot Loss
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(epochs, history.history['loss'], 'bo-', label='Training Loss')
plt.plot(epochs, history.history['val_loss'], 'r*-', label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training vs Validation Loss')
plt.legend()

# Plot Accuracy
plt.subplot(1, 2, 2)
plt.plot(epochs, history.history['accuracy'], 'bo-', label='Training Accuracy')
plt.plot(epochs, history.history['val_accuracy'], 'r*-', label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Training vs Validation Accuracy')
plt.legend()

plt.show()


In [None]:
# final_model.fit([X1_train, X2_train], y_train, 
#                 epochs=20, batch_size=64, 
#                 validation_data=([X1_test, X2_test], y_test))

In [None]:
loss, accuracy = final_model.evaluate([X1_test, X2_test], y_test)
print(f"Test Accuracy: {accuracy:.4f}")

In [None]:
from sklearn.metrics import classification_report

# ✅ Step 1: Get predictions (probabilities)
y_pred_probs = final_model.predict([X1_test, X2_test])  # Predicts probabilities

# ✅ Step 2: Convert probabilities to class labels (Threshold = 0.5)
y_pred_labels = (y_pred_probs > 0.5).astype(int)  # Convert to binary (0 or 1)

# ✅ Step 3: Compute classification report
report = classification_report(y_test, y_pred_labels, digits=4)
print(report)

In [None]:
from tensorflow.keras.models import load_model

final_model = load_model("best_model.h5")  # replace with your checkpoint filepath

## Testing

In [None]:
import datasets

# Load full testing dataset (all splits)
test_dataset = datasets.load_dataset("code_x_glue_cc_clone_detection_big_clone_bench", split="test")

# Convert to Pandas DataFrame for easier handling
test_df = test_dataset.to_pandas()

# Save to a single CSV file (optional)
test_df.to_csv("bcb_test.csv", index=False)

print(f"Final Test Dataset Size: {test_df.shape}")
print(test_df.head())

In [None]:
print(test_df.columns)  # Ensure all expected columns exist
print(test_df.dtypes)   # Check data types
print(test_df.iloc[:5]) # Print first 5 rows clearly

In [None]:
import pandas as pd
pd.set_option('display.max_colwidth', None)  # Prevent truncation
print(test_df.head(10))

In [None]:
print(f"Test Dataset: {test_df.shape}")
print()
print(f"Test Dataset: {test_df.shape}")


In [None]:
# cleaning code
test_df["func1"] = test_df["func1"].apply(clean_code)
test_df["func2"] = test_df["func2"].apply(clean_code)

In [None]:
nRowsRead = 50000 # specify 'None' if want to read whole file
# ner_dataset.csv may have more rows in reality, but we are only loading/previewing the first 5000 rows
df_test = pd.read_csv("bcb_test.csv", delimiter=',', nrows = nRowsRead, encoding='utf-8')
nRow, nCol = df_test.shape
print(f'There are {nRow} rows and {nCol} columns')

In [None]:
import pandas as pd
from nltk.tokenize import word_tokenize
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix

# Step 1: Load Your New Test Dataset
nRowsRead = 5000 # specify 'None' if want to read whole file
df_test = pd.read_csv("bcb_test.csv", delimiter=',', nrows = nRowsRead, encoding='utf-8')  # Replace with actual file

# Step 2: Tokenize Code Snippets
df_test["func1_tokens"] = df_test["func1"].apply(word_tokenize)
df_test["func2_tokens"] = df_test["func2"].apply(word_tokenize)

# Step 3: Convert Tokens to Word Indices
def text_to_indices(text, model, vocab):
    tokens = word_tokenize(text)
    return [model.wv.key_to_index[word] for word in tokens if word in vocab]

# Load your trained Word2Vec model
word2vec_model = gensim.models.Word2Vec.load("word2vec_bcb.model")

vocab = set(word2vec_model.wv.key_to_index.keys())

df_test["func1_indices"] = df_test["func1"].apply(lambda x: text_to_indices(x, word2vec_model, vocab))
df_test["func2_indices"] = df_test["func2"].apply(lambda x: text_to_indices(x, word2vec_model, vocab))

# Step 4: Apply Padding
MAX_SEQ_LENGTH = 200  # Ensure this matches the training phase
X1_test_new = pad_sequences(df_test["func1_indices"], maxlen=MAX_SEQ_LENGTH, padding="post")
X2_test_new = pad_sequences(df_test["func2_indices"], maxlen=MAX_SEQ_LENGTH, padding="post")

# Step 5: Get True Labels
y_test_new = df_test["label"].astype(int)  # Ensure labels are in integer format

# Step 6: Make Predictions
y_preds = final_model.predict([X1_test_new, X2_test_new])

# Step 7: Convert Predictions to Binary Labels
y_preds_binary = (y_preds > 0.5).astype(int).flatten()

# Step 8: Evaluate Model Performance
print("Classification Report:")
print(classification_report(y_test_new, y_preds_binary))

print(f"Test Accuracy: {accuracy_score(y_test_new, y_preds_binary):.4f}")

print("Confusion Matrix:")
print(confusion_matrix(y_test_new, y_preds_binary))


In [None]:
# Save the model
final_model.save('/kaggle/working/CCD-SelfAttention.h5')