# Data Pre-processing

In [1]:
import os

from utils import all_paths_exist

In [2]:
CODE_DIR_NAME = 'split_cpps'
LABEL_DIR_NAME = 'split_labels'

file_dir = os.path.dirname(os.path.realpath('__file__'))
proj_dir = os.path.dirname(file_dir)

code_dir = f'{proj_dir}/{CODE_DIR_NAME}'
label_dir = f'{proj_dir}/{LABEL_DIR_NAME}'

if not all_paths_exist([code_dir, label_dir]):
    err = f'Missing "{CODE_DIR_NAME}" & "{LABEL_DIR_NAME}" under "{proj_dir}"'
    raise Exception(err)

In [3]:
file_names = []
extension_tracker = dict()

for _, _, files in os.walk(code_dir):
    for file in files:
        try:
            if file[-3:] == '.cc':
                name = file[:-3]
                file_names.append(name)
                extension_tracker[name] = 'cc'
            elif file[-4:] == '.cpp':
                name = file[:-4]
                file_names.append(name)
                extension_tracker[name] = 'cpp'
        except:
            # Exceptions likely occur due to the filename being less than 3/4 
            # chars long, so we can skip since they cannot be the code files
            # we're looking for.
            continue
        
print(file_names)

['additional_functions_0', 'additional_functions_1', 'addrdb_0', 'addrdb_1', 'addrdb_10', 'addrdb_2', 'addrdb_3', 'addrdb_4', 'addrdb_5', 'addrdb_6', 'addrdb_7', 'addrdb_8', 'addrdb_9', 'addresstype_0', 'addresstype_1', 'addresstype_10', 'addresstype_11', 'addresstype_12', 'addresstype_13', 'addresstype_14', 'addresstype_15', 'addresstype_16', 'addresstype_17', 'addresstype_18', 'addresstype_19', 'addresstype_2', 'addresstype_20', 'addresstype_21', 'addresstype_22', 'addresstype_3', 'addresstype_4', 'addresstype_5', 'addresstype_6', 'addresstype_7', 'addresstype_8', 'addresstype_9', 'addrman_0', 'addrman_1', 'addrman_10', 'addrman_11', 'addrman_12', 'addrman_13', 'addrman_14', 'addrman_15', 'addrman_16', 'addrman_17', 'addrman_18', 'addrman_19', 'addrman_2', 'addrman_20', 'addrman_21', 'addrman_22', 'addrman_23', 'addrman_24', 'addrman_25', 'addrman_26', 'addrman_27', 'addrman_28', 'addrman_29', 'addrman_3', 'addrman_30', 'addrman_31', 'addrman_32', 'addrman_33', 'addrman_34', 'addrman

In [4]:
def set_dataset_dir(new_dir: str, base_path: str = proj_dir)->str:
    dir_path = f'{base_path}/datasets/{new_dir}'
    
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)
        
    return dir_path

In [5]:
import numpy as np

from sklearn.model_selection import train_test_split

TEST_SIZE = 0.10
SEED = 42

def create_train_test_split(label_filter: str, 
                            line_list: list[str], 
                            code_list: list[str], 
                            label_list: list[str])->tuple:
    
    filtered_list_idx = label_list == label_filter
    
    filtered_line_list = line_list[filtered_list_idx, np.newaxis]
    filtered_code_list = code_list[filtered_list_idx, np.newaxis]
    filtered_label_list = label_list[filtered_list_idx, np.newaxis]
    filtered_combined_list = np.concatenate((filtered_line_list, 
                                             filtered_code_list,
                                             filtered_label_list), axis=1)
    
    (combined_traineval, 
     combined_test, 
     label_traineval, _) = train_test_split(filtered_combined_list, 
                                            filtered_label_list, 
                                            test_size=TEST_SIZE, 
                                            random_state=SEED)
    
    (combined_train, 
     combined_eval, _, _) = train_test_split(combined_traineval, 
                                             label_traineval, 
                                             test_size=TEST_SIZE, 
                                             random_state=SEED)
    
    return combined_train, combined_eval, combined_test

In [6]:
def recombine_se_sef(se: np.ndarray, sef: np.ndarray)->np.ndarray:
    return np.concatenate((se, sef), axis=0)

In [None]:
import pandas as pd
import tree_sitter_cpp as tscpp

from tree_sitter import Language, Parser

