Skip to content
This repository has been archived by the owner on Nov 10, 2017. It is now read-only.

Commit

Permalink
import first version
Browse files Browse the repository at this point in the history
git-svn-id: https://peafowl.googlecode.com/svn/trunk@2 54597ca8-0954-0410-a2f0-27c18351348c
  • Loading branch information
timothee.peignier committed Aug 9, 2008
1 parent 63f177b commit fe20cf1
Show file tree
Hide file tree
Showing 13 changed files with 782 additions and 0 deletions.
20 changes: 20 additions & 0 deletions LICENSE
@@ -0,0 +1,20 @@
Copyright (c) 2008 Timothée Peignier.

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:

The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
53 changes: 53 additions & 0 deletions README
@@ -0,0 +1,53 @@
Name
====

Peafowl - a light weight server for distributed message passing based on Starling.

Synopsis
========

::

# Start the Starling server as a daemonized process:
>>> peafowl -H 192.168.1.1 -d

# Put messages onto a queue:
>>> from memcache import Client
>>> peafowl = Client(['192.168.1.1:22122'])
>>> peafowl.set('my_queue', 12345)

# Get messages from the queue:
>>> from memcache import Client
>>> peafowl = Client(['192.168.1.1:22122'])
>>> while True:
>>> print peafowl.get('my_queue')

Description
===========

Peafowl is a powerful but simple messaging server that enables reliable
distributed queuing with an absolutely minimal overhead. It speaks the
MemCache protocol for maximum cross-platform compatibility. Any language
that speaks MemCache can take advantage of Peafowl's queue facilities.

Known Issues
============

* Peafowl is a pure port of Starling (written by Blaine Cook), it's only to be use if you can't bear ruby.

Authors
=======

Timothée Peignier <tim@tryphon.org>

Original author
===============

Blaine Cook <romeda@gmail.com>

Copyright
=========

Peafowl is : Copyright 2008 Timothée Peignier <tim@tryphon.org>

Starling is : Copyright 2007 Blaine Cook <blaine@twitter.com>, Twitter Inc.
5 changes: 5 additions & 0 deletions bin/peafowl
@@ -0,0 +1,5 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from peafowl.runner import Runner
Runner.run()

7 changes: 7 additions & 0 deletions peafowl/__init__.py
@@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
__version__ = "0.1"
from server import Server
from handler import Handler
from queue import PersistentQueue
from collection import QueueCollection

103 changes: 103 additions & 0 deletions peafowl/collection.py
@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
import os, thread
from queue import PersistentQueue

class QueueCollectionError(Exception):
pass

class QueueCollection(object):
def __init__(self, path):
if not os.path.isdir(path) and os.access(path, os.W_OK):
raise QueueCollectionError("Queue path '%s' is inacessible" % path)
self.shutdown_lock = thread.allocate_lock()
self.path = path
self.queues = {}
self.queue_locks = {}
self.stats = {'current_bytes':0, 'total_items':0, 'get_misses':0, 'get_hits':0}

def put(self, key, data):
"""
Puts ``data`` onto the queue named ``key``.
"""
queue = self.get_queues(key)
if not queue:
return None
self.stats['current_bytes'] += len(data)
self.stats['total_items'] += 1
queue.put(data)
return True

def take(self, key):
"""
Retrieves data from the queue named ``key``.
"""
queue = self.get_queues(key)
if not queue or not queue.qsize():
self.stats['get_misses'] += 1
return None
else:
self.stats['get_hits'] += 1
result = queue.get()
self.stats['current_bytes'] -= len(result)
return result

def get_queues(self, key = None):
"""
Returns all active queues.
"""
if self.shutdown_lock.locked():
return None

if not key:
return self.queues

if self.queues.has_key(key):
return self.queues[key]

if not self.queue_locks.has_key(key):
self.queue_locks[key] = thread.allocate_lock()

if self.queue_locks[key].locked():
return None
else:
try:
self.queue_locks[key].acquire()
if not self.queues.has_key(key):
self.queues[key] = PersistentQueue(self.path, key)
self.stats['current_bytes'] += self.queues[key].initial_bytes
finally:
self.queue_locks[key].release()
return self.queues[key]

def get_stats(self, name = None):
"""
Returns statistic ``stat_name`` for the ``QueueCollection``.
Valid statistics are:
``get_misses`` Total number of get requests with empty responses
``get_hits`` Total number of get requests that returned data
``current_bytes`` Current size in bytes of items in the queues
``current_size`` Current number of items across all queues
``total_items`` Total number of items stored in queues.
"""
if not name:
return self.stats
elif name == 'current_size':
return self._current_size()
else:
return self.stats[name]

def close(self):
"""
Safely close all queues.
"""
self.shutdown_lock.acquire()
for name, queue in self.queues:
queue.close()
del self.queues[name]

def _current_size(self):
size = 0
for name in self.queues:
size += self.queues[name].qsize()
return size

175 changes: 175 additions & 0 deletions peafowl/handler.py
@@ -0,0 +1,175 @@
# -*- coding: utf-8 -*-
import threading, re, time, os, logging
from resource import getrusage, RUSAGE_SELF
from struct import pack, unpack
import peafowl as peafowl

DATA_PACK_FMT = "!I%sp"

# ERROR responses
ERR_UNKNOWN_COMMAND = "CLIENT_ERROR bad command line format\r\n"

