From 67fd51c52c57f9d53d187ea976f4f1c2cf3cfe7d Mon Sep 17 00:00:00 2001 From: Navraj Chohan Date: Wed, 24 Oct 2012 23:10:52 +0000 Subject: [PATCH 1/2] Fixed issue --- .../api/taskqueue/taskqueue_rabbitmq.py | 56 +++++++++++++++---- 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/AppServer/google/appengine/api/taskqueue/taskqueue_rabbitmq.py b/AppServer/google/appengine/api/taskqueue/taskqueue_rabbitmq.py index c00ab84c39..7a1ffae3fd 100644 --- a/AppServer/google/appengine/api/taskqueue/taskqueue_rabbitmq.py +++ b/AppServer/google/appengine/api/taskqueue/taskqueue_rabbitmq.py @@ -53,20 +53,35 @@ import pika DEFAULT_RATE = '5.00/s' + DEFAULT_RATE_FLOAT = 5.0 + DEFAULT_BUCKET_SIZE = 5 + MAX_ETA = datetime.timedelta(days=30) + MAX_PULL_TASK_SIZE_BYTES = 2 ** 20 + MAX_PUSH_TASK_SIZE_BYTES = 100 * (2 ** 10) + MAX_TASK_SIZE = MAX_PUSH_TASK_SIZE_BYTES + MAX_REQUEST_SIZE = 32 << 20 + MAX_RETRIES = 10 -MAX_WAIT = 60 # max wait in seconds + +# Max wait in seconds +MAX_WAIT = 60 + +# Max time for exponential backoff for RabbitMQ reconnect +MAX_RECONNECT_TIME = 1024 + BUILT_IN_HEADERS = set(['x-appengine-queuename', 'x-appengine-taskname', 'x-appengine-taskretrycount', 'x-appengine-development-payload', 'content-length']) + DEFAULT_QUEUE_NAME = 'default' QUEUE_MODE = taskqueue_service_pb.TaskQueueMode @@ -74,6 +89,7 @@ AUTOMATIC_QUEUES = { DEFAULT_QUEUE_NAME: (0.2, DEFAULT_BUCKET_SIZE, DEFAULT_RATE), '__cron': (1, 1, '1/s')} + _TASKQUEUE_KIND = "___TaskQueue___" def _GetAppId(request): @@ -360,9 +376,14 @@ def __init__(self, task_executor, app_name, retry_seconds, **kwargs): self._should_exit = False self.task_executor = task_executor self.default_retry_seconds = retry_seconds - self.connection = pika.BlockingConnection(pika.ConnectionParameters( + try: + self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) - self.channel = self.connection.channel() + self.channel = self.connection.channel() + except pika.exceptions.AMQPConnectionError, e: + logging.error("Unable to connect to RabbitMQ: " + str(e)) + except Exception, e: + logging.error("Unknown Exception--unable to connect to RabbitMQ: " + str(e)) self._queue_name = "app_%s"%app_name if kwargs: raise TypeError('Unknown parameters: %s' % ', '.join(kwargs)) @@ -444,8 +465,13 @@ def _TaskCallback(self, ch, method, properties, body): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() - # TODO should be done transactionally with the publish and reject - # The API does support transactions see + except pika.exceptions.AMQPConnectionError, e: + logging.error("Unable to connect to RabbitMQ: " + str(e)) + except Exception, e: + logging.error("Unknown exception--unable to connect to RabbitMQ: " + str(e)) + # TODO RabbitMQ's basic_publish and reject should be + # done transactionally to prevent race conditions and duplicate + # tasks being enqueued. The API does support transactions see: # http://www.rabbitmq.com/amqp-0-9-1-reference.html else: ch.basic_reject(delivery_tag = method.delivery_tag, requeue = False) @@ -453,6 +479,7 @@ def _TaskCallback(self, ch, method, properties, body): def MainLoop(self): """The main loop of the scheduler.""" + reconnect_time = 1 while 1: try: logging.info("Connecting to RabbitMQ") @@ -468,8 +495,10 @@ def MainLoop(self): logging.error("RabbitMQ Connection error %s"%str(e)) except Exception, e: logging.error("RabbitMQ Unknown exception %s"%str(e)) - logging.info("Reconnecting in 5 seconds") - time.sleep(5) + logging.info("Reconnecting in " + str(reconnect_time) + " seconds") + time.sleep(reconnect_time) + if reconnect_time <= MAX_RECONNECT_TIME: + reconnect_time *= 2 class TaskQueueServiceStub(apiproxy_stub.APIProxyStub): """Python only task queue service stub. @@ -519,10 +548,15 @@ def __init__(self, self._task_scheduler = _BackgroundTaskScheduler( _TaskExecutor(default_http_server, self._secret_hash), app_id, retry_seconds=task_retry_seconds) - self.connection = pika.BlockingConnection(pika.ConnectionParameters( + try: + self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) - self.channel = self.connection.channel() - self.channel.queue_declare(queue='app_%s'%app_id, durable=False) + self.channel = self.connection.channel() + self.channel.queue_declare(queue='app_%s'%app_id, durable=False) + except pika.exceptions.AMQPConnectionError, e: + logging.error("RabbitMQ Connection error %s"%str(e)) + except Exception, e: + logging.error("Unknown exception--Unable to connect to to RabbitMQ") def StartBackgroundExecution(self): """Start automatic task execution.""" @@ -745,6 +779,8 @@ def _enqueue_task(self, request, now): host='localhost')) raise apiproxy_errors.ApplicationError( taskqueue_service_pb.TaskQueueServiceError.TRANSIENT_ERROR) + except Exception, e: + logging.error("Unknown exception--Unable to connect to RabbitMQ") def _LocateTaskByName(self, task_name): """ Makes sure the task does not exist or tombstoned From bcc92130e9be0d58930bc4591ebdbe3c6d89bf78 Mon Sep 17 00:00:00 2001 From: Navraj Chohan Date: Thu, 25 Oct 2012 16:37:25 +0000 Subject: [PATCH 2/2] Code review changes, and syntax fix --- AppController/djinn.rb | 5 +++-- .../google/appengine/api/taskqueue/taskqueue_rabbitmq.py | 6 +++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/AppController/djinn.rb b/AppController/djinn.rb index 1d665f0003..a2aab41da1 100644 --- a/AppController/djinn.rb +++ b/AppController/djinn.rb @@ -2834,7 +2834,7 @@ def start_appengine() static_handlers = HelperFunctions.parse_static_data(app) proxy_port = HAProxy.app_listen_port(app_number) login_ip = get_login.public_ip - success = Nginx.write_app_config(app, app_number, my_public, + success = Nginx.write_app_config(app, app_number, my_public, my_private, proxy_port, static_handlers, login_ip) if not success Djinn.log_debug("ERROR: Failure to create valid nginx config file for application #{app}.") @@ -3445,7 +3445,8 @@ def start_sisyphus static_handlers = HelperFunctions.parse_static_data(app) proxy_port = HAProxy.app_listen_port(app_number) - Nginx.write_app_config(app, app_number, my_public, proxy_port, static_handlers, login_ip) + Nginx.write_app_config(app, app_number, my_public, my_private, + proxy_port, static_handlers, login_ip) HAProxy.write_app_config(app, app_number, num_servers, my_private) Collectd.write_app_config(app) diff --git a/AppServer/google/appengine/api/taskqueue/taskqueue_rabbitmq.py b/AppServer/google/appengine/api/taskqueue/taskqueue_rabbitmq.py index 7a1ffae3fd..6c59216632 100644 --- a/AppServer/google/appengine/api/taskqueue/taskqueue_rabbitmq.py +++ b/AppServer/google/appengine/api/taskqueue/taskqueue_rabbitmq.py @@ -52,6 +52,7 @@ from google.appengine.api import datastore_errors import pika +#TODO document these globals DEFAULT_RATE = '5.00/s' DEFAULT_RATE_FLOAT = 5.0 @@ -73,7 +74,7 @@ # Max wait in seconds MAX_WAIT = 60 -# Max time for exponential backoff for RabbitMQ reconnect +# Max for time for exponential backoff for RabbitMQ reconnect MAX_RECONNECT_TIME = 1024 BUILT_IN_HEADERS = set(['x-appengine-queuename', @@ -497,8 +498,11 @@ def MainLoop(self): logging.error("RabbitMQ Unknown exception %s"%str(e)) logging.info("Reconnecting in " + str(reconnect_time) + " seconds") time.sleep(reconnect_time) + if reconnect_time <= MAX_RECONNECT_TIME: reconnect_time *= 2 + else: + reconnect_time = MAX_RECONNECT_TIME class TaskQueueServiceStub(apiproxy_stub.APIProxyStub): """Python only task queue service stub.