Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
...
  • 8 commits
  • 5 files changed
  • 0 commit comments
  • 2 contributors
View
2  docs/setup.rst
@@ -12,7 +12,7 @@ The "Hurry Up" Install Guide
#. Configure OpenStack to publish Notifications back into RabbitMQ (see below)
#. Restart the OpenStack services.
#. Run the Worker to start consuming messages. (see below)
-#. Run the web server (``python manage.py runserver``)
+#. Run the web server (``python manage.py runserver --insecure``)
#. Point your browser to ``http://127.0.0.1:8000`` (the default server location)
#. Click on stuff, see what happens. You can't hurt anything, it's all read-only.
View
4 settings.py
@@ -167,3 +167,7 @@
},
}
}
+
+# Force use of the pickle serializer as a workaound for django-1.6. See:
+# https://docs.djangoproject.com/en/dev/releases/1.6/#default-session-serialization-switched-to-json
+SESSION_SERIALIZER='django.contrib.sessions.serializers.PickleSerializer'
View
2  tests/unit/test_worker.py
@@ -169,6 +169,7 @@ def test_run(self):
mock_logger.debug("Processing on 'east_coast.prod.global nova'")
mock_logger.debug("Completed processing on "
"'east_coast.prod.global nova'")
+ mock_logger.info("Worker exiting.")
config = {
'name': 'east_coast.prod.global',
@@ -217,6 +218,7 @@ def test_run_queue_args(self):
mock_logger.debug("Processing on 'east_coast.prod.global nova'")
mock_logger.debug("Completed processing on "
"'east_coast.prod.global nova'")
+ mock_logger.info("Worker exiting.")
config = {
'name': 'east_coast.prod.global',
View
2  worker/start_workers.py
@@ -25,13 +25,13 @@ def _get_parent_logger():
def kill_time(signal, frame):
- log_listener.end()
print "dying ..."
for process in processes:
process.terminate()
print "rose"
for process in processes:
process.join()
+ log_listener.end()
print "bud"
sys.exit(0)
View
23 worker/worker.py
@@ -19,6 +19,8 @@
import datetime
import sys
import time
+import anyjson
+import signal
import kombu
import kombu.mixins
@@ -38,8 +40,10 @@
from stacktach import message_service
from stacktach import stacklog
from stacktach import views
+from kombu.serialization import BytesIO, register
stacklog.set_default_logger_name('worker')
+shutdown_soon = False
def _get_child_logger():
@@ -60,6 +64,14 @@ def __init__(self, name, connection, deployment, durable, queue_arguments,
self.total_processed = 0
self.topics = topics
self.exchange = exchange
+ signal.signal(signal.SIGTERM, self._shutdown)
+
+ register('bufferjson', self.loads, anyjson.dumps,
+ content_type='application/json',
+ content_encoding='binary')
+
+ def loads(s):
+ return anyjson.loads(BytesIO(s))
def _create_exchange(self, name, type, exclusive=False, auto_delete=False):
return message_service.create_exchange(name, exchange_type=type,
@@ -135,9 +147,14 @@ def on_nova(self, body, message):
(e, json.loads(str(message.body))))
raise
+ def _shutdown(self, signal, stackframe = False):
+ global shutdown_soon
+ self.should_stop = True
+ shutdown_soon = True
+
def continue_running():
- return True
+ return not shutdown_soon
def exit_or_sleep(exit=False):
@@ -197,6 +214,10 @@ def run(deployment_config, deployment_id, exchange):
"exception=%s. Retrying in 5s"
logger.exception(msg % (name, exchange, e))
exit_or_sleep(exit_on_exception)
+ logger.info("Worker exiting.")
+
+signal.signal(signal.SIGINT, signal.SIG_IGN)
+signal.signal(signal.SIGTERM, signal.SIG_IGN)
POST_PROCESS_METHODS = {
'RawData': views.post_process_rawdata,

No commit comments for this range

Something went wrong with that request. Please try again.