## TODO


### Imports

In [1]:
import pandas
import numpy as np
from ipywidgets import IntProgress
from IPython.display import display
np.set_printoptions(suppress=True)
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
plt.rcParams['figure.figsize'] = [8,5]
np.set_printoptions(suppress=True)

import os
import shutil
from matplotlib.ticker import StrMethodFormatter, NullFormatter
import dictdiffer

from tqdm.notebook import tnrange as nrange
from tqdm.notebook import tqdm
from datetime import datetime

In [2]:
def set_pandas_display_options() -> None:
    """Set pandas display options."""
    # Ref: https://stackoverflow.com/a/52432757/
    display = pandas.options.display

    display.max_columns = 1000
    display.max_rows = 100
    display.max_colwidth = 199
    display.width = None
    display.float_format = '{:.2f}'.format
    # display.precision = 2  # set as needed
set_pandas_display_options()

In [3]:
path = "E:\\Studium\\10_Semester\\Masterarbeit\\Deployment\\Kafka\\results\\setup"
output = "E:\\Studium\\10_Semester\\Masterarbeit\\Deployment\\Kafka\\eval\\"
experiment = 0
path = path + str(experiment) + os.path.sep
output = output + str(experiment) + os.path.sep

## Filter

In [4]:
def dropFirstXRows(input, x):
    return input.drop(np.arange(0,x))

def removeNaN(array, array2):
    helper = np.logical_not(np.isnan(array))
    return array[helper], array2[helper]

In [5]:
def filterLastData(generated, received):
    lastOffset = received["Kafka.Offset"].to_numpy()[-1]
    return generated[generated["Kafka.Offset"] <= lastOffset]

In [6]:
def extractDate(line):
    return int(datetime.strptime(" ".join(line.split(" ")[0:2]),"%Y-%m-%d %H:%M:%S").timestamp()) * 1000

def transitionsFromFile(path):
    f = open(path, "r")
    start = False
    end = False
    name = None
    last = 0
    transitions = []
    for line in f.readlines():
        if(line.startswith("Starting transition to state")):
            name = line.split(" ")[4][:-1]
            start = True
        elif(line.startswith("Doing state")):
            end = True
        elif(start):
            last = extractDate(line)
            start = False
        elif(end):
            transitions.append((name, last, extractDate(line)))
            end = False
    f.close()
    return transitions

In [7]:
def loadData(path):
    
    names = []
    produced_generated = []
    produced_recieved = []
    filtered = []
    warnings = []
    modelchange = []
    firstTimestamp = []
    producerRun = []
    transitions = {}
    
    runs = [x for x in os.listdir(path)]
    producerByRun = []
    for r in runs:
        pathHelper = path + r + str(os.path.sep) + "data" + str(os.path.sep)
        producer = [pathHelper + x + str(os.path.sep) for x in os.listdir(pathHelper)]
        producerByRun.append(producer)
        transitions[r] =  transitionsFromFile(path + ".." + str(os.path.sep) + "runs" + str(os.path.sep) + "exp" + str(experiment) + "-" + r + ".log")

        for prod in tqdm(producer):
            
            producerRun.append(r)
            
            #print(prod)
            time = os.listdir(prod)[0]
            topic = list(filter(lambda x : len(x) == 6, os.listdir(prod + time + str(os.path.sep))))[0][:-4]
            
            dataPath = prod  + str(os.path.sep) + time + str(os.path.sep) + topic
            
            produced_generated_df = pandas.read_csv(dataPath + "_produced.csv")
            produced_recieved_df = pandas.read_csv(dataPath + ".csv")

            produced_generated_df = filterLastData(produced_generated_df, produced_recieved_df)

            filtered_df = pandas.read_csv(dataPath + "_filtered.csv")
            warnings_df = pandas.read_csv(dataPath + "_warnings.csv")
            modelchange_df = pandas.read_csv(dataPath + "_modelchange.csv")     
            
            #remove first 60100 elements ( around 5 minutes)
            
            modelchange_df = modelchange_df[modelchange_df["producedElements"] > 60101]
            
            producedFilter = produced_generated_df["ProducedElements"] <= 60100 # last point of anomaly
                       
            offsets = produced_generated_df[producedFilter]["Kafka.Offset"].to_numpy()
            produced_generated_df = produced_generated_df[np.logical_not(producedFilter)]

            producedReceivedFilter = np.isin(produced_recieved_df["Kafka.Offset"].to_numpy(), offsets)
            produced_recieved_df = produced_recieved_df[np.logical_not(producedReceivedFilter)]
            
            filteredFilter = np.isin(filtered_df["Data.Offset"].to_numpy(), offsets)
            offsets = filtered_df["Kafka.Offset"][filteredFilter].to_numpy()
            filtered_df = filtered_df[np.logical_not(filteredFilter)]
            
            warningFilter = np.isin(warnings_df["Record.BeginOffset"], offsets)
            warnings_df = warnings_df[np.logical_not(warningFilter)]
            
            firstTimestampValue = produced_generated_df.iloc()[0][3]
            
            produced_generated.append(produced_generated_df)
            produced_recieved.append(produced_recieved_df)
            filtered.append(filtered_df)
            warnings.append(warnings_df)
            modelchange.append(modelchange_df)
            firstTimestamp.append(firstTimestampValue)
            
            names.append("run_" + r + "_" + topic)

    return names, produced_generated, produced_recieved, filtered, warnings, modelchange, firstTimestamp, transitions, producerRun

