In [1]:
import tensorflow as tf
print("TensorFlow version:", tf.__version__)
print("GPU Available:", len(tf.config.list_physical_devices('GPU')))
print("Physical Devices:", tf.config.list_physical_devices())


TensorFlow version: 2.18.0
GPU Available: 0
Physical Devices: [PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU')]


In [3]:
!pip install pandas 
!pip install gensim scikit-learn
!pip install igraph py2neo networkx
!pip install py2neo

Collecting igraph


ERROR: Could not install packages due to an OSError: HTTPSConnectionPool(host='files.pythonhosted.org', port=443): Max retries exceeded with url: /packages/b0/17/621d3a59430851a327421fdbec9ec8494d7fadaffc6dfdd42d4a95accbf2/igraph-0.11.8-cp39-abi3-win_amd64.whl.metadata (Caused by ProtocolError('Connection aborted.', ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)))



Collecting py2neo
  Downloading py2neo-2021.2.4-py2.py3-none-any.whl.metadata (9.9 kB)
Collecting interchange~=2021.0.4 (from py2neo)
  Downloading interchange-2021.0.4-py2.py3-none-any.whl.metadata (1.9 kB)
Collecting monotonic (from py2neo)
  Downloading monotonic-1.6-py2.py3-none-any.whl.metadata (1.5 kB)
Collecting pansi>=2020.7.3 (from py2neo)
  Downloading pansi-2024.11.0-py2.py3-none-any.whl.metadata (3.1 kB)
Downloading py2neo-2021.2.4-py2.py3-none-any.whl (177 kB)
Downloading interchange-2021.0.4-py2.py3-none-any.whl (28 kB)
Downloading pansi-2024.11.0-py2.py3-none-any.whl (26 kB)
Downloading monotonic-1.6-py2.py3-none-any.whl (8.2 kB)
Installing collected packages: monotonic, pansi, interchange, py2neo
Successfully installed interchange-2021.0.4 monotonic-1.6 pansi-2024.11.0 py2neo-2021.2.4


In [13]:
import pandas as pd
import re
def remo(code):
    # Check if input is a string
    if not isinstance(code, str):
        return code
        
    code = re.sub(r'/\.?\*/', '', code, flags=re.DOTALL)
    code = re.sub(r'//.*?$', '', code, flags=re.MULTILINE)
    code = re.sub(r'^\s*[\n\r]', '', code, flags=re.MULTILINE)
    return code.strip()

# Apply the function to the 'func' column (or whatever your code column is named)

train = pd.read_csv("/Users/user01/fahim/icsme/train_label_dataset.csv")
test = pd.read_csv("/Users/user01/fahim/icsme/test_label_dataset.csv")
train['functionSource'] = train['functionSource'].apply(remo)
test['functionSource'] = test['functionSource'].apply(remo)

train = train[['functionSource', 'numeric']]
test = test[['functionSource', 'numeric']]


train.reset_index(drop=True, inplace=True)
test.reset_index(drop=True, inplace=True)

In [15]:
import os
import pandas as pd
import numpy as np
import networkx as nx
from gensim.models.word2vec import Word2Vec
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, GRU, Dense, Bidirectional
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical

# Paths
TRAIN_DATA_PATH = r'D:\SySeVR\train_dataset.csv'
TEST_DATA_PATH = r'D:\SySeVR\test_dataset.csv'
W2V_MODEL_PATH = r'D:\SySeVR\\w2v_model\wordmodel.model'
EMBEDDING_DIM = 40
MAXLEN = 198
BATCH_SIZE = 128
EPOCHS = 5
OUTPUT_CLASSES = 5



# Step 1: Load Dataset
def load_data(train_path, test_path):
    train_df = pd.read_csv(train_path)
    test_df = pd.read_csv(test_path)
    return train_df, test_df

