In [2]:
from pprint import pprint
import flink_rest_client
from flink_rest_client.v1.client import FlinkRestClientV1

In [3]:
client: FlinkRestClientV1 = flink_rest_client.FlinkRestClient.get(host="localhost", port=8082)
result = client.overview()

In [4]:
result

{'taskmanagers': 8,
 'slots-total': 8,
 'slots-available': 7,
 'jobs-running': 1,
 'jobs-finished': 0,
 'jobs-cancelled': 14,
 'jobs-failed': 1,
 'flink-version': '1.16.1',
 'flink-commit': 'DeadD0d0'}

In [8]:
for job in client.jobs.overview():
    if job['state']=='RUNNING':
        pprint(job)
        job_id = job['jid']

{'duration': 308612,
 'end-time': -1,
 'jid': '37f70c464fba1f57eeb4fefc10e0e9e6',
 'last-modification': 1685977994459,
 'name': 'DownstreamHotPagesExecutor',
 'start-time': 1685977986746,
 'state': 'RUNNING',
 'tasks': {'canceled': 0,
           'canceling': 0,
           'created': 0,
           'deploying': 0,
           'failed': 0,
           'finished': 0,
           'initializing': 0,
           'reconciling': 0,
           'running': 10,
           'scheduled': 0,
           'total': 10}}


In [9]:
client.jobs.get(job_id)

{'jid': '37f70c464fba1f57eeb4fefc10e0e9e6',
 'name': 'DownstreamHotPagesExecutor',
 'isStoppable': False,
 'state': 'RUNNING',
 'start-time': 1685977986746,
 'end-time': -1,
 'duration': 312975,
 'maxParallelism': -1,
 'now': 1685978299721,
 'timestamps': {'RUNNING': 1685977986762,
  'FINISHED': 0,
  'RECONCILING': 0,
  'FAILING': 0,
  'INITIALIZING': 1685977986746,
  'CANCELED': 0,
  'SUSPENDED': 0,
  'RESTARTING': 0,
  'CREATED': 1685977986756,
  'FAILED': 0,
  'CANCELLING': 0},
 'vertices': [{'id': 'bc764cd8ddf7a0cff126f51c16239658',
   'name': 'Source: kafka-hotitems',
   'maxParallelism': 128,
   'parallelism': 1,
   'status': 'RUNNING',
   'start-time': 1685977986865,
   'end-time': -1,
   'duration': 312856,
   'tasks': {'INITIALIZING': 0,
    'CANCELED': 0,
    'CANCELING': 0,
    'FINISHED': 0,
    'CREATED': 0,
    'FAILED': 0,
    'DEPLOYING': 0,
    'RECONCILING': 0,
    'RUNNING': 1,
    'SCHEDULED': 0},
   'metrics': {'read-bytes': 0,
    'read-bytes-complete': True,
    

In [11]:
# encoding: utf-8
from prometheus_client import Counter, Gauge, Summary
from prometheus_client.core import CollectorRegistry
from prometheus_client.exposition import choose_encoder


class Monitor:
    def __init__(self):
        # 注册收集器&最大耗时map
        self.collector_registry = CollectorRegistry(auto_describe=False)
        self.request_time_max_map = {}

        # 接口调用summary统计
        self.http_request_summary = Summary(name="http_server_requests_seconds",
                                       documentation="Num of request time summary",
                                       labelnames=("method", "code", "uri"),
                                       registry=self.collector_registry)
        # 接口最大耗时统计
        self.http_request_max_cost = Gauge(name="http_server_requests_seconds_max",
                                      documentation="Number of request max cost",
                                      labelnames=("method", "code", "uri"),
                                      registry=self.collector_registry)

        # 请求失败次数统计
        self.http_request_fail_count = Counter(name="http_server_requests_error",
                                          documentation="Times of request fail in total",
                                          labelnames=("method", "code", "uri"),
                                          registry=self.collector_registry)

        # 模型预测耗时统计
        self.http_request_predict_cost = Counter(name="http_server_requests_seconds_predict",
                                            documentation="Seconds of prediction cost in total",
                                            labelnames=("method", "code", "uri"),
                                            registry=self.collector_registry)
        # 图片下载耗时统计
        self.http_request_download_cost = Counter(name="http_server_requests_seconds_download",
                                             documentation="Seconds of download cost in total",
                                             labelnames=("method", "code", "uri"),
                                             registry=self.collector_registry)

    # 获取/metrics结果
    def get_prometheus_metrics_info(self, handler):
        encoder, content_type = choose_encoder(handler.request.headers.get('accept'))
        handler.set_header("Content-Type", content_type)
        handler.write(encoder(self.collector_registry))
        self.reset_request_time_max_map()

    # summary统计
    def set_prometheus_request_summary(self, handler):
        self.http_request_summary.labels(handler.request.method, handler.get_status(), handler.request.path).observe(handler.request.request_time())
        self.set_prometheus_request_max_cost(handler)

    # 自定义summary统计
    def set_prometheus_request_summary_customize(self, method, status, path, cost_time):
        self.http_request_summary.labels(method, status, path).observe(cost_time)
        self.set_prometheus_request_max_cost_customize(method, status, path, cost_time)

    # 失败统计
    def set_prometheus_request_fail_count(self, handler, amount=1.0):
        self.http_request_fail_count.labels(handler.request.method, handler.get_status(), handler.request.path).inc(amount)

    # 自定义失败统计
    def set_prometheus_request_fail_count_customize(self, method, status, path, amount=1.0):
        self.http_request_fail_count.labels(method, status, path).inc(amount)

    # 最大耗时统计
    def set_prometheus_request_max_cost(self, handler):
        requset_cost = handler.request.request_time()
        if self.check_request_time_max_map(handler.request.path, requset_cost):
            self.http_request_max_cost.labels(handler.request.method, handler.get_status(), handler.request.path).set(requset_cost)
            self.request_time_max_map[handler.request.path] = requset_cost

    # 自定义最大耗时统计
    def set_prometheus_request_max_cost_customize(self, method, status, path, cost_time):
        if self.check_request_time_max_map(path, cost_time):
            self.http_request_max_cost.labels(method, status, path).set(cost_time)
            self.request_time_max_map[path] = cost_time

    # 预测耗时统计
    def set_prometheus_request_predict_cost(self, handler, amount=1.0):
        self.http_request_predict_cost.labels(handler.request.method, handler.get_status(), handler.request.path).inc(amount)

    # 自定义预测耗时统计
    def set_prometheus_request_predict_cost_customize(self, method, status, path, cost_time):
        self.http_request_predict_cost.labels(method, status, path).inc(cost_time)

    # 下载耗时统计
    def set_prometheus_request_download_cost(self, handler, amount=1.0):
        self.http_request_download_cost.labels(handler.request.method, handler.get_status(), handler.request.path).inc(amount)

    # 自定义下载耗时统计
    def set_prometheus_request_download_cost_customize(self, method, status, path, cost_time):
        self.http_request_download_cost.labels(method, status, path).inc(cost_time)

    # 校验是否赋值最大耗时map
    def check_request_time_max_map(self, uri, cost):
        if uri not in self.request_time_max_map:
            return True
        if self.request_time_max_map[uri] < cost:
            return True
        return False

    # 重置最大耗时map
    def reset_request_time_max_map(self):
        for key in self.request_time_max_map:
            self.request_time_max_map[key] = 0.0
