Skip to content

Commit

Permalink
- Fix kombu/messaging initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
afabiani committed Jun 28, 2018
1 parent 85ec511 commit de70783
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 23 deletions.
54 changes: 41 additions & 13 deletions geonode/messaging/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,44 @@
#
#########################################################################
from django.conf import settings
from kombu import pools
from kombu import BrokerConnection

connections = pools.Connections(limit=100)
producers = pools.Producers(limit=connections.limit)

# run in-memory if broker is not available
# see producer code for synchronous queue
url = getattr(settings, 'BROKER_URL', 'memory://')
task_serializer = getattr(settings, 'CELERY_TASK_SERIALIZER', 'pickle')
broker_transport_options = getattr(settings, 'BROKER_TRANSPORT_OPTIONS', {'socket_timeout': 10})
broker_socket_timeout = getattr(broker_transport_options, 'socket_timeout', 10)
connection = BrokerConnection(url, connect_timeout=broker_socket_timeout)
from geonode.notifications_helper import NotificationsAppConfigBase

connections = None
producers = None
url = None
task_serializer = None
broker_transport_options = None
broker_socket_timeout = None
connection = None


class MessagingAppConfig(NotificationsAppConfigBase):
name = 'geonode.messaging'

def ready(self):
super(MessagingAppConfig, self).ready()

from kombu import pools
from kombu import BrokerConnection

global connections
global producers
global url
global task_serializer
global broker_transport_options
global broker_socket_timeout
global connection

connections = pools.Connections(limit=100)
producers = pools.Producers(limit=connections.limit)

# run in-memory if broker is not available
# see producer code for synchronous queue
url = getattr(settings, 'BROKER_URL', 'memory://')
task_serializer = getattr(settings, 'CELERY_TASK_SERIALIZER', 'pickle')
broker_transport_options = getattr(settings, 'BROKER_TRANSPORT_OPTIONS', {'socket_timeout': 10})
broker_socket_timeout = getattr(broker_transport_options, 'socket_timeout', 10)
connection = BrokerConnection(url, connect_timeout=broker_socket_timeout)


default_app_config = 'geonode.messaging.MessagingAppConfig'
9 changes: 8 additions & 1 deletion geonode/messaging/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import traceback

from decorator import decorator
from kombu import BrokerConnection
from kombu.common import maybe_declare
from queues import queue_email_events, queue_geoserver_events,\
queue_notifications_events, queue_layer_viewers

from . import (producers,
from . import (url,
producers,
connection,
broker_socket_timeout,
task_serializer)
Expand All @@ -52,6 +54,7 @@ def sync_if_local_memory(func, *args, **kwargs):
try:
return func(*args, **kwargs)
finally:
connection = BrokerConnection(url, connect_timeout=broker_socket_timeout)
if getattr(connection.connection, 'driver_name', None) == 'memory':
# hack explained:
# when using memory://, first run usually contains only message for
Expand All @@ -71,6 +74,10 @@ def sync_if_local_memory(func, *args, **kwargs):
msg = "Exception while publishing message: {}".format(tb)
logger.error(msg)
raise
elif not getattr(connection.connection, 'driver_name', None):
msg = "Exception while getting connection to {}".format(url)
logger.error(msg)
raise


@sync_if_local_memory
Expand Down
31 changes: 22 additions & 9 deletions tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,36 @@ def update(ctx):
print "Public PORT is {0}".format(pub_port)
db_url = _update_db_connstring()
geodb_url = _update_geodb_connstring()
override_env = "$HOME/.override_env"
envs = {
"public_fqdn": "{0}:{1}".format(pub_ip, pub_port),
"public_fqdn": "{0}:{1}".format(pub_ip, pub_port or 80),
"public_host": "{0}".format(pub_ip),
"dburl": db_url,
"geodburl": geodb_url,
"override_fn": "$HOME/.override_env"
"override_fn": override_env
}
ctx.run("echo export GEOSERVER_PUBLIC_LOCATION=\
if not os.environ.get('GEOSERVER_PUBLIC_LOCATION'):
ctx.run("echo export GEOSERVER_PUBLIC_LOCATION=\
http://{public_fqdn}/geoserver/ >> {override_fn}".format(**envs), pty=True)
ctx.run("echo export SITEURL=\
if not os.environ.get('SITEURL'):
ctx.run("echo export SITEURL=\
http://{public_fqdn}/ >> {override_fn}".format(**envs), pty=True)
ctx.run("echo export ALLOWED_HOSTS=\
\"\\\"['{public_fqdn}', '{public_host}', 'localhost', 'django', 'geonode',]\\\"\" \
>> {override_fn}".format(**envs), pty=True)
ctx.run("echo export DATABASE_URL=\

try:
current_allowed = ast.literal_eval(os.getenv('ALLOWED_HOSTS') or \
"['{public_fqdn}', '{public_host}', 'localhost', 'django', 'geonode',]".format(**envs))
except ValueError:
current_allowed = []
current_allowed.extend(['{}'.format(pub_ip), '{}:{}'.format(pub_ip, pub_port)])
allowed_hosts = ['"{}"'.format(c) for c in current_allowed] + ['"geonode"', '"django"']

ctx.run('echo export ALLOWED_HOSTS="\\"{}\\"" >> {}'.format(allowed_hosts, override_env), pty=True)

if not os.environ.get('DATABASE_URL'):
ctx.run("echo export DATABASE_URL=\
{dburl} >> {override_fn}".format(**envs), pty=True)
ctx.run("echo export GEODATABASE_URL=\
if not os.environ.get('GEODATABASE_URL'):
ctx.run("echo export GEODATABASE_URL=\
{geodburl} >> {override_fn}".format(**envs), pty=True)
ctx.run("source $HOME/.override_env", pty=True)
print "****************************final**********************************"
Expand Down

0 comments on commit de70783

Please sign in to comment.