# Step 2: Build PDG
def build_pdg(source_code):
    pdg = nx.DiGraph()
    statements = source_code.split('\n')
    for i, stmt in enumerate(statements):
        pdg.add_node(i, code=stmt, type="Statement", location=f"{i+1}:0")
    for i in range(len(statements) - 1):
        pdg.add_edge(i, i + 1, type="ControlDependency")
    for i, stmt in enumerate(statements):
        if "=" in stmt:
            var_name = stmt.split("=")[0].strip()
            for j, other_stmt in enumerate(statements):
                if var_name in other_stmt and i != j:
                    pdg.add_edge(i, j, type="DataDependency", var=var_name)
    return pdg

# Step 3: Modify PDG Nodes
def modify_pdg_nodes(pdg):
    for node, data in pdg.nodes(data=True):
        if data['type'] == "Statement":
            data['code'] = data['code'].strip()
    return pdg

# Step 4: Extract Slices from PDG
def extract_slices_from_pdg(pdg):
    slices = []
    for node in pdg.nodes:
        slice_nodes = list(nx.ancestors(pdg, node)) + [node]
        slices.append([pdg.nodes[n]['code'] for n in slice_nodes])
    return slices

# Step 5: Process Functions

def process_functions_with_pdg_grouped(df):
    grouped_slices = []
    for func in df['functionSource']:
        pdg = build_pdg(func)
        pdg = modify_pdg_nodes(pdg)
        slices = extract_slices_from_pdg(pdg)
        grouped_slices.append(slices)
    return grouped_slices

# Step 6: Train Word2Vec Model
def train_word2vec(corpus, model_path):
    
    w2v_model = Word2Vec(sentences=corpus, vector_size=EMBEDDING_DIM, window=5, min_count=1, workers=4, sg=1, epochs=5)
    w2v_model.save(model_path)
    print("Word2Vec model saved.")

# Step 7: Pad and Embed Functions
def pad_grouped_slices(grouped_vectors, maxlen, embedding_dim, max_slices):
    padded_grouped = []
    for group in grouped_vectors:
        padded_group = []
        for vec in group[:max_slices]:
            if len(vec) > maxlen:
                vec = vec[:maxlen]
            elif len(vec) < maxlen:
                vec.extend([[0] * embedding_dim] * (maxlen - len(vec)))
            padded_group.append(vec)
        while len(padded_group) < max_slices:
            padded_group.append([[0] * embedding_dim] * maxlen)
        padded_grouped.append(padded_group)
    return np.array(padded_grouped, dtype=np.float32)

def embed_grouped_tokens(grouped_tokens, w2v_model):
    embedded_grouped = []
    for token_group in grouped_tokens:
        embedded_group = [
            [w2v_model.wv[token] for token in token_list if token in w2v_model.wv]
            for token_list in token_group
        ]
        embedded_group = [emb_slice for emb_slice in embedded_group if emb_slice]  # Remove empty slices
        if not embedded_group:  # Add placeholder for empty groups
            embedded_group = [[[0] * w2v_model.vector_size] * MAXLEN]
        embedded_grouped.append(embedded_group)
    return embedded_grouped
    
# Step 8: Define TensorFlow BiGRU Model
def create_bgru_model(input_dim, maxlen, embedding_dim, hidden_dim, output_dim, dropout):
    inputs = Input(shape=(maxlen, embedding_dim))
    x = Bidirectional(GRU(hidden_dim, return_sequences=False, dropout=dropout))(inputs)
    x = Dense(output_dim, activation="softmax")(x)
    model = Model(inputs, x)
    model.compile(optimizer=Adam(learning_rate=1e-3), loss="categorical_crossentropy", metrics=["accuracy"])
    return model

