# The Failure Log Classifier

## Functions

In [1]:
import pandas as pd
import numpy as np
import statistics
import operator
from collections import Counter
from pathlib import Path
import xml.etree.ElementTree as ET
import filecmp
from lxml import etree
import subprocess
import os
import sklearn as sk
from sklearn.model_selection import KFold, StratifiedKFold
from sklearn.tree import DecisionTreeClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import classification_report, f1_score, precision_score,recall_score, confusion_matrix,precision_recall_fscore_support
from imblearn.over_sampling import SMOTE
from sklearn.tree import export_text
import math
from sklearn.feature_selection import mutual_info_classif
from tqdm import tqdm
from sklearn import tree
from functools import reduce
from sklearn.preprocessing import OneHotEncoder
import warnings
warnings.filterwarnings('ignore')
from sklearn.feature_extraction.text import TfidfVectorizer

In [2]:
def getFilesByEndsWith(fileDir,ends):
    filesList = []
    for path, subdirs, files in os.walk(fileDir):
        for file in files:
            if (file.endswith(ends)):
                filesList.append(os.path.join(path,file))
    return filesList

def getFilesByStartsWith(fileDir,starts):
    filesList = []
    for path, subdirs, files in os.walk(fileDir):
        for file in files:
            if (file.startswith(starts)):
                filesList.append(os.path.join(path,file))
    return filesList

def readXMLFile(filePath):
    parser = etree.XMLParser(strip_cdata=False,recover=True)
    with open(filePath, "rb") as source:
        tree = etree.parse(source, parser=parser)
    xmlroot = tree.getroot()
    return xmlroot


def getTestNames(allTestNames):
    testDict = {}
    i = 1000
    for e in allTestNames:
        testDict[e] = i
        i = i + 1
    return testDict

def get_scores (tn,fp,fn,tp):
    if(tp==0):
        accuracy = (tp+tn)/(tn+fp+fn+tp)
        Precision = 0
        Recall = 0
        F1 = 0    
    else:
        accuracy = (tp+tn)/(tn+fp+fn+tp)
        Precision = tp/(tp+fp)
        Recall = tp/(tp+fn)
        F1 = 2*((Precision*Recall)/(Precision+Recall))    
    return accuracy, F1, Precision, Recall

def get_rates(tn,fp,fn,tp):
    tpr = tp/(tp+fn)
    fpr = fp/(tn+fp)
    return tpr, fpr


def getResultTag(failuresPerProject,uniqueFailureFilter,onlyFailureWithMutantsFilter,balance):
    resultTag = ''
    if (balance):
        resultTag = resultTag+"Balance"
    else:
        resultTag = resultTag+"NotBalance"

    if (uniqueFailureFilter):
        resultTag = resultTag +'+NoDuplicate'
    else:
        resultTag = resultTag +'+Duplicate'
        
    if (onlyFailureWithMutantsFilter):
        failuresPerProject = failuresPerProject[failuresPerProject["HasMutants"]>0]
        resultTag = resultTag +'+FailuresWithMutants'
    else:
        resultTag = resultTag +'+allFailures'
    
    return resultTag,failuresPerProject
    

def oneHotEncodingFeatures(dataSet,encoderColumn):
    ohe = OneHotEncoder()

    dataSetEncoded = dataSet[[encoderColumn]]
    ohe.fit(dataSetEncoded)

    dataSetEncoded_ohe = ohe.transform(dataSetEncoded).toarray()
    encoded_df = pd.DataFrame(dataSetEncoded_ohe, columns=ohe.get_feature_names(dataSetEncoded.columns))

    concatResult = pd.concat([encoded_df,dataSet.loc[:, dataSet.columns != encoderColumn]], axis=1)
    return concatResult

def getConfusionMatrixPerRow(dataset,confusionMatrixTracker,project,resultTag,dfColumnet):    
    ConfResult = []
    for resultType in set(confusionMatrixTracker.values()):
        indPerType = [k for k,v in confusionMatrixTracker.items() if v == resultType]
        perResult = dataset.iloc[indPerType]
        ExceptionsList = perResult['FailureException'].values.tolist()
        for exception in set(ExceptionsList):
            ConfResult.append([project,resultTag,resultType,exception,ExceptionsList.count(exception),(ExceptionsList.count(exception)/len(ExceptionsList))*100])
        
    ConfResultDF = pd.DataFrame(ConfResult,columns=dfColumnet)
    return ConfResultDF

