Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions AppController/djinn.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2834,7 +2834,7 @@ def start_appengine()
static_handlers = HelperFunctions.parse_static_data(app)
proxy_port = HAProxy.app_listen_port(app_number)
login_ip = get_login.public_ip
success = Nginx.write_app_config(app, app_number, my_public,
success = Nginx.write_app_config(app, app_number, my_public, my_private,
proxy_port, static_handlers, login_ip)
if not success
Djinn.log_debug("ERROR: Failure to create valid nginx config file for application #{app}.")
Expand Down Expand Up @@ -3445,7 +3445,8 @@ def start_sisyphus

static_handlers = HelperFunctions.parse_static_data(app)
proxy_port = HAProxy.app_listen_port(app_number)
Nginx.write_app_config(app, app_number, my_public, proxy_port, static_handlers, login_ip)
Nginx.write_app_config(app, app_number, my_public, my_private,
proxy_port, static_handlers, login_ip)
HAProxy.write_app_config(app, app_number, num_servers, my_private)
Collectd.write_app_config(app)

Expand Down
60 changes: 50 additions & 10 deletions AppServer/google/appengine/api/taskqueue/taskqueue_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,28 +52,45 @@
from google.appengine.api import datastore_errors
import pika

#TODO document these globals
DEFAULT_RATE = '5.00/s'

DEFAULT_RATE_FLOAT = 5.0

DEFAULT_BUCKET_SIZE = 5

MAX_ETA = datetime.timedelta(days=30)

MAX_PULL_TASK_SIZE_BYTES = 2 ** 20

MAX_PUSH_TASK_SIZE_BYTES = 100 * (2 ** 10)

MAX_TASK_SIZE = MAX_PUSH_TASK_SIZE_BYTES

MAX_REQUEST_SIZE = 32 << 20

MAX_RETRIES = 10
MAX_WAIT = 60 # max wait in seconds

# Max wait in seconds
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type of comment is not helpful. What is this the maximum waiting time for? Connecting to RabbitMQ? Sending messages? Everything? Are there implications to setting this value too low or too high? These are the types of questions that should be answered by a helpful docstring.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this MAX_WAIT is for. Leaving in for backward compatibility.

MAX_WAIT = 60

# Max for time for exponential backoff for RabbitMQ reconnect
MAX_RECONNECT_TIME = 1024

BUILT_IN_HEADERS = set(['x-appengine-queuename',
'x-appengine-taskname',
'x-appengine-taskretrycount',
'x-appengine-development-payload',
'content-length'])

DEFAULT_QUEUE_NAME = 'default'

QUEUE_MODE = taskqueue_service_pb.TaskQueueMode

AUTOMATIC_QUEUES = {
DEFAULT_QUEUE_NAME: (0.2, DEFAULT_BUCKET_SIZE, DEFAULT_RATE),
'__cron': (1, 1, '1/s')}

_TASKQUEUE_KIND = "___TaskQueue___"

def _GetAppId(request):
Expand Down Expand Up @@ -360,9 +377,14 @@ def __init__(self, task_executor, app_name, retry_seconds, **kwargs):
self._should_exit = False
self.task_executor = task_executor
self.default_retry_seconds = retry_seconds
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
try:
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
self.channel = self.connection.channel()
except pika.exceptions.AMQPConnectionError, e:
logging.error("Unable to connect to RabbitMQ: " + str(e))
except Exception, e:
logging.error("Unknown Exception--unable to connect to RabbitMQ: " + str(e))
self._queue_name = "app_%s"%app_name
if kwargs:
raise TypeError('Unknown parameters: %s' % ', '.join(kwargs))
Expand Down Expand Up @@ -444,15 +466,21 @@ def _TaskCallback(self, ch, method, properties, body):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
# TODO should be done transactionally with the publish and reject
# The API does support transactions see
except pika.exceptions.AMQPConnectionError, e:
logging.error("Unable to connect to RabbitMQ: " + str(e))
except Exception, e:
logging.error("Unknown exception--unable to connect to RabbitMQ: " + str(e))
# TODO RabbitMQ's basic_publish and reject should be
# done transactionally to prevent race conditions and duplicate
# tasks being enqueued. The API does support transactions see:
# http://www.rabbitmq.com/amqp-0-9-1-reference.html
else:
ch.basic_reject(delivery_tag = method.delivery_tag, requeue = False)


def MainLoop(self):
"""The main loop of the scheduler."""
reconnect_time = 1
while 1:
try:
logging.info("Connecting to RabbitMQ")
Expand All @@ -468,8 +496,13 @@ def MainLoop(self):
logging.error("RabbitMQ Connection error %s"%str(e))
except Exception, e:
logging.error("RabbitMQ Unknown exception %s"%str(e))
logging.info("Reconnecting in 5 seconds")
time.sleep(5)
logging.info("Reconnecting in " + str(reconnect_time) + " seconds")
time.sleep(reconnect_time)

if reconnect_time <= MAX_RECONNECT_TIME:
reconnect_time *= 2
else:
reconnect_time = MAX_RECONNECT_TIME

class TaskQueueServiceStub(apiproxy_stub.APIProxyStub):
"""Python only task queue service stub.
Expand Down Expand Up @@ -519,10 +552,15 @@ def __init__(self,
self._task_scheduler = _BackgroundTaskScheduler(
_TaskExecutor(default_http_server, self._secret_hash), app_id,
retry_seconds=task_retry_seconds)
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
try:
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
self.channel.queue_declare(queue='app_%s'%app_id, durable=False)
self.channel = self.connection.channel()
self.channel.queue_declare(queue='app_%s'%app_id, durable=False)
except pika.exceptions.AMQPConnectionError, e:
logging.error("RabbitMQ Connection error %s"%str(e))
except Exception, e:
logging.error("Unknown exception--Unable to connect to to RabbitMQ")

def StartBackgroundExecution(self):
"""Start automatic task execution."""
Expand Down Expand Up @@ -745,6 +783,8 @@ def _enqueue_task(self, request, now):
host='localhost'))
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.TRANSIENT_ERROR)
except Exception, e:
logging.error("Unknown exception--Unable to connect to RabbitMQ")

def _LocateTaskByName(self, task_name):
""" Makes sure the task does not exist or tombstoned
Expand Down