In [None]:
import os
import time
import sys
import json
import re
import numpy as np
import pandas as pd
from datetime import datetime

import warnings
warnings.filterwarnings("ignore")

# since container uses UTC time, however, we are in UTC+8, 
# and for different timezones, you may need to change this value
TZ_OFFSET = 8*3600 


In [None]:
def get_stable_timepoint(x, y, scale_end_time=300, non_scaling_period=[200,280], stable_duration=100, stable_threshold=0.1):
    """
    get the stable timepoint, where we define stable as the timepoint s:
    1. s > scale_time
    2. all the data in [s, s+stable_duration] is under (1+stable_threshold)*non-scaling-peak
    3. non-scaling-peak is the max value in y[non_scaling_period[0]:non_scaling_period[1]]
    """
    x,y = np.array(x), np.array(y)
    non_scaling_peak = np.max( y[(x>non_scaling_period[0]) & (x<non_scaling_period[1])] )
    stable_peak = (1+stable_threshold)*non_scaling_peak
    post_scale_time = x[x>scale_end_time]

    stable_time = -1
    for t in post_scale_time:
        if np.all(y[(x>=t) & (x<=t+stable_duration)] < stable_peak):
            stable_time = t
            break
    if stable_time == -1:
        print("No stable timepoint found, use the last timepoint")
        stable_time = x[-1]
    return stable_time
    

