Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Plugin] sw_flask general exceptions handled #93

Merged
merged 4 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ gen:
lint: clean
flake8 --version || python3 -m pip install flake8
flake8 . --count --select=E9,F63,F7,F82 --show-source
flake8 . --count --max-complexity=12 --max-line-length=120
flake8 . --count --max-complexity=13 --max-line-length=120

license: clean
python3 tools/check-license-header.py skywalking tests tools
Expand Down
11 changes: 10 additions & 1 deletion skywalking/plugins/sw_flask.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
def install():
from flask import Flask
_full_dispatch_request = Flask.full_dispatch_request

_handle_user_exception = Flask.handle_user_exception
_handle_exception = Flask.handle_exception

def params_tostring(params):
return "\n".join([k + '=[' + ",".join(params.getlist(k)) + ']' for k, _ in params.items()])
Expand Down Expand Up @@ -66,5 +66,14 @@ def _sw_handle_user_exception(this: Flask, e):

return _handle_user_exception(this, e)

def _sw_handle_exception(this: Flask, e):
if e is not None:
entry_span = get_context().active_span()
if entry_span is not None and type(entry_span) is not NoopSpan:
entry_span.raised()

return _handle_exception(this, e)

Flask.full_dispatch_request = _sw_full_dispatch_request
Flask.handle_user_exception = _sw_handle_user_exception
Flask.handle_exception = _sw_handle_exception
11 changes: 4 additions & 7 deletions skywalking/plugins/sw_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,15 @@ def _sw_send(this, topic, value=None, key=None, headers=None, partition=None, ti

peer = ";".join(this.config["bootstrap_servers"])
context = get_context()
carrier = Carrier()
with context.new_exit_span(op="Kafka/" + topic + "/Producer" or "/", peer=peer, carrier=carrier) as span:
with context.new_exit_span(op="Kafka/" + topic + "/Producer" or "/", peer=peer) as span:
carrier = span.inject()
span.layer = Layer.MQ
span.component = Component.KafkaProducer

if headers is None:
headers = []
for item in carrier:
headers.append((item.key, item.val.encode("utf-8")))
else:
for item in carrier:
headers.append((item.key, item.val.encode("utf-8")))
for item in carrier:
headers.append((item.key, item.val.encode("utf-8")))

res = _send(this, topic, value=value, key=key, headers=headers, partition=partition,
timestamp_ms=timestamp_ms)
Expand Down
10 changes: 3 additions & 7 deletions skywalking/plugins/sw_pymongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from skywalking import Layer, Component, config
from skywalking.trace import tags
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag

Expand Down Expand Up @@ -56,11 +55,10 @@ def _sw_command(this: SocketInfo, dbname, spec, *args, **kwargs):
address = this.sock.getpeername()
peer = "%s:%s" % address
context = get_context()
carrier = Carrier()

operation = list(spec.keys())[0]
sw_op = operation.capitalize() + "Operation"
with context.new_exit_span(op="MongoDB/" + sw_op, peer=peer, carrier=carrier) as span:
with context.new_exit_span(op="MongoDB/" + sw_op, peer=peer) as span:
result = _command(this, dbname, spec, *args, **kwargs)

span.layer = Layer.Database
Expand Down Expand Up @@ -108,10 +106,9 @@ def _sw_execute(this: _Bulk, *args, **kwargs):
address = this.collection.database.client.address
peer = "%s:%s" % address
context = get_context()
carrier = Carrier()

sw_op = "MixedBulkWriteOperation"
with context.new_exit_span(op="MongoDB/"+sw_op, peer=peer, carrier=carrier) as span:
with context.new_exit_span(op="MongoDB/"+sw_op, peer=peer) as span:
span.layer = Layer.Database
span.component = Component.MongoDB

Expand Down Expand Up @@ -144,10 +141,9 @@ def _sw_send_message(this: Cursor, operation):
peer = "%s:%s" % address

context = get_context()
carrier = Carrier()
op = "FindOperation"

with context.new_exit_span(op="MongoDB/"+op, peer=peer, carrier=carrier) as span:
with context.new_exit_span(op="MongoDB/"+op, peer=peer) as span:
span.layer = Layer.Database
span.component = Component.MongoDB

Expand Down
4 changes: 1 addition & 3 deletions skywalking/plugins/sw_pymysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from skywalking import Layer, Component, config
from skywalking.trace import tags
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag

Expand All @@ -31,8 +30,7 @@ def _sw_execute(this: Cursor, query, args=None):
peer = "%s:%s" % (this.connection.host, this.connection.port)

context = get_context()
carrier = Carrier()
with context.new_exit_span(op="Mysql/PyMsql/execute", peer=peer, carrier=carrier) as span:
with context.new_exit_span(op="Mysql/PyMsql/execute", peer=peer) as span:
span.layer = Layer.Database
span.component = Component.PyMysql
res = _execute(this, query, args)
Expand Down
14 changes: 5 additions & 9 deletions skywalking/plugins/sw_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,18 @@ def _sw_basic_publish(this, exchange,
mandatory=False):
peer = '%s:%s' % (this.connection.params.host, this.connection.params.port)
context = get_context()
carrier = Carrier()
import pika
with context.new_exit_span(op="RabbitMQ/Topic/" + exchange + "/Queue/" + routing_key + "/Producer" or "/",
peer=peer, carrier=carrier) as span:
peer=peer) as span:
carrier = span.inject()
span.layer = Layer.MQ
span.component = Component.RabbitmqProducer
properties = pika.BasicProperties() if properties is None else properties

