diff --git a/fluent/sender.py b/fluent/sender.py index a49dae1..38580ef 100644 --- a/fluent/sender.py +++ b/fluent/sender.py @@ -1,32 +1,28 @@ -import os -import sys, urllib import msgpack import socket import threading -import json import time -global_sender = None +_global_sender = None def setup(tag, **kwargs): - host = ('host' in kwargs) and kwargs['host'] or 'localhost' - port = ('port' in kwargs) and kwargs['port'] or 24224 + host = kwargs.get('host', 'localhost') + port = kwargs.get('port', 24224) - global global_sender - global_sender = FluentSender(tag, host=host, port=port) + global _global_sender + _global_sender = FluentSender(tag, host=host, port=port) def get_global_sender(): - global global_sender - return global_sender + return _global_sender class FluentSender(object): def __init__(self, - tag, - host='127.0.0.1', - port=24224, - bufmax=1*1024*1024, - timeout=3.0, - verbose=False): + tag, + host='localhost', + port=24224, + bufmax=1*1024*1024, + timeout=3.0, + verbose=False): self.tag = tag self.host = host @@ -64,9 +60,9 @@ def _close(self): self.socket = None def _make_packet(self, label, data): - tag = "%s.%s" % (self.tag, label) - cur_time = int(time.mktime(time.localtime())) - packet = [ tag, cur_time, data ] + tag = '.'.join(self.tag, label) + cur_time = int(time.time()) + packet = (tag, cur_time, data) if self.verbose: print packet return self.packer.pack(packet) @@ -89,17 +85,11 @@ def _send_internal(self, bytes): self._reconnect() # send message - total = len(bytes) - nsent = 0 - while nsent < total: - n = self.socket.send(bytes[nsent:]) - if n <= 0: - raise RuntimeError("socket connection broken") - nsent += n + self.socket.sendall(bytes) # send finished self.pendings = None - except: + except Exception: # close socket self._close() # clear buffer if it exceeds max bufer size