In [None]:
class Experiment:
    def __init__(self, folder, signal=None):
        self.folder = folder
        self.mechanism = folder.split("/")[-1].split("-")[0]
        self.workload = folder.split("/")[-1].split("-")[1]

        
        ############# basic check #############
        assert os.path.exists(folder)
        assert os.path.exists(folder + f"/{self.mechanism}-{self.workload}-statistics.log")
        assert os.path.exists(folder + f"/jobmanager.log")
        # make sure "All subtasks have acknowledged subscale completion" exist in jobmanager.log
        end = None
        ana_job = 4
        with open(folder + f"/jobmanager.log") as f:
            lines = f.readlines()
            for line in lines:
                if ana_job == 0:
                    break
                if "All subtasks have acknowledged subscale completion" in line:
                    end = re.search(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3})', line).group(1)
                    # print(f"Experiment {self.mechanism}-{self.workload} loaded: scale operation end time: {end}")
                    end = datetime.strptime(end, '%Y-%m-%d %H:%M:%S,%f').timestamp() + TZ_OFFSET
                    ana_job -= 1
                elif "Successfully loaded cache-capacit" in line:
                    # 2024-12-20 04:06:13,560 INFO  org.apache.flink.runtime.scale.ScaleConfig                   [] - Successfully loaded cache-capacity: 20000
                    capacity = int(re.search(r'Successfully loaded cache-capacity: (\d+)', line).group(1))
                    ana_job -= 1
                elif "Successfully loaded subscale-reassignment:" in line:
                    emergency_ratio = float(re.search(r'emergency-ratio: ([\d.]+),', line).group(1))
                    fairness_weight = float(re.search(r'fairness-weight: ([\d.]+)', line).group(1))
                    # print(f"  emergency_ratio: {emergency_ratio}, fairness_weight: {fairness_weight}")
                    ana_job -= 1
                elif "Successfully loaded state-sample-rate:" in line:
                    # Successfully loaded state-sample-rate: 2.5
                    state_sample_rate = float(re.search(r'state-sample-rate: ([\d.]+)', line).group(1))
                    # keep first 1 digit
                    state_sample_rate = round(state_sample_rate, 1)
        # print(f"Experiment {self.mechanism}-{self.workload} loaded: scale operation end time: {self.scale_operation_end_time}")


        ############# load basic information: mechanism-workload-statistics.log #############
        latencies_list = None
        self.no_scale = None
        with open(folder + f"/{self.mechanism}-{self.workload}-statistics.log") as f:
            for line in f:
                if "job_start_time" in line:
                    self.job_start_time = int(line.split(": ")[1])
                elif "subscale_time" in line:
                    self.subscale_time = list(map(int, line.split(": ")[1].strip("[]\n").split(", ")))
                elif "scale_time" in line:
                    self.scale_trigger_time = int (line.split(": ")[1])
                elif "host_info" in line:
                    self.scaling_host_info = eval(line.split(": ", 1)[1])
                elif "inter_scheduler" in line:
                    inter_scheduler = line.split(": ", 1)[1]
                    inter_scheduler = inter_scheduler.strip()
                elif "intra_scheduler" in line:
                    intra_scheduler = line.split(": ", 1)[1]
                    intra_scheduler = intra_scheduler.strip()
                elif "latency" in line:
                    latencies_text = line.split(": ", 1)[1]
                    latencies_list = eval(latencies_text)
                elif "no_scale" in line:
                    self.no_scale = eval(line.split(": ", 1)[1])
        
        assert (end is not None or self.no_scale is not None)
        if end is not None:
            self.scale_operation_end_time = end * 1000 # convert to ms

        self.real_time_latencies = {}
        if latencies_list is not None:
            for timestamp, metrics in latencies_list:
                if len(metrics) == 0:
                    continue
                relative_time = (timestamp*1000 - self.job_start_time) / 1000 # convert to relative time in seconds
                for metric in metrics:
                    relative_time = round(relative_time, 2)
                    id = metric['id'].split("cl.")[1]
                    value = round(float(metric['value']), 4)
                    # subtask_index = int(id.split(".")[0])
                    metric_name = id.split(".")[1]
                    # print(f"  {relative_time}: {subtask_index}.{metric_name}: {value}")
                    if metric_name not in self.real_time_latencies:
                        self.real_time_latencies[metric_name] = {}
                    self.real_time_latencies[metric_name][relative_time] = value
        
        # sort by time
        for key, value in self.real_time_latencies.items():
            self.real_time_latencies[key] = dict(sorted(value.items(), key=lambda x: x[0]))


        
        self.scaling_tms = set()
        for host, info in self.scaling_host_info.items():
            tm, _ =  info
            self.scaling_tms.add(tm)
        # print(f"scaling_tms: {self.scaling_tms}")
        self.filesink_tms = set()
        # for all "docker-taskmanager-*.log" files, find the corresponding taskmanager

        for file in os.listdir(folder):
            if file.startswith("docker-taskmanager-") and file.endswith(".log"):
                tm = file.split(".")[0]
                if tm in self.scaling_tms:
                    continue
                with open(folder + "/" + file) as f:
                    # if 'FileSink: Writer' in file, then it is a filesink tm
                    if "FileSink: Writer" in f.read():
                        self.filesink_tms.add(tm)
        # print(f"filesink_tms: {self.filesink_tms}")
        
        
        self.latency_loaded = False
        self.suspend_loaded = False
        self.latency_data = {}
        self.state_transitions_loaded = False
        self.throughput_loaded = False

        ###################### intra-subscale ######################

        if self.mechanism == "drrs":
            self.test_env = {
                "capacity": capacity,
                "fairness_weight": fairness_weight,
                "emergency_ratio": emergency_ratio,
                "inter_scheduler": inter_scheduler,
                "intra_scheduler": intra_scheduler,
            }

        if signal is None:
            if self.no_scale:
                self.signal = "no_scale"
            # elif self.mechanism == "drrs":
            #     self.signal =  inter_scheduler+"-"+intra_scheduler
            else:
                self.signal = self.mechanism + "-" + self.workload
        else:
            self.signal = signal
    def get_subtask_by_tm(self, tm):
        for host, info in self.scaling_host_info.items():
            if info[0] == tm:
                return host
        raise Exception(f"Taskmanager {tm} not found in scaling_host_info: {self.scaling_host_info}")
    
    def load_latency(self, stable_during=150, stable_factor=0.2):
        if self.no_scale is not None and (0, 0) in self.latency_data:
            return self.latency_data
        elif (stable_during, stable_factor) in self.latency_data:
            return self.latency_data[(stable_during, stable_factor)]
        latency = {}
        latency_x = [] # output time
        latency_y = [] # latency
        for tm in self.filesink_tms:
            with open(self.folder + f"/{tm}.log") as f:
                for line in f:
                    if 'FileSink: Writer' in line and 'creationTime' in line and 'outputTime' in line and 'Marker' in line:
                        creation_time = int(re.search(r'creationTime=(\d+),', line).group(1))
                        output_time = int(re.search(r'outputTime=(\d+)', line).group(1))
                        if creation_time < 1e12:
                            # print(f"  something wrong with creation_time: {creation_time}, ignore")
                            continue

                        assert creation_time < output_time
                        # latency_x.append(output_time)
                        # latency_y.append(output_time - creation_time)
                        latency[output_time] = output_time - creation_time
                        # if latency[output_time] > 7000 print current line
        # sort by output time
        sorted_latency = sorted(latency.items(), key=lambda x: x[0])
        latency_x = [x[0] for x in sorted_latency]
        latency_y = [x[1] for x in sorted_latency]
        print(f"Experiment {self.mechanism}-{self.workload}-{self.signal} loaded: {len(latency_y)} latency markers")

        latency_x, latency_y = np.array(latency_x), np.array(latency_y)
        latency_x = (latency_x - self.job_start_time) / 1000 # convert to relative time in seconds
        if self.no_scale is None:
            r_scale_start_time = (self.scale_trigger_time - self.job_start_time) / 1000
            r_scale_operation_end_time = (self.scale_operation_end_time - self.job_start_time) / 1000
            print(f"  scale : {r_scale_start_time} - {r_scale_operation_end_time} (only for scale operation completion)")

            stable_time = get_stable_timepoint(
                latency_x, 
                latency_y, 
                scale_end_time=r_scale_operation_end_time, 
                non_scaling_period=[200,280],
                stable_duration=stable_during, 
                stable_threshold=stable_factor)

            print(f"  stable time: {stable_time}")
        
            # latency peak during scale start-stable time
            peak_latency = np.max(latency_y[(r_scale_start_time < latency_x) & (latency_x < stable_time)])
            average_latency = np.mean(latency_y[(r_scale_start_time < latency_x) & (latency_x < stable_time)])
            print(f"  re-stable time: {stable_time}, peak latency: {peak_latency}, average latency: {average_latency}")
    

            latency = {
                "x": latency_x,
                "y": latency_y,
                "r_scale_start_time": r_scale_start_time,
                "r_scale_operation_end_time": r_scale_operation_end_time,
                "r_restable_time": stable_time,
                "peak_latency": peak_latency,
                "average_latency": average_latency
            }
            self.latency_data[(stable_during, stable_factor)] = latency
        else:
            latency = {
                "x": latency_x,
                "y": latency_y,
            }
            print(f"  no_scale: {len(latency_y)} latency markers")
            self.latency_data[(0, 0)] = latency
        return latency

    def load_suspend(self):
        if self.suspend_loaded:
            return
        self.suspend_loaded = True
        self.suspened = {}

        for tm in self.scaling_tms:
            suspended_ranges = []
            with open(self.folder + f"/{tm}.log") as f:
                suspended = False # normal suspend
                start = -1
                end = -1

                mainThreadSuspend = False # main thread suspend(only for meces)
                mainThreadStart = -1
                mainThreadEnd = -1
                for line in f.readlines():
                    if "mainTread is not available at" in line:
                        assert mainThreadSuspend == False
                        mainThreadSuspend = True
                        mainThreadStart = int(re.search(r'at (\d+)', line).group(1))
                    elif "mainTread is available at" in line:
                        assert mainThreadSuspend == True
                        mainThreadSuspend = False
                        mainThreadEnd = int(re.search(r'at (\d+)', line).group(1))
                        suspended_ranges.append((mainThreadStart, mainThreadEnd))
                    elif 'subtask subscale handler is available at' in line:
                        # print(line)
                        assert suspended == True
                        suspended = False
                        end = int(re.search(r'at (\d+)', line).group(1))
                        # print('start:', start, 'end:', end)
                        assert start != -1 and end > start
                        suspended_ranges.append((start, end))
                        start, end = -1, -1
                    elif 'subtask subscale handler is not available at' in line:
                        assert suspended == False
                        suspended = True
                        assert start == -1 and end == -1
                        start = int(re.search(r'at (\d+)', line).group(1))
            merged = []
            suspended_ranges.sort(key=lambda x: x[0])
            for start, end in suspended_ranges:
                if not merged or start > merged[-1][1]:
                    merged.append([start, end])
                else:
                    merged[-1][1] = max(merged[-1][1], end)
            self.suspened[self.get_subtask_by_tm(tm)] = merged
        print(f"Experiment {self.mechanism}-{self.workload}-{self.signal} loaded: {len(self.suspened)} subtasks")
    
    def load_state_info(self, print_info=False):
        def print_info():
            assert self.state_transitions_loaded
            if "approx_state_size" in self.state_info:
                # print(f"  approx_state_size: {self.state_info['approx_state_size']}")
                # print the (approx_state_size, real_transition_size, Estimated_error ) for each state in each transition
                avg_estimated_error = 0
                errs = []
                real_sizes = []
                approx_sizes = []
                for key, value in self.state_info["transitions"].items():
                    for key, _, size in value:
                        approx_size = self.state_info["approx_state_size"][key]/1024 # convert to KB
                        estimated_error = np.abs(approx_size - size)/size * 100
                        errs.append(estimated_error)
                        print(f"({key}: {int(approx_size)}KB, {size}KB, {estimated_error:.1f})", end=", ")
                        real_sizes.append(size)
                        approx_sizes.append(approx_size)
                    print()
                avg_estimated_error = np.mean(errs)
                print(f"  avg_estimated_error: {avg_estimated_error:.1f}")

                #  normalization
                real_sizes = np.array(real_sizes)
                approx_sizes = np.array(approx_sizes)
                real_sizes = real_sizes / np.max(real_sizes)
                approx_sizes = approx_sizes / np.max(approx_sizes)
                # keep 2 digits:np.abs(real_sizes - approx_sizes)
                print(f"  diff: {np.round(real_sizes - approx_sizes, 2)}")
                # calculate the Scale Invariance Error
                sie = np.mean(np.abs(real_sizes - approx_sizes))
                print(f"  Scale Invariance Error: {sie:.2f}")
                



        if self.state_transitions_loaded:
            if print_info:
                print_info()
            return
        self.state_transitions_loaded = True

        # state transition order and state size
        state = {}
        for tm in self.scaling_tms:
            with open(self.folder + f"/{tm}.log") as f:
                subtask = self.get_subtask_by_tm(tm)
                for line in f:
                    if re.search(r'send state \[\d+\] to task \d+', line):
                        # send state [0] to 1
                        state_key = int(re.search(r'send state \[(\d+)\] to task \d+', line).group(1))
                        target = int(re.search(r'send state \[\d+\] to task (\d+)', line).group(1))
                        send_time = re.search(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3})', line).group(1)
                        send_time = datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S,%f').timestamp()
                        send_time = send_time * 1000 - self.job_start_time
                        send_time = send_time / 1000 # convert to relative time in seconds
                        # print(f"  send state {state_key} to {target} at {send_time}")
                        state[state_key] = {"target": target, "send_time": send_time, "source": subtask}
                    if "StateBuffer created on" in line:
                        # StateBuffer created on [15] with size 16999 KB
                        state_key = int(re.search(r'\[(\d+)\]', line).group(1))
                        state_size = int(re.search(r'size (\d+) KB', line).group(1))
                        # print(f"  StateBuffer created on {state_key} with size {state_size} KB")
                        state[state_key].update({"size": state_size})
        # print(f"Experiment {self.mechanism}-{self.workload}-{self.signal} loaded: {state} ")
        # split by source-target
        state_transitions = {}
        for key, value in state.items():
            source = value["source"]
            target = value["target"]
            if (source, target) not in state_transitions:
                state_transitions[(source, target)] = []
            state_transitions[(source, target)].append((key, value["send_time"], value["size"]))
        # sort by send time
        for key, value in state_transitions.items():
            state_transitions[key] = sorted(value, key=lambda x: x[1])
        # sort by source-target
        state_transitions = dict(sorted(state_transitions.items(), key=lambda x: x[0]))
        # self.state_transitions = state_transitions
        self.state_info = {"transitions": state_transitions}
        print(f"Experiment {self.mechanism}-{self.workload}-{self.signal} loaded: {len(state_transitions)} state transitions")

        # read approximate state size from jobmanager.log(if exists)
        with open(self.folder + f"/jobmanager.log") as f:
            for line in f:
                if "Successfully collect state size" in line:
                    # 2024-12-20 04:28:19,580 INFO  org.apache.flink.runtime.scale.coordinator.ScaleCoordinator  [] - Successfully collect state size: {0=1267935, 1=1676696, 2=897459, 3=395127, 4=1193690, 5=11791440, 6=2981414, 7=4212991, 8=4252542, 9=1304190, 10=1374647, 11=4376856, 12=1803160, 13=1657700, 14=2007357, 15=514752, 16=1266435, 17=596949, 18=2059096, 19=8409912, 20=2163546, 21=2249976, 22=3744120, 23=9350923, 24=1270642, 25=753228, 26=525132, 27=1105585, 28=1539240, 29=2225800, 30=1025198, 31=1152918, 32=4526352, 33=752640, 34=606474, 35=4571868, 36=1588260, 37=636688, 38=783870, 39=995280, 40=918528, 41=885656, 42=2698798, 43=6490108, 44=10627160, 45=1034194, 46=753006, 47=5917274, 48=679764, 49=1516437, 50=3681960, 51=4868460, 52=3603418, 53=2379480, 54=5322614, 55=617004, 56=958518, 57=552600, 58=963144, 59=2015007, 60=474300, 61=760000, 62=1004585, 63=1585668, 64=3645808, 65=688128, 66=14544200, 67=421038, 68=854220, 69=498295, 70=1925720, 71=15214320, 72=1477527, 73=1820312, 74=1188039, 75=1187646, 76=533750, 77=1355088, 78=9948050, 79=2548764, 80=1345582, 81=4431180, 82=947085, 83=2036600, 84=1300684, 85=2202928, 86=960042, 87=1634640, 88=3771690, 89=641690, 90=6510996, 91=846765, 92=1112584, 93=412080, 94=1713800, 95=1436790, 96=422280, 97=746118, 98=976854, 99=1138480, 100=4838680, 101=1844500, 102=899640, 103=1433660, 104=2024064, 105=798510, 106=6821685, 107=8454181, 108=2327688, 109=4024658, 110=2785104, 111=13193030, 112=528176, 113=504900, 114=8876014, 115=674082, 116=4147233, 117=2789670, 118=668520, 119=982902, 120=7131000, 121=1944111, 122=374852, 123=3394800, 124=6654228, 125=1543248, 126=8142644, 127=2142976}
                    state_size_str = re.search(r'Successfully collect state size: (.+)', line).group(1)
                    state_size_str = state_size_str.replace('=', ':')
                    approx_state_size = eval(state_size_str)
                    # sort by state key
                    approx_state_size = dict(sorted(approx_state_size.items(), key=lambda x: x[0]))
                    self.state_info["approx_state_size"] = approx_state_size


        if print_info:
            print_info()
    
    def load_throughput(self):
        if self.throughput_loaded:
            return
        assert os.path.exists(self.folder + f"/throughputs.log")
        throughput = {}
        with open(self.folder + f"/throughputs.log") as f:
            for line in f:
                timestamp = int(re.search(r'^(\d+):', line).group(1))
                timestamp = (timestamp - self.job_start_time) / 1000 # convert to relative time in seconds
                metric_str = re.search(r'\[(.*)\]', line).group(0)
                if metric_str == '[]':
                    continue
                metric_str = metric_str.replace("'", '"')
                metric_json = json.loads(metric_str.strip())
                for id, value in [(item['id'], item['value']) for item in metric_json]:
                    try:
                        value = float(value)
                    except Exception as e:
                        print('Cannot convert to int:', value)
                        continue
                    # timestamp: 1730286916984 id: 1.Source__Bid_Source.numRecordsInPerSecond value: 4.6
                    subtask= int(re.search(r'^(\d+)\.', id).group(1))
                    operator = re.search(r'\Source__(\w+)_', id).group(1)
                    if operator not in throughput:
                        throughput[operator] = {}
                    if subtask not in throughput[operator]:
                        throughput[operator][subtask] = {}
                    throughput[operator][subtask][timestamp] = value
        # merge all subtasks into operator-level
        # notice: simply sum all subtasks' throughput is not correct, because the timestamp may not be aligned
        # so we need to align the timestamp
        operator_throughput = {}
        if len(throughput) == 1:
            throughput = throughput[list(throughput.keys())[0]]
            all_timestamps = set()
            for subtask, data in throughput.items():
                all_timestamps.update(data.keys())
            all_timestamps = sorted(list(all_timestamps))
            for subtask, data in throughput.items():
                timestamps = np.array(list(data.keys()))
                values = np.array(list(data.values()))
                interp = interp1d(timestamps, values, kind='cubic', fill_value='extrapolate')
                interpolated_values = interp(all_timestamps)
                interpolated_values = np.clip(interpolated_values, 0, None)
                for i, timestamp in enumerate(all_timestamps):
                    if timestamp not in operator_throughput:
                        operator_throughput[timestamp] = 0
                    operator_throughput[timestamp] += interpolated_values[i]
            
            if self.workload == 'q7':
                # multiply by 2, because the the input take 1 as 2
                for timestamp in operator_throughput:
                    operator_throughput[timestamp] *= 2

        else:
            # add all operators' throughput
            # can not simply sum, because the timestamp may not be aligned
            # so we need to align the timestamp
            all_timestamps = set()
            for operator, subtasks in throughput.items():
                for subtask, data in subtasks.items():
                    all_timestamps.update(data.keys())
            all_timestamps = sorted(list(all_timestamps))
            for operator, subtasks in throughput.items():
                for subtask, data in subtasks.items():
                    timestamps = np.array(list(data.keys()))
                    values = np.array(list(data.values()))
                    interp = interp1d(timestamps, values, kind='cubic', fill_value='extrapolate')
                    interpolated_values = interp(all_timestamps)
                    interpolated_values = np.clip(interpolated_values, 0, None)
                    for i, timestamp in enumerate(all_timestamps):
                        if timestamp not in operator_throughput:
                            operator_throughput[timestamp] = 0
                        operator_throughput[timestamp] += interpolated_values[i]
        print(operator_throughput)
        self.throughput_loaded = True
        self.throughput = operator_throughput
    
    def load_throughput_for_fwc(self):
    
        output_times = []
        with open(self.folder + f"/output.log") as f:
            # print(f"loading {self.folder}/output.log with half_scale_time: {(half_scale_time- self.job_start_time)/1000}")
            for line in f:
                # expectedGenTime
                gen_time = int(re.search(r'expectedGenTime: (\d+)', line).group(1))
                processing_time = int(re.search(r'processedTime: (\d+)', line).group(1))
                output_times.append( (processing_time-self.job_start_time)//1000 ) # ms to s with offset
        
        output_times.sort()
        # print(f"output_times count: {len(output_times)}, first: {output_times[0]}, last: {output_times[-1]}")
        end_time = output_times[-1]//60
        throughput = np.zeros(11)
        for t in output_times:
            throughput[t//60] += 1
        # remove the first and last 1 minute
        # print(throughput)
        average_throughput = throughput[1:-1].mean()
        self.average_throughput = average_throughput