In [8]:
#load data

names, produced_generated, produced_recieved, filtered, warnings, modelchange, firstTimestamp, transitions, producerRun = loadData(path)

HBox(children=(FloatProgress(value=0.0, max=7.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=7.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=7.0), HTML(value='')))




### Helper

In [26]:
def transformTimestamp(timestamps, firstTimestamp):
    return (timestamps - firstTimestamp) / 60000

def plotDistributionOverTime(data, timestamps, path, firstTimestamp, transitions = None):
    if(transitions is not None):
        for t in transitions:
            if t[1] > firstTimestamp:
                plt.axvspan(transformTimestamp(t[1],firstTimestamp), transformTimestamp(t[2],firstTimestamp), facecolor='r', alpha=0.2)    

    plt.plot(transformTimestamp(timestamps,firstTimestamp), data)
    plt.xlabel("time in min")
    plt.ylabel("duration in ms")
    plt.savefig(path + "_over_time.pdf")
    plt.savefig(path + "_over_time.jpg", dpi = 300)
    plt.close()
    
    
def plotDistributionCumulative(data, path = None):    
    fig, ax = plt.subplots()
    sorted = np.sort(data)
    plt.xscale("log")

    plt.plot(sorted,np.linspace(0, 1,len(sorted),endpoint=True))
    plt.xlabel("duration in ms")
    plt.ylabel("cumulative frequency")
    
    
    ax.xaxis.set_major_formatter(StrMethodFormatter('{x:.0f}'))
    ax.xaxis.set_minor_formatter(NullFormatter())

    if(path is not None):
        plt.savefig(path + "_dist.pdf")
        plt.savefig(path + "_dist.jpg", dpi = 300)
        plt.close()

In [10]:
def removeDataDoingTransitionFilter(timestamps, transitions, additional = 1000):
    bitarray = None
    for t in transitions:
        a = (timestamps >= t[1] - additional) & (timestamps <= t[2] + additional)
        if bitarray is None:
            bitarray = a
        else:
            bitarray = bitarray | a
    return np.logical_not(bitarray)

def extractAvgMedStdMinMaxFromArray(diff, timestamps, path, name, firstTimestamp, transitions):
    plotDistributionOverTime(diff, timestamps, path + name, firstTimestamp, transitions)
        
    diff = diff[removeDataDoingTransitionFilter(timestamps, transitions)]
    
    plotDistributionCumulative(diff, path + name)
    
    avg, med, std, minimum, maximum, per90, per95, per99, per99 = (np.average(diff), np.median(diff), np.std(diff), np.min(diff), np.max(diff), np.percentile(diff, 90), np.percentile(diff, 95), np.percentile(diff, 99), np.percentile(diff, 99.9))
    datas = [str(x) for x in [name,avg, med, std, minimum, maximum, per90, per95, per99, per99]]
    file_object = open(path + 'values.csv', 'a')
    file_object.write(";".join(datas) + '\n')
    file_object.close()
    return diff
    #return "avg = %.2f ms; median %.2f ms; std %.2f ms; min %d ms; max %d ms; 90%% %.2f ms; 95%% %.2f ms; 99%% %.2f ms; 99.9%% %.2f ms"\
    #    %(**datas)
    
def extractAvgMedStdMinMaxFromListOfArray(inputList,path, unit = "ms"):
    diff = np.concatenate(inputList, axis = 0)
    
    plotDistributionCumulative(diff, path = path)
    datas = (np.average(diff), np.median(diff), np.std(diff), np.min(diff), np.max(diff), np.percentile(diff, 90), np.percentile(diff, 95), np.percentile(diff, 99), np.percentile(diff, 99.9))
    text = "avg = %.2f " + unit + "; median %.2f " + unit + "; std %.2f " + unit + "; min %d " + unit + "; max %d " + unit + "; 90%% %.2f " + unit + "; 95%% %.2f " + unit + "; 99%% %.2f " + unit + "; 99.9%% %.2f " + unit
    print(text %(datas))
    return diff

In [11]:
def prepareDictory(path):
    shutil.rmtree(path,ignore_errors=True)
    os.makedirs(path, exist_ok=True)
    file_object = open(path + 'values.csv', 'w')
    file_object.write(";".join(["name", "avg", "med", "std", "minimum", "maximum", "per90", "per95", "per99", "per99"]) + '\n')
    file_object.close()

