Skip to content

Commit

Permalink
tweaks and minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-pytel committed Jun 22, 2021
1 parent 7c564db commit f25844f
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 9 deletions.
1 change: 1 addition & 0 deletions docs/EnvVars.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ Environment Variable | Description | Default
| `SW_KAFKA_REPORTER_TOPIC_MANAGEMENT` | Specifying Kafka topic name for service instance reporting and registering. | `skywalking-managements` |
| `SW_KAFKA_REPORTER_TOPIC_SEGMENT` | Specifying Kafka topic name for Tracing data. | `skywalking-segments` |
| `SW_KAFKA_REPORTER_CONFIG_key` | The configs to init KafkaProducer. it support the basic arguments (whose type is either `str`, `bool`, or `int`) listed [here](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer) | unset |
| `SW_CELERY_PARAMETERS_LENGTH`| The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off | `512` |
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ aiofiles==0.6.0
aiohttp==3.7.3
attrs==19.3.0
blindspin==2.0.1
celery==4.4.7
certifi==2020.6.20
chardet==3.0.4
click==7.1.2
Expand Down
3 changes: 2 additions & 1 deletion skywalking/agent/protocol/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def __init__(self):
self.traces_reporter = HttpTraceSegmentReportService()

def fork_after_in_child(self):
self.__init__()
self.service_management.fork_after_in_child()
self.traces_reporter.fork_after_in_child()

def connected(self):
return True
Expand Down
2 changes: 1 addition & 1 deletion skywalking/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092" # type: str
kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements" # type: str
kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments" # type: str
celery_parameters = os.getenv('SW_CELERY_PARAMETERS', '').lower() == 'true'
celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')


def init(
Expand Down
15 changes: 10 additions & 5 deletions skywalking/plugins/sw_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def install():
from celery import Celery

def send_task(self, name, args=None, kwargs=None, **options):
# NOTE: Lines commented out below left for documentation purposes if sometime in the future exchange / queue
# names are wanted. Currently these do not match between producer and consumer so would need some work.

broker_url = self.conf['broker_url']
# exchange = options['exchange']
# queue = options['routing_key']
Expand All @@ -47,8 +50,9 @@ def send_task(self, name, args=None, kwargs=None, **options):
# span.tag(Tag(key=tags.MqTopic, val=exchange))
# span.tag(Tag(key=tags.MqQueue, val=queue))

if config.celery_parameters:
span.tag(Tag(key=tags.CeleryParameters, val='*{}, **{}'.format(args, kwargs)))
if config.celery_parameters_length:
params = '*{}, **{}'.format(args, kwargs)[:config.celery_parameters_length]
span.tag(Tag(key=tags.CeleryParameters, val=params))

options = {**options}
headers = options.get('headers')
Expand Down Expand Up @@ -83,20 +87,21 @@ def fun(*args, **kwargs):

if req.get('sw8'):
span = context.new_entry_span(op=op, carrier=carrier)
span.peer = (req.get('hostname') or '???').split('@', 1)[-1]
else:
span = context.new_local_span(op=op)

with span:
span.layer = Layer.MQ
span.component = Component.Celery
span.peer = req.get('hostname') or 'localhost'

span.tag(Tag(key=tags.MqBroker, val=task.app.conf['broker_url']))
# span.tag(Tag(key=tags.MqTopic, val=exchange))
# span.tag(Tag(key=tags.MqQueue, val=queue))

if config.celery_parameters:
span.tag(Tag(key=tags.CeleryParameters, val='*{}, **{}'.format(args, kwargs)))
if config.celery_parameters_length:
params = '*{}, **{}'.format(args, kwargs)[:config.celery_parameters_length]
span.tag(Tag(key=tags.CeleryParameters, val=params))

return _fun(*args, **kwargs)

Expand Down
2 changes: 0 additions & 2 deletions skywalking/trace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def new_entry_span(self, op: str, carrier: 'Carrier' = None) -> Span:
spans = _spans_dup()
parent = spans[-1] if spans else None # type: Span

# span = EntrySpan(
span = parent if parent is not None and parent.kind.is_entry else EntrySpan(
context=self,
sid=self._sid.next(),
Expand All @@ -120,7 +119,6 @@ def new_exit_span(self, op: str, peer: str) -> Span:
spans = _spans_dup()
parent = spans[-1] if spans else None # type: Span

# span = parent if parent is not None and parent.kind.is_exit else ExitSpan(
span = ExitSpan(
context=self,
sid=self._sid.next(),
Expand Down

0 comments on commit f25844f

Please sign in to comment.