In [2]:
data_path = './data/VulnExtractData/ffmpeg_test'
train_path = data_path + '/Train'
test_path = data_path + '/Test'
train_path,test_path


('./data/VulnExtractData/ffmpeg_test/Train',
 './data/VulnExtractData/ffmpeg_test/Test')

In [3]:
task_list = ['AF','BF','CL']

In [None]:
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.discriminant_analysis import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC
from sklearn.pipeline import make_pipeline
from sklearn.feature_extraction.text import CountVectorizer,TfidfVectorizer


def train_model_svm(X_train,Y_train,X_test,Y_test):
    param_grid = {
        'svc__C': [0.1, 1, 10, 100],
        'svc__kernel': ['linear', 'rbf', 'poly'],
        'svc__degree': [2, 3, 4],
        'svc__gamma': ['scale', 'auto']
    }
    

    pipeline = make_pipeline(TfidfVectorizer(ngram_range=(1,1)), SVC(probability=True))
    grid_search = GridSearchCV(pipeline,param_grid,verbose = 2,n_jobs = -1))

    grid_search.fit(X_train,Y_train)

    print(f"Best parameters: {grid_search.best_params_}")

    Y_pred = grid_search.best_estimator_.predict(X_test)

    print(classification_report(Y_test, Y_pred, digits=4))

def train_model_svm_with_features(X_train,Y_train,X_test,Y_test):
    param_grid = {
        'svc__C': [0.1, 1, 10, 100],
        'svc__kernel': ['linear', 'rbf', 'poly'],
        'svc__degree': [2, 3, 4],
        'svc__gamma': ['scale', 'auto']
    }

    preprocessor = ColumnTransformer([
        ('tfidf', TfidfVectorizer(), 'Description'),  # only your text column name here
        ('scaler', StandardScaler(), X_train.columns[1:])
    ], remainder='drop')

    pipeline = make_pipeline(preprocessor, SVC(probability=True))

    grid_search = GridSearchCV(pipeline,param_grid,verbose = 2,n_jobs = -1)

    grid_search.fit(X_train,Y_train)

    print(f"Best parameters: {grid_search.best_params_}")

    Y_pred = grid_search.best_estimator_.predict(X_test)

    print(classification_report(Y_test, Y_pred, digits=4))

 

In [37]:
import os
import pandas as pd

def prepare_data(task_id,with_features = False):    
    folder_train_path = train_path + '/' + task_id

    all_train_files = []
    for root, dirs, files in os.walk(folder_train_path):
        for file in files:
            full_path = os.path.join(root, file)
            all_train_files.append(full_path)

    dfs = [pd.read_csv(file) for file in all_train_files]

    folder_test_path = test_path + '/' + task_id

    all_test_files = []
    for root, dirs, files in os.walk(folder_test_path):
        for file in files:
            full_path = os.path.join(root, file)
            all_test_files.append(full_path)

    dfs_train = [pd.read_csv(file) for file in all_train_files]

    dfs_test = [pd.read_csv(file) for file in all_test_files]

    df_train = pd.concat(dfs_train, ignore_index=True)

    df_test  = pd.concat(dfs_test, ignore_index=True)

    #use only Description for training
    X_train_descriptions = df_train['Description']
    X_test_descriptions = df_test['Description']

    if with_features == True:
        X_train_features = df_train.drop(columns=['Description', 'label', 'CVE_ID'])
        X_test_features = df_test.drop(columns=['Description', 'label', 'CVE_ID'])

        X_train = pd.concat([X_train_descriptions,X_train_features],axis=1)
        X_test = pd.concat([X_test_descriptions,X_test_features],axis=1)
    else:
        X_train = X_train_descriptions
        X_test = X_test_descriptions

    Y_train = df_train['label'].apply(lambda x:0 if x==4 else 1).values
    Y_test = df_test['label'].apply(lambda x:0 if x==4 else 1).values
   
    return X_train,Y_train,X_test,Y_test



for task_id in task_list:
    X_train,Y_train,X_test,Y_test = prepare_data(task_id, with_features=False)
    #train_model_svm(X_train,Y_train,X_test,Y_test)
    X_train,Y_train,X_test,Y_test = prepare_data(task_id, with_features=True)
    train_model_svm_with_features(X_train,Y_train,X_test,Y_test)



                                           Description  AFCB  AFC  AFBC  AFCF  \
