In [None]:
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import re
import requests
import time
import numpy as np
from collections import defaultdict
from matplotlib import lines, markers

from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets

plt.style.use('ggplot')

## Configuration

In [None]:
FLINK_ENDPOINT = 'http://localhost:8081'
SAMPLING_FREQ_SEC = 1
ANALYSIS_DURATION_SEC = 600

In [None]:
def splitComponent(component, pattern):
    m = pattern.match(component)
    if m:
        mdict = matchDict(m)
        return mdict['instance'], mdict['component'], mdict['metric']
    raise Exception(f'Failed to match {component}!')
    
def getTaskManagers():
    return requests.get(f'{FLINK_ENDPOINT}/taskmanagers').json()['taskmanagers']
    
def getAvailableTaskManagerMetrics():
    return [metric['id'] for metric in requests.get(f'{FLINK_ENDPOINT}/taskmanagers/metrics').json()]

def getTaskManagerMetrics(tmID, metrics):
    metricString = ','.join(metrics)
    return requests.get(f'{FLINK_ENDPOINT}/taskmanagers/{tmID}/metrics', params={'get': metricString}).json()
    
def getAvailableVertexMetrics(jobID, vertexID):
    return [metric['id'] for metric in requests.get(f'{FLINK_ENDPOINT}/jobs/{jobID}/vertices/{vertexID}/metrics').json()]

def getJobInfo(jobID):
    return requests.get(f'{FLINK_ENDPOINT}/jobs/{jobID}').json()

def getAvailableJobMetrics(jobID):
    jobMetrics = dict() # vertexID -> [metrics]
    vertexes = getJobInfo(jobID)['vertices']
    for vertex in vertexes:
        jobMetrics[vertex['id']] = getAvailableVertexMetrics(jobID, vertex['id'])
    return jobMetrics

def getJobMetricNames(jobMetrics, namePattern):
    metricNames = set()
    for metrics in jobMetrics.values():
        for metric in metrics:
            m = namePattern.match(metric)
            if m:
                metricNames.add(m.group('metric'))
    return metricNames


def updateVertexRequests(selectedMetrics, availableJobMetrics, namePattern, vertexRequests):
    """Given a list of metrics names, update the given vertex requests dictonary

    Keyword arguments:
    selectedMetrics -- A list of metric names 
    availableJobMetrics -- A dictionary of vertexID: [metrics], as returned by getAvailableJobMetrics
    namePattern -- A regular expression that converts metricName -> metric (and optionally filters unwanted metrics)
    vertexRequests -- A dictionary of vertexID: [metrics] that will be used for recording statistics
    """
    for vertexID, availableMetrics in availableJobMetrics.items():
        filteredMetrics = []
        for metric in availableMetrics:
            m = namePattern.match(metric)
            if m and m.group('metric') in selectedMetrics:
                filteredMetrics.append(metric)
        vertexRequests[vertexID] = filteredMetrics
        print(f'{len(vertexRequests[vertexID])} metrics for {vertexID}')
        
def selectVertexMetrics(availableJobMetrics, vertexRequests, namePattern, description):
    widget = widgets.SelectMultiple(description=description, options=getJobMetricNames(availableJobMetrics, namePattern))
    interact(updateVertexRequests, selectedMetrics=widget, namePattern=fixed(namePattern), 
             availableJobMetrics=fixed(availableJobMetrics), vertexRequests=fixed(vertexRequests))

def getVertexMetrics(jobID, vertexID, metrics, maxRequestLength=40):
    def rawGetJobMetrics(jobID, vertexID, metrics):
        metricString = ','.join(metrics)
        return requests.get(f'{FLINK_ENDPOINT}/jobs/{jobID}/vertices/{vertexID}/metrics', params={'get': metricString}).json()
    completeJSON = []
    # Split metric requests so that the request string does not become too long
    for i in range(0, len(metrics), maxRequestLength):
        partialMetrics = metrics[i:i+maxRequestLength]
        completeJSON += rawGetJobMetrics(jobID, vertexID, partialMetrics)
    return completeJSON

def matchDict(match):
    d = defaultdict(lambda: 'DEFAULT')
    matchDict = match.groupdict()
    d.update(matchDict)
    return d

def plotAggregated(df, ax, startTime, aggregateFor, groupBy):
    markerstyles = list(markers.MarkerStyle.markers.keys())
    aggregated = df.groupby(aggregateFor).aggregate({'value': [np.mean, np.std]})
    for i, (name, group) in enumerate(aggregated.groupby(level=groupBy)):
        data = group.reset_index()
        data.t -= startTime
        ax.plot(data.t, data.value['mean'], alpha=.7, label=name[0][:5] + '_' + name[1][:15], 
                marker=markerstyles[i % len(markerstyles)], markevery=10, markersize=5)
        ax.fill_between(data.t, data.value['mean'] - data.value['std']/2, data.value['mean'] + data.value['std']/2, alpha=.3)
        
