Skip to content

Commit

Permalink
[Fix][Plugin] sw_flask general exceptions handled (#93)
Browse files Browse the repository at this point in the history
* sw_flask fix will handle errors like returning the wrong type from a handler or other internal errors.
* Updated StackedSpan to track depth, and made depth variable instance instead of class level (this was a bug).
* Changed how SpanContext decides when all spans finished to write Segment data, now counts span start / stops which should work better across different async scenarios.
* Changed new_exit_span() with span.inject() to work simpler like the NodeJS agent, now plugins inject directly themselves if they need to.
* Removed carrier from plugins which didn't actually use it.
  • Loading branch information
tom-pytel committed Dec 9, 2020
1 parent 80f1e72 commit 19e907d
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 93 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ gen:
lint: clean
flake8 --version || python3 -m pip install flake8
flake8 . --count --select=E9,F63,F7,F82 --show-source
flake8 . --count --max-complexity=12 --max-line-length=120
flake8 . --count --max-complexity=13 --max-line-length=120

license: clean
python3 tools/check-license-header.py skywalking tests tools
Expand Down
11 changes: 10 additions & 1 deletion skywalking/plugins/sw_flask.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
def install():
from flask import Flask
_full_dispatch_request = Flask.full_dispatch_request

_handle_user_exception = Flask.handle_user_exception
_handle_exception = Flask.handle_exception

def params_tostring(params):
return "\n".join([k + '=[' + ",".join(params.getlist(k)) + ']' for k, _ in params.items()])
Expand Down Expand Up @@ -66,5 +66,14 @@ def _sw_handle_user_exception(this: Flask, e):

return _handle_user_exception(this, e)

def _sw_handle_exception(this: Flask, e):
if e is not None:
entry_span = get_context().active_span()
if entry_span is not None and type(entry_span) is not NoopSpan:
entry_span.raised()

return _handle_exception(this, e)

Flask.full_dispatch_request = _sw_full_dispatch_request
Flask.handle_user_exception = _sw_handle_user_exception
Flask.handle_exception = _sw_handle_exception
11 changes: 4 additions & 7 deletions skywalking/plugins/sw_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,15 @@ def _sw_send(this, topic, value=None, key=None, headers=None, partition=None, ti

peer = ";".join(this.config["bootstrap_servers"])
context = get_context()
carrier = Carrier()
with context.new_exit_span(op="Kafka/" + topic + "/Producer" or "/", peer=peer, carrier=carrier) as span:
with context.new_exit_span(op="Kafka/" + topic + "/Producer" or "/", peer=peer) as span:
carrier = span.inject()
span.layer = Layer.MQ
span.component = Component.KafkaProducer

if headers is None:
headers = []
for item in carrier:
headers.append((item.key, item.val.encode("utf-8")))
else:
for item in carrier:
headers.append((item.key, item.val.encode("utf-8")))
for item in carrier:
headers.append((item.key, item.val.encode("utf-8")))

res = _send(this, topic, value=value, key=key, headers=headers, partition=partition,
timestamp_ms=timestamp_ms)
Expand Down
10 changes: 3 additions & 7 deletions skywalking/plugins/sw_pymongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from skywalking import Layer, Component, config
from skywalking.trace import tags
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag

Expand Down Expand Up @@ -56,11 +55,10 @@ def _sw_command(this: SocketInfo, dbname, spec, *args, **kwargs):
address = this.sock.getpeername()
peer = "%s:%s" % address
context = get_context()
carrier = Carrier()

operation = list(spec.keys())[0]
sw_op = operation.capitalize() + "Operation"
with context.new_exit_span(op="MongoDB/" + sw_op, peer=peer, carrier=carrier) as span:
with context.new_exit_span(op="MongoDB/" + sw_op, peer=peer) as span:
result = _command(this, dbname, spec, *args, **kwargs)

span.layer = Layer.Database
Expand Down Expand Up @@ -108,10 +106,9 @@ def _sw_execute(this: _Bulk, *args, **kwargs):
address = this.collection.database.client.address
peer = "%s:%s" % address
context = get_context()
carrier = Carrier()

sw_op = "MixedBulkWriteOperation"
with context.new_exit_span(op="MongoDB/"+sw_op, peer=peer, carrier=carrier) as span:
with context.new_exit_span(op="MongoDB/"+sw_op, peer=peer) as span:
span.layer = Layer.Database
span.component = Component.MongoDB

Expand Down Expand Up @@ -144,10 +141,9 @@ def _sw_send_message(this: Cursor, operation):
peer = "%s:%s" % address

context = get_context()
carrier = Carrier()
op = "FindOperation"

with context.new_exit_span(op="MongoDB/"+op, peer=peer, carrier=carrier) as span:
with context.new_exit_span(op="MongoDB/"+op, peer=peer) as span:
span.layer = Layer.Database
span.component = Component.MongoDB

Expand Down
4 changes: 1 addition & 3 deletions skywalking/plugins/sw_pymysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from skywalking import Layer, Component, config
from skywalking.trace import tags
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag

Expand All @@ -31,8 +30,7 @@ def _sw_execute(this: Cursor, query, args=None):
peer = "%s:%s" % (this.connection.host, this.connection.port)

context = get_context()
carrier = Carrier()
with context.new_exit_span(op="Mysql/PyMsql/execute", peer=peer, carrier=carrier) as span:
with context.new_exit_span(op="Mysql/PyMsql/execute", peer=peer) as span:
span.layer = Layer.Database
span.component = Component.PyMysql
res = _execute(this, query, args)
Expand Down
14 changes: 5 additions & 9 deletions skywalking/plugins/sw_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,18 @@ def _sw_basic_publish(this, exchange,
mandatory=False):
peer = '%s:%s' % (this.connection.params.host, this.connection.params.port)
context = get_context()
carrier = Carrier()
import pika
with context.new_exit_span(op="RabbitMQ/Topic/" + exchange + "/Queue/" + routing_key + "/Producer" or "/",
peer=peer, carrier=carrier) as span:
peer=peer) as span:
carrier = span.inject()
span.layer = Layer.MQ
span.component = Component.RabbitmqProducer
properties = pika.BasicProperties() if properties is None else properties

if properties.headers is None:
headers = {}
for item in carrier:
headers[item.key] = item.val
properties.headers = headers
else:
for item in carrier:
properties.headers[item.key] = item.val
properties.headers = {}
for item in carrier:
properties.headers[item.key] = item.val

res = _basic_publish(this, exchange,
routing_key,
Expand Down
12 changes: 4 additions & 8 deletions skywalking/plugins/sw_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from skywalking import Layer, Component
from skywalking.trace import tags
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag
from skywalking import config
Expand All @@ -44,18 +43,15 @@ def _sw_request(this: Session, method, url,
hooks, stream, verify, cert, json)

context = get_context()
carrier = Carrier()
with context.new_exit_span(op=url_param.path or "/", peer=url_param.netloc, carrier=carrier) as span:
with context.new_exit_span(op=url_param.path or "/", peer=url_param.netloc) as span:
carrier = span.inject()
span.layer = Layer.Http
span.component = Component.Requests

if headers is None:
headers = {}
for item in carrier:
headers[item.key] = item.val
else:
for item in carrier:
headers[item.key] = item.val
for item in carrier:
headers[item.key] = item.val

span.tag(Tag(key=tags.HttpMethod, val=method.upper()))
span.tag(Tag(key=tags.HttpUrl, val=url))
Expand Down
12 changes: 4 additions & 8 deletions skywalking/plugins/sw_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from skywalking import Layer, Component
from skywalking.trace import tags
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag

Expand All @@ -31,19 +30,16 @@ def _sw_request(this: RequestMethods, method, url, fields=None, headers=None, **

from urllib.parse import urlparse
url_param = urlparse(url)
carrier = Carrier()
context = get_context()
with context.new_exit_span(op=url_param.path or "/", peer=url_param.netloc, carrier=carrier) as span:
with context.new_exit_span(op=url_param.path or "/", peer=url_param.netloc) as span:
carrier = span.inject()
span.layer = Layer.Http
span.component = Component.Urllib3

if headers is None:
headers = {}
for item in carrier:
headers[item.key] = item.val
else:
for item in carrier:
headers[item.key] = item.val
for item in carrier:
headers[item.key] = item.val

span.tag(Tag(key=tags.HttpMethod, val=method.upper()))
span.tag(Tag(key=tags.HttpUrl, val=url))
Expand Down
5 changes: 2 additions & 3 deletions skywalking/plugins/sw_urllib_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

from skywalking import Layer, Component
from skywalking.trace import tags
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag

Expand All @@ -36,9 +35,9 @@ def _sw_open(this: OpenerDirector, fullurl, data=None, timeout=socket._GLOBAL_DE
fullurl = Request(fullurl, data)

context = get_context()
carrier = Carrier()
url = fullurl.selector.split("?")[0] if fullurl.selector else '/'
with context.new_exit_span(op=url, peer=fullurl.host, carrier=carrier) as span:
with context.new_exit_span(op=url, peer=fullurl.host) as span:
carrier = span.inject()
span.layer = Layer.Http
span.component = Component.General
code = None
Expand Down
20 changes: 12 additions & 8 deletions skywalking/trace/carrier.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,22 @@ def val(self, val: str):


class Carrier(CarrierItem):
def __init__(self):
def __init__(self, trace_id: str = '', segment_id: str = '', span_id: str = '', service: str = '',
service_instance: str = '', endpoint: str = '', client_address: str = '',
correlation: dict = None): # pyre-ignore
super(Carrier, self).__init__(key='sw8')
self.trace_id = '' # type: str
self.segment_id = '' # type: str
self.span_id = '' # type: str
self.service = '' # type: str
self.service_instance = '' # type: str
self.endpoint = '' # type: str
self.client_address = '' # type: str
self.trace_id = trace_id # type: str
self.segment_id = segment_id # type: str
self.span_id = span_id # type: str
self.service = service # type: str
self.service_instance = service_instance # type: str
self.endpoint = endpoint # type: str
self.client_address = client_address # type: str
self.correlation_carrier = SW8CorrelationCarrier()
self.items = [self.correlation_carrier, self] # type: List[CarrierItem]
self.__iter_index = 0 # type: int
if correlation is not None:
self.correlation_carrier.correlation = correlation

@property
def val(self) -> str:
Expand Down
24 changes: 10 additions & 14 deletions skywalking/trace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(self):
self.segment = Segment() # type: Segment
self._sid = Counter()
self._correlation = {} # type: dict
self._nspans = 0

def new_local_span(self, op: str) -> Span:
span = self.ignore_check(op, Kind.Local)
Expand Down Expand Up @@ -111,7 +112,7 @@ def new_entry_span(self, op: str, carrier: 'Carrier' = None) -> Span:

return span

def new_exit_span(self, op: str, peer: str, carrier: 'Carrier' = None) -> Span:
def new_exit_span(self, op: str, peer: str) -> Span:
span = self.ignore_check(op, Kind.Exit)
if span is not None:
return span
Expand All @@ -127,9 +128,6 @@ def new_exit_span(self, op: str, peer: str, carrier: 'Carrier' = None) -> Span:
peer=peer,
)

if carrier is not None:
span.inject(carrier=carrier)

return span

def ignore_check(self, op: str, kind: Kind):
Expand All @@ -150,22 +148,23 @@ def ignore_check(self, op: str, kind: Kind):
return None

def start(self, span: Span):
self._nspans += 1
spans = _spans()
if span not in spans:
spans.append(span)

def stop(self, span: Span) -> bool:
spans = _spans()
idx = spans.index(span) # span SHOULD now always be at end even in async-world, but just in case

if span.finish(self.segment):
del spans[idx]
span.finish(self.segment)
del spans[spans.index(span)]

if len(spans) == 0:
self._nspans -= 1
if self._nspans == 0:
_local().context = None
agent.archive(self.segment)
return True

return len(spans) == 0
return False

def active_span(self):
spans = _spans()
Expand Down Expand Up @@ -231,10 +230,7 @@ def new_entry_span(self, op: str, carrier: 'Carrier' = None) -> Span:
self._noop_span.extract(carrier)
return self._noop_span

def new_exit_span(self, op: str, peer: str, carrier: 'Carrier' = None) -> Span:
if carrier is not None:
self._noop_span.inject(carrier)

def new_exit_span(self, op: str, peer: str) -> Span:
return self._noop_span

def start(self, span: Span):
Expand Down

0 comments on commit 19e907d

Please sign in to comment.