Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: collect and report logs #147

Merged
merged 15 commits into from
Aug 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
with:
submodules: true
- name: Check License
uses: apache/skywalking-eyes@9bd5feb86b5817aa6072b008f9866a2c3bbc8587
uses: apache/skywalking-eyes@63d89639812f1a94bd45d9329d0f936ec4769a37
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Set up Python ${{ matrix.python-version }}
Expand Down
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ header:
- 'LICENSE'
- 'NOTICE'
- '.github/PULL_REQUEST_TEMPLATE'
- '.gitignore'

comment: on-failure
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ Alternatively, you can also pass the configurations via environment variables (s

All supported environment variables can be found [here](docs/EnvVars.md)

## Report logs with Python Agent
The Python agent is capable of reporting collected logs to the backend(SkyWalking OAP/ [SkyWalking Satellite Sidecar](https://github.com/apache/skywalking-satellite)), enabling Log & Trace Correlation.

Please refer to the [Log Reporter Doc](docs/LogReporter.md) for a detailed guide.

## Supported Libraries

There are some built-in plugins (such as `http.server`, `Flask`, `Django` etc.) that support automatic instrumentation of Python libraries, the complete lists can be found [here](docs/Plugins.md)
Expand Down
7 changes: 7 additions & 0 deletions docs/EnvVars.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,10 @@ Environment Variable | Description | Default
| `SW_CELERY_PARAMETERS_LENGTH`| The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off | `512` |
| `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `False` |
| `SW_PROFILE_TASK_QUERY_INTERVAL` | The number of seconds between two profile task query. | `20` |
| `SW_AGENT_LOG_REPORTER_ACTIVE` | If `True`, Python agent will report collected logs to the OAP or Satellite. Otherwise, it disables the feature. | `False` |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides this doc change, let's add a specific doc to show users how to use it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wu-sheng I just added a detailed guide, please see if that is sufficient.

| `SW_AGENT_LOG_COLLECTOR_BACKEND_SERVICES` | The log reporter will use a separate gRPC channel until the [Satellite](https://github.com/apache/skywalking-satellite) project is ready. | `127.0.0.1:11800` |
| `SW_AGENT_LOG_REPORTER_BUFFER_SIZE` | The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped. | `10000` |
| `SW_AGENT_LOG_REPORTER_MESSAGE_SIZE` | Max message size allowed for log transmission. | `10485760` |
| `SW_AGENT_LOG_REPORTER_LEVEL` | This config specifies the logger levels of concern, any logs with a level below the config will be ignored. | `WARNING` |
| `SW_AGENT_LOG_REPORTER_FORMATTED` | If `True`, the log reporter will transmit the logs as formatted. Otherwise, puts logRecord.msg and logRecord.args into message content and tags(`argument.n`), respectively. Along with an `exception` tag if an exception was raised. | `True` |
| `SW_AGENT_LOG_REPORTER_LAYOUT` | The log reporter formats the logRecord message based on the layout given. | `%(asctime)s [%(threadName)s] %(levelname)s %(name)s - %(message)s` |
81 changes: 81 additions & 0 deletions docs/LogReporter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Python agent gRPC log reporter

This functionality reports logs collected from the Python logging module(in theory, also logging libraries depending on the core logging module).

To utilize this feature, you will need to add some new configurations to the agent initialization step.

## Enabling the feature
```Python
from skywalking import agent, config

config.init(collector_address='127.0.0.1:11800', service_name='your awesome service',
log_grpc_reporter_active=True, log_grpc_collector_address='127.0.0.1:11800')
agent.start()
```

`log_grpc_reporter_active=True` - Enables the log reporter.

`log_grpc_collector_address` - For now, the log reporter uses a separate gRPC channel(will be merged upon the [SkyWalking Satellite Sidecar](https://github.com/apache/skywalking-satellite) project matures).
If you would like to use the Satellite sidecar, you will need to configure an address pointing to its gatherer. Otherwise, you can simply keep the address the same as the OAP.

`log_grpc_reporter_max_buffer_size` and `log_grpc_reporter_max_message_size` - Used to limit the reporting overhead.

Alternatively, you can pass configurations through environment variables.
Please refer to [EnvVars.md](EnvVars.md) for the list of environment variables associated with the log reporter.

## Specify a logging level
Only the logs with a level equal to or higher than the specified will be collected and reported.
In other words, the agent ignores some unwanted logs based on your level threshold.

`log_grpc_reporter_level` - The string name of a logger level.

Note that it also works with your custom logger levels, simply specify its string name in the config.

## Formatting
Note that regardless of the formatting, Python agent will always report the following three tags -

`level` - the logger level name

`logger` - the logger name

`thread` - the thread name
### Customize the reported log format
You can choose to report collected logs in a custom layout.

If not set, the agent uses the layout below by default, else the agent uses your custom layout set in `log_grpc_reporter_layout`.

`'%(asctime)s [%(threadName)s] %(levelname)s %(name)s - %(message)s'`

If the layout is set to `None`, the reported log content will only contain the pre-formatted `LogRecord.message`(`msg % args`) without any additional styles and information.

### Transmit un-formatted logs
You can also choose to report the log messages without any formatting.
It separates the raw log msg `logRecord.msg` and `logRecord.args`, then puts them into message content and tags starting from `argument.0`, respectively, along with an `exception` tag if an exception was raised.

Note when you set `log_grpc_reporter_formatted` to False, it ignores your custom layout introduced above.

As an example, the following code:
```Python
logger.info("SW test log %s %s %s", 'arg0', 'arg1', 'arg2')
```

Will result in:
```json
{
"content": "SW test log %s %s %s",
"tags": [
{
"key": "argument.0",
"value": "arg0"
},
{
"key": "argument.1",
"value": "arg1"
},
{
"key": "argument.2",
"value": "arg2"
}
]
}
```
59 changes: 53 additions & 6 deletions skywalking/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,23 @@
from threading import Thread, Event
from typing import TYPE_CHECKING

from skywalking import config, plugins, loggings
from skywalking.protocol.logging.Logging_pb2 import LogData

from skywalking import config, plugins
from skywalking import loggings
from skywalking.agent.protocol import Protocol
from skywalking.agent.protocol.grpc_log import GrpcLogProtocol
from skywalking.command import command_service
from skywalking.config import profile_active, profile_task_query_interval
from skywalking.loggings import logger

if TYPE_CHECKING:
from skywalking.trace.context import Segment


__started = False
__protocol = Protocol() # type: Protocol
__heartbeat_thread = __report_thread = __query_profile_thread = __command_dispatch_thread = __queue = __finished = None
__protocol = __log_protocol = Protocol() # type: Protocol
__heartbeat_thread = __report_thread = __log_report_thread = __query_profile_thread = __command_dispatch_thread \
= __queue = __log_queue = __finished = None


def __heartbeat():
Expand All @@ -56,6 +60,26 @@ def __report():
__finished.wait(0)


def __log_heartbeat():
while not __finished.is_set():
try:
__log_protocol.heartbeat()
except Exception as exc:
logger.error(str(exc))

__finished.wait(30)


def __log_report():
while not __finished.is_set():
try:
__log_protocol.report(__log_queue)
except Exception as exc:
logger.error(str(exc))

__finished.wait(0)


def __query_profile_command():
while not __finished.is_set():
try:
Expand All @@ -72,7 +96,8 @@ def __command_dispatch():


def __init_threading():
global __heartbeat_thread, __report_thread, __query_profile_thread, __command_dispatch_thread, __queue, __finished
global __heartbeat_thread, __report_thread, __log_report_thread, __query_profile_thread, \
__command_dispatch_thread, __queue, __log_queue, __finished

__queue = Queue(maxsize=config.max_buffer_size)
__finished = Event()
Expand All @@ -85,12 +110,19 @@ def __init_threading():
__report_thread.start()
__command_dispatch_thread.start()

if config.log_grpc_reporter_active:
__log_queue = Queue(maxsize=config.log_grpc_reporter_max_buffer_size)
__log_heartbeat_thread = Thread(name='LogHeartbeatThread', target=__log_heartbeat, daemon=True)
__log_report_thread = Thread(name='LogReportThread', target=__log_report, daemon=True)
__log_heartbeat_thread.start()
__log_report_thread.start()

if profile_active:
__query_profile_thread.start()


def __init():
global __protocol
global __protocol, __log_protocol

if config.protocol == 'grpc':
from skywalking.agent.protocol.grpc import GrpcProtocol
Expand All @@ -103,12 +135,20 @@ def __init():
__protocol = KafkaProtocol()

plugins.install()
if config.log_grpc_reporter_active:
from skywalking import log
__log_protocol = GrpcLogProtocol()
log.install()

__init_threading()


def __fini():
__protocol.report(__queue, False)
__queue.join()
if config.log_grpc_reporter_active:
__log_protocol.report(__log_queue, False)
__log_queue.join()
__finished.set()


Expand Down Expand Up @@ -177,3 +217,10 @@ def archive(segment: 'Segment'):
__queue.put(segment, block=False)
except Full:
logger.warning('the queue is full, the segment will be abandoned')


def archive_log(log_data: 'LogData'):
try:
__log_queue.put(log_data, block=False)
except Full:
logger.warning('the queue is full, the log will be abandoned')
5 changes: 5 additions & 0 deletions skywalking/agent/protocol/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ def heartbeat(self):
self.service_management.send_instance_props()
self.properties_sent = True

logger.debug(
'segment reporter service heart beats, [%s], [%s]',
config.service_name,
config.service_instance,
)
self.service_management.send_heart_beat()

except grpc.RpcError:
Expand Down
103 changes: 103 additions & 0 deletions skywalking/agent/protocol/grpc_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import traceback
from queue import Queue, Empty
from time import time

import grpc

from skywalking import config
from skywalking.agent import Protocol
from skywalking.agent.protocol.interceptors import header_adder_interceptor
from skywalking.client.grpc import GrpcServiceManagementClient, GrpcLogDataReportService
from skywalking.loggings import logger
from skywalking.protocol.logging.Logging_pb2 import LogData


class GrpcLogProtocol(Protocol):
def __init__(self):
self.properties_sent = False
self.state = None

if config.force_tls:
self.channel = grpc.secure_channel(config.log_grpc_collector_address, grpc.ssl_channel_credentials(),
options=(('grpc.max_send_message_length',
config.log_grpc_reporter_max_message_size),))
else:
self.channel = grpc.insecure_channel(config.log_grpc_collector_address,
options=(('grpc.max_send_message_length',
config.log_grpc_reporter_max_message_size),))
if config.authentication:
self.channel = grpc.intercept_channel(
self.channel, header_adder_interceptor('authentication', config.authentication)
)

self.channel.subscribe(self._cb, try_to_connect=True)
self.service_management = GrpcServiceManagementClient(self.channel)
self.log_reporter = GrpcLogDataReportService(self.channel)

def _cb(self, state):
logger.debug('grpc log reporter channel connectivity changed, [%s -> %s]', self.state, state)
self.state = state

def heartbeat(self):
try:
if not self.properties_sent:
self.service_management.send_instance_props()
self.properties_sent = True

logger.debug(
'log reporter service heart beats, [%s], [%s]',
config.service_name,
config.service_instance,
)
self.service_management.send_heart_beat()

except grpc.RpcError:
self.on_error()

def on_error(self):
traceback.print_exc() if logger.isEnabledFor(logging.DEBUG) else None
self.channel.unsubscribe(self._cb)
self.channel.subscribe(self._cb, try_to_connect=True)

def report(self, queue: Queue, block: bool = True):
start = time()

def generator():
while True:
try:
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
return
log_data = queue.get(block=block, timeout=timeout) # type: LogData
except Empty:
return

queue.task_done()

logger.debug('Reporting Log')

yield log_data

try:
self.log_reporter.report(generator())

except grpc.RpcError:
self.on_error()
5 changes: 5 additions & 0 deletions skywalking/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ def report(self, generator):
raise NotImplementedError()


class LogDataReportService(object):
def report(self, generator):
raise NotImplementedError()


class ProfileTaskChannelService(object):
def do_query(self):
raise NotImplementedError()