Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 55 additions & 16 deletions skywalking/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,28 +66,67 @@ def wrapper(*args, **kwargs):
return decorator


class _RunnableWrapper:
"""Wrapper returned by @runnable. Call continue_tracing() on the parent thread
to capture the current trace context, then pass the result as Thread target."""

def __init__(self, func, op, layer, component, tags):
self._func = func
self._op = op
self._layer = layer
self._component = component
self._tags = tags
# Capture snapshot at decoration time — supports inline @runnable usage
self._snapshot = get_context().capture()
# Preserve original function attributes
self.__name__ = func.__name__
self.__doc__ = func.__doc__
self.__module__ = getattr(func, '__module__', None)
self.__wrapped__ = func

def __call__(self, *args, **kwargs):
"""Direct call — creates a local span with cross-thread propagation
using the snapshot captured at decoration time (inline @runnable pattern)."""
context = get_context()
with context.new_local_span(op=self._op) as span:
if self._snapshot is not None:
context.continued(self._snapshot)
span.layer = self._layer
span.component = self._component
if self._tags:
for tag in self._tags:
span.tag(tag)
return self._func(*args, **kwargs)

def continue_tracing(self):
"""Capture the current trace context snapshot on the calling thread.
Returns a callable to be used as Thread target that will propagate
the trace context to the child thread via CrossThread reference."""
snapshot = get_context().capture()

def _continued_wrapper(*args, **kwargs):
context = get_context()
with context.new_local_span(op=self._op) as span:
if snapshot is not None:
context.continued(snapshot)
span.layer = self._layer
span.component = self._component
if self._tags:
for tag in self._tags:
span.tag(tag)
return self._func(*args, **kwargs)

return _continued_wrapper


def runnable(
op: str = None,
layer: Layer = Layer.Unknown,
component: Component = Component.Unknown,
tags: List[Tag] = None,
):
def decorator(func):
snapshot = get_context().capture()

@wraps(func)
def wrapper(*args, **kwargs):
_op = op or f'Thread/{func.__name__}'
context = get_context()
with context.new_local_span(op=_op) as span:
context.continued(snapshot)
span.layer = layer
span.component = component
if tags:
for tag in tags:
span.tag(tag)
func(*args, **kwargs)

return wrapper
_op = op or f'Thread/{func.__name__}'
return _RunnableWrapper(func, _op, layer, component, tags)

return decorator
16 changes: 16 additions & 0 deletions tests/plugin/web/sw_threading/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
65 changes: 65 additions & 0 deletions tests/plugin/web/sw_threading/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

version: '2.1'

services:
collector:
extends:
service: collector
file: ../../docker-compose.base.yml

provider:
extends:
service: agent
file: ../../docker-compose.base.yml
ports:
- 9091:9091
volumes:
- .:/app
command: ['bash', '-c', 'pip install flask && pip install -r /app/requirements.txt && sw-python run python3 /app/services/provider.py']
depends_on:
collector:
condition: service_healthy
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9091"]
interval: 5s
timeout: 60s
retries: 120
environment:
SW_AGENT_NAME: provider
SW_AGENT_LOGGING_LEVEL: DEBUG

consumer:
extends:
service: agent
file: ../../docker-compose.base.yml
ports:
- 9090:9090
volumes:
- .:/app
command: ['bash', '-c', 'pip install flask && pip install -r /app/requirements.txt && sw-python run python3 /app/services/consumer.py']
depends_on:
collector:
condition: service_healthy
provider:
condition: service_healthy
environment:
SW_AGENT_NAME: consumer
SW_AGENT_LOGGING_LEVEL: DEBUG
networks:
beyond:
113 changes: 113 additions & 0 deletions tests/plugin/web/sw_threading/expected.data.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

segmentItems:
- serviceName: provider
segmentSize: 1
segments:
- segmentId: not null
spans:
- operationName: /users
parentSpanId: -1
spanId: 0
spanLayer: Http
tags:
- key: http.method
value: POST
- key: http.url
value: http://provider:9091/users
- key: http.status_code
value: '200'
refs:
- parentEndpoint: /users
networkAddress: 'provider:9091'
refType: CrossProcess
parentSpanId: 1
parentTraceSegmentId: not null
parentServiceInstance: not null
parentService: consumer
traceId: not null
startTime: gt 0
endTime: gt 0
componentId: 7001
spanType: Entry
peer: not null
skipAnalysis: false
- serviceName: consumer
segmentSize: 2
segments:
- segmentId: not null
spans:
- operationName: /users
parentSpanId: 0
spanId: 1
spanLayer: Http
startTime: gt 0
endTime: gt 0
componentId: 7002
isError: false
spanType: Exit
peer: provider:9091
skipAnalysis: false
tags:
- key: http.method
value: POST
- key: http.url
value: 'http://provider:9091/users'
- key: http.status_code
value: '200'
- operationName: /post
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Unknown
startTime: gt 0
endTime: gt 0
componentId: 0
isError: false
spanType: Local
peer: ''
skipAnalysis: false
refs:
- parentEndpoint: /users
networkAddress: ''
refType: CrossThread
parentSpanId: 0
parentTraceSegmentId: not null
parentServiceInstance: not null
parentService: consumer
traceId: not null
- segmentId: not null
spans:
- operationName: /users
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
tags:
- key: http.method
value: GET
- key: http.url
value: http://0.0.0.0:9090/users
- key: http.status_code
value: '200'
startTime: gt 0
endTime: gt 0
componentId: 7001
spanType: Entry
peer: not null
skipAnalysis: false
1 change: 1 addition & 0 deletions tests/plugin/web/sw_threading/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

16 changes: 16 additions & 0 deletions tests/plugin/web/sw_threading/services/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
44 changes: 44 additions & 0 deletions tests/plugin/web/sw_threading/services/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import requests

from skywalking.decorators import runnable


# Module-level @runnable — this is the pattern from issue #11605
@runnable(op='/post')
def post():
requests.post('http://provider:9091/users', timeout=5)


if __name__ == '__main__':
from flask import Flask, jsonify

app = Flask(__name__)

@app.route('/users', methods=['POST', 'GET'])
def application():
from threading import Thread
t = Thread(target=post.continue_tracing())
t.start()
t.join()

return jsonify({'status': 'ok'})

PORT = 9090
app.run(host='0.0.0.0', port=PORT, debug=True)
32 changes: 32 additions & 0 deletions tests/plugin/web/sw_threading/services/provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import time


if __name__ == '__main__':
from flask import Flask, jsonify

app = Flask(__name__)

@app.route('/users', methods=['POST', 'GET'])
def application():
time.sleep(0.5)
return jsonify({'status': 'ok'})

PORT = 9091
app.run(host='0.0.0.0', port=PORT, debug=True)
Loading
Loading