### Histogram of generated data over time

consumer & produducer timestamp of the produced data

data isn't produced every 5ms instead there are peaks and lows, just the avg is 5ms

In [None]:
def plotHist(data, label, firstTimestamp, transitions, width = 250, slabel = False):
    timespan = data[-1] - data[0]
    
    if(transitions is not None):
        labeled = False
        for t in transitions:
            if t[1] > firstTimestamp:
                plt.axvspan(transformTimestamp(t[1],firstTimestamp) * 60, transformTimestamp(t[2],firstTimestamp) * 60, facecolor='r', alpha=0.2, label = "MockFog's transition phase" if not labeled else None)    
                labeled = True
    
    plt.hist((data - firstTimestamp) / 1000, bins = int(timespan / width),zorder=2, label = label)
    plt.hlines(width / 5, 0, (data[-1] - firstTimestamp) / 1000, color = "red", zorder=1, label = "expected amount of points" if slabel else None)
    #plt.title("Gaussian Histogram")
    plt.xlabel("runtime of the experiment in s")
    plt.ylabel("data points in an 250 ms intervall")
    plt.legend()

for x in nrange(len(names)):
    try:
        producedTime = produced_generated[x].to_numpy()[:,3]
        #print("Produced data distribution")
        plotHist(producedTime, "produced data points", firstTimestamp[x], transitions[producerRun[x]], slabel = True) 
        #print("Recieved produced data distribution")
        plotHist(produced_recieved[x].to_numpy()[:,0], "recieved data points", firstTimestamp[x], None)
        p = output + "produced_hist" + os.path.sep
        os.makedirs(p, exist_ok=True)
        p += names[x]
        plt.savefig(p + ".pdf")
        plt.savefig(p + ".jpg", dpi = 300)
        plt.close()
        print(names[x])
    except Exception as e:
        plt.close()
        print(e)
        print("Error", names[x], x)
        



In [24]:
def kafkaAggCount(dataList, index, label, path):
    print("How often aggregates Kafka data for", label)
    counts = []
    for x in range(len(names)):
        ts = produced_recieved[x].to_numpy()[:,index]
        ts = ts[removeDataDoingTransitionFilter(ts, transitions[producerRun[x]])]
        counts.append(np.unique([ts], return_counts = True)[1])
    a,b = np.unique(np.concatenate(counts, axis = 0), return_counts=True)
    countsSum = np.sum(b)
    np.max(b)
    print("Modus: %d with %.2f%%" %(a[np.where(b == np.max(b))][0], (np.max(b) / countsSum) * 100))
    print("One element with %.2f%%" %((b[np.where(a == 1)][0] / countsSum) * 100))
    extractAvgMedStdMinMaxFromListOfArray(counts, path, "r/s")

kafkaAggCount(produced_recieved, 0, "produced_recieved", output + "produced_records_sub")

How often aggregates Kafka data for produced_recieved
Modus: 2 with 18.87%
One element with 13.42%
avg = 9.23 r/s; median 4.00 r/s; std 15.18 r/s; min 1 r/s; max 295 r/s; 90% 40.00 r/s; 95% 46.00 r/s; 99% 57.00 r/s; 99.9% 96.00 r/s


### Information on the time difference between the arival time of records for all 3 topics

peaks caused by cpu time, it is not regulary produced

In [21]:
def avgMedStdArivalTime(df, column, path, name, firstTimestamp, transitions, filterArray = None):
    df = df.to_numpy()[:,column].astype(np.int64)
    diff = df[1:,]-df[:-1,]
    timestamps = df[1:,]
    if filterArray is not None:
        diff = diff[filterArray]
        timestamps = timestamps[filterArray]
    diff = extractAvgMedStdMinMaxFromArray(diff, timestamps, path, name, firstTimestamp, transitions)
    return diff
    

currentPath = output + "arivalTimesOfData" + os.path.sep
shutil.rmtree(currentPath,ignore_errors=True)
pathesWithoutSep = [currentPath + "produced_generated", currentPath + "produced_received", currentPath + "filtered", currentPath + "warnings"] 
pathes = [x + os.path.sep for x in pathesWithoutSep]
for x in pathes:
    prepareDictory(x)
    
pro_gen = []
pro_rec = []
fil = []
war = []
    
for x in nrange(len(names)):
    try:
        pro_gen.append(avgMedStdArivalTime(produced_generated[x], 3,pathes[0] + os.path.sep, names[x], firstTimestamp[x], transitions[producerRun[x]]))
        pro_rec.append(avgMedStdArivalTime(produced_recieved[x], 0, pathes[1] + os.path.sep, names[x], firstTimestamp[x], transitions[producerRun[x]]))
        fil.append(avgMedStdArivalTime(filtered[x], 0, pathes[2] + os.path.sep, names[x], firstTimestamp[x], transitions[producerRun[x]]))
        warningFilter = np.logical_not(np.isin(warnings[x]["Record.BeginOffset"].to_numpy(), modelchange[x]["producedElements"].to_numpy())[1:])
        war.append(avgMedStdArivalTime(warnings[x], 0, pathes[3] + os.path.sep, names[x], firstTimestamp[x], transitions[producerRun[x]], warningFilter))
    except Exception as e:
        plt.close()
        e.print_stack()
        print(e)
        print("Error", names[x], x)

