Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions fluent/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,30 @@

_global_sender = None


def setup(tag, **kwargs):
host = kwargs.get('host', 'localhost')
port = kwargs.get('port', 24224)

global _global_sender
_global_sender = FluentSender(tag, host=host, port=port)


def get_global_sender():
return _global_sender


class FluentSender(object):
def __init__(self,
tag,
host='localhost',
port=24224,
bufmax=1*1024*1024,
bufmax=1 * 1024 * 1024,
timeout=3.0,
verbose=False):

self.tag = tag
self.uds = host.startswith('unix://')
self.host = host
self.port = port
self.bufmax = bufmax
Expand All @@ -36,7 +40,7 @@ def __init__(self,
self.pendings = None
self.packer = msgpack.Packer()
self.lock = threading.Lock()

try:
self._reconnect()
except:
Expand Down Expand Up @@ -95,9 +99,15 @@ def _send_internal(self, bytes):

def _reconnect(self):
if not self.socket:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock_type = socket.AF_INET
sock_dest = (self.host, self.port)
if self.uds:
sock_type = socket.AF_UNIX
sock_dest = self.host[len('unix://'):]

sock = socket.socket(sock_type, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
sock.connect((self.host, self.port))
sock.connect(sock_dest)
self.socket = sock

def _close(self):
Expand Down
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from tests.test_event import *
from tests.test_sender import *
from tests.test_handler import *
from tests.test_uds_sender import *
12 changes: 9 additions & 3 deletions tests/mockserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@
except ImportError:
from io import BytesIO


class MockRecvServer(threading.Thread):
"""
Single threaded server accepts one connection and recv until EOF.
"""
def __init__(self, port):
self._sock = socket.socket()
self._sock.bind(('localhost', port))
def __init__(self, host='localhost', port=24224):
if host.startswith('unix://'):
host = host[len('unix://'):]
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._sock.bind(host)
else:
self._sock = socket.socket()
self._sock.bind((host, port))
self._buf = BytesIO()

threading.Thread.__init__(self)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def setUp(self):
super(TestLogger, self).setUp()
for port in range(10000, 20000):
try:
self._server = mockserver.MockRecvServer(port)
self._server = mockserver.MockRecvServer(port=port)
self._port = port
break
except IOError as e:
Expand Down
8 changes: 3 additions & 5 deletions tests/test_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,18 @@
import fluent.sender
import msgpack


class TestSender(unittest.TestCase):
def setUp(self):
super(TestSender, self).setUp()
for port in range(10000, 20000):
try:
self._server = mockserver.MockRecvServer(port)
self._server = mockserver.MockRecvServer(port=port)
break
except IOError as e:
print(e)
pass
self._sender = fluent.sender.FluentSender(
tag='test',
port=port,
)
self._sender = fluent.sender.FluentSender(tag='test', port=port)

def get_data(self):
return self._server.get_recieved()
Expand Down
38 changes: 38 additions & 0 deletions tests/test_uds_sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import print_function
import unittest
from tests import mockserver
import fluent.sender
import os


class TestSender(unittest.TestCase):
def setUp(self):
super(TestSender, self).setUp()
socket_path = os.path.abspath('uds_socket')
server_address = 'unix://' + socket_path

# Make sure the socket does not already exist
try:
os.unlink(socket_path)
except OSError:
if os.path.exists(socket_path):
raise

self._server = mockserver.MockRecvServer(host=server_address)
self._sender = fluent.sender.FluentSender(tag='test', host=server_address)

def get_data(self):
return self._server.get_recieved()

def test_simple(self):
sender = self._sender
sender.emit('foo', {'bar': 'baz'})
sender._close()
data = self.get_data()
eq = self.assertEqual
eq(1, len(data))
eq(3, len(data[0]))
eq('test.foo', data[0][0])
eq({'bar': 'baz'}, data[0][2])
self.assert_(data[0][1])
self.assert_(isinstance(data[0][1], int))