def PredictFailure(data,balance,targetCol,classifier):
    data_target = data[[targetCol]]
    data = data.drop([targetCol], axis=1)
    if ('index' in data.columns):
        data = data.drop(['index'], axis=1)

    # The type of k-fold is StratifiedKFold to ensure each fold has flaky failures. 
    fold = StratifiedKFold(n_splits=10,shuffle=True)

    TN = FP = FN = TP = 0
    confusionMatrixTracker = {}

    for train_index, test_index in fold.split(data,data_target):
        x_train, x_test = data.iloc[list(train_index)], data.iloc[list(test_index)]
        y_train, y_test = data_target.iloc[list(train_index)], data_target.iloc[list(test_index)]

        if(balance):
            oversample = SMOTE()
            x_train, y_train = oversample.fit_resample(x_train, y_train)
        if (classifier=="DT"):
            model = DecisionTreeClassifier(criterion='entropy', max_depth = None)
        elif(classifier == "NB"):
            model = GaussianNB()
        trained_model = model.fit(x_train, y_train)
        preds = trained_model.predict(x_test)

        # Export the DT as text
        #print (tree.export_text(model,feature_names=data[1,2,3,4,5,6]))
        
        # Track Which one is FP, FN, TP ( others are TN)
        y_testList = y_test[targetCol].values.tolist()
        predsList = preds.tolist()
        for counter in range (0,len(test_index)):
            if (predsList[counter] == 1 & y_testList[counter]==1):
                confusionMatrixTracker[test_index[counter]] = 'TP'
            elif ((predsList[counter] == 1) & (y_testList[counter]==0)):
                confusionMatrixTracker[test_index[counter]] = 'FP'
            elif ((predsList[counter] == 0) & (y_testList[counter]==1)):
                confusionMatrixTracker[test_index[counter]] = 'FN'

        tn, fp, fn, tp = confusion_matrix(y_test, preds, labels=[0,1]).ravel()
        TN = TN + tn
        FP = FP + fp
        FN = FN + fn
        TP = TP + tp
    accuracy, F1, Precision, Recall = get_scores (TN,FP,FN,TP)
    tpr, fpr = get_rates(TN,FP,FN,TP)
    return [TN+FP+FN+TP,TP,FN,FP,TN,Precision*100, Recall*100,F1*100,accuracy*100,tpr*100, fpr*100],confusionMatrixTracker


def getClassificationResult(failuresDataset,datasetColumns,targetCol,classifier):
    resultColumns = ['ResultTag','Project','Total','TP','FN','FP','TN','P','R','F1','Ac','TPR','FPR']
    binaryResult = [True,False] # To consider the repetition of failures from different mutants + Consider the fact that some tests have no mutants + to balance or not balance.
    resultDF = pd.DataFrame(columns=resultColumns)

    # Final target tags
    targetTags = ["NotBalance+Duplicate+FailuresWithMutants","Balance+Duplicate+FailuresWithMutants"]

    # for confusion matrix analysis 
    defColumns = ['ResultTag','Project','PredictResultTag','FailureException','FailureExceptionFreq','FailureExceptionPercentage']
    confusionDF = pd.DataFrame(columns=defColumns)
    for uniqueFailureFilter in binaryResult:
        for onlyFailureWithMutantsFilter in binaryResult:
            for balance in binaryResult:
                for project in failuresDataset['Project'].unique():
                    failuresPerProject = failuresDataset[failuresDataset['Project']==project]
                    # Result tag:
                    resultTag,failuresPerProject = getResultTag(failuresPerProject,uniqueFailureFilter,onlyFailureWithMutantsFilter,balance)
                    
                    # # temp step :
                    if (resultTag in targetTags):
                        # this to ensure that we have enought flaky failures + have at least one mutant
                        if (failuresPerProject[targetCol].sum()>10 and 'mutant' in failuresPerProject['FailureType'].unique()):
                            trainingFailures = failuresPerProject[datasetColumns].reset_index()
                            # OneHotEncode ... for FailureException or Test
                            if ('FailureException' in datasetColumns):
                                trainingFailures = oneHotEncodingFeatures(trainingFailures,'FailureException')
                            if ('Test' in datasetColumns):
                                trainingFailures = oneHotEncodingFeatures(trainingFailures,'Test')

                            # Predict ...  
                            predictionResult,confusionMatrixTracker = PredictFailure(trainingFailures,balance,targetCol,classifier)
                            predictionResult.insert(0,project)
                            predictionResult.insert(0,resultTag)

                            # Get the distribution of confusion matrix per result
                            confusionMatrixTrackerPerProject = getConfusionMatrixPerRow(failuresPerProject,confusionMatrixTracker,project,resultTag,defColumns)
                            confusionDF = confusionDF.append(confusionMatrixTrackerPerProject)

                            resultDF = resultDF.append(pd.Series(predictionResult, index=resultDF.columns ), ignore_index=True)
                        else:
                            resultDF = resultDF.append(pd.Series([resultTag,project,0,0,0,0,0,0,0,0,0,0,0], index=resultDF.columns ), ignore_index=True)
    return resultDF, confusionDF