LINE_DATASET = 'line_dataset'
CUTOFF_DATASET = 'cutoff_dataset'
AST_DATASET = 'ast_dataset'
AST_NO_CODE_DATASET = 'ast_no_code_dataset'
TEST_DATASET = 'test_dataset'
COL_NAMES = ['line', 'code', 'label']

parser = Parser(Language(tscpp.language()))

def create_data_files(dataset_name: str, input_type: str = None)->None:
    dataset_dir = set_dataset_dir(dataset_name)
    if input_type == None:
        print('in')
        input_type = dataset_name
    container = []

    for name in file_names:
        cpp_file = f'{code_dir}/{name}.{extension_tracker[name]}'
        txt_file = f'{label_dir}/{name}.txt'
        
        if not all_paths_exist([cpp_file, txt_file]):
            raise Exception(f'Could not find {cpp_file} and {txt_file}')
        
        with (open(cpp_file, 'r') as code, 
            open(txt_file, 'r') as labels):
            code_lines = code.read().splitlines()
            label_lines = labels.readlines()
            
            if len(code_lines) != len(label_lines):
                raise Exception(f'Length mismatch for {name}.')
            
            for i in range(len(code_lines)):
                curr_label = int(label_lines[i])
                
                if dataset_name == LINE_DATASET:
                    curr_line = f'Line {i+1}: {code_lines[i].strip()}'
                    stripped_code = [line.strip() for line in code_lines]
                    curr_code_block = '\t'.join(stripped_code)
                    
                elif dataset_name == CUTOFF_DATASET:
                    curr_line = 'Side effect free?'
                    stripped_code = [line.strip() for line in code_lines[:i+1]]
                    curr_code_block = '\t'.join(stripped_code)
                    
                elif dataset_name == AST_DATASET:
                    stripped_code = [line.strip() for line in code_lines[:i+1]]
                    tree = parser.parse(bytes('\n'.join(stripped_code), 
                                              encoding='utf-8'))
                    curr_line = str(tree.root_node)
                    curr_code_block = '\t'.join(stripped_code)
                    
                elif dataset_name == AST_NO_CODE_DATASET:
                    stripped_code = [line.strip() for line in code_lines[:i+1]]
                    tree = parser.parse(bytes('\n'.join(stripped_code), 
                                              encoding='utf-8'))
                    curr_line = str(tree.root_node)
                    curr_code_block = 'na'
                    
                else:
                    raise Exception(f'Dataset name "{dataset_name}" unknown.')
                
                write_str = [curr_line, curr_code_block, curr_label]
                container.append(write_str)
      
    container = np.array(container)
    line_list = container[:, 0]
    code_list = container[:, 1]
    label_list = container[:, 2]
    
    # Partitioning the dataset before splitting is necessary
    # To ensure that the train, eval, and test splits
    # All contain side effect functions
    (se_combined_train, 
     se_combined_eval, 
     se_combined_test) = create_train_test_split('1', 
                                                 line_list, 
                                                 code_list, 
                                                 label_list)
    
    (sef_combined_train, 
     sef_combined_eval, 
     sef_combined_test) = create_train_test_split('0', 
                                                  line_list, 
                                                  code_list, 
                                                  label_list)
    
    # Recombine post-split
    combined_train = recombine_se_sef(se_combined_train, sef_combined_train)
    combined_eval = recombine_se_sef(se_combined_eval, sef_combined_eval)
    combined_test = recombine_se_sef(se_combined_test, sef_combined_test)
    
    # Save to respective files
    train_df = pd.DataFrame(combined_train, columns=COL_NAMES)
    train_df.to_csv(f'{dataset_dir}/train.csv', index=False)
    
    eval_df = pd.DataFrame(combined_eval, columns=COL_NAMES)
    eval_df.to_csv(f'{dataset_dir}/eval.csv', index=False)
    
    test_df = pd.DataFrame(combined_test, columns=COL_NAMES)
    test_df.to_csv(f'{dataset_dir}/test.csv', index=False)


In [None]:
import time

start_time = time.time()

create_data_files(LINE_DATASET)
line_time = time.time()
print(f'Time to finish line_dataset: {line_time - start_time} s')

create_data_files(CUTOFF_DATASET)
cutoff_time = time.time()
print(f'Time to finish cutoff_dataset: {cutoff_time - line_time} s')

create_data_files(AST_DATASET)
ast_time = time.time()
print(f'Time to finish ast_dataset: {ast_time - cutoff_time} s')

create_data_files(AST_NO_CODE_DATASET)
no_code_time = time.time()
print(f'Time to finish ast_no_code_dataset: {no_code_time - ast_time} s')