if properties.headers is None:
headers = {}
for item in carrier:
headers[item.key] = item.val
properties.headers = headers
else:
for item in carrier:
properties.headers[item.key] = item.val
properties.headers = {}
for item in carrier:
properties.headers[item.key] = item.val

res = _basic_publish(this, exchange,
routing_key,
Expand Down
12 changes: 4 additions & 8 deletions skywalking/plugins/sw_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from skywalking import Layer, Component
from skywalking.trace import tags
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag
from skywalking import config
Expand All @@ -44,18 +43,15 @@ def _sw_request(this: Session, method, url,
hooks, stream, verify, cert, json)

context = get_context()
carrier = Carrier()
with context.new_exit_span(op=url_param.path or "/", peer=url_param.netloc, carrier=carrier) as span:
with context.new_exit_span(op=url_param.path or "/", peer=url_param.netloc) as span:
carrier = span.inject()
span.layer = Layer.Http
span.component = Component.Requests

if headers is None:
headers = {}
for item in carrier:
headers[item.key] = item.val
else:
for item in carrier:
headers[item.key] = item.val
for item in carrier:
headers[item.key] = item.val

span.tag(Tag(key=tags.HttpMethod, val=method.upper()))
span.tag(Tag(key=tags.HttpUrl, val=url))
Expand Down
12 changes: 4 additions & 8 deletions skywalking/plugins/sw_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from skywalking import Layer, Component
from skywalking.trace import tags
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag

Expand All @@ -31,19 +30,16 @@ def _sw_request(this: RequestMethods, method, url, fields=None, headers=None, **

from urllib.parse import urlparse
url_param = urlparse(url)
carrier = Carrier()
context = get_context()
with context.new_exit_span(op=url_param.path or "/", peer=url_param.netloc, carrier=carrier) as span:
with context.new_exit_span(op=url_param.path or "/", peer=url_param.netloc) as span:
carrier = span.inject()
span.layer = Layer.Http
span.component = Component.Urllib3

if headers is None:
headers = {}
for item in carrier:
headers[item.key] = item.val
else:
for item in carrier:
headers[item.key] = item.val
for item in carrier:
headers[item.key] = item.val

span.tag(Tag(key=tags.HttpMethod, val=method.upper()))
span.tag(Tag(key=tags.HttpUrl, val=url))
Expand Down
5 changes: 2 additions & 3 deletions skywalking/plugins/sw_urllib_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

from skywalking import Layer, Component
from skywalking.trace import tags
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag

Expand All @@ -36,9 +35,9 @@ def _sw_open(this: OpenerDirector, fullurl, data=None, timeout=socket._GLOBAL_DE
fullurl = Request(fullurl, data)

context = get_context()
carrier = Carrier()
url = fullurl.selector.split("?")[0] if fullurl.selector else '/'
with context.new_exit_span(op=url, peer=fullurl.host, carrier=carrier) as span:
with context.new_exit_span(op=url, peer=fullurl.host) as span:
carrier = span.inject()
span.layer = Layer.Http
span.component = Component.General
code = None
Expand Down
20 changes: 12 additions & 8 deletions skywalking/trace/carrier.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,22 @@ def val(self, val: str):


class Carrier(CarrierItem):
def __init__(self):
def __init__(self, trace_id: str = '', segment_id: str = '', span_id: str = '', service: str = '',
service_instance: str = '', endpoint: str = '', client_address: str = '',
correlation: dict = None): # pyre-ignore
super(Carrier, self).__init__(key='sw8')
self.trace_id = '' # type: str
self.segment_id = '' # type: str
self.span_id = '' # type: str
self.service = '' # type: str
self.service_instance = '' # type: str
self.endpoint = '' # type: str
self.client_address = '' # type: str
self.trace_id = trace_id # type: str
self.segment_id = segment_id # type: str
self.span_id = span_id # type: str
self.service = service # type: str
self.service_instance = service_instance # type: str
self.endpoint = endpoint # type: str
self.client_address = client_address # type: str
self.correlation_carrier = SW8CorrelationCarrier()
self.items = [self.correlation_carrier, self] # type: List[CarrierItem]
self.__iter_index = 0 # type: int
if correlation is not None:
self.correlation_carrier.correlation = correlation

@property
def val(self) -> str:
Expand Down
24 changes: 10 additions & 14 deletions skywalking/trace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(self):
self.segment = Segment() # type: Segment
self._sid = Counter()
self._correlation = {} # type: dict
self._nspans = 0

def new_local_span(self, op: str) -> Span:
span = self.ignore_check(op, Kind.Local)
Expand Down Expand Up @@ -111,7 +112,7 @@ def new_entry_span(self, op: str, carrier: 'Carrier' = None) -> Span:

return span

def new_exit_span(self, op: str, peer: str, carrier: 'Carrier' = None) -> Span:
def new_exit_span(self, op: str, peer: str) -> Span:
span = self.ignore_check(op, Kind.Exit)
if span is not None:
return span
Expand All @@ -127,9 +128,6 @@ def new_exit_span(self, op: str, peer: str, carrier: 'Carrier' = None) -> Span:
peer=peer,
)

if carrier is not None:
span.inject(carrier=carrier)

return span

def ignore_check(self, op: str, kind: Kind):
Expand All @@ -150,22 +148,23 @@ def ignore_check(self, op: str, kind: Kind):
return None

def start(self, span: Span):
self._nspans += 1
spans = _spans()
if span not in spans:
spans.append(span)

def stop(self, span: Span) -> bool:
spans = _spans()
idx = spans.index(span) # span SHOULD now always be at end even in async-world, but just in case

if span.finish(self.segment):
del spans[idx]
span.finish(self.segment)
del spans[spans.index(span)]

if len(spans) == 0:
self._nspans -= 1
if self._nspans == 0:
_local().context = None
agent.archive(self.segment)
return True

return len(spans) == 0
return False

def active_span(self):
spans = _spans()
Expand Down Expand Up @@ -231,10 +230,7 @@ def new_entry_span(self, op: str, carrier: 'Carrier' = None) -> Span:
self._noop_span.extract(carrier)
return self._noop_span

def new_exit_span(self, op: str, peer: str, carrier: 'Carrier' = None) -> Span:
if carrier is not None:
self._noop_span.inject(carrier)

def new_exit_span(self, op: str, peer: str) -> Span:
return self._noop_span

def start(self, span: Span):
Expand Down