In [12]:
from pyspark.sql import SparkSession
import os
import json
import re
import pandas as pd

spark = SparkSession.builder\
    .config("spark.driver.memory", "16g")\
    .getOrCreate()

spark

In [5]:
def sum_throughput(directory):
    sum = None
    for root, _dirs, files in os.walk(directory):
        for file in files:
            if file.endswith("summary.json"):
                with open(os.path.join(root, file), 'r') as f:
                    data = json.load(f)
                    if sum is None:
                        sum = 0
                    sum += data["Goodput (requests/second)"]
    return sum

DIR = "varying-hot-local-network"

for mr in [0, 5, 10, 25, 50, 75, 100]:
    for hot in [0, 1000, 500, 100, 10, 1]:
        throughput = sum_throughput(os.path.join(DIR, "mr{}hot{}".format(mr, hot)))
        print(f"mr: {mr}, hot: {hot}: {throughput}")


mr: 0, hot: 0: 7519.228276188625
mr: 0, hot: 1000: 7204.396906622784
mr: 0, hot: 500: 6087.282728177791
mr: 0, hot: 100: 1417.278388796924
mr: 0, hot: 10: 11.641614257512028
mr: 0, hot: 1: 614.9636002449379
mr: 5, hot: 0: 6815.594717626722
mr: 5, hot: 1000: 6497.300301915009
mr: 5, hot: 500: 5816.311552097401
mr: 5, hot: 100: 1056.2883719929137
mr: 5, hot: 10: 14.19160831724642
mr: 5, hot: 1: 431.99028366577886
mr: 10, hot: 0: 6542.560859683452
mr: 10, hot: 1000: 6223.320982727547
mr: 10, hot: 500: 5201.346412873112
mr: 10, hot: 100: 1198.3540893916088
mr: 10, hot: 10: 13.358319390240695
mr: 10, hot: 1: 194.2071936543178
mr: 25, hot: 0: 5750.097927785856
mr: 25, hot: 1000: 5149.783021218066
mr: 25, hot: 500: 4648.427654996603
mr: 25, hot: 100: 1332.8836104621516
mr: 25, hot: 10: 15.583263823249931
mr: 25, hot: 1: 39.62480416800349
mr: 50, hot: 0: 4383.052867371478
mr: 50, hot: 1000: 4012.870463139554
mr: 50, hot: 500: 3874.8678190082287
mr: 50, hot: 100: 1323.602007745018
mr: 50, hot: 

In [6]:
def sum_histograms(directory):
    sum = {}
    for root, _dirs, files in os.walk(directory):
        for file in files:
            if file.endswith("histograms.json"):
                with open(os.path.join(root, file), 'r') as f:
                    data = json.load(f)
                    for key in data:
                        if key not in sum:
                            sum[key] = 0
                        sum[key] += data[key]["NUM_SAMPLES"]
    return sum

for mr in [0, 5, 10, 25, 50, 75, 100]:
    for hot in [0, 1000, 500, 100, 10, 1]:
        histogram = sum_histograms(os.path.join(DIR, "mr{}hot{}".format(mr, hot)))
        total = 0
        for v in histogram.values():
            total += v
        print(f"mr: {mr}, hot: {hot}: {histogram}, total = {total}")