def recordVertexMetrics(df, vertexMetrics, timestamp, namePattern):
    for vertexID, metrics in vertexMetrics.items():
        metricValues = getVertexMetrics(jobID, vertexID, metrics)
        for metric in metricValues:
            componentInstance, componentName, baseMetric  = splitComponent(metric['id'], namePattern)
            df = df.append({'t': int(timestamp), 
                                      'vertex': vertexID, 
                                      'component': componentName, 
                                      'instance': componentInstance, 
                                      'metric': baseMetric, 
                                      'value': float(metric['value'])}, 
                                       ignore_index=True)
    return df

def recordTaskManagerMetrics(df, taskManagers, taskManagerMetrics, timestamp):
    for tm in taskManagers:
        metricValues = getTaskManagerMetrics(tm['id'], taskManagerMetrics)
        for metric in metricValues:
            df = df.append({'t': int(timestamp), 
                            'tm': tm['id'], 
                            'metric': metric['id'], 
                            'value': float(metric['value'])}, 
                           ignore_index=True)
    return df

In [None]:
# Get running job
# TODO: Report for multiple jobs
jobs = requests.get(f'{FLINK_ENDPOINT}/jobs').json()['jobs']
taskManagers = getTaskManagers()
runningJobs = [job for job in jobs if job['status'] == 'RUNNING']
assert len(runningJobs) == 1, 'Toolkit can only work with exactly one running job!'
jobID = runningJobs[0]['id']
print(f'Reporting for job "{jobID}"')

In [None]:
jobData = pd.DataFrame(columns=['t', 'vertex', 'component', 'instance', 'metric', 'value'])
jobData['t'] = jobData['t'].astype(int)
jobData['value'] = jobData['value'].astype(float)

tmData = pd.DataFrame(columns=['t', 'tm', 'metric', 'value'])
tmData['t'] = tmData['t'].astype(int)
tmData['value'] = tmData['value'].astype(float)

In [None]:
# Regex patterns for splitting metric names
OPERATOR_METRIC_PATTERN = re.compile('^(?P<instance>\d+)\.(?P<component>.+)\.(?P<metric>.+)?$')
TASK_METRIC_PATTERN = re.compile('^(?P<instance>\d+)\.(?P<metric>[^\.]+)$')

# Task and operator metrics need to be requested from specific vertexes
taskMetrics = dict()
operatorMetrics = dict()
availableJobMetrics = getAvailableJobMetrics(jobID)
selectVertexMetrics(availableJobMetrics, taskMetrics, TASK_METRIC_PATTERN, 'task metrics')
selectVertexMetrics(availableJobMetrics, operatorMetrics, OPERATOR_METRIC_PATTERN, 'op metrics')

# Task Manager Metrics are requested from all taskmanagers
# so we just maintain the names
taskManagerMetrics = None
@interact(selectedMetrics=widgets.SelectMultiple(description='tm metrics', options=getAvailableTaskManagerMetrics()))
def selectTaskManagerMetrics(selectedMetrics):
    global taskManagerMetrics
    taskManagerMetrics = selectedMetrics if selectedMetrics else []

In [None]:
%matplotlib inline
%load_ext autoreload
%autoreload 2
%matplotlib notebook

def plotMetric(df, ax, startTime, metric, aggregateFor, groupBy):
    ax.clear()
    plotAggregated(df[df.metric == metric], ax, startTime, aggregateFor, groupBy)
    ax.legend()
    ax.set(xlabel='sec', title=metric)    

taskMetricNames = list(getJobMetricNames(taskMetrics, TASK_METRIC_PATTERN))
operatorMetricNames = list(getJobMetricNames(operatorMetrics, OPERATOR_METRIC_PATTERN))
nPlots = len(taskManagerMetrics) + len(operatorMetricNames) + len(taskMetricNames)
fig, axes = plt.subplots(figsize=(8, 4*nPlots), nrows=nPlots, sharex=True, squeeze=False)
plt.ion()
fig.show()
fig.canvas.draw()

startTime = time.time()
currentTime = startTime

minimumRecordedTime = min(jobData.t.min(), tmData.t.min())
referenceTime = int(startTime) if np.isnan(minimumRecordedTime) else minimumRecordedTime
try:
    while currentTime - startTime < ANALYSIS_DURATION_SEC:
        # Retrieve metrics through Flink's REST API
        jobData = recordVertexMetrics(jobData, taskMetrics, currentTime, TASK_METRIC_PATTERN)
        jobData = recordVertexMetrics(jobData, operatorMetrics, currentTime, OPERATOR_METRIC_PATTERN)
        tmData = recordTaskManagerMetrics(tmData, taskManagers, taskManagerMetrics, currentTime)
        # Plot task and operator metrics
        axesIndex = 0
        for metric in taskMetricNames + operatorMetricNames:
            ax = axes[axesIndex][0]
            plotMetric(jobData, ax, referenceTime, metric, ['t', 'vertex', 'component'], ['vertex', 'component'])
            axesIndex += 1
        # Plot task manager metrics
        for metric in taskManagerMetrics:
            ax = axes[axesIndex][0]
            plotMetric(tmData, ax, referenceTime, metric, ['t', 'tm', 'metric'], ['tm', 'metric'])
            axesIndex += 1
        fig.canvas.draw()
        currentTime = time.time()
        time.sleep(SAMPLING_FREQ_SEC)
except KeyboardInterrupt:
    pass