In [None]:
# Some Required installation

!pip install kaggle
!pip install  keras_core keras_nlp
!pip install Keras-Preprocessing

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os

for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from collections import Counter
from keras.utils import pad_sequences
from keras.utils import pad_sequences
from keras.utils import to_categorical
from keras.models import Sequential, Model
from keras.layers import Dense,Dropout,Input,Embedding,Flatten,TextVectorization,Conv1D,GlobalMaxPooling1D,MaxPooling1D,GlobalAveragePooling1D
from keras.initializers import Constant
from keras.layers import Dense,LSTM,Bidirectional,Attention,Concatenate,GRU,BatchNormalization
import nltk
from nltk.corpus import stopwords
import re
import seaborn as sns
nltk.download('stopwords')
import keras_core as keras
import keras_nlp



In [None]:
# Enable multiple gpus
gpus = tf.config.list_physical_devices('GPU')
if len(gpus)<=1:
    strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
    print(f'Using {len(gpus)} GPU')
else:
    strategy = tf.distribute.MirroredStrategy()
    print(f'Using {len(gpus)} GPUs')

In [None]:
data=pd.read_csv('/content/SC_4label.csv')
data

In [None]:
labels=['DD','IO','RE','TD']
plt.figure(figsize = (8,5))
ax = sns.countplot(x = data['label_encoded'], palette = 'Set1', alpha = 0.8)
ax.set_xticklabels(labels)  # Set custom x-axis labels
#plt.title('Distribution of vulnerabilities')
plt.savefig('vul_distribution.pdf')
plt.show()

In [None]:
X = data['code']
y = data['label_encoded']

In [None]:
solidity_stopwords = [
    "pragma", "interface", "contract", "function", "event", "modifier", "library", "using",
    "string", "uint8", "uint256", "address", "mapping", "bool", "require", "return", "memory",
    "storage", "public", "internal", "view", "returns", "constant", "constructor",
    "_owner", "_balances", "_allowances", "_founder", "_marketing", "_who", "_burntAmount",
    "_from", "_to", "_value", "_timestamp", "_bool", "msg.sender", "totalSupply",
    "balanceOf", "transfer", "allowance", "approve", "transferFrom", "add", "sub", "mul", "div",
    "mod", "changeFounder", "setMinter", "setFurnace", "freezeAccount","solidity","bytes32"
]
def clean_solidity_code(solidity_code):
    # Remove comments (both single-line and multi-line)
    cleaned_code = re.sub(r'//.*?$', '', solidity_code, flags=re.MULTILINE)
    cleaned_code = re.sub(r'/\*.*?\*/', '', cleaned_code, flags=re.DOTALL)

    # Remove special characters and punctuation
    cleaned_code = re.sub(r'[^a-zA-Z0-9\s]', '', cleaned_code)

    # Remove extra whitespace and blank lines, and convert to lowercase
    cleaned_code = '\n'.join(line.strip().lower() for line in cleaned_code.splitlines() if line.strip())
    # Remove common English stop words
    stop_words = set(stopwords.words('english'))
    tokens = [word for word in cleaned_code.split() if word not in stop_words]
    tokens = [token for token in tokens if token not in solidity_stopwords]
    cleaned_code = ' '.join(tokens)

    return tokens

In [None]:
X_cleaned = X.apply(clean_solidity_code)
X_cleaned_sentences = [' '.join(doc) for doc in X_cleaned]
X_cleaned_sentences=np.array(X_cleaned_sentences)
X_cleaned_sentences[1]

In [None]:
#Data splitting
X_train, X_test, y_train, y_test = train_test_split(X_cleaned_sentences, y,
                                                    test_size=0.1,shuffle=True,random_state=42,stratify=y)

In [None]:
# one hot encoding label
y_train_encoded = to_categorical(y_train, 4)
y_test_encoded = to_categorical(y_test, 4)
y_train_encoded

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import TextVectorization

# Define the vocabulary size and sequence length
vocab_size = 10000  # adjust as needed
sequence_length = 100  # adjust as needed

# Initialize TextVectorization layer
vectorizer = TextVectorization(max_tokens=vocab_size, output_sequence_length=sequence_length)

# Adapt the vectorizer on the training data
vectorizer.adapt(X_train)

# Vectorize the training and testing data
X_train_sequences = vectorizer(X_train)
X_test_sequences = vectorizer(X_test)


In [None]:
# Padding the sequences
X_train_padded = pad_sequences(X_train_sequences, maxlen=128, padding='post', truncating='post')
X_test_padded = pad_sequences(X_test_sequences, maxlen=128, padding='post', truncating='post')
print('Shape of training tensor: ', X_train_padded.shape)
print('Shape of testing tensor: ', X_test_padded.shape)

In [None]:
#Smote oversampling
from imblearn.over_sampling import SMOTE
from imblearn.over_sampling import BorderlineSMOTE
import math

def smote(x, y):
    k_neighbors = math.ceil(sum(y) * 0.01)

    smote = SMOTE(sampling_strategy=1,
                  k_neighbors=k_neighbors)
    x, y = smote.fit_resample(x, y)

    return x, y

def bordersmote(x, y):
    k_neighbors = math.ceil(sum(y) * 0.01)
    m_neighbors = math.ceil(sum(y) * 0.01)

    bordersmote = BorderlineSMOTE(sampling_strategy=1,
                                  k_neighbors=k_neighbors,
                                  m_neighbors=m_neighbors)

    x, y = bordersmote.fit_resample(x, y)

    return x, y