print("Arival diff of generated data")
diff = extractAvgMedStdMinMaxFromListOfArray(pro_gen, pathesWithoutSep[0])
print("Percentage of 5ms %.2f" %((np.sum(diff == 5)/len(diff) * 100)))
print("Percentage of 0ms %.2f" %((np.sum(diff == 0)/len(diff) * 100)))
print("Percentage of <=10ms %.2f" %((np.sum(diff <= 10)/len(diff) * 100)))
print("Percentage of >100ms %.2f" %((np.sum(diff > 100)/len(diff) * 100)))
print()
print("Arival diff of generated data from Kafka")
extractAvgMedStdMinMaxFromListOfArray(pro_rec, pathesWithoutSep[1])
print("Arival diff of filtered data from Kafka")
extractAvgMedStdMinMaxFromListOfArray(fil, pathesWithoutSep[2])
print("Arival diff of warning data from Kafka")
_ = extractAvgMedStdMinMaxFromListOfArray(war, pathesWithoutSep[3])

HBox(children=(FloatProgress(value=0.0, max=21.0), HTML(value='')))


Arival diff of generated data
avg = 5.00 ms; median 5.00 ms; std 3.37 ms; min 0 ms; max 657 ms; 90% 6.00 ms; 95% 7.00 ms; 99% 11.00 ms; 99.9% 23.00 ms
Percentage of 5ms 57.50
Percentage of 0ms 2.55
Percentage of <=10ms 98.98
Percentage of >100ms 0.01

Arival diff of generated data from Kafka
avg = 5.00 ms; median 0.00 ms; std 28.30 ms; min 0 ms; max 1488 ms; 90% 8.00 ms; 95% 19.00 ms; 99% 207.00 ms; 99.9% 266.00 ms
Arival diff of filtered data from Kafka
avg = 5.00 ms; median 0.00 ms; std 33.94 ms; min 0 ms; max 1831 ms; 90% 0.00 ms; 95% 11.00 ms; 99% 222.00 ms; 99.9% 434.00 ms
avg = 26.40 ms; median 0.00 ms; std 73.01 ms; min 0 ms; max 1456 ms; 90% 51.00 ms; 95% 225.00 ms; 99% 350.01 ms; 99.9% 595.10 ms


## Check Produced

### How long does it take until a produced record is acknowledged by kafka

In [None]:
def kafkaAck(df, path, name, firstTimestamp, transitions):
    ack = df.to_numpy()[:,0].astype(np.int64)
    send = df.to_numpy()[:,3].astype(np.int64)
    diff = ack - send
    diff = extractAvgMedStdMinMaxFromArray(diff, send, path, name, firstTimestamp, transitions)
    return diff


currentPath = output + "producedAckByKafka"
prepareDictory(currentPath + os.path.sep)

kaf = []

for x in nrange(len(names)):
    try:
        kaf.append(kafkaAck(produced_recieved[x], currentPath + os.path.sep, names[x], firstTimestamp[x], transitions[producerRun[x]]))
    except Exception as e:
        plt.close()
        print(e)
        print("Error", names[x], x)

print("Produced acknowledged by Kafka")
_ = extractAvgMedStdMinMaxFromListOfArray(kaf, currentPath)

### check validity of produced data

In [None]:
def validate(df, heigherFields, heigherOrEqualFields, offset):
    df = df.to_numpy()#[:,0:4]
    
    lastRow = df[0]
    error = False
    
    for x in df[1:]:
            
        if(np.sum(x[heigherFields] > lastRow[heigherFields]) != len(heigherFields) or 
           np.sum(x[heigherOrEqualFields] >= lastRow[heigherOrEqualFields]) != len(heigherOrEqualFields)):
            error = True
            print("Error:")
            print(lastRow)
            print(x)
        if(not np.sum(lastRow[offset] + 1 == x[offset]) == len(offset)):
            print("Offset %i increased not by 1" %lastRow[2])
            
        lastRow = x
    return not error
        
    #f.value = 100

for x in nrange(len(names)):
    if not validate(produced_recieved[x],[2],[0,1,3],[2]):
        print("Not valid produced", names[x])
    if not validate(filtered[x],[2,3],[0,1,4],[2,3]):
        print("Not valid filtered", names[x])
    if not validate(warnings[x],[2,4,5],[0,1,6,7],[2]):
        print("Not valid warnings", names[x])

