Skip to content
Permalink
Browse files

Add Papertrail integration

Test Plan: unit

Reviewers: #ft, alangenfeld

Reviewed By: #ft, alangenfeld

Subscribers: alangenfeld

Differential Revision: https://dagster.phacility.com/D757
  • Loading branch information...
natekupp committed Aug 2, 2019
1 parent d973402 commit 77ac0855db25df3ab2bb3c936e4c41fd139c8146
@@ -0,0 +1,2 @@
[run]
branch = True
@@ -0,0 +1,39 @@
# dagster-papertrail

##Introduction
This library provides an integration with [Papertrail](https://papertrailapp.com) for logging.

## Example
You can easily set up your Dagster pipeline to log to Papertrail. You'll need an active Papertrail
account, and have your papertrail URL and port handy. Starting with a simple hello world pipeline,
you can configure it to use Papertrail logging as follows:

```python
from dagster import pipeline, solid, ModeDefinition
from dagster_papertrail import papertrail_logger
@solid
def hello_logs(context):
context.log.info('Hello, world!')
@pipeline(
mode_defs=[ModeDefinition(logger_defs={'papertrail': papertrail_logger})]
)
def hello_pipeline():
hello_logs() # pylint: disable=no-value-for-parameter
```

Just provide your environment configuration:

```yaml
loggers:
papertrail:
config:
log_level: 'INFO'
name: 'hello_pipeline'
papertrail_address: YOUR_PAPERTRAIL_URL
papertrail_port: YOUR_PAPERTRAIL_PORT
```

and you're off to the races!
@@ -0,0 +1,4 @@
from .version import __version__
from .loggers import papertrail_logger

__all__ = ['papertrail_logger']
@@ -0,0 +1,42 @@
import logging
import socket

from dagster import logger, Field


class ContextFilter(logging.Filter):
hostname = socket.gethostname()

def filter(self, record):
record.hostname = ContextFilter.hostname
return True


@logger(
{
'log_level': Field(str, is_optional=True, default_value='INFO'),
'name': Field(str, is_optional=True, default_value='dagster_papertrail'),
'papertrail_address': Field(str, description='Papertrail URL', is_optional=False),
'papertrail_port': Field(int, description='Papertrail port', is_optional=False),
},
description='A JSON-formatted console logger',
)
def papertrail_logger(init_context):
level, name, papertrail_address, papertrail_port = (
init_context.logger_config.get(k)
for k in ('log_level', 'name', 'papertrail_address', 'papertrail_port')
)

klass = logging.getLoggerClass()
logger_ = klass(name, level=level)

log_format = '%(asctime)s %(hostname)s ' + name + ': %(message)s'

formatter = logging.Formatter(log_format, datefmt='%b %d %H:%M:%S')
handler = logging.handlers.SysLogHandler(address=(papertrail_address, papertrail_port))
handler.addFilter(ContextFilter())
handler.setFormatter(formatter)

logger_.addHandler(handler)

return logger_
@@ -0,0 +1,3 @@
__version__ = '0.5.5'

__nightly__ = 'nightly-2019.07.29'
@@ -0,0 +1,58 @@
import logging

from dagster import execute_pipeline, pipeline, solid, ModeDefinition
from dagster.loggers import colored_console_logger
from dagster.seven import mock
from dagster_papertrail import papertrail_logger


@solid
def hello_logs(context):
context.log.info('Hello, world!')


@pipeline(
mode_defs=[
ModeDefinition(
logger_defs={'console': colored_console_logger, 'papertrail': papertrail_logger}
)
]
)
def hello_pipeline():
hello_logs() # pylint: disable=no-value-for-parameter


def test_papertrail_logger():
with mock.patch('logging.handlers.SysLogHandler.emit') as emit:

execute_pipeline(
hello_pipeline,
{
'loggers': {
'console': {'config': {'log_level': 'INFO'}},
'papertrail': {
'config': {
'log_level': 'INFO',
'name': 'hello_pipeline',
'papertrail_address': '127.0.0.1',
'papertrail_port': 12345,
}
},
}
},
)

log_record = emit.call_args_list[0][0][0]

assert isinstance(log_record, logging.LogRecord)
assert log_record.name == 'hello_pipeline'
assert log_record.levelname == 'INFO'

for msg in [
'orig_message = "Hello, world!"',
'pipeline = "hello_pipeline"',
'step_key = "hello_logs.compute"',
'solid = "hello_logs"',
'solid_definition = "hello_logs"',
]:
assert msg in log_record.msg
@@ -0,0 +1,5 @@
from dagster_papertrail.version import __version__


def test_version():
assert __version__
@@ -0,0 +1 @@
mock==2.0.0
@@ -0,0 +1 @@
datadog==0.28.0
@@ -0,0 +1,52 @@
import argparse
import sys

from setuptools import find_packages, setup


def get_version(name):
version = {}
with open('dagster_papertrail/version.py') as fp:
exec(fp.read(), version) # pylint: disable=W0122

if name == 'dagster-papertrail':
return version['__version__']
elif name == 'dagster-papertrail-nightly':
return version['__nightly__']
else:
raise Exception('Shouldn\'t be here: bad package name {name}'.format(name=name))


parser = argparse.ArgumentParser()
parser.add_argument('--nightly', action='store_true')


def _do_setup(name='dagster-papertrail'):
setup(
name='dagster_papertrail',
version=get_version(name),
author='Elementl',
license='Apache-2.0',
description='Package for papertrail Dagster framework components.',
url='https://github.com/dagster-io/dagster/tree/master/python_modules/libraries/dagster-papertrail',
classifiers=[
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'License :: OSI Approved :: Apache Software License',
'Operating System :: OS Independent',
],
packages=find_packages(exclude=['test']),
install_requires=['dagster'],
tests_require=['mock==2.0.*'],
zip_safe=False,
)


if __name__ == '__main__':
parsed, unparsed = parser.parse_known_args()
sys.argv = [sys.argv[0]] + unparsed
if parsed.nightly:
_do_setup('dagster-papertrail-nightly')
else:
_do_setup('dagster-papertrail')
@@ -0,0 +1,16 @@
[tox]
envlist = py37,py36,py35,py27

[testenv]
passenv = CI_* COVERALLS_REPO_TOKEN
deps =
-e ../../dagster
-r ../../dagster/dev-requirements.txt
-r ./dev-requirements.txt
-e .
commands =
coverage erase
pytest -vv --junitxml=test_results.xml --cov=dagster_papertrail --cov-append --cov-report=
coverage report --omit='.tox/*,**/test_*.py' --skip-covered
coverage html --omit='.tox/*,**/test_*.py'
coverage xml --omit='.tox/*,**/test_*.py'

0 comments on commit 77ac085

Please sign in to comment.
You can’t perform that action at this time.