if __name__ == "__main__":
    
    # train_df, test_df = load_data(TRAIN_DATA_PATH, TEST_DATA_PATH)

    
    # label_encoder = LabelEncoder()
    # train_df['label'] = label_encoder.fit_transform(train_df['label'])
    # test_df['label'] = label_encoder.transform(test_df['label'])

    # Process functions and construct PDGs
    
    train_grouped_slices = process_functions_with_pdg_grouped(train)
    
    test_grouped_slices = process_functions_with_pdg_grouped(test)

    # Train Word2Vec embeddings
    
    train_word2vec([slice_ for group in train_grouped_slices for slice_ in group], W2V_MODEL_PATH)

    # Load Word2Vec model
    w2v_model = Word2Vec.load(W2V_MODEL_PATH)

    # Embed grouped tokens
    
    train_grouped_vectors = embed_grouped_tokens(train_grouped_slices, w2v_model)
    
    test_grouped_vectors = embed_grouped_tokens(test_grouped_slices, w2v_model)

 
    MAX_SLICES = 9 
    train_padded = pad_grouped_slices(train_grouped_vectors, MAXLEN, EMBEDDING_DIM, MAX_SLICES)
   
    test_padded = pad_grouped_slices(test_grouped_vectors, MAXLEN, EMBEDDING_DIM, MAX_SLICES)


    # Prepare labels
    train_labels = train['numeric'].values
    test_labels = test['numeric'].values

    
    train_labels = to_categorical(train_labels, num_classes=OUTPUT_CLASSES)
    test_labels = to_categorical(test_labels, num_classes=OUTPUT_CLASSES)
    train_padded = train_padded.reshape(train_padded.shape[0], -1, EMBEDDING_DIM)
    test_padded = test_padded.reshape(test_padded.shape[0], -1, EMBEDDING_DIM)

    
    assert train_padded.shape[0] == len(train_labels), "Mismatch between train data and labels"
    assert test_padded.shape[0] == len(test_labels), "Mismatch between test data and labels"

   
    model = create_bgru_model(
        input_dim=EMBEDDING_DIM,
        maxlen=train_padded.shape[1],
        embedding_dim=EMBEDDING_DIM,
        hidden_dim=256,
        output_dim=OUTPUT_CLASSES,
        dropout=0.2,
    )

   
    print("Training the BiGRU model...")
    model.fit(
        train_padded,
        train_labels,
        validation_data=(test_padded, test_labels),
        batch_size=BATCH_SIZE,
        epochs=EPOCHS,
        verbose=1,
    )

    # Evaluate the model
    print("Evaluating the model...")
    test_loss, test_accuracy = model.evaluate(test_padded, test_labels, verbose=1)
    print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

    # Get predictions
    print("Generating predictions...")
    y_pred_probs = model.predict(test_padded)
    y_pred = np.argmax(y_pred_probs, axis=1)
    y_true = np.argmax(test_labels, axis=1)

    print("\nClassification Report:")
    print(classification_report(y_true, y_pred))

    print("\nConfusion Matrix:")
    print(confusion_matrix(y_true, y_pred))


