Skip to content

Commit

Permalink
Beam integration (#446)
Browse files Browse the repository at this point in the history
* Created Beam Integration
  • Loading branch information
tiopi committed Aug 9, 2019
1 parent 211f7f4 commit c8a1be9
Show file tree
Hide file tree
Showing 3 changed files with 359 additions and 0 deletions.
148 changes: 148 additions & 0 deletions sentry_sdk/integrations/beam.py
@@ -0,0 +1,148 @@
from __future__ import absolute_import

import sys
import types
from functools import wraps

from sentry_sdk.hub import Hub
from sentry_sdk._compat import reraise
from sentry_sdk.utils import capture_internal_exceptions, event_from_exception
from sentry_sdk.integrations import Integration
from sentry_sdk.integrations.logging import ignore_logger

WRAPPED_FUNC = "_wrapped_{}_"
INSPECT_FUNC = "_inspect_{}" # Required format per apache_beam/transforms/core.py
USED_FUNC = "_sentry_used_"


class BeamIntegration(Integration):
identifier = "beam"

@staticmethod
def setup_once():
# type: () -> None
from apache_beam.transforms.core import DoFn, ParDo # type: ignore

ignore_logger("root")
ignore_logger("bundle_processor.create")

function_patches = ["process", "start_bundle", "finish_bundle", "setup"]
for func_name in function_patches:
setattr(
DoFn,
INSPECT_FUNC.format(func_name),
_wrap_inspect_call(DoFn, func_name),
)

old_init = ParDo.__init__

def sentry_init_pardo(self, fn, *args, **kwargs):
# Do not monkey patch init twice
if not getattr(self, "_sentry_is_patched", False):
for func_name in function_patches:
if not hasattr(fn, func_name):
continue
wrapped_func = WRAPPED_FUNC.format(func_name)

# Check to see if inspect is set and process is not
# to avoid monkey patching process twice.
# Check to see if function is part of object for
# backwards compatibility.
process_func = getattr(fn, func_name)
inspect_func = getattr(fn, INSPECT_FUNC.format(func_name))
if not getattr(inspect_func, USED_FUNC, False) and not getattr(
process_func, USED_FUNC, False
):
setattr(fn, wrapped_func, process_func)
setattr(fn, func_name, _wrap_task_call(process_func))

self._sentry_is_patched = True
old_init(self, fn, *args, **kwargs)

ParDo.__init__ = sentry_init_pardo


def _wrap_inspect_call(cls, func_name):
from apache_beam.typehints.decorators import getfullargspec # type: ignore

if not hasattr(cls, func_name):
return None

def _inspect(self):
"""
Inspect function overrides the way Beam gets argspec.
"""
wrapped_func = WRAPPED_FUNC.format(func_name)
if hasattr(self, wrapped_func):
process_func = getattr(self, wrapped_func)
else:
process_func = getattr(self, func_name)
setattr(self, func_name, _wrap_task_call(process_func))
setattr(self, wrapped_func, process_func)
return getfullargspec(process_func)

setattr(_inspect, USED_FUNC, True)
return _inspect


def _wrap_task_call(func):
"""
Wrap task call with a try catch to get exceptions.
Pass the client on to raise_exception so it can get rebinded.
"""
client = Hub.current.client

@wraps(func)
def _inner(*args, **kwargs):
try:
gen = func(*args, **kwargs)
except Exception:
raise_exception(client)

if not isinstance(gen, types.GeneratorType):
return gen
return _wrap_generator_call(gen, client)

setattr(_inner, USED_FUNC, True)
return _inner


def _capture_exception(exc_info, hub):
"""
Send Beam exception to Sentry.
"""
integration = hub.get_integration(BeamIntegration)
if integration:
client = hub.client
event, hint = event_from_exception(
exc_info,
client_options=client.options,
mechanism={"type": "beam", "handled": False},
)
hub.capture_event(event, hint=hint)


def raise_exception(client):
"""
Raise an exception. If the client is not in the hub, rebind it.
"""
hub = Hub.current
if hub.client is None:
hub.bind_client(client)
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc_info, hub)
reraise(*exc_info)


def _wrap_generator_call(gen, client):
"""
Wrap the generator to handle any failures.
"""
while True:
try:
yield next(gen)
except StopIteration:
break
except Exception:
raise_exception(client)
203 changes: 203 additions & 0 deletions tests/integrations/beam/test_beam.py
@@ -0,0 +1,203 @@
import pytest
import inspect

pytest.importorskip("apache_beam")

import dill

from sentry_sdk.integrations.beam import (
BeamIntegration,
_wrap_task_call,
_wrap_inspect_call,
)

from apache_beam.typehints.trivial_inference import instance_to_type
from apache_beam.typehints.decorators import getcallargs_forhints
from apache_beam.transforms.core import DoFn, ParDo, _DoFnParam, CallableWrapperDoFn
from apache_beam.runners.common import DoFnInvoker, OutputProcessor, DoFnContext
from apache_beam.utils.windowed_value import WindowedValue


def foo():
return True


def bar(x, y):
# print(x + y)
return True


def baz(x, y=2):
# print(x + y)
return True


class A:
def __init__(self, fn):
self.r = "We are in A"
self.fn = fn
setattr(self, "_inspect_fn", _wrap_inspect_call(self, "fn"))

def process(self):
return self.fn()


class B(A, object):
def fa(self, x, element=False, another_element=False):
if x or (element and not another_element):
# print(self.r)
return True
1 / 0
return False

def __init__(self):
self.r = "We are in B"
super(B, self).__init__(self.fa)


class SimpleFunc(DoFn):
def process(self, x):
if x:
1 / 0
return [True]


