From 97f0c848aa30b42bdfbfad35c015e2ec803e9731 Mon Sep 17 00:00:00 2001 From: Kevin Morey Date: Tue, 28 May 2013 15:59:10 -0500 Subject: [PATCH] Unix domain socket (UDS) support --- fluent/sender.py | 18 ++++++++++++++---- tests/__init__.py | 1 + tests/mockserver.py | 12 +++++++++--- tests/test_handler.py | 2 +- tests/test_sender.py | 8 +++----- tests/test_uds_sender.py | 38 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 66 insertions(+), 13 deletions(-) create mode 100644 tests/test_uds_sender.py diff --git a/fluent/sender.py b/fluent/sender.py index 21b02d8..a85b035 100644 --- a/fluent/sender.py +++ b/fluent/sender.py @@ -6,6 +6,7 @@ _global_sender = None + def setup(tag, **kwargs): host = kwargs.get('host', 'localhost') port = kwargs.get('port', 24224) @@ -13,19 +14,22 @@ def setup(tag, **kwargs): global _global_sender _global_sender = FluentSender(tag, host=host, port=port) + def get_global_sender(): return _global_sender + class FluentSender(object): def __init__(self, tag, host='localhost', port=24224, - bufmax=1*1024*1024, + bufmax=1 * 1024 * 1024, timeout=3.0, verbose=False): self.tag = tag + self.uds = host.startswith('unix://') self.host = host self.port = port self.bufmax = bufmax @@ -36,7 +40,7 @@ def __init__(self, self.pendings = None self.packer = msgpack.Packer() self.lock = threading.Lock() - + try: self._reconnect() except: @@ -95,9 +99,15 @@ def _send_internal(self, bytes): def _reconnect(self): if not self.socket: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock_type = socket.AF_INET + sock_dest = (self.host, self.port) + if self.uds: + sock_type = socket.AF_UNIX + sock_dest = self.host[len('unix://'):] + + sock = socket.socket(sock_type, socket.SOCK_STREAM) sock.settimeout(self.timeout) - sock.connect((self.host, self.port)) + sock.connect(sock_dest) self.socket = sock def _close(self): diff --git a/tests/__init__.py b/tests/__init__.py index 805a0c1..7b66784 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,3 +1,4 @@ from tests.test_event import * from tests.test_sender import * from tests.test_handler import * +from tests.test_uds_sender import * diff --git a/tests/mockserver.py b/tests/mockserver.py index f7364cf..92dc844 100644 --- a/tests/mockserver.py +++ b/tests/mockserver.py @@ -8,13 +8,19 @@ except ImportError: from io import BytesIO + class MockRecvServer(threading.Thread): """ Single threaded server accepts one connection and recv until EOF. """ - def __init__(self, port): - self._sock = socket.socket() - self._sock.bind(('localhost', port)) + def __init__(self, host='localhost', port=24224): + if host.startswith('unix://'): + host = host[len('unix://'):] + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._sock.bind(host) + else: + self._sock = socket.socket() + self._sock.bind((host, port)) self._buf = BytesIO() threading.Thread.__init__(self) diff --git a/tests/test_handler.py b/tests/test_handler.py index e1971e1..7a0a88f 100644 --- a/tests/test_handler.py +++ b/tests/test_handler.py @@ -9,7 +9,7 @@ def setUp(self): super(TestLogger, self).setUp() for port in range(10000, 20000): try: - self._server = mockserver.MockRecvServer(port) + self._server = mockserver.MockRecvServer(port=port) self._port = port break except IOError as e: diff --git a/tests/test_sender.py b/tests/test_sender.py index 8fb1916..614cde5 100644 --- a/tests/test_sender.py +++ b/tests/test_sender.py @@ -4,20 +4,18 @@ import fluent.sender import msgpack + class TestSender(unittest.TestCase): def setUp(self): super(TestSender, self).setUp() for port in range(10000, 20000): try: - self._server = mockserver.MockRecvServer(port) + self._server = mockserver.MockRecvServer(port=port) break except IOError as e: print(e) pass - self._sender = fluent.sender.FluentSender( - tag='test', - port=port, - ) + self._sender = fluent.sender.FluentSender(tag='test', port=port) def get_data(self): return self._server.get_recieved() diff --git a/tests/test_uds_sender.py b/tests/test_uds_sender.py new file mode 100644 index 0000000..6f06382 --- /dev/null +++ b/tests/test_uds_sender.py @@ -0,0 +1,38 @@ +from __future__ import print_function +import unittest +from tests import mockserver +import fluent.sender +import os + + +class TestSender(unittest.TestCase): + def setUp(self): + super(TestSender, self).setUp() + socket_path = os.path.abspath('uds_socket') + server_address = 'unix://' + socket_path + + # Make sure the socket does not already exist + try: + os.unlink(socket_path) + except OSError: + if os.path.exists(socket_path): + raise + + self._server = mockserver.MockRecvServer(host=server_address) + self._sender = fluent.sender.FluentSender(tag='test', host=server_address) + + def get_data(self): + return self._server.get_recieved() + + def test_simple(self): + sender = self._sender + sender.emit('foo', {'bar': 'baz'}) + sender._close() + data = self.get_data() + eq = self.assertEqual + eq(1, len(data)) + eq(3, len(data[0])) + eq('test.foo', data[0][0]) + eq({'bar': 'baz'}, data[0][2]) + self.assert_(data[0][1]) + self.assert_(isinstance(data[0][1], int))