From 1cbe1cfc7a4616e570b01d9832c112fc3fd6eef5 Mon Sep 17 00:00:00 2001 From: vishal Date: Wed, 23 Sep 2020 14:39:55 -0400 Subject: [PATCH 1/4] Update api.py --- pkg/workloads/cortex/lib/type/api.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/workloads/cortex/lib/type/api.py b/pkg/workloads/cortex/lib/type/api.py index 71aa77bcf1..332928ccae 100644 --- a/pkg/workloads/cortex/lib/type/api.py +++ b/pkg/workloads/cortex/lib/type/api.py @@ -17,6 +17,7 @@ import time from pathlib import Path import json +import threading import datadog @@ -47,6 +48,8 @@ def __init__(self, provider, storage, model_dir, cache_dir=".", **kwargs): host_ip = os.environ["HOST_IP"] datadog.initialize(statsd_host=host_ip, statsd_port="8125") self.statsd = datadog.statsd + else: + self.metrics_file_lock = threading.Lock() def get_cached_classes(self): prefix = os.path.join(self.metadata_root, "classes") + "/" @@ -111,6 +114,7 @@ def post_metrics(self, metrics): cx_logger().warn("failure encountered while publishing metrics", exc_info=True) def store_metrics_locally(self, status_code, total_time): + self.metrics_file_lock.acquire() status_code_series = int(status_code / 100) status_code_file_name = f"/mnt/workspace/{os.getpid()}.{status_code_series}XX" @@ -118,6 +122,7 @@ def store_metrics_locally(self, status_code, total_time): request_time_file = f"/mnt/workspace/{os.getpid()}.request_time" self.increment_counter_file(request_time_file, total_time) + self.metrics_file_lock.release() def increment_counter_file(self, file_name, value): previous_val = 0 From 9e8f60e132f45d9cfd64a330cfddf74dcc1063b9 Mon Sep 17 00:00:00 2001 From: vishal Date: Wed, 23 Sep 2020 14:51:50 -0400 Subject: [PATCH 2/4] Update api.py --- pkg/workloads/cortex/lib/type/api.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/workloads/cortex/lib/type/api.py b/pkg/workloads/cortex/lib/type/api.py index 332928ccae..a8d7e164de 100644 --- a/pkg/workloads/cortex/lib/type/api.py +++ b/pkg/workloads/cortex/lib/type/api.py @@ -48,7 +48,8 @@ def __init__(self, provider, storage, model_dir, cache_dir=".", **kwargs): host_ip = os.environ["HOST_IP"] datadog.initialize(statsd_host=host_ip, statsd_port="8125") self.statsd = datadog.statsd - else: + + if provider == "local": self.metrics_file_lock = threading.Lock() def get_cached_classes(self): @@ -114,15 +115,17 @@ def post_metrics(self, metrics): cx_logger().warn("failure encountered while publishing metrics", exc_info=True) def store_metrics_locally(self, status_code, total_time): - self.metrics_file_lock.acquire() - status_code_series = int(status_code / 100) + try: + self.metrics_file_lock.acquire() + status_code_series = int(status_code / 100) - status_code_file_name = f"/mnt/workspace/{os.getpid()}.{status_code_series}XX" - self.increment_counter_file(status_code_file_name, 1) + status_code_file_name = f"/mnt/workspace/{os.getpid()}.{status_code_series}XX" + self.increment_counter_file(status_code_file_name, 1) - request_time_file = f"/mnt/workspace/{os.getpid()}.request_time" - self.increment_counter_file(request_time_file, total_time) - self.metrics_file_lock.release() + request_time_file = f"/mnt/workspace/{os.getpid()}.request_time" + self.increment_counter_file(request_time_file, total_time) + finally: + self.metrics_file_lock.release() def increment_counter_file(self, file_name, value): previous_val = 0 From bc5554801846dd8b80840bfb46bfd0dcb906aa9a Mon Sep 17 00:00:00 2001 From: vishal Date: Thu, 24 Sep 2020 12:01:15 -0400 Subject: [PATCH 3/4] Update api.py --- pkg/workloads/cortex/lib/type/api.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/workloads/cortex/lib/type/api.py b/pkg/workloads/cortex/lib/type/api.py index da101105e8..14022e06ad 100644 --- a/pkg/workloads/cortex/lib/type/api.py +++ b/pkg/workloads/cortex/lib/type/api.py @@ -117,14 +117,14 @@ def post_metrics(self, metrics): cx_logger().warn("failure encountered while publishing metrics", exc_info=True) def store_metrics_locally(self, status_code, total_time): - try: - self.metrics_file_lock.acquire() - status_code_series = int(status_code / 100) + self.metrics_file_lock.acquire() - status_code_file_name = f"/mnt/workspace/{os.getpid()}.{status_code_series}XX" - self.increment_counter_file(status_code_file_name, 1) + status_code_series = int(status_code / 100) + status_code_file_name = f"/mnt/workspace/{os.getpid()}.{status_code_series}XX" + request_time_file = f"/mnt/workspace/{os.getpid()}.request_time" - request_time_file = f"/mnt/workspace/{os.getpid()}.request_time" + try: + self.increment_counter_file(status_code_file_name, 1) self.increment_counter_file(request_time_file, total_time) finally: self.metrics_file_lock.release() From 17a2a16fab6a8677514ad35146081b0baf7acce5 Mon Sep 17 00:00:00 2001 From: vishal Date: Thu, 24 Sep 2020 12:34:50 -0400 Subject: [PATCH 4/4] Update api.py --- pkg/workloads/cortex/lib/type/api.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/workloads/cortex/lib/type/api.py b/pkg/workloads/cortex/lib/type/api.py index 14022e06ad..9b92ded540 100644 --- a/pkg/workloads/cortex/lib/type/api.py +++ b/pkg/workloads/cortex/lib/type/api.py @@ -117,12 +117,11 @@ def post_metrics(self, metrics): cx_logger().warn("failure encountered while publishing metrics", exc_info=True) def store_metrics_locally(self, status_code, total_time): - self.metrics_file_lock.acquire() - status_code_series = int(status_code / 100) status_code_file_name = f"/mnt/workspace/{os.getpid()}.{status_code_series}XX" request_time_file = f"/mnt/workspace/{os.getpid()}.request_time" + self.metrics_file_lock.acquire() try: self.increment_counter_file(status_code_file_name, 1) self.increment_counter_file(request_time_file, total_time)