Skip to content

bouthilx/cqueue

 
 

Repository files navigation

msgqueue

Message Queue Primitives. CQueue allows you to start the server in the same script as your worker, removing the need for you to deploy a server before hand.

The cockroachdb backend even allows you to create a highly resilient queue that can survive multiple server failures.

Install

pip install git+git://github.com/Delaunay/cqueue

Simple Workflow

from msgqueue import new_client

client = new_client(
    uri='cockroach://192.168.0.10:8123',
    namespace='task',       # Queue Namespace
    log_capture=True,       # Capture Worker Log and Stdout
    timeout=60)             # Client Time Out

# remove a message from the `work` queue
message = client.pop('work')

result = processing(message)

# Message has finished being processed
client.mark_actioned(message)

# put a message in the `result` queue
client.push('result', result)

Start servers at will

from msgqueue import new_server

# start a new cockroach server for clients
# to push and pull their messages
server = new_server(uri='cockroach://192.168.0.10:8123')
server.start()

Detect errors & Inspect logs

from msgqueue import new_monitor

monitor = new_monitor(uri='cockroach://192.168.0.10:8123')

# get messages that were assigned to a currently dead worker & that have not finished
dead_messages = monitor.lost_messages('task')

# Requeue all messages that did not finish
monitor.requeue_messages()

# get unresponsive worker entries
dead_workers = monitor.dead_agents('task')

# Get logs of the dead worker
log = monitor.log('task', dead_workers[0])

print(log)

# Get all messages (read & unread) from the `task` namespace and the `work` queue
# For analysis
messages = monitor.messages('task', 'work')
for msg in messages:
    print(
        m.read,             # Was the message read
        m.read_time,        # when was the message read
        m.actioned,         # Was the message actioned
        n.actioned_time,    # when was the message actioned
        m.message,          # User provided data (json)
        m.retry,            # Number of time it was retried
        m.error             # Error if any
    )

Dependencies

For mongodb:

sudo apt-get install mongodb-server

About

Message Queue Primitives

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 99.1%
  • Other 0.9%