### Time between the last step and the next

In [25]:
def extractDiffBetweenTwoTables(a,b,keyA,keyB, sortBy, valueA, valueB):
    joined = a.set_index(keyA).add_prefix('a_').join(b.set_index(keyB).add_prefix('b_'))#.sort_values("b_" + sortBy, ascending = True)

    diff = joined['a_' + valueA].to_numpy() - joined['b_' + valueB].to_numpy()
    return diff, joined['b_' + valueB].to_numpy(), joined

currentPathPF = output + "receivedByFollowingTopic" + os.path.sep + "ProducedFiltered"
prepareDictory(currentPathPF + os.path.sep)
currentPathPPR = output + "receivedByFollowingTopic" + os.path.sep + "ProducedProdRec"
prepareDictory(currentPathPPR + os.path.sep)

diffListPF = []
diffListPPR = []

for x in nrange(len(names)):
    try:
        diffPF, timestamps, _ = extractDiffBetweenTwoTables(filtered[x], produced_recieved[x], 'Data.Offset', 'Kafka.Offset', "Consumer.Time", 'Consumer.Time', 'Consumer.Time')
        diffPF = extractAvgMedStdMinMaxFromArray(diffPF, timestamps, currentPathPF + os.path.sep, names[x], firstTimestamp[x], transitions[producerRun[x]])
        diffListPF.append(diffPF)
        
        diffPPR, timestamps, _ = extractDiffBetweenTwoTables(produced_recieved[x], produced_generated[x], 'Kafka.Offset', 'Kafka.Offset', "Kafka.Offset", 'Consumer.Time', 'Producer.Timestamp')
        diffPPR = extractAvgMedStdMinMaxFromArray(diffPPR, timestamps, currentPathPPR + os.path.sep, names[x], firstTimestamp[x], transitions[producerRun[x]])
        diffListPPR.append(diffPPR)
    except Exception as e:
        plt.close()
        print(e)
        print("Error", names[x], x)

        
print("Produced -> Produced Received")
extractAvgMedStdMinMaxFromListOfArray(diffListPPR, currentPathPPR)
diffPPR = np.concatenate(diffListPPR, axis = 0)
print("More than 100 ms: %.2f%%"%((np.sum(diffPPR > 100) / len(diffPPR)) * 100))
print("More lower than 8 ms: %.2f%%"%((np.sum(diffPPR < 8) / len(diffPPR)) * 100))

print("Produced -> Filtered")
_ = extractAvgMedStdMinMaxFromListOfArray(diffListPF, currentPathPF)

HBox(children=(FloatProgress(value=0.0, max=21.0), HTML(value='')))


Produced -> Produced Received
avg = 167.91 ms; median 153.00 ms; std 119.00 ms; min 11 ms; max 1693 ms; 90% 296.00 ms; 95% 397.00 ms; 99% 516.00 ms; 99.9% 846.00 ms
More than 100 ms: 62.99%
More lower than 8 ms: 0.00%
Produced -> Filtered
avg = 177.25 ms; median 155.00 ms; std 159.02 ms; min -1279 ms; max 2277 ms; 90% 362.00 ms; 95% 465.00 ms; 99% 650.00 ms; 99.9% 1084.00 ms


In [None]:
#print two together
def printTwoTogether(combineIds, namesForIds):

    indexes = [names.index(x) for x in combineIds]

    fig, ax = plt.subplots()

    plt.xscale("log")

    for x in range(len(indexes)):
        diffPPR, timestamps, _ = extractDiffBetweenTwoTables(produced_recieved[indexes[x]], produced_generated[indexes[x]], 'Kafka.Offset', 'Kafka.Offset', "Kafka.Offset", 'Consumer.Time', 'Producer.Timestamp')
        diffPPR = diffPPR[removeDataDoingTransitionFilter(timestamps, transitions[producerRun[x]])]
        #extractAvgMedStdMinMaxFromArray(diffPPR, None, None, None, None)
        sorted = np.sort(diffPPR)
        plt.plot(sorted,np.linspace(0, 1,len(sorted),endpoint=True), label = namesForIds[x])




    plt.xlabel("duration in ms")
    plt.ylabel("cumulative frequency")
    plt.legend()

    ax.xaxis.set_major_formatter(StrMethodFormatter('{x:.0f}'))
    ax.xaxis.set_minor_formatter(NullFormatter())

    cOutput = output + "receivedByFollowingTopic" + os.path.sep + "ProducedProdRec_" + "-".join(combineIds)
    plt.savefig(cOutput + ".jpg", dpi = 300)
    plt.savefig(cOutput + ".pdf")
    plt.show()

printTwoTogether(["run_2_t1", "run_2_t7"], ["good placement (t1)", "poor placement (t7)"])

Negative values are possible, a node can receive the warnings before the filtered, it depends on the position in cluster an other aspects