smote = SMOTE(sampling_strategy='minority', random_state=42)
X_train_resampled, y_train_resampled = smote.fit_resample(X_train_padded, y_train_encoded)

In [None]:
y_train_resampled
label_counts = np.sum(y_train_resampled, axis=0)
max_length = max([len(w) for w in X_train])
print(max_length)
emb_len = len(vectorizer.get_vocabulary())
print(emb_len)

In [None]:
preset= "albert_base_en_uncased"

preprocessor = keras_nlp.models.AlbertPreprocessor.from_preset(preset, sequence_length=128)
classifier = keras_nlp.models.AlbertClassifier.from_preset(preset,preprocessor=preprocessor,num_classes=4)
classifier.summary()


In [None]:
checkpoint_path = '/content/saved_model/best_model_albert.keras'  # Use .keras extension
checkpoint_dir = os.path.dirname(checkpoint_path)

model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True,
    save_weights_only=False,  # Save the entire model, not just weights
    verbose=1
)

from keras.callbacks import EarlyStopping,ReduceLROnPlateau
early_stop = EarlyStopping(monitor='val_loss',patience=3,verbose=True,restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-6)


In [None]:
# Compile
classifier.compile(
    loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
    optimizer=tf.keras.optimizers.Adam(1e-5),
    metrics= ["accuracy"]
)

In [None]:
# Fit
history = classifier.fit(x=X_train,
                         y=y_train_encoded,
                         batch_size=16,
                         epochs=20,
                         validation_data=(X_test,y_test_encoded),)

In [None]:
classifier.save("/content/saved_model/final_model_albert.keras")

In [None]:
test_loss, test_accuracy=classifier.evaluate(X_test,y_test_encoded)
print(f'Test Loss: {test_loss}, Test Accuracy: {test_accuracy}')

In [None]:
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']

# Get the actual number of epochs the model trained for
epochs_range = range(len(acc))  # Use the length of 'acc' instead of a fixed range

plt.figure(figsize=(8, 8))
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(['train', 'test'], loc='upper left')
plt.title('Training and Validation Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.savefig('train_val_acc_albert.pdf')
plt.show()

In [None]:
def plot_metrics(history, metric):
    train_metric = history.history[metric]
    val_metric = history.history[f'val_{metric}']
    epochs = range(1, len(train_metric)+1)

    plt.figure(figsize=(8,8))
    plt.plot(epochs,train_metric,label = f'Training {metric}')
    plt.plot(epochs,val_metric,label = f'Validation {metric}')
    plt.title(f'Training and Validation {metric.capitalize()}')
    plt.xlabel('Epoch')
    plt.ylabel(metric.capitalize())
    plt.legend(['train', 'test'],loc='upper left')
    plt.savefig('train_val_loss_albert.pdf')
    plt.show()
#Training and validation loss plot
plot_metrics(history, metric='loss')

In [None]:
#prediction
y_pred = classifier.predict(X_test)
y_pred_class = y_pred.argmax(axis=1)

In [None]:
labels=['DD','IO','RE','TD']

In [None]:
from sklearn.metrics import classification_report
report = classification_report(y_test, y_pred_class,target_names=labels)
print(report)

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
cm=confusion_matrix(y_test,y_pred_class)
disp = ConfusionMatrixDisplay(confusion_matrix=cm,display_labels=labels)
disp.plot()
plt.savefig('confmat_albert.pdf')
plt.show()

In [None]:
unlabeled_data = pd.read_csv('/content/SC_unlabeled.csv')
X_unlabeled = unlabeled_data['code']
X_unlabeled

In [None]:
X_cleaned_unlabeled = X_unlabeled.apply(clean_solidity_code)
X_cleaned_unlabeled_sentences = [' '.join(doc) for doc in X_cleaned_unlabeled]
X_unlabeled_sequences = vectorizer(X_cleaned_unlabeled_sentences)
X_unlabeled_padded = pad_sequences(X_unlabeled_sequences, maxlen=128, padding='post', truncating='post')


In [None]:
preset= "albert_base_en_uncased"
preprocessor = keras_nlp.models.AlbertPreprocessor.from_preset(preset, sequence_length=128)
classifier = keras_nlp.models.AlbertClassifier.from_preset(preset,preprocessor=preprocessor,num_classes=4)
classifier.load_weights('/content/saved_model/final_model_albert.keras')


In [None]:
y_pred_unlabeled = classifier.predict(X_cleaned_unlabeled_sentences)

# Get the predicted class for each input
predicted_classes = y_pred_unlabeled.argmax(axis=1)
print(predicted_classes)

#unlabeled_data['predicted_class'] = predicted_classes
#unlabeled_data.to_csv('predicted_unlabeled_data.csv', index=False)

y_actual = data['label_encoded']  # Replace 'label_encoded' with the actual label column name
#y_actual

# Evaluate the predictions
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

# Print the classification report
print("Classification Report:")
print(classification_report(y_actual, predicted_classes))

# Display the confusion matrix
cm = confusion_matrix(y_actual, predicted_classes)
labels = ['DD', 'IO', 'RE', 'TD']  # Replace with your class names
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
disp.plot(cmap='viridis')
plt.savefig('confmat_albert_unlabeled.pdf')
plt.show()