class PlaceHolderFunc(DoFn):
def process(self, x, timestamp=DoFn.TimestampParam, wx=DoFn.WindowParam):
if isinstance(timestamp, _DoFnParam) or isinstance(wx, _DoFnParam):
raise Exception("Bad instance")
if x:
1 / 0
yield True


def fail(x):
if x:
1 / 0
return [True]


test_parent = A(foo)
test_child = B()
test_simple = SimpleFunc()
test_place_holder = PlaceHolderFunc()
test_callable = CallableWrapperDoFn(fail)


# Cannot call simple functions or placeholder test.
@pytest.mark.parametrize(
"obj,f,args,kwargs",
[
[test_parent, "fn", (), {}],
[test_child, "fn", (False,), {"element": True}],
[test_child, "fn", (True,), {}],
[test_simple, "process", (False,), {}],
[test_callable, "process", (False,), {}],
],
)
def test_monkey_patch_call(obj, f, args, kwargs):
func = getattr(obj, f)

assert func(*args, **kwargs)
assert _wrap_task_call(func)(*args, **kwargs)


@pytest.mark.parametrize("f", [foo, bar, baz, test_parent.fn, test_child.fn])
def test_monkey_patch_pickle(f):
f_temp = _wrap_task_call(f)
assert dill.pickles(f_temp), "{} is not pickling correctly!".format(f)

# Pickle everything
s1 = dill.dumps(f_temp)
s2 = dill.loads(s1)
dill.dumps(s2)


@pytest.mark.parametrize(
"f,args,kwargs",
[
[foo, (), {}],
[bar, (1, 5), {}],
[baz, (1,), {}],
[test_parent.fn, (), {}],
[test_child.fn, (False,), {"element": True}],
[test_child.fn, (True,), {}],
],
)
def test_monkey_patch_signature(f, args, kwargs):
arg_types = [instance_to_type(v) for v in args]
kwargs_types = {k: instance_to_type(v) for (k, v) in kwargs.items()}
f_temp = _wrap_task_call(f)
try:
getcallargs_forhints(f, *arg_types, **kwargs_types)
except Exception:
print("Failed on {} with parameters {}, {}".format(f, args, kwargs))
raise
try:
getcallargs_forhints(f_temp, *arg_types, **kwargs_types)
except Exception:
print("Failed on {} with parameters {}, {}".format(f_temp, args, kwargs))
raise
try:
expected_signature = inspect.signature(f)
test_signature = inspect.signature(f_temp)
assert (
expected_signature == test_signature
), "Failed on {}, signature {} does not match {}".format(
f, expected_signature, test_signature
)
except Exception:
# expected to pass for py2.7
pass


class _OutputProcessor(OutputProcessor):
def process_outputs(self, windowed_input_element, results):
print(windowed_input_element)
try:
for result in results:
assert result
except StopIteration:
print("In here")


@pytest.fixture
def init_beam(sentry_init):
def inner(fn):
sentry_init(default_integrations=False, integrations=[BeamIntegration()])
# Little hack to avoid having to run the whole pipeline.
pardo = ParDo(fn)
signature = pardo._signature
output_processor = _OutputProcessor()
return DoFnInvoker.create_invoker(
signature, output_processor, DoFnContext("test")
)

return inner


@pytest.mark.parametrize("fn", [test_simple, test_callable, test_place_holder])
def test_invoker_normal(init_beam, fn):
invoker = init_beam(fn)
print("Normal testing {} with {} invoker.".format(fn, invoker))
windowed_value = WindowedValue(False, 0, [None])
invoker.invoke_process(windowed_value)


@pytest.mark.parametrize("fn", [test_simple, test_callable, test_place_holder])
def test_invoker_exception(init_beam, capture_events, capture_exceptions, fn):
invoker = init_beam(fn)
events = capture_events()

print("Exception testing {} with {} invoker.".format(fn, invoker))
# Window value will always have one value for the process to run.
windowed_value = WindowedValue(True, 0, [None])
try:
invoker.invoke_process(windowed_value)
except Exception:
pass

event, = events
exception, = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"
assert exception["mechanism"]["type"] == "beam"
8 changes: 8 additions & 0 deletions tox.ini
Expand Up @@ -32,6 +32,9 @@ envlist =
{pypy,py2.7,py3.5,py3.6,py3.7,py3.8}-celery-{4.1,4.2,4.3}
{pypy,py2.7}-celery-3

{py2.7,py3.6}-beam-{12,13,master}
py3.7-beam-{12,13}

# The aws_lambda tests deploy to the real AWS and have their own matrix of Python versions.
py3.7-aws_lambda

Expand Down Expand Up @@ -93,6 +96,10 @@ deps =
{py3.5,py3.6}-sanic: aiocontextvars==0.2.1
sanic: aiohttp

beam-12: apache-beam>=2.12.0, <2.13.0
beam-13: apache-beam>=2.13.0, <2.14.0
beam-master: git+https://github.com/apache/beam#egg=apache-beam&subdirectory=sdks/python

celery-3: Celery>=3.1,<4.0
celery-4.1: Celery>=4.1,<4.2
celery-4.2: Celery>=4.2,<4.3
Expand Down Expand Up @@ -154,6 +161,7 @@ deps =
setenv =
PYTHONDONTWRITEBYTECODE=1
TESTPATH=tests
beam: TESTPATH=tests/integrations/beam
django: TESTPATH=tests/integrations/django
flask: TESTPATH=tests/integrations/flask
bottle: TESTPATH=tests/integrations/bottle
Expand Down

0 comments on commit c8a1be9

Please sign in to comment.