# GET Responses
GET_COMMAND = r'^get (.{1,250})\r\n$'
GET_RESPONSE = "VALUE %s %s %s\r\n%s\r\nEND\r\n"
GET_RESPONSE_EMPTY = "END\r\n"

# SET Responses
SET_COMMAND = r'^set (.{1,250}) ([0-9]+) ([0-9]+) ([0-9]+)\r\n$'
SET_RESPONSE_SUCCESS = "STORED\r\n"
SET_RESPONSE_FAILURE = "NOT STORED\r\n"
SET_CLIENT_DATA_ERROR = "CLIENT_ERROR bad data chunk\r\nERROR\r\n"

# STAT Response
STATS_COMMAND = r'stats\r\n$'
STATS_RESPONSE = """STAT pid %d\r
STAT uptime %d\r
STAT time %d\r
STAT version %s\r
STAT rusage_user %0.6f\r
STAT rusage_system %0.6f\r
STAT curr_items %d\r
STAT total_items %d\r
STAT bytes %d\r
STAT curr_connections %d\r
STAT total_connections %d\r
STAT cmd_get %d\r
STAT cmd_set %d\r
STAT get_hits %d\r
STAT get_misses %d\r
STAT bytes_read %d\r
STAT bytes_written %d\r
STAT limit_maxbytes %d\r
%s\nEND\r\n"""
QUEUE_STATS_RESPONSE = """STAT queue_%s_items %d\r
STAT queue_%s_total_items %d\r
STAT queue_%s_logsize %d\r
STAT queue_%s_expired_items %d\r"""

class Handler(threading.Thread):
"""
This is an internal class used by Peafowl Server to handle the
MemCache protocol and act as an interface between the Server and the
QueueCollection.
"""
def __init__(self, socket, queue_collection, stats):
threading.Thread.__init__(self)
self.expiry_stats = {}
self.stats = stats
self.queue_collection = queue_collection
self.socket = socket
self.file = self.socket.makefile("rb")

def run(self):
"""
Process incoming commands from the attached client.
"""
self.stats['total_connections'] += 1
while True:
command = self.file.readline()
if not command:
break
logging.debug("Receiving command : %s" % repr(command))
self.stats['bytes_read'] += len(command)
self._process(command)

def _process(self, command):
m = re.match(SET_COMMAND, command)
if m:
logging.debug("Received a SET command")
self.stats['set_requests'] += 1
self.set(m.group(1), m.group(2), m.group(3), m.group(4))
return
m = re.match(GET_COMMAND, command)
if m:
logging.debug("Received a GET command")
self.stats['get_requests'] += 1
self.get(m.group(1))
return
m = re.match(STATS_COMMAND, command)
if m:
logging.debug("Received a STATS command")
self.get_stats()
return
logging.debug("Received unknow command")
self._respond(ERR_UNKNOWN_COMMAND)

def _respond(self, message, *args):
response = message % args
self.stats['bytes_written'] += len(response)
logging.debug("Sending response : %s" % repr(response))
self.socket.send(response)

def set(self, key, flags, expiry, length):
length = int(length)
data = self.file.read(length)
data_end = self.file.read(2)
self.stats['bytes_read'] += (length + 2)
if data_end == '\r\n' and len(data) == length:
internal_data = pack(DATA_PACK_FMT % (length + 1), int(expiry), data)
if self.queue_collection.put(key, internal_data):
logging.debug("SET command is a success")
self._respond(SET_RESPONSE_SUCCESS)
else:
logging.warning("SET command failed")
self._respond(SET_RESPONSE_FAILURE)
else:
logging.error("SET command failed hard")
self._respond(SET_CLIENT_DATA_ERROR)

def get(self, key):
now = time.time()
data = None
response = self.queue_collection.take(key)
while response:
expiry, data = unpack(DATA_PACK_FMT % (len(response) - 4), response)
if expiry == 0 or expiry >= now:
break
if self.expiry_stats.has_key(key):
self.expiry_stats[key] += 1
else:
self.expiry_stats[key] = 1
expiry, data = None, None
response = self.queue_collection.take(key)
if data:
logging.debug("GET command respond with value")
self._respond(GET_RESPONSE, key, 0, len(data), data)
else:
logging.debug("GET command response was empty")
self._respond(GET_RESPONSE_EMPTY)

def get_stats(self):
self._respond(STATS_RESPONSE,
os.getpid(), # pid
time.time() - self.stats['start_time'], # uptime
time.time(), # time
'0.1', # peafowl version
getrusage(RUSAGE_SELF)[0],
getrusage(RUSAGE_SELF)[1],
self.queue_collection.get_stats('current_size'),
self.queue_collection.get_stats('total_items'),
self.queue_collection.get_stats('current_bytes'),
self.stats['connections'],
self.stats['total_connections'],
self.stats['get_requests'],
self.stats['set_requests'],
self.queue_collection.stats['get_hits'],
self.queue_collection.stats['get_misses'],
self.stats['bytes_read'],
self.stats['bytes_written'],
0,
self.queue_stats()
)

def queue_stats(self):
response = ''
for name in self.queue_collection.get_queues():
queue = self.queue_collection.get_queues(name)
if self.expiry_stats.has_key(name):
expiry_stats = self.expiry_stats[name]
else:
expiry_stats = 0
response += QUEUE_STATS_RESPONSE % (name, queue.qsize(), name, queue.total_items, name, queue.log_size, name, expiry_stats)
return response


0 comments on commit fe20cf1

Please sign in to comment.