From a0c6fde93bc8088d2bb34d1dd366a44b5d2657ee Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Thu, 1 Dec 2022 21:07:04 -0500 Subject: [PATCH] fix: Fix memory leak from usage.py not properly cleaning up call stack (#3371) * fix: Fix memory leak from usage.py not properly cleaning up call stack context Signed-off-by: Danny Chiao * fix Signed-off-by: Danny Chiao Signed-off-by: Danny Chiao --- sdk/python/feast/usage.py | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/usage.py b/sdk/python/feast/usage.py index 0965e70999..18bb497182 100644 --- a/sdk/python/feast/usage.py +++ b/sdk/python/feast/usage.py @@ -78,7 +78,7 @@ class FnCall: class Sampler: - def should_record(self, event) -> bool: + def should_record(self) -> bool: raise NotImplementedError @property @@ -87,7 +87,7 @@ def priority(self): class AlwaysSampler(Sampler): - def should_record(self, event) -> bool: + def should_record(self) -> bool: return True @@ -100,7 +100,7 @@ def __init__(self, ratio): self.total_counter = 0 self.sampled_counter = 0 - def should_record(self, event) -> bool: + def should_record(self) -> bool: self.total_counter += 1 if self.total_counter == self.MAX_COUNTER: self.total_counter = 1 @@ -176,10 +176,12 @@ def _set_installation_id(): def _export(event: typing.Dict[str, typing.Any]): - _executor.submit(requests.post, USAGE_ENDPOINT, json=event, timeout=30) + _executor.submit(requests.post, USAGE_ENDPOINT, json=event, timeout=2) def _produce_event(ctx: UsageContext): + if ctx.sampler and not ctx.sampler.should_record(): + return # Cannot check for unittest because typeguard pulls in unittest is_test = flags_helper.is_test() or bool({"pytest"} & sys.modules.keys()) event = { @@ -204,10 +206,6 @@ def _produce_event(ctx: UsageContext): **_constant_attributes, } event.update(ctx.attributes) - - if ctx.sampler and not ctx.sampler.should_record(event): - return - _export(event) @@ -262,6 +260,13 @@ def deeply_nested(...): """ sampler = attrs.pop("sampler", AlwaysSampler()) + def clear_context(ctx): + _context.set(UsageContext()) # reset context to default values + # TODO: Figure out why without this, new contexts.get aren't reset + ctx.call_stack = [] + ctx.completed_calls = [] + ctx.attributes = {} + def decorator(func): if not _is_enabled: return func @@ -295,17 +300,22 @@ def wrapper(*args, **kwargs): raise exc finally: - last_call = ctx.call_stack.pop(-1) - last_call.end = datetime.utcnow() - ctx.completed_calls.append(last_call) ctx.sampler = ( sampler if sampler.priority > ctx.sampler.priority else ctx.sampler ) + last_call = ctx.call_stack.pop(-1) + last_call.end = datetime.utcnow() + ctx.completed_calls.append(last_call) - if not ctx.call_stack: - # we reached the root of the stack - _context.set(UsageContext()) # reset context to default values + if not ctx.call_stack or ( + len(ctx.call_stack) == 1 + and "feast.feature_store.FeatureStore.serve" + in str(ctx.call_stack[0].fn_name) + ): + # When running `feast serve`, the serve method never exits so it gets + # stuck otherwise _produce_event(ctx) + clear_context(ctx) return wrapper