From 44f4a25a338819b2528f22d7d959652538192c38 Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 3 Apr 2017 06:47:07 +0000 Subject: [PATCH] Fixed issues #6 --- pyprometheus/__init__.py | 2 +- pyprometheus/contrib/uwsgi_features.py | 65 ++++++++++++++++++++++---- 2 files changed, 57 insertions(+), 10 deletions(-) diff --git a/pyprometheus/__init__.py b/pyprometheus/__init__.py index 5a52e0e..117b606 100644 --- a/pyprometheus/__init__.py +++ b/pyprometheus/__init__.py @@ -18,7 +18,7 @@ __license__ = "BSD, see LICENSE for more details" -__version__ = "0.0.6" +__version__ = "0.0.7" __version_info__ = list(map(int, __version__.split("."))) diff --git a/pyprometheus/contrib/uwsgi_features.py b/pyprometheus/contrib/uwsgi_features.py index f7819e5..d336fe3 100644 --- a/pyprometheus/contrib/uwsgi_features.py +++ b/pyprometheus/contrib/uwsgi_features.py @@ -176,6 +176,8 @@ def __init__(self, sharedarea_id=SHAREDAREA_ID, namespace="", stats=False, label self._stats = stats self._labels = tuple(sorted(labels.items(), key=lambda x: x[0])) + self._syncs = 0 + self._m = uwsgi.sharedarea_memoryview(self._sharedarea_id) self.init_memory() @@ -197,17 +199,33 @@ def metric_name(self, name): """ return ":".join([self._namespace, name]) + + @staticmethod + def get_unique_id(): + try: + return uwsgi.worker_id() + except Exception: + try: + return uwsgi.mule_id() + except Exception: + return os.getpid() + return "unknown" + def declare_metrics(self): return { - "memory_sync": Counter(self.metric_name("memory_read"), "UWSGI shared memory syncs", ("sharedarea", ) + self._labels), + "memory_sync": Counter(self.metric_name("memory_read"), "UWSGI shared memory syncs", ("sharedarea", "id") + self._labels), "memory_size": Gauge(self.metric_name("memory_size"), "UWSGI shared memory size", ("sharedarea", ) + self._labels), "num_keys": Gauge(self.metric_name("num_keys"), "UWSGI num_keys", ("sharedarea", ) + self._labels) } def collect(self): + labels = self._labels + (("sharedarea", self._sharedarea_id), ("id", self.get_unique_id())) + metric = self._collectors["memory_sync"] + metric.add_sample(labels, metric.build_sample(labels, ( (TYPES.GAUGE, metric.name, "", labels, self._syncs), ))) + + yield metric + labels = self._labels + (("sharedarea", self._sharedarea_id), ) - # metric = self._collectors["memory_sync"] - # metric.add_sample(labels, metric.build_sample(labels, ( (TYPES.GAUGE, metric.name, "", labels, ) )) # yield metric metric = self._collectors["memory_size"] @@ -252,6 +270,8 @@ def serialize_key(self, key): return val def unserialize_key(self, serialized_key): + if not serialized_key: + raise RuntimeError("Invalid serialized key") return marshal.loads(serialized_key) def get_area_size_with_lock(self): @@ -319,7 +339,7 @@ def read_memory(self): def load_exists_positions(self): """Load all keys from memory """ - + self._syncs += 1 self._used = self.get_area_size() self._sign = self.get_area_sign() @@ -376,6 +396,7 @@ def read_key_string(self, position, size): key_string_bytes = self.m[self.get_slice(position, size)] return struct.unpack(b"{0}s".format(size), key_string_bytes)[0] + def read_key_value(self, position): """Read float value of position @@ -444,6 +465,9 @@ def inc_value(self, key, value): except InvalidUWSGISharedareaPagesize as e: logger.error("Invalid sharedarea pagesize {0} bytes".format(len(self._m))) return 0 + except Exception as e: + logger.error(e, exc_info=True) + return 0 def write_value(self, key, value): """Write value to shared memory @@ -461,6 +485,9 @@ def write_value(self, key, value): except InvalidUWSGISharedareaPagesize as e: logger.error("Invalid sharedarea pagesize {0} bytes".format(len(self._m))) return None + except Exception as e: + logger.error(e, exc_info=True) + return 0 def get_value(self, key): """Read value from shared memory @@ -474,6 +501,9 @@ def get_value(self, key): except InvalidUWSGISharedareaPagesize: logger.error("Invalid sharedarea pagesize {0} bytes".format(len(self._m))) return 0 + except Exception as e: + logger.error(e, exc_info=True) + return 0 @property def is_actual(self): @@ -495,7 +525,10 @@ def lock(self): if not self.wlocked and not self.rlocked: self.wlocked, self.rlocked = lock_id, lock_id uwsgi.sharedarea_wlock(self._sharedarea_id) - yield + try: + yield + except Exception as e: + logger.error(e, exc_info=True) uwsgi.sharedarea_unlock(self._sharedarea_id) self.wlocked, self.rlocked = False, False else: @@ -507,7 +540,10 @@ def rlock(self): if not self.rlocked: self.rlocked = lock_id uwsgi.sharedarea_rlock(self._sharedarea_id) - yield + try: + yield + except Exception as e: + logger.error(e, exc_info=True) uwsgi.sharedarea_unlock(self._sharedarea_id) self.rlocked = False else: @@ -527,14 +563,17 @@ def clear(self): self._positions.clear() def get_items(self): - self.validate_actuality() + with self.rlock(): + self.validate_actuality() for key, position in self._positions.items(): yield self.unserialize_key(key), self.read_key_value(position[2]) def inc_items(self, items): - self.validate_actuality() + with self.lock(): + self.validate_actuality() + for key, value in items: try: positions, created = self.get_key_position(self.serialize_key(key), value) @@ -543,10 +582,15 @@ def inc_items(self, items): self.write_key_value(positions[2], self.read_key_value(positions[2]) + value) except InvalidUWSGISharedareaPagesize: logger.error("Invalid sharedarea pagesize {0} bytes".format(len(self._m))) + except Exception as e: + logger.error(e, exc_info=True) + return 0 def write_items(self, items): - self.validate_actuality() + with self.lock(): + self.validate_actuality() + for key, value in items: try: positions, created = self.get_key_position(self.serialize_key(key), value) @@ -555,6 +599,9 @@ def write_items(self, items): self.write_key_value(positions[2], value) except InvalidUWSGISharedareaPagesize: logger.error("Invalid sharedarea pagesize {0} bytes".format(len(self._m))) + except Exception as e: + logger.error(e, exc_info=True) + return 0 class UWSGIFlushStorage(LocalMemoryStorage):