Skip to content
Find file
Fetching contributors…
Cannot retrieve contributors at this time
201 lines (165 sloc) 6.31 KB
import Queue
from collections import defaultdict
import logging
import socket
import struct
import zlib
import time
import multiprocessing
from beaker.cache import Cache
import signal
import cjson
from pymongo import Connection
import redis
logger = logging.getLogger('logserver')
logger.setLevel(logging.INFO)
#handler = logging.FileHandler('./log')
handler = logging.StreamHandler()
logger.addHandler(handler)
def is_message_chunked(data):
return data[:2] == '\x1e\x0f'
def join_worker(task_queue, unpack_queue, stopped):
cache = Cache('chunked_cache', expire=2*60)
def _process_packet(data):
id = struct.unpack_from('Q', data, 2)[0]
seq = struct.unpack_from('B', data, 10)[0]
chunks_count = struct.unpack_from('B', data, 11)[0]
payload = data[12:]
try:
chunks = cache.get(id)
except KeyError:
chunks = []
chunks.append((seq, payload))
if len(chunks) < chunks_count:
logger.debug('incomplete chunked message %s: %s/%s', id, seq, chunks_count)
cache.put(id, chunks)
return
logger.debug('complete chunked message %s', id)
return ''.join([c[1] for c in sorted(chunks, key=lambda x: x[0])])
while not stopped.is_set():
try:
packet = task_queue.get()
data = _process_packet(packet)
if data is not None:
unpack_queue.put_nowait(data)
except:
logger.exception('error during join chunked messages: ')
def fix_message(message):
return dict([((k[1:] if k[0] == '$' else k).replace('.','_'),v) for k,v in message.iteritems()])
def unpack_worker(unpack_queue, store_queue, stopped):
while not stopped.is_set():
try:
data = unpack_queue.get()
message = cjson.decode(zlib.decompress(data, zlib.MAX_WBITS+32))
request_id = message.get('_request_id')
if request_id is None:
logger.debug('missing "_request_id" field in message')
continue
del message['_request_id']
message['request_id'] = request_id
store_queue.put_nowait(fix_message(message))
except:
logger.exception('error during unpack data: ')
class LogIndex(object):
def __init__(self, redis_server):
self.r = redis.Redis(redis_server)
def append(self, items):
pipe = self.r.pipeline()
for request_id, message_id in items:
pipe.rpush(request_id, message_id)
pipe.execute()
def store_worker(mongo_servers, redis_servers, store_queue, chunk_size, stopped, join_queue, unpack_queue):
logger.info('starting mongodb worker process')
shards = {}
for i, (host, port) in enumerate(mongo_servers):
shards[i] = Connection(host, port).logging.messages, LogIndex(redis_servers[i])
while not stopped.is_set():
messages_chunks = defaultdict(list)
for _ in range(chunk_size):
try:
m = store_queue.get(timeout=0.5)
shard = zlib.crc32(m['request_id']) % len(shards)
messages_chunks[shard].append(m)
except Queue.Empty:
break
if not messages_chunks:
continue
start = time.time()
for shard, (collection, index) in shards.iteritems():
try:
obj_ids = collection.insert(messages_chunks[shard])
logger.info(
'%s messages saved to mongodb #%s in %0.9fs (store_q: %s, join_q: %s, unpack_q: %s)',
len(messages_chunks[shard]),
shard,
(time.time() - start),
store_queue.qsize(),
join_queue.qsize(),
unpack_queue.qsize()
)
index.append([(messages_chunks[shard][i]['request_id'], obj_id) for i, obj_id in enumerate(obj_ids)])
except:
logger.exception('error during saving messages to mongodb #%s', shard)
logger.info('shutting down mongodb worker process')
def recv_worker(socket_fd, join_queue, unpack_queue, stopped):
s = socket.fromfd(socket_fd, socket.AF_INET, socket.SOCK_DGRAM)
while not stopped.is_set():
try:
message, address = s.recvfrom(8192)
if is_message_chunked(message):
join_queue.put_nowait(message)
continue
unpack_queue.put_nowait(message)
except:
logger.exception('')
def main():
queue_size = 10000000
unpack_processes_count = 6
store_processes_count = 3
store_chunk_size = 10000
recv_processes_count = 2
mongo_servers = [
('192.168.1.94', 27017),
('192.168.1.95', 27017)
]
redis_servers = [
'192.168.1.94',
'192.168.1.95',
]
assert len(mongo_servers) == len(redis_servers)
join_queue = multiprocessing.Queue(maxsize=queue_size)
unpack_queue = multiprocessing.Queue(maxsize=queue_size)
store_queue = multiprocessing.Queue(maxsize=queue_size)
stopped = multiprocessing.Event()
procs = []
p = multiprocessing.Process(target=join_worker, args=(join_queue, unpack_queue, stopped))
p.daemon = False
p.start()
procs.append(p)
for _ in range(unpack_processes_count):
p = multiprocessing.Process(target=unpack_worker, args=(unpack_queue, store_queue, stopped))
p.daemon = False
p.start()
procs.append(p)
for _ in range(store_processes_count):
p = multiprocessing.Process(target=store_worker, args=(mongo_servers, redis_servers, store_queue, store_chunk_size, stopped, join_queue, unpack_queue))
p.daemon = False
p.start()
procs.append(p)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('0.0.0.0', 12202))
def shutdown(signum, frame):
logger.info('got signal %s', signum)
stopped.set()
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
for _ in range(recv_processes_count):
p = multiprocessing.Process(target=recv_worker, args=(s.fileno(), join_queue, unpack_queue, stopped))
p.daemon = False
p.start()
procs.append(p)
for p in procs:
p.join()
if __name__ == '__main__':
main()
Something went wrong with that request. Please try again.