In [None]:
currentPathFW = output + "receivedByFollowingTopic" + os.path.sep + "FilteredRecWarning"
prepareDictory(currentPathFW + os.path.sep)

currentPathPW = output + "receivedByFollowingTopic" + os.path.sep + "ProducedRecWarning"
prepareDictory(currentPathPW + os.path.sep)

currentPathPGW = output + "receivedByFollowingTopic" + os.path.sep + "ProducedGenWarning"
prepareDictory(currentPathPGW + os.path.sep)

fw = []
pw = []
pgw = []

for x in nrange(len(names)):
    try:
        
        #In warnings Record.BeginOffset and Record.EndOffset are the related filtered offsets! There is no guarantee that these values are the same!!
        warnings_filtered_joined = warnings[x].set_index("Record.BeginOffset").join(filtered[x].add_prefix('fb_').set_index("fb_Kafka.Offset")).set_index("Record.EndOffset").join(filtered[x].add_prefix('fe_').set_index("fe_Kafka.Offset"))

        diff = (warnings_filtered_joined["Consumer.Time"] - warnings_filtered_joined["fb_Consumer.Time"]).to_numpy()
        diff, timestamps = removeNaN(diff, warnings_filtered_joined["fb_Consumer.Time"].to_numpy())
        diff = extractAvgMedStdMinMaxFromArray(diff, timestamps, currentPathFW + os.path.sep, names[x], firstTimestamp[x], transitions[producerRun[x]])
        fw.append(diff)

        diff, timestamps, prod_warning_join = extractDiffBetweenTwoTables(warnings_filtered_joined, produced_recieved[x].add_prefix('pr_'), 'fb_Data.Offset', 'pr_Kafka.Offset', "pr_Consumer.Time", 'Consumer.Time', 'pr_Consumer.Time')
        diff, timestamps = removeNaN(diff, timestamps)
        diff = extractAvgMedStdMinMaxFromArray(diff, timestamps, currentPathPW + os.path.sep, names[x], firstTimestamp[x], transitions[producerRun[x]])
        pw.append(diff)
        
        prodGenProdRec = produced_generated[x].set_index("Kafka.Offset").join(produced_recieved[x].set_index("Kafka.Offset")).reset_index()
        diff, timestamps, prod_warning_join = extractDiffBetweenTwoTables(warnings_filtered_joined, prodGenProdRec.add_prefix('pr_'), 'fb_Data.Offset', 'pr_Kafka.Offset', "pr_Consumer.Time", 'Consumer.Time', 'pr_Producer.Timestamp')
        diff, timestamps = removeNaN(diff, timestamps)
        diff = extractAvgMedStdMinMaxFromArray(diff, timestamps, currentPathPGW + os.path.sep, names[x], firstTimestamp[x], transitions[producerRun[x]])
        pgw.append(diff)
        
    except Exception as e:
        plt.close()
        print(e)
        print("Error", names[x], x)

print("Filtered -> Warning")
extractAvgMedStdMinMaxFromListOfArray(fw, currentPathFW)
print("ProducedRec -> Warning")
_ = extractAvgMedStdMinMaxFromListOfArray(pw, currentPathPW)
print("ProducedGen -> Warning")
_ = extractAvgMedStdMinMaxFromListOfArray(pgw, currentPathPGW)
diff = np.concatenate(pgw, axis = 0)
print("More than 3000 ms: %.2f%%"%((np.sum(diff > 3000) / len(diff)) * 100))
print("More than 5000 ms: %.2f%%"%((np.sum(diff > 5000) / len(diff)) * 100))

## Data loss produced to filter

In [None]:
def calcDataLoss(df1, df1OffsetColumn, df2, df2OffsetColumn):
    df1Offsets = df1.to_numpy()[:,df1OffsetColumn].astype(np.int64)
    df2Offsets = df2.to_numpy()[:,df2OffsetColumn].astype(np.int64)
    
    errors = {}
        
    count = {}
    
    for x in df1Offsets:
        
        count[x] = 0

        
        #count = np.sum(df2Offsets == x)
        
        #if(count != 1):
        #    errors[count] = errors.get(count, 0) + 1
        
    for x in df1Offsets:
        count[x] = count[x] + 1
        
    for k, v in count.items():
        if v != 1:
            errors[count] = errors.get(count, 0) + 1
    
    out = "Received records "
    losses = False
    for k, v in errors.items():
        out += "%ix for %i times; " %(k,v)
        losses = True
    
    if(losses):
        print(out)
    #else:
    #    print("The expected data was received exactly once.")
    return not losses

for x in nrange(len(names)):
    try:
        
        if not calcDataLoss(produced_generated[x], 0, filtered[x], 3) :
            print("Error", names[x])
        
    except Exception as e:
        plt.close()
        print(e)
        print("Error", names[x], x)
   

## Latency produced - warning

