Skip to content

Commit

Permalink
Import gevent and zerorpc unconditionally
Browse files Browse the repository at this point in the history
We previously imported these modules only when required, since the RPC
framework to use was a runtime choice. Now that we have removed that
choice, and zerorpc is the only alternative, there is no point anymore
on having conditional imports spread across the rpc module.

Signed-off-by: Rodrigo Tobar <rtobar@icrar.org>
  • Loading branch information
rtobar committed Jul 1, 2020
1 parent ffb4fdf commit 6842fa3
Showing 1 changed file with 4 additions and 14 deletions.
18 changes: 4 additions & 14 deletions daliuge-runtime/dlg/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import logging
import threading

import gevent
import zerorpc

from six.moves import queue as Queue # @UnresolvedImport

from . import utils
Expand Down Expand Up @@ -90,7 +93,6 @@ class ZeroRPCClient(RPCClientBase):
def __init__(self, *args, **kwargs):
super(ZeroRPCClient, self).__init__(*args, **kwargs)
if not hasattr(self, '_context'):
zerorpc = utils.timed_import('zerorpc')
self._context = zerorpc.Context()

def start(self):
Expand Down Expand Up @@ -145,8 +147,6 @@ def has_method(self, session_id, uid, name):
return client

def run_zrpcclient(self, host, port, req_queue):
import gevent
import zerorpc
client = zerorpc.Client("tcp://%s:%d" % (host,port), context=self._context)

forwarder = gevent.spawn(self.forward_requests, req_queue, client)
Expand All @@ -156,7 +156,6 @@ def run_zrpcclient(self, host, port, req_queue):
client.close()

def forward_requests(self, req_queue, client):
import gevent
while self.rpc_running:
try:
req = req_queue.get_nowait()
Expand All @@ -165,7 +164,6 @@ def forward_requests(self, req_queue, client):
gevent.sleep(0.005)

def process_response(self, req, async_response):
import gevent
try:
x = ZeroRPCClient.response(async_response.get_nowait(), False)
except Exception as e:
Expand All @@ -190,14 +188,11 @@ class ZeroRPCServer(RPCServerBase):
@classmethod
def create_context(cls):
# This import can take a long time in big HPC deployments
return utils.timed_import('zerorpc').Context()
return zerorpc.Context()

def start(self):
super(ZeroRPCServer, self).start()

# See above
utils.timed_import('gevent')

# Starts the single-threaded ZeroRPC server for RPC requests
timeout = 30
server_started = threading.Event()
Expand All @@ -207,10 +202,6 @@ def start(self):
raise Exception("ZeroRPC server didn't start within %d seconds" % (timeout,))

def run_zrpcserver(self, host, port, server_started):

import gevent
import zerorpc

# Use context provided by subclass
self._zrpcserver = zerorpc.Server(self, context=self._context)
# zmq needs an address, not a hostname
Expand All @@ -225,7 +216,6 @@ def run_zrpcserver(self, host, port, server_started):
logger.info("ZeroRPC server finished")

def stop_zrpcserver(self):
import gevent
while self.rpc_running:
gevent.sleep(0.01)
logger.info("Closing ZeroRPC server on tcp://%s:%d", utils.zmq_safe(self._rpc_host), self._rpc_port)
Expand Down

0 comments on commit 6842fa3

Please sign in to comment.