Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 17 additions & 27 deletions fluent/sender.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,28 @@
import os
import sys, urllib
import msgpack
import socket
import threading
import json
import time

global_sender = None
_global_sender = None

def setup(tag, **kwargs):
host = ('host' in kwargs) and kwargs['host'] or 'localhost'
port = ('port' in kwargs) and kwargs['port'] or 24224
host = kwargs.get('host', 'localhost')
port = kwargs.get('port', 24224)

global global_sender
global_sender = FluentSender(tag, host=host, port=port)
global _global_sender
_global_sender = FluentSender(tag, host=host, port=port)

def get_global_sender():
global global_sender
return global_sender
return _global_sender

class FluentSender(object):
def __init__(self,
tag,
host='127.0.0.1',
port=24224,
bufmax=1*1024*1024,
timeout=3.0,
verbose=False):
tag,
host='localhost',
port=24224,
bufmax=1*1024*1024,
timeout=3.0,
verbose=False):

self.tag = tag
self.host = host
Expand Down Expand Up @@ -64,9 +60,9 @@ def _close(self):
self.socket = None

def _make_packet(self, label, data):
tag = "%s.%s" % (self.tag, label)
cur_time = int(time.mktime(time.localtime()))
packet = [ tag, cur_time, data ]
tag = '.'.join(self.tag, label)
cur_time = int(time.time())
packet = (tag, cur_time, data)
if self.verbose:
print packet
return self.packer.pack(packet)
Expand All @@ -89,17 +85,11 @@ def _send_internal(self, bytes):
self._reconnect()

# send message
total = len(bytes)
nsent = 0
while nsent < total:
n = self.socket.send(bytes[nsent:])
if n <= 0:
raise RuntimeError("socket connection broken")
nsent += n
self.socket.sendall(bytes)

# send finished
self.pendings = None
except:
except Exception:
# close socket
self._close()
# clear buffer if it exceeds max bufer size
Expand Down