mr: 0, hot: 0: {'rejected': 44194, 'aborted': 0, 'unexpected': 5, 'completed': 902312}, total = 946511
mr: 0, hot: 1000: {'rejected': 105108, 'aborted': 0, 'unexpected': 7, 'completed': 864534}, total = 969649
mr: 0, hot: 500: {'rejected': 163872, 'aborted': 0, 'unexpected': 58, 'completed': 730479}, total = 894409
mr: 0, hot: 100: {'rejected': 171918, 'aborted': 0, 'unexpected': 390, 'completed': 170074}, total = 342382
mr: 0, hot: 10: {'rejected': 10594, 'aborted': 0, 'unexpected': 1739, 'completed': 1397}, total = 13730
mr: 0, hot: 1: {'rejected': 2279864, 'aborted': 0, 'unexpected': 0, 'completed': 73796}, total = 2353660
mr: 5, hot: 0: {'rejected': 34601, 'aborted': 0, 'unexpected': 1093, 'completed': 817874}, total = 853568
mr: 5, hot: 1000: {'rejected': 90006, 'aborted': 0, 'unexpected': 2737, 'completed': 779680}, total = 872423
mr: 5, hot: 500: {'rejected': 150126, 'aborted': 0, 'unexpected': 3864, 'completed': 697959}, total = 851949
mr: 5, hot: 100: {'rejected': 112848, 'abo

In [7]:
def count_deadlocks(directory):
    local = remote = 0
    for root, _dirs, files in os.walk(directory):
        for file in files:
            if file == "log.txt":
                with open(os.path.join(root, file), 'r') as f:
                    block = []
                    for line in f:
                        if "ERROR: deadlock detected.HINT" in line:
                            remote += 1
                        if "ERROR: deadlock detected" in line: 
                            block.append(line)
                        elif len(block) > 0:
                            block.append(line)
                            if "Hint: See server log for query details." in line:
                                local += 1
                                block.clear()


    return local, remote

for mr in [0, 5, 10, 25, 50, 75, 100]:
    for hot in [0, 1000, 500, 100, 10, 1]:
        local, remote = count_deadlocks(os.path.join(DIR, "mr{}hot{}".format(mr, hot)))
        print(f"mr: {mr}, hot: {hot}: {local} local, {remote} remote")


mr: 0, hot: 0: 5 local, 0 remote
mr: 0, hot: 1000: 7 local, 0 remote
mr: 0, hot: 500: 58 local, 0 remote
mr: 0, hot: 100: 390 local, 0 remote
mr: 0, hot: 10: 1739 local, 0 remote
mr: 0, hot: 1: 0 local, 0 remote
mr: 5, hot: 0: 6 local, 0 remote
mr: 5, hot: 1000: 9 local, 0 remote
mr: 5, hot: 500: 38 local, 1 remote
mr: 5, hot: 100: 245 local, 13 remote
mr: 5, hot: 10: 1333 local, 91 remote
mr: 5, hot: 1: 0 local, 44 remote
mr: 10, hot: 0: 2 local, 1 remote
mr: 10, hot: 1000: 9 local, 0 remote
mr: 10, hot: 500: 37 local, 2 remote
mr: 10, hot: 100: 220 local, 27 remote
mr: 10, hot: 10: 1168 local, 201 remote
mr: 10, hot: 1: 0 local, 132 remote
mr: 25, hot: 0: 1 local, 0 remote
mr: 25, hot: 1000: 10 local, 1 remote
mr: 25, hot: 500: 15 local, 3 remote
mr: 25, hot: 100: 122 local, 67 remote
mr: 25, hot: 10: 449 local, 510 remote
mr: 25, hot: 1: 0 local, 189 remote
mr: 50, hot: 0: 1 local, 1 remote
mr: 50, hot: 1000: 1 local, 1 remote
mr: 50, hot: 500: 6 local, 7 remote
mr: 50, hot: 100: 40

In [None]:
def collect_summary(root, mr, hot):
    directory = os.path.join(root, f"mr{mr}hot{hot}")
    records = []
    for dirpath, _, files in os.walk(directory):
        for file in files:
            if file.endswith("summary.json"):
                # Use regex to parse the pattern /region(\d+)/ from the dirpath
                # and store the result in a variable called region
                res = re.search(r"region(\d+)", dirpath)
                region = None
                if res:
                    region = res.group(1)

                if region is None:
                    continue

                rec = {
                    "region": region,
                    "mr": mr,
                    "hot": hot,
                }

                with open(os.path.join(dirpath, file), 'r') as f:
                    data = json.load(f)
                    rec["throughput"] = data["Goodput (requests/second)"]
                    rec["p50"] = data["Latency Distribution"]["Median Latency (microseconds)"]
                    rec["p90"] = data["Latency Distribution"]["90th Percentile Latency (microseconds)"]
                    rec["p99"] = data["Latency Distribution"]["99th Percentile Latency (microseconds)"]
                    rec["p0"] = data["Latency Distribution"]["Minimum Latency (microseconds)"]
                    rec["p100"] = data["Latency Distribution"]["Maximum Latency (microseconds)"]

                records.append(rec)
    
    return records
                

summary = []

for mr in [0, 5, 10, 25, 50, 75, 100]:
    for hot in [0, 1000, 500, 100, 10, 1]:
        summary += collect_summary("varying-hot-local-network", mr, hot)

df = pd.DataFrame(summary)

In [40]:
df.loc[df.hot == 0, ["mr", "hot", "region", "p90"]]\
    .sort_values(by=["mr", "region"]).pivot_table(index="mr", columns="region", values="p90") / 1000

region,1,2,3
mr,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,13.136,13.024,13.828
5,18.242,18.28,19.705
10,20.883,21.033,21.362
25,24.038,24.299,24.733
50,30.815,30.176,31.491
75,37.062,36.832,34.748
100,37.556,37.146,38.083