Word2Vec model saved.
Training the BiGRU model...
Epoch 1/5
[1m141/141[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1141s[0m 8s/step - accuracy: 0.2781 - loss: 1.5826 - val_accuracy: 0.2498 - val_loss: 1.6222
Epoch 2/5
[1m141/141[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1362s[0m 10s/step - accuracy: 0.3268 - loss: 1.5392 - val_accuracy: 0.2522 - val_loss: 1.6271
Epoch 3/5
[1m141/141[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1546s[0m 11s/step - accuracy: 0.3377 - loss: 1.5180 - val_accuracy: 0.2669 - val_loss: 1.6164
Epoch 4/5
[1m141/141[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1657s[0m 12s/step - accuracy: 0.3436 - loss: 1.5123 - val_accuracy: 0.2433 - val_loss: 1.6292
Epoch 5/5
[1m141/141[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1728s[0m 12s/step - accuracy: 0.3423 - loss: 1.5103 - val_accuracy: 0.2633 - val_loss: 1.6223
Evaluating the model...
[1m141/141[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m150s[0m 1s/step - accuracy: 0.2624 - loss: 1.61

In [17]:
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    matthews_corrcoef, cohen_kappa_score, mean_squared_error,
    mean_absolute_error, roc_auc_score, confusion_matrix
)
import numpy as np

# Assuming y_true and y_pred are already defined as in your code

# Calculate overall metrics
accuracy = accuracy_score(y_true, y_pred)
precision_macro = precision_score(y_true, y_pred, average='macro', zero_division=0)
recall_macro = recall_score(y_true, y_pred, average='macro', zero_division=0)
f1_macro = f1_score(y_true, y_pred, average='macro', zero_division=0)
precision_weighted = precision_score(y_true, y_pred, average='weighted', zero_division=0)
recall_weighted = recall_score(y_true, y_pred, average='weighted', zero_division=0)
f1_weighted = f1_score(y_true, y_pred, average='weighted', zero_division=0)
mcc = matthews_corrcoef(y_true, y_pred)
kappa = cohen_kappa_score(y_true, y_pred)
mse = mean_squared_error(y_true, y_pred)
mae = mean_absolute_error(y_true, y_pred)

# For ROC AUC Score, we need the probability estimates for each class
# Assuming y_pred_probs contains these probabilities
try:
    roc_auc_macro = roc_auc_score(y_true, y_pred_probs, average='macro', multi_class='ovr')
except ValueError:
    roc_auc_macro = float('nan')  # Handle case where ROC AUC cannot be computed

# Calculate confusion matrix
cm = confusion_matrix(y_true, y_pred)
tn = np.sum(np.diag(cm)) - np.sum(cm, axis=1)  # True Negatives
fp = np.sum(cm, axis=0) - np.diag(cm)          # False Positives
fn = np.sum(cm, axis=1) - np.diag(cm)          # False Negatives
tp = np.diag(cm)                               # True Positives

# Summing up for overall counts
TP = np.sum(tp)
TN = np.sum(tn)
FP = np.sum(fp)
FN = np.sum(fn)

# Print metrics
print(f"Accuracy: {accuracy}")
print(f"Precision (Macro): {precision_macro}")
print(f"Recall (Macro): {recall_macro}")
print(f"F1 Score (Macro): {f1_macro}")
print(f"Precision (Weighted): {precision_weighted}")
print(f"Recall (Weighted): {recall_weighted}")
print(f"F1 Score (Weighted): {f1_weighted}")
print(f"Matthews Correlation Coefficient (MCC): {mcc}")
print(f"Cohen's Kappa: {kappa}")
print(f"Mean Squared Error (MSE): {mse}")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"ROC AUC Score (Macro): {roc_auc_macro}")
print(f"True Positives (TP): {TP}")
print(f"True Negatives (TN): {TN}")
print(f"False Positives (FP): {FP}")
print(f"False Negatives (FN): {FN}")


Accuracy: 0.2633333333333333
Precision (Macro): 0.3260423197148011
Recall (Macro): 0.259326568166853
F1 Score (Macro): 0.22701670881899833
Precision (Weighted): 0.32558182250816703
Recall (Weighted): 0.2633333333333333
F1 Score (Weighted): 0.2279629982729952
Matthews Correlation Coefficient (MCC): 0.08944184122618962
Cohen's Kappa: 0.07444306973310644
Mean Squared Error (MSE): 4.650888888888889
Mean Absolute Error (MAE): 1.6713333333333333
ROC AUC Score (Macro): 0.5619550782717949
True Positives (TP): 1185
True Negatives (TN): 1425
False Positives (FP): 3315
False Negatives (FN): 3315


In [19]:
sensitivity_list = []
specificity_list = []

# Calculate Sensitivity and Specificity for each class
for i in range(len(cm)):
    TP = cm[i, i]
    FN = np.sum(cm[i, :]) - TP
    FP = np.sum(cm[:, i]) - TP
    TN = np.sum(cm) - (TP + FP + FN)
    
    sensitivity = TP / (TP + FN) if (TP + FN) > 0 else 0
    specificity = TN / (TN + FP) if (TN + FP) > 0 else 0
    
    sensitivity_list.append(sensitivity)
    specificity_list.append(specificity)

# Calculate macro-averaged Sensitivity and Specificity
sensitivity_macro = np.mean(sensitivity_list)
specificity_macro = np.mean(specificity_list)

# Print metrics
print(f"Sensitivity (Macro-Averaged): {sensitivity_macro:.4f}")
print(f"Specificity (Macro-Averaged): {specificity_macro:.4f}")

Sensitivity (Macro-Averaged): 0.2593
Specificity (Macro-Averaged): 0.8148
