Skip to content

Commit

Permalink
Merge pull request #9 from abyssnlp/fix/metrics-fetcher
Browse files Browse the repository at this point in the history
Fix/metrics fetcher
  • Loading branch information
abyssnlp committed May 4, 2023
2 parents a4f41fd + e26eb40 commit 761bd21
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ repos:
entry: pylint
language: system
types: [python]
exclude: ^(tests/.*|ci/.*)$
exclude: ^(tests/.*|ci/.*|scripts/.*)$
80 changes: 60 additions & 20 deletions airflow_metrics_gbq/metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import concurrent.futures
import multiprocessing
import os
import random
import socket
import time
import threading
Expand All @@ -9,7 +10,7 @@
from concurrent.futures import ThreadPoolExecutor
import atexit
import typing as t
from enum import Enum, unique
from enum import Enum, unique, auto
from collections import defaultdict
from dataclasses import dataclass
import pandas as pd
Expand All @@ -36,6 +37,13 @@ class Point:
name: t.Optional[str] = None


class Mode(Enum):
"""Mode of execution"""

SYNC = auto()
ASYNC = auto()


@unique
class Measure(Enum):
"""Type of measure"""
Expand Down Expand Up @@ -111,35 +119,40 @@ def __init__(
last_table: str,
timers_table: str,
num_threads: int = (multiprocessing.cpu_count() // 2),
mode: Mode = Mode.SYNC,
):
self._mode = mode
self.dataset_id = dataset_id
self.counts_table = counts_table
self.last_table = last_table
self.timers_table = timers_table
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = gcp_credentials
self.gbq_connector = GoogleBigQueryConnector(gcp_credentials)
self.logger = setup_gcloud_logging("airflow_monitoring", gcp_credentials)
# track batch time
self._last_flush = time.time()
# buffer
self.pool = ThreadPoolExecutor(max_workers=num_threads)
self._metrics = []
self._buffer = Queue(maxsize=self.CAPACITY + 50)
self.monitor_batch = threading.Event()
self.monitor_batch.set()

# Flush running
self.is_flush_running = threading.Event()
self.is_flush_running.clear()

# Metrics connection
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.bind((host, port))

# Fetch into queue from socket
self.pool.submit(self._fetch).add_done_callback(self.callback)
self.pool.submit(self.monitor).add_done_callback(self.callback)
atexit.register(self.close)
if self._mode == Mode.ASYNC:
# track batch time
self._last_flush = time.time()
# buffer
self.pool = ThreadPoolExecutor(max_workers=num_threads)
self._metrics = []
self._buffer = Queue(maxsize=self.CAPACITY + 50)
self.monitor_batch = threading.Event()
self.monitor_batch.set()

# Flush running
self.is_flush_running = threading.Event()
self.is_flush_running.clear()

# Fetch into queue from socket
self.pool.submit(self._fetch).add_done_callback(self.callback)
self.pool.submit(self.monitor).add_done_callback(self.callback)
atexit.register(self.close)
else:
self._container = []

def monitor(self):
"""Monitor batch time for flushing metrics"""
Expand Down Expand Up @@ -169,15 +182,29 @@ def close(self):
self.pool.shutdown()
self.logger.debug("Done")

def run(self):
"""Entrypoint to flush the queue to BigQuery based on num. records"""
def run_async(self):
"""Async flush the queue to BigQuery based on num. records"""
while True:
if self._buffer.qsize() >= self.CAPACITY and not self.is_flush_running.is_set():
self.is_flush_running.set()
self.flush_queue()
self.is_flush_running.clear()
time.sleep(1)

def run_sync(self):
"""Runs the fetch and upload to GBQ synchronously"""
while True:
time.sleep(random.randint(1, 5))
self._fetch_sync()
self.send_metrics(self._container)

def run(self):
"""Entrypoint to run in sync or async mode"""
if self._mode == Mode.ASYNC:
self.run_async()
else:
self.run_sync()

def flush_queue(self):
"""Flush the queue to BigQuery"""
metrics = []
Expand All @@ -190,6 +217,18 @@ def flush_queue(self):
self.send_metrics(metrics)
self._last_flush = time.time()

def _fetch_sync(self):
"""Fetch metrics sync"""
self._container = []

while len(self._container) <= self.CAPACITY:
measure: str = self._sock.recv(1024).decode("utf-8")

try:
self._container.append(PointWithType.from_record(measure))
except IndexError as e:
self.logger.error(f"Seems like there is no record, measure: {measure}, error: {e}")

@retry(
retry=(retry_if_exception_type(queue.Full) | retry_if_exception_type(NoMetricFoundException)),
wait=wait_exponential(multiplier=1, min=4, max=20),
Expand All @@ -213,6 +252,7 @@ def _fetch(self):

def _get_dfs(self, metrics: t.List[PointWithType]) -> (pd.DataFrame, pd.DataFrame, pd.DataFrame):
"""Get dataframes to upload"""

measure_dict = self._part_types(metrics)

df_counts, df_last, df_timer = pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
Expand Down
3 changes: 3 additions & 0 deletions airflow_metrics_gbq/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sys
import logging
from google.cloud.logging_v2.client import Client as logging_client
from google.cloud.logging.handlers import CloudLoggingHandler
Expand All @@ -12,7 +13,9 @@ def setup_gcloud_logging(app_name, gcp_credentials_path):
logger.setLevel(logging.DEBUG)
gcloud_logging_client = logging_client.from_service_account_json(gcp_credentials_path)
gcloud_logging_handler = CloudLoggingHandler(gcloud_logging_client, name=app_name)
stream_handler = logging.StreamHandler(sys.stdout)
logger.addHandler(gcloud_logging_handler)
logger.addHandler(stream_handler)
return logger


Expand Down
38 changes: 38 additions & 0 deletions scripts/load/load_test_socket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import argparse
import random
import socket

example_metrics = [
"airflow.scheduler.critical_section_duration:8.494474|ms",
"airflow.executor.open_slots:32|g",
"airflow.executor.queued_tasks:0|g",
"airflow.executor.running_tasks:0|g",
"airflow.scheduler_heartbeat:1|c",
"airflow.pool.open_slots.default_pool:128|g",
"airflow.pool.queued_slots.default_pool:0|g",
"airflow.scheduler.critical_section_duration:8.162927|ms",
"airflow.dag_processing.total_parse_time:11.112728387117386|g",
"airflow.dagbag_size:22|g",
"airflow.dag_processing.processes:1|c",
]


def send_metrics(host: str, port: int, num_messages: int):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

messages = [random.choice(example_metrics) for _ in range(num_messages)]
for message in messages:
print("sending ", message)
sock.sendto(message.encode(), (host, port))

sock.close()


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Send UDP messages to the host,port")
parser.add_argument("-i", "--host", type=str, required=False, default="127.0.0.1", help="Host address")
parser.add_argument("-p", "--port", type=int, required=False, default=8125, help="Host port")
parser.add_argument("-n", "--num-messages", type=int, required=True, help="Number of messages to send")
args = parser.parse_args()

send_metrics(args.host, args.port, args.num_messages)

0 comments on commit 761bd21

Please sign in to comment.