As the filter uses a median filter with size 5 it needs 3 outliers to detect a change. otherwise it is just skipped. ==> there must be a latency of at least (3*5ms) = 15ms

### Check all anomalies were detected

In [None]:
for x in nrange(len(names)):
    try:
        
        filterOver10 = filtered[x][filtered[x]["Data.Measurement"] > 1.0]
        filterOver10Joined = filterOver10.add_prefix('filtered_').set_index("filtered_Kafka.Offset").join(warnings[x].add_prefix('warnings_').set_index("warnings_Record.BeginOffset"))
        noBelongingWarning = filterOver10Joined[filterOver10Joined["warnings_Consumer.Time"].isna()]
        print("Found %d warnings" %(len(filterOver10)))
        if(len(noBelongingWarning) > 0):
            print("%s: For %d / %d anomalies there were no belonging warnings" %(names[x], len(noBelongingWarning),len(filterOver10)))
        
    except Exception as e:
        plt.close()
        print(e)
        print("Error", names[x], x)

In [None]:
currentPath = output + "anomalyDetectionTime"
prepareDictory(currentPath + os.path.sep)

diffList = []

for x in nrange(len(names)):
    try:
        
        #received and produced
        pr_rc = produced_generated[x].add_prefix('produced_generated_').set_index("produced_generated_Kafka.Offset").join(produced_recieved[x].add_prefix('produced_recieved_').set_index("produced_recieved_Kafka.Offset")).reset_index()

        #received and produced and filtered
        pr_rc_fi = pr_rc.set_index("produced_generated_Kafka.Offset").join(filtered[x].add_prefix('filtered_').set_index("filtered_Data.Offset")).reset_index()

        #changes and received and produced and filtered
        mc_pg_pr_fi = modelchange[x].add_prefix('modelchange_').set_index("modelchange_producedElements").join(pr_rc_fi.set_index("produced_generated_ProducedElements")).reset_index().rename(columns={'modelchange_producedElements': 'producedElements'})

        #merge all with warnings
        fullJoin = mc_pg_pr_fi.set_index("filtered_Kafka.Offset").join(warnings[x].add_prefix('warning_').set_index("warning_Record.BeginOffset")).reset_index()

        fullJoinOver10 = fullJoin[fullJoin["modelchange_value"] > 1]

        diff = (fullJoinOver10['warning_Consumer.Time'] - fullJoinOver10['produced_generated_Producer.Timestamp']).to_numpy()

        timestamps = fullJoinOver10['produced_generated_Producer.Timestamp'].to_numpy()

        if (np.isnan(diff[-1])):
            diff = diff[:-1]
            timestamps = timestamps[:-1]

        diff = extractAvgMedStdMinMaxFromArray(diff, timestamps, currentPath + os.path.sep, names[x], firstTimestamp[x], transitions[producerRun[x]])
        diffList.append(diff)
        
    except Exception as e:
        plt.close()
        print(e)
        print("Error", names[x], x)

print("Time to detect an anomaly")
_ = extractAvgMedStdMinMaxFromListOfArray(diffList, currentPath)

## Usage

In [None]:
def extractUsage(lines, server, run):
    data = {}

    date = None
    for l in lines:
        if l.startswith("Time "):
            date = int(l[5:-1])
        else:
            e = l.split(' ');
            e = list(filter(lambda x: x != "", e))
            cpu = float(e[0])
            mem = float(e[1])
            pid = int(e[2])
            time = e[3].split(":")
            secondsRunning = int(time[0]) * 3600 + int(time[1]) * 60 + int(time[2])
            name = e[4]
            container = e[5]
            up = e[6]
            
            if(name.startswith("etcd")):
                pass
            else:
                split = name.split("_")
                
                if split[1] == "POD":
                    continue
                name = split[1] + "-" +  split[2]
            
            pid_data = data.get(pid, {
                "mem": [],
                "cpu": [],
                "pid" : pid,
                "running" : [],
                "name" : name,
                "container" : container
            })

            pid_data["mem"].append(mem)
            pid_data["cpu"].append(cpu)
            pid_data["running"].append(date)

            data[pid] = pid_data

    zero_cpu = []
    for k in data:
        summ = sum(data[k]["cpu"])
        if summ == 0.0 or data[k]["name"].startswith("debug"):
            zero_cpu.append(k)

    for x in zero_cpu:
        del data[x]

    outpath = output + "usage" + os.path.sep
    
    outpathMem = outpath + "mem" + os.path.sep
    outpathCPU = outpath + "cpu" + os.path.sep

    os.makedirs(outpathMem, exist_ok=True)
    os.makedirs(outpathCPU, exist_ok=True)

    for k in data:
        cpu = data[k]["cpu"]
        running = data[k]["running"]
        plt.plot(running, cpu, label = data[k]["name"][:40])
    plt.legend() 
    plt.ylabel("CPU usage in %")
    #plt.ylim((0,5))
    plt.savefig(outpathCPU + server + "_" + str(run) + ".pdf")
    plt.savefig(outpathCPU + server + "_" + str(run) + ".jpg", dpi = 300)
    #plt.show()
    plt.close()

    for k in data:
        mem = data[k]["mem"]
        running = data[k]["running"]
        plt.plot(running, mem, label = data[k]["name"][:40])
    plt.ylabel("memory usage in %")
    plt.legend()   
    #plt.ylim((0,5))
    plt.savefig(outpathMem + server + "_" + str(run) + ".pdf")
    plt.savefig(outpathMem + server + "_" + str(run) + ".jpg", dpi = 300)
    #plt.show()
    plt.close()