0    (/home/seviezhou/ffmpeg/ffmpeg+0x29497f0) Shad...     0    0     0     0   
1    > [ 1 - libavformat/http.c ] > > After executi...     0    0     0     0   
2    > [ 2 - libavformat/rtmppkt.c ] > > Issue is c...     0    0     0     0   
3    > [ 3 - ffserver.c ] > > This issue is complet...     0    0     0     0   
4    > After a bit of reverse engineering of RTMP p...     0    0     0     0   
..                                                 ...   ...  ...   ...   ...   
522  When a crafted MXF file, which claims a large ...     0    0     0     0   
523  When a crafted NSV file, which claims a large ...     0    0     0     0   
524  When a crafted RL2 file, which claims a large ...     0    0     0     0   
525  While ffmpeg calculating ?????bytestream_end??...     0    0     0     0   
526  zmbvenc allocates a buffer for a picture with ...     0    0     0     0   

     AFAM  AFF  AFR  AFMS  

In [57]:
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from torch.utils.data import TensorDataset, Dataset, DataLoader, RandomSampler, SequentialSampler

In [None]:
from torch.optim import AdamW
import torch
from torch.nn.utils.rnn import pad_sequence
from transformers import get_linear_schedule_with_warmup
import gc
from torch.utils.data import TensorDataset, Dataset, DataLoader, RandomSampler, SequentialSampler
from torch import nn


class TextDataset(Dataset):
    def __init__(self,df,tokenizer,max_len = 512):
        self.tokenizer = tokenizer
        self.df = df
        self.max_len = max_len
        self.texts = df['Description']
        self.labels =  df['label'].apply(lambda x:0 if x==4 else 1).values
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self,index):
        text = self.texts[index]
        labels = self.labels[index]
        encoding = self.tokenizer(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_attention_mask=True,
            return_tensors='pt',
            truncation=True,
            padding=False 
        )
        return {
            'input_ids': encoding['input_ids'].squeeze(0),  # Remove the batch dimension
            'attention_mask': encoding['attention_mask'].squeeze(0),
            'labels': torch.tensor(labels, dtype=torch.long)
        }
    

class TextFeatureDataset(Dataset):
    def __init__(self,df,tokenizer,max_len = 512):
        
        self.tokenizer = tokenizer
        self.df = df
        feature_cols = [col for col in df.columns if col not in ['Description', 'CVE_ID', 'label']]

        self.max_len = max_len

        self.features = df[feature_cols].astype(float)
        self.texts = df['Description']
        self.labels =  df['label'].apply(lambda x:0 if x==4 else 1).values
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self,index):
        text = self.texts[index]
        labels = self.labels[index]
        features = self.features.iloc[index]
        encoding = self.tokenizer(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_attention_mask=True,
            return_tensors='pt',
            truncation=True,
            padding=False 
        )
        return {
            'input_ids': encoding['input_ids'].squeeze(0),  # Remove the batch dimension
            'attention_mask': encoding['attention_mask'].squeeze(0),
            'features': torch.tensor(features, dtype=torch.float),
            'labels': torch.tensor(labels, dtype=torch.long)
        }    

class BertClassifierWithFeatures(nn.Module):
    def __init__(self,bert_model_name='bert-base-uncased',num_additional_features=16, num_classes=2):
        super().__init__()
        self.bert = BertModel.from_pretrained(bert_model_name,num_labels=num_classes)
        self.classifier = nn.Linear(self.bert.config.hidden_size + num_additional_features, num_classes)

    def forward(self,input_ids,attention_mask,features):
        outputs = self.bert(input_ids,attention_mask = attention_mask)
        cls_embedding = outputs.last_hidden_state[:, 0, :]
        combined = torch.cat((cls_embedding, features), dim=1)
        logits = self.classifier(combined)
        return logits

