Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions skywalking/plugins/sw_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,12 @@ def _sw_perform_request(this: Transport, method, url, headers=None, params=None,
with context.new_exit_span(op="Elasticsearch/" + method + url, peer=peer) as span:
span.layer = Layer.Database
span.component = Component.Elasticsearch
try:
res = _perform_request(this, method, url, headers=headers, params=params, body=body)
res = _perform_request(this, method, url, headers=headers, params=params, body=body)

span.tag(Tag(key=tags.DbType, val="Elasticsearch"))
if config.elasticsearch_trace_dsl:
span.tag(Tag(key=tags.DbStatement, val="" if body is None else body))
span.tag(Tag(key=tags.DbType, val="Elasticsearch"))
if config.elasticsearch_trace_dsl:
span.tag(Tag(key=tags.DbStatement, val="" if body is None else body))

except BaseException as e:
span.raised()
raise e
return res

Transport.perform_request = _sw_perform_request
13 changes: 5 additions & 8 deletions skywalking/plugins/sw_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,11 @@ def _sw_send(this, topic, value=None, key=None, headers=None, partition=None, ti
for item in carrier:
headers.append((item.key, item.val.encode("utf-8")))

try:
res = _send(this, topic, value=value, key=key, headers=headers, partition=partition,
timestamp_ms=timestamp_ms)
span.tag(Tag(key=tags.MqBroker, val=peer))
span.tag(Tag(key=tags.MqTopic, val=topic))
except BaseException as e:
span.raised()
raise e
res = _send(this, topic, value=value, key=key, headers=headers, partition=partition,
timestamp_ms=timestamp_ms)
span.tag(Tag(key=tags.MqBroker, val=peer))
span.tag(Tag(key=tags.MqTopic, val=topic))

return res

return _sw_send
84 changes: 35 additions & 49 deletions skywalking/plugins/sw_pymongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,19 @@ def _sw_command(this: SocketInfo, dbname, spec, *args, **kwargs):
operation = list(spec.keys())[0]
sw_op = operation.capitalize() + "Operation"
with context.new_exit_span(op="MongoDB/" + sw_op, peer=peer, carrier=carrier) as span:
try:
result = _command(this, dbname, spec, *args, **kwargs)

span.layer = Layer.Database
span.component = Component.MongoDB
span.tag(Tag(key=tags.DbType, val="MongoDB"))
span.tag(Tag(key=tags.DbInstance, val=dbname))

if config.pymongo_trace_parameters:
# get filters
filters = _get_filter(operation, spec)
max_len = config.pymongo_parameters_max_length
filters = filters[0:max_len] + "..." if len(filters) > max_len else filters
span.tag(Tag(key=tags.DbStatement, val=filters))
except BaseException as e:
span.raised()
raise e
result = _command(this, dbname, spec, *args, **kwargs)

span.layer = Layer.Database
span.component = Component.MongoDB
span.tag(Tag(key=tags.DbType, val="MongoDB"))
span.tag(Tag(key=tags.DbInstance, val=dbname))

if config.pymongo_trace_parameters:
# get filters
filters = _get_filter(operation, spec)
max_len = config.pymongo_parameters_max_length
filters = filters[0:max_len] + "..." if len(filters) > max_len else filters
span.tag(Tag(key=tags.DbStatement, val=filters))

else:
result = _command(this, dbname, spec, *args, **kwargs)
Expand Down Expand Up @@ -123,26 +119,21 @@ def _sw_execute(this: _Bulk, *args, **kwargs):
span.layer = Layer.Database
span.component = Component.MongoDB

try:
bulk_result = _execute(this, *args, **kwargs)
bulk_result = _execute(this, *args, **kwargs)

span.tag(Tag(key=tags.DbType, val="MongoDB"))
span.tag(Tag(key=tags.DbInstance, val=this.collection.database.name))
if config.pymongo_trace_parameters:
filters = ""
bulk_ops = this.ops
for bulk_op in bulk_ops:
opname = bulk_op_map.get(bulk_op[0])
_filter = opname + " " + str(bulk_op[1])
filters = filters + _filter + " "
span.tag(Tag(key=tags.DbType, val="MongoDB"))
span.tag(Tag(key=tags.DbInstance, val=this.collection.database.name))
if config.pymongo_trace_parameters:
filters = ""
bulk_ops = this.ops
for bulk_op in bulk_ops:
opname = bulk_op_map.get(bulk_op[0])
_filter = opname + " " + str(bulk_op[1])
filters = filters + _filter + " "

max_len = config.pymongo_parameters_max_length
filters = filters[0:max_len] + "..." if len(filters) > max_len else filters
span.tag(Tag(key=tags.DbStatement, val=filters))

except BaseException as e:
span.raised()
raise e
max_len = config.pymongo_parameters_max_length
filters = filters[0:max_len] + "..." if len(filters) > max_len else filters
span.tag(Tag(key=tags.DbStatement, val=filters))

return bulk_result

Expand All @@ -164,22 +155,17 @@ def _sw_send_message(this: Cursor, operation):
span.layer = Layer.Database
span.component = Component.MongoDB

try:
# __send_message return nothing
__send_message(this, operation)

span.tag(Tag(key=tags.DbType, val="MongoDB"))
span.tag(Tag(key=tags.DbInstance, val=this.collection.database.name))
# __send_message return nothing
__send_message(this, operation)

if config.pymongo_trace_parameters:
filters = "find " + str(operation.spec)
max_len = config.pymongo_parameters_max_length
filters = filters[0:max_len] + "..." if len(filters) > max_len else filters
span.tag(Tag(key=tags.DbStatement, val=filters))
span.tag(Tag(key=tags.DbType, val="MongoDB"))
span.tag(Tag(key=tags.DbInstance, val=this.collection.database.name))

except BaseException as e:
span.raised()
raise e
if config.pymongo_trace_parameters:
filters = "find " + str(operation.spec)
max_len = config.pymongo_parameters_max_length
filters = filters[0:max_len] + "..." if len(filters) > max_len else filters
span.tag(Tag(key=tags.DbStatement, val=filters))

return

Expand Down
28 changes: 12 additions & 16 deletions skywalking/plugins/sw_pymysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,18 @@ def _sw_execute(this: Cursor, query, args=None):
with context.new_exit_span(op="Mysql/PyMsql/execute", peer=peer, carrier=carrier) as span:
span.layer = Layer.Database
span.component = Component.PyMysql
try:
res = _execute(this, query, args)

span.tag(Tag(key=tags.DbType, val="mysql"))
span.tag(Tag(key=tags.DbInstance, val=this.connection.db.decode("utf-8")))
span.tag(Tag(key=tags.DbStatement, val=query))

if config.mysql_trace_sql_parameters and args:
parameter = ",".join([str(arg) for arg in args])
max_len = config.mysql_sql_parameters_max_length
parameter = parameter[0:max_len] + "..." if len(parameter) > max_len else parameter
span.tag(Tag(key=tags.DbSqlParameters, val='[' + parameter + ']'))

except BaseException as e:
span.raised()
raise e
res = _execute(this, query, args)

span.tag(Tag(key=tags.DbType, val="mysql"))
span.tag(Tag(key=tags.DbInstance, val=this.connection.db.decode("utf-8")))
span.tag(Tag(key=tags.DbStatement, val=query))

if config.mysql_trace_sql_parameters and args:
parameter = ",".join([str(arg) for arg in args])
max_len = config.mysql_sql_parameters_max_length
parameter = parameter[0:max_len] + "..." if len(parameter) > max_len else parameter
span.tag(Tag(key=tags.DbSqlParameters, val='[' + parameter + ']'))

return res

Cursor.execute = _sw_execute
33 changes: 13 additions & 20 deletions skywalking/plugins/sw_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,15 @@ def _sw_basic_publish(this, exchange,
for item in carrier:
properties.headers[item.key] = item.val

try:
res = _basic_publish(this, exchange,
routing_key,
body,
properties=properties,
mandatory=mandatory)
span.tag(Tag(key=tags.MqBroker, val=peer))
span.tag(Tag(key=tags.MqTopic, val=exchange))
span.tag(Tag(key=tags.MqQueue, val=routing_key))
except BaseException as e:
span.raised()
raise e
res = _basic_publish(this, exchange,
routing_key,
body,
properties=properties,
mandatory=mandatory)
span.tag(Tag(key=tags.MqBroker, val=peer))
span.tag(Tag(key=tags.MqTopic, val=exchange))
span.tag(Tag(key=tags.MqQueue, val=routing_key))

return res

return _sw_basic_publish
Expand All @@ -91,13 +88,9 @@ def _sw__on_deliver(this, method_frame, header_frame, body):
+ "/Consumer" or "", carrier=carrier) as span:
span.layer = Layer.MQ
span.component = Component.RabbitmqConsumer
try:
__on_deliver(this, method_frame, header_frame, body)
span.tag(Tag(key=tags.MqBroker, val=peer))
span.tag(Tag(key=tags.MqTopic, val=exchange))
span.tag(Tag(key=tags.MqQueue, val=routing_key))
except BaseException as e:
span.raised()
raise e
__on_deliver(this, method_frame, header_frame, body)
span.tag(Tag(key=tags.MqBroker, val=peer))
span.tag(Tag(key=tags.MqTopic, val=exchange))
span.tag(Tag(key=tags.MqQueue, val=routing_key))

return _sw__on_deliver
13 changes: 5 additions & 8 deletions skywalking/plugins/sw_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,11 @@ def _sw_send_command(this: Connection, *args, **kwargs):
span.layer = Layer.Cache
span.component = Component.Redis

try:
res = _send_command(this, *args, **kwargs)
span.tag(Tag(key=tags.DbType, val="Redis"))
span.tag(Tag(key=tags.DbInstance, val=this.db))
span.tag(Tag(key=tags.DbStatement, val=op))
except BaseException as e:
span.raised()
raise e
res = _send_command(this, *args, **kwargs)
span.tag(Tag(key=tags.DbType, val="Redis"))
span.tag(Tag(key=tags.DbInstance, val=this.db))
span.tag(Tag(key=tags.DbStatement, val=op))

return res

Connection.send_command = _sw_send_command
23 changes: 10 additions & 13 deletions skywalking/plugins/sw_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,17 @@ def _sw_request(this: Session, method, url,
for item in carrier:
headers[item.key] = item.val

try:
res = _request(this, method, url, params, data, headers, cookies, files, auth, timeout,
allow_redirects,
proxies,
hooks, stream, verify, cert, json)
res = _request(this, method, url, params, data, headers, cookies, files, auth, timeout,
allow_redirects,
proxies,
hooks, stream, verify, cert, json)

span.tag(Tag(key=tags.HttpMethod, val=method.upper()))
span.tag(Tag(key=tags.HttpUrl, val=url))
span.tag(Tag(key=tags.HttpStatus, val=res.status_code))
if res.status_code >= 400:
span.error_occurred = True

span.tag(Tag(key=tags.HttpMethod, val=method.upper()))
span.tag(Tag(key=tags.HttpUrl, val=url))
span.tag(Tag(key=tags.HttpStatus, val=res.status_code))
if res.status_code >= 400:
span.error_occurred = True
except BaseException as e:
span.raised()
raise e
return res

Session.request = _sw_request
17 changes: 7 additions & 10 deletions skywalking/plugins/sw_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,14 @@ def _sw_request(this: RequestMethods, method, url, fields=None, headers=None, **
for item in carrier:
headers[item.key] = item.val

try:
res = _request(this, method, url, fields=fields, headers=headers, **urlopen_kw)
res = _request(this, method, url, fields=fields, headers=headers, **urlopen_kw)

span.tag(Tag(key=tags.HttpMethod, val=method.upper()))
span.tag(Tag(key=tags.HttpUrl, val=url))
span.tag(Tag(key=tags.HttpStatus, val=res.status))
if res.status >= 400:
span.error_occurred = True

span.tag(Tag(key=tags.HttpMethod, val=method.upper()))
span.tag(Tag(key=tags.HttpUrl, val=url))
span.tag(Tag(key=tags.HttpStatus, val=res.status))
if res.status >= 400:
span.error_occurred = True
except BaseException as e:
span.raised()
raise e
return res

RequestMethods.request = _sw_request
18 changes: 7 additions & 11 deletions skywalking/plugins/sw_urllib_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

def install():
from urllib.request import OpenerDirector
from urllib.error import HTTPError

_open = OpenerDirector.open

Expand All @@ -45,16 +44,13 @@ def _sw_open(this: OpenerDirector, fullurl, data, timeout):

[fullurl.add_header(item.key, item.val) for item in carrier]

try:
res = _open(this, fullurl, data, timeout)
span.tag(Tag(key=tags.HttpMethod, val=fullurl.get_method()))
span.tag(Tag(key=tags.HttpUrl, val=fullurl.full_url))
span.tag(Tag(key=tags.HttpStatus, val=res.code))
if res.code >= 400:
span.error_occurred = True
except HTTPError as e:
span.raised()
raise e
res = _open(this, fullurl, data, timeout)
span.tag(Tag(key=tags.HttpMethod, val=fullurl.get_method()))
span.tag(Tag(key=tags.HttpUrl, val=fullurl.full_url))
span.tag(Tag(key=tags.HttpStatus, val=res.code))
if res.code >= 400:
span.error_occurred = True

return res

OpenerDirector.open = _sw_open
2 changes: 2 additions & 0 deletions skywalking/trace/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_type, BaseException):
self.raised()
Comment thread
kezhenxu94 marked this conversation as resolved.
self.stop()
if exc_tb is not None:
return False
Expand Down