In [None]:
runs = [x for x in os.listdir(path)]

for run in runs:
    p = path + run + os.path.sep + "logs" + os.path.sep + "usage" + os.path.sep
    servers = os.listdir(p)
    for server in tqdm(servers):
        file = open(p + server + os.path.sep + "ps.log", 'r') 
        lines = file.readlines() 
        extractUsage(lines, server, run)

## Responsibilities

shows already the server ID not kafka ID

In [None]:
def addToData(data, topic, current, seconds):
    #empty
    if not current:
        return data
    
    if topic in data:
        cd = data[topic]
        if cd["data"][-1] != current:
            cd["data"].append(current)
            cd["time"].append(seconds)
    else:
        data[topic] = {
            "data" : [current],
            "time" : [seconds]
        }
    
    return data

def showDiff(data):
    for t in data:
        print(t)
        print(data[t]["data"][0]) 
        if len(data[t]["data"]) > 1:
            ld = data[t]["data"][0]
            for x in range(1, len(data[t]["data"])):
                cd = data[t]["data"][x]                
                print("\x1b[31m" + str(list(dictdiffer.diff(ld,cd))) + "\x1b[0m")
                print("Minute %.1f" %(data[t]["time"][x] / 60))
                ld = cd
        print()
        
def replaceKafkaWithNode(c, pods, second):
    result = []
    for x in c:
        d = pods["kafka-" + str(x)]
        index = 0
        for y in range(len(d["time"])):
            if(d["time"][y] > second):
                break
            index = y
        result.append(d["data"][index]["server"])
    return set(result)

def extractResponsibilities(lines):
    
    trial = 0
    seconds = 0
    topic = None
    partitions = None
    kafka = True
    
    data = {}
    pods = {}
    
    current = {}
    
    for line in lines:
        split = list(filter(lambda x : len(x) > 0, line.split(" ")))
        if(line.startswith("Trial ")):
            kafka = True
            trial = int(split[1])
            seconds = int(split[3])
            current = {}
        elif(line.startswith("  topic \"")):
            data = addToData(data, topic, current, seconds)
            topic = split[1][1:-1]
            partitions = int(split[3])
            current = {}
        elif(line.startswith("    partition ")):
            partition = int(split[1][:-1]) 
            leader =  int(split[3][:-1]) 
            replicas = set([int(x) for x in split[5][:-1].split(",")])
            isrs = set([int(x) for x in split[7][:-1].split(",")])
            current[partition] = {
                "leader" : leader,
                "replicas" : replicas,
                "isrs" : isrs
            }
        elif(line.startswith("pod \"debug")):
            data = addToData(data, topic, current, seconds)
        elif(line.startswith("NAME")):
            kafka = False
        elif(not kafka):
            if(line.startswith(("analyst", "filter", "kafka", "producer", "zoo"))):
                pod = split[0]
                state = split[1]
                server = split[2][:-1]
                
                current = {
                    "state" : state,
                    "server": server
                }
                
                if pod in pods:
                    if pods[pod]["data"][-1] != current:
                        pods[pod]["data"].append(current)
                        pods[pod]["time"].append(seconds)
                else:
                    pods[pod] = {
                        "data" : [current],
                        "time" : [seconds]
                    }
                
            else:
                print("Error", line)
            
    for t in data:
        for i in range(len(data[t]["data"])):
            c = data[t]["data"][i]
            time = data[t]["time"][i]
            for partition in c:
                
                c[partition]["replicas"] = replaceKafkaWithNode(c[partition]["replicas"], pods, time)
                c[partition]["isrs"] = replaceKafkaWithNode(c[partition]["isrs"], pods, time)
                c[partition]["leader"] = list(replaceKafkaWithNode([c[partition]["leader"]], pods, 0))[0]
               
    showDiff(data)
    
    for i in range(5):
        print("====================================")
        
    showDiff(pods)
    
    for i in range(5):
        print("====================================")

runs = os.listdir(path)
for run in runs:    
    print("\x1b[34mRUN:" + str(run) + "\x1b[0m")
    p = path + run + os.path.sep + "logs" + os.path.sep + "responsibilities.log"
    file = open(p, 'r') 
    lines = file.readlines()
    extractResponsibilities(lines)