def collate_fn(batch):
    input_ids = [item['input_ids'] for item in batch]
    attention_mask = [item['attention_mask'] for item in batch]
    labels = [item['labels'] for item in batch]
    features = [item['features'] for item in batch]

    input_ids = pad_sequence(input_ids, batch_first=True, padding_value=0)
    attention_mask = pad_sequence(attention_mask, batch_first=True, padding_value=0)
    labels = torch.stack(labels)
    features = torch.stack(features)


    return {
        'input_ids': input_ids,
        'attention_mask': attention_mask,
        'features': features,
        'labels': labels
    }

def train_model(model, train_loader, device, optimizer, scheduler, epochs=3):
    model.to(device)

    for epoch in range(epochs):
        model.train()
        for i,batch in enumerate(train_loader):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            features = batch['features'].to(device)
            model.zero_grad()
            outputs = model(input_ids,attention_mask = attention_mask,features = features)
            loss = nn.CrossEntropyLoss()(outputs, labels)
            loss.backward()
            optimizer.step()
            scheduler.step()
            
            if i % 20 == 0 :
                print(f'Epoch:{epoch} Step:{i}/{len(train_loader)} Loss:{loss.item()}')
            
def evaluate_model(model, test_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    n_correct = 0
    n_samples = 0
    predictions, true_labels = [], []
    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids,attention_mask = attention_mask)
            preds = torch.argmax(outputs.logits,dim = 1)
            predictions.extend(preds.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())

            n_samples += labels.shape[0]
            n_correct += (preds == labels).sum().item()
            print(n_correct)
    print(predictions,true_labels)
    print(100* n_correct/ n_samples)
    print(classification_report(true_labels, predictions, target_names=['Negative', 'Positive'], digits=4))
    return 100* n_correct/ n_samples
            


def run_training(model_name,model_class,tokenizer,df_train,df_val,epochs = 3,learning_rate = 5e-5):
    gc.collect()
    
    torch.cuda.empty_cache()      # releases cached GPU memory back to the driver

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    train_dataset = TextFeatureDataset(df_train,tokenizer)
    val_dataset = TextFeatureDataset(df_val,tokenizer)

    train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=8, collate_fn=collate_fn)
    if model_class == AutoModelForSequenceClassification:
        model = model_class.from_pretrained(model_name, num_labels=2)
    else: 
        model = model_class(model_name, num_classes=2)
    optimizer = AdamW(model.parameters(), lr=learning_rate)
    total_steps = len(train_loader) * epochs

    scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

    train_model(model, train_loader, device, optimizer, scheduler, epochs)
   # accuracy = evaluate_model(model, val_loader, device)
    return model
    

    


In [110]:
from transformers import AutoModel, AutoTokenizer
from transformers import AutoModelForSequenceClassification
from transformers import BertTokenizer, BertModel


model_name = 'bert-base-uncased'
model_class = AutoModelForSequenceClassification
tokenizer = AutoTokenizer.from_pretrained(model_name)


def prepare_data(task_id,with_features = False):    
    folder_train_path = train_path + '/' + task_id

    all_train_files = []
    for root, dirs, files in os.walk(folder_train_path):
        for file in files:
            full_path = os.path.join(root, file)
            all_train_files.append(full_path)

    dfs = [pd.read_csv(file) for file in all_train_files]

    folder_test_path = test_path + '/' + task_id

    all_test_files = []
    for root, dirs, files in os.walk(folder_test_path):
        for file in files:
            full_path = os.path.join(root, file)
            all_test_files.append(full_path)

    dfs_train = [pd.read_csv(file) for file in all_train_files]

    dfs_test = [pd.read_csv(file) for file in all_test_files]

    df_train = pd.concat(dfs_train, ignore_index=True)

    df_test  = pd.concat(dfs_test, ignore_index=True)

    return df_train,df_test

for task_id in task_list:
    df_train ,df_test = prepare_data(task_id, with_features=False)


    #model = run_training(model_name,model_class,tokenizer,df_train,df_test)
    val_dataset = TextDataset(df_test,tokenizer)
    val_loader = DataLoader(val_dataset, batch_size=8, collate_fn=collate_fn)

    #evaluate_model(model=model,test_loader=val_loader)

    model_class = BertClassifierWithFeatures
    model = run_training(model_name,model_class,tokenizer,df_train,df_test)

  'features': torch.tensor(features, dtype=torch.float),


ValueError: too many dimensions 'str'