## Main

### Preprocess the dataset

In [3]:
output = 'Result'
xmlSummaryDir = 'Path-to-summary-files' # add the path to the 22 java projects (ICST dataset)
parser = etree.XMLParser(strip_cdata=False,recover=True)

In [4]:
csvFiles = getFilesByEndsWith(xmlSummaryDir,'FeaturesPerTest.csv') # use FeaturesPerTestAll.csv to count all failures .. 
failureFeatures = pd.concat([pd.read_csv(f,index_col=False) for f in csvFiles])

# Temp Part for cleaning the columns:
UnnamedColumns = [col for col in failureFeatures.columns if col.startswith('Unnamed')]
failureFeatures = failureFeatures.drop(columns=UnnamedColumns)

if ('CUTnStackTrace' in failureFeatures.columns):
    failureFeatures = failureFeatures.drop(columns=['CUTnStackTrace'])

# normalized failure Status + FailureType
failureFeatures['FailureStatusCode'] = [1 if x =='FLAKY' else 0 for x in failureFeatures['FailureStatus']]
failureFeatures['FailureTypeCode'] = [1 if x =='test' else 0 for x in failureFeatures['FailureType']]


# Remove flaky mutants .. 
execludeFlakyMutant = True
if (execludeFlakyMutant):
    failureFeatures = failureFeatures[~((failureFeatures['FailureType']=='mutant')&(failureFeatures['FailureStatus']=='FLAKY'))]

ignoredTests =[]
for t in failureFeatures['Test'].unique():
    perTest = failureFeatures[failureFeatures['Test']==t]
    if (len(perTest['FailureType'].unique())==1):
        ignoredTests.append(t)
failureFeaturesAll = failureFeatures[~failureFeatures['Test'].isin(ignoredTests)]


In [5]:
data = pd.read_csv('Result/PerFailureResult.csv')
failureFeaturesFlaky = failureFeaturesAll[failureFeaturesAll['FailureType']=='test']
failureFeaturesMutants = failureFeaturesAll[failureFeaturesAll['FailureType']=='mutant']


data['TargetFailures'] =  data['Test'] + '|' + data['TestID'].astype(str)
failureFeaturesFlaky['TargetFailures'] = failureFeaturesFlaky['Test'] + '|' + failureFeaturesFlaky['FailureId'].astype(str)

TargetFailureFeatures = failureFeaturesFlaky[failureFeaturesFlaky['TargetFailures'].isin(data['TargetFailures'].unique())]

failureFeatures = TargetFailureFeatures.append(failureFeaturesMutants, ignore_index=True)

### Prediction Part

In [6]:
TrainingColumns = ['FailureStatusCode','FailureException', 'TestNameInStackTrace', 'ClassNameInStackTrace','otherTestClassInStackTrace', 'JunitInStackTrace', 'CUTinStackTrace']
FlakeFlaggerPredict,FlakeFlaggerConfusionDF = getClassificationResult(failureFeatures,TrainingColumns,'FailureStatusCode','DT')
FlakeFlaggerPredictNB,FlakeFlaggerConfusionDFNB = getClassificationResult(failureFeatures,TrainingColumns,'FailureStatusCode','NB')


# Label this result .. 
FlakeFlaggerPredict.insert (0, "Dataset", "FlakeFlagger")
FlakeFlaggerPredict.insert (1, "ResultType", "mainPrediction")
FlakeFlaggerConfusionDF.insert (0, "Dataset", "FlakeFlagger")
FlakeFlaggerConfusionDF.insert (1, "ResultType", "mainPrediction")

FlakeFlaggerPredictNB.insert (0, "Dataset", "FlakeFlagger")
FlakeFlaggerPredictNB.insert (1, "ResultType", "mainPredictionNB")

FlakeFlaggerPredict[FlakeFlaggerPredict['ResultTag']=="Balance+Duplicate+FailuresWithMutants"].sort_values(by=['Total'], ascending=False)
FlakeFlaggerPredict[FlakeFlaggerPredict['ResultTag']=="NotBalance+Duplicate+FailuresWithMutants"].sort_values(by=['Total'], ascending=False)


FlakeFlaggerPredict.to_csv(output+'/DT_FinalClassifierResult.csv',index=False)
FlakeFlaggerPredictNB.to_csv(output+'/NB_FinalClassifierResult.csv',index=False)