Skip to content

Commit

Permalink
fix: Fix memory leak from usage.py not properly cleaning up call stack (
Browse files Browse the repository at this point in the history
#3371)

* fix: Fix memory leak from usage.py not properly cleaning up call stack context

Signed-off-by: Danny Chiao <danny@tecton.ai>

* fix

Signed-off-by: Danny Chiao <danny@tecton.ai>

Signed-off-by: Danny Chiao <danny@tecton.ai>
  • Loading branch information
adchia committed Dec 2, 2022
1 parent 4ebe00f commit a0c6fde
Showing 1 changed file with 24 additions and 14 deletions.
38 changes: 24 additions & 14 deletions sdk/python/feast/usage.py
Expand Up @@ -78,7 +78,7 @@ class FnCall:


class Sampler:
def should_record(self, event) -> bool:
def should_record(self) -> bool:
raise NotImplementedError

@property
Expand All @@ -87,7 +87,7 @@ def priority(self):


class AlwaysSampler(Sampler):
def should_record(self, event) -> bool:
def should_record(self) -> bool:
return True


Expand All @@ -100,7 +100,7 @@ def __init__(self, ratio):
self.total_counter = 0
self.sampled_counter = 0

def should_record(self, event) -> bool:
def should_record(self) -> bool:
self.total_counter += 1
if self.total_counter == self.MAX_COUNTER:
self.total_counter = 1
Expand Down Expand Up @@ -176,10 +176,12 @@ def _set_installation_id():


def _export(event: typing.Dict[str, typing.Any]):
_executor.submit(requests.post, USAGE_ENDPOINT, json=event, timeout=30)
_executor.submit(requests.post, USAGE_ENDPOINT, json=event, timeout=2)


def _produce_event(ctx: UsageContext):
if ctx.sampler and not ctx.sampler.should_record():
return
# Cannot check for unittest because typeguard pulls in unittest
is_test = flags_helper.is_test() or bool({"pytest"} & sys.modules.keys())
event = {
Expand All @@ -204,10 +206,6 @@ def _produce_event(ctx: UsageContext):
**_constant_attributes,
}
event.update(ctx.attributes)

if ctx.sampler and not ctx.sampler.should_record(event):
return

_export(event)


Expand Down Expand Up @@ -262,6 +260,13 @@ def deeply_nested(...):
"""
sampler = attrs.pop("sampler", AlwaysSampler())

def clear_context(ctx):
_context.set(UsageContext()) # reset context to default values
# TODO: Figure out why without this, new contexts.get aren't reset
ctx.call_stack = []
ctx.completed_calls = []
ctx.attributes = {}

def decorator(func):
if not _is_enabled:
return func
Expand Down Expand Up @@ -295,17 +300,22 @@ def wrapper(*args, **kwargs):

raise exc
finally:
last_call = ctx.call_stack.pop(-1)
last_call.end = datetime.utcnow()
ctx.completed_calls.append(last_call)
ctx.sampler = (
sampler if sampler.priority > ctx.sampler.priority else ctx.sampler
)
last_call = ctx.call_stack.pop(-1)
last_call.end = datetime.utcnow()
ctx.completed_calls.append(last_call)

if not ctx.call_stack:
# we reached the root of the stack
_context.set(UsageContext()) # reset context to default values
if not ctx.call_stack or (
len(ctx.call_stack) == 1
and "feast.feature_store.FeatureStore.serve"
in str(ctx.call_stack[0].fn_name)
):
# When running `feast serve`, the serve method never exits so it gets
# stuck otherwise
_produce_event(ctx)
clear_context(ctx)

return wrapper

Expand Down

0 comments on commit a0c6fde

Please sign in to comment.