Skip to content

Commit

Permalink
Merge pull request #19 from funcx-faas/dlhub-enhancements-#4
Browse files Browse the repository at this point in the history
Dlhub enhancements #4
  • Loading branch information
ryanchard committed Jul 2, 2019
2 parents a74ffa5 + 2767cb6 commit ddac036
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 106 deletions.
21 changes: 12 additions & 9 deletions funcx/endpoint/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import os

from parsl.config import Config
from parsl.app.app import python_app, bash_app
from parsl.launchers import SingleNodeLauncher
from parsl.channels import LocalChannel
from parsl.channels import SSHInteractiveLoginChannel
from parsl.providers import LocalProvider
from parsl.executors import HighThroughputExecutor

Expand All @@ -23,9 +20,14 @@ def _load_auth_client():
No credentials are used if the server is not production
Returns:
(globus_sdk.ConfidentialAppAuthClient): Client used to perform GlobusAuth actions
Returns
-------
globus_sdk.ConfidentialAppAuthClient
Client used to perform GlobusAuth actions
"""

_prod = True

if _prod:
app = globus_sdk.ConfidentialAppAuthClient(GLOBUS_CLIENT,
GLOBUS_KEY)
Expand All @@ -35,11 +37,12 @@ def _load_auth_client():


def _get_parsl_config():
"""
Get the parsl config.
"""Get the Parsl config.
Returns:
(parsl.config.Config): Parsl config to execute tasks.
Returns
-------
parsl.config.Config
Parsl config to execute tasks.
"""

config = Config(
Expand Down
197 changes: 138 additions & 59 deletions funcx/endpoint/endpoint.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import statistics
import threading
import platform
import requests
import logging
import pickle
import parsl
Expand All @@ -18,91 +16,172 @@
from parsl.providers import LocalProvider
from parsl.channels import LocalChannel
from parsl.config import Config
from parsl.app.app import python_app

parsl.load(_get_parsl_config())
from parsl.app.app import python_app

dfk = parsl.dfk()
ex = dfk.executors['htex_local']
logging.basicConfig(filename='funcx_endpoint.log', level=logging.DEBUG)


@python_app
def run_code(code, entry_point, event=None):
def execute_function(code, entry_point, event=None):
"""Run the function. First it exec's the function code
to load it into the current context and then eval's the function
using the entry point
Parameters
----------
code : str
The function code in a string format.
entry_point : str
The name of the function's entry point.
event : dict
The event context
Returns
-------
json
The result of running the function
"""
try:
exec(code)
return eval(entry_point)(event)
except Exception as e:
return str(e)


def send_request(serving_url, inputs):
headers = {"content-type": "application/json"}
r = requests.post(serving_url, data=json.dumps(inputs), headers=headers)
return json.loads(r.content)

class FuncXEndpoint:

def __init__(self, ip="funcx.org", port=50001, worker_threads=1, container_type="Singularity"):
"""Initiate a funcX endpoint
Parameters
----------
ip : int
IP address of the service to receive tasks
port : int
Port of the service to receive tasks
worker_threads : int
Number of concurrent workers to receive and process tasks
container_type : str
The virtualization type to use (Singularity, Shifter, Docker)
"""

# Log in and start a client
self.fx = FuncXClient()

self.ip = ip
self.port = port
self.worker_threads = worker_threads
self.container_type = container_type

# Register this endpoint with funcX
self.endpoint_uuid = lookup_option("endpoint_uuid")
self.endpoint_uuid = self.fx.register_endpoint(platform.node(), self.endpoint_uuid)
print(f"Endpoint UUID: {self.endpoint_uuid}")
write_option("endpoint_uuid", self.endpoint_uuid)

parsl.load(_get_parsl_config())
self.dfk = parsl.dfk()
self.ex = self.dfk.executors['htex_local']

# Start the endpoint
self.endpoint_worker()

def endpoint_worker(self):
"""The funcX endpoint worker. This initiates a funcX client and starts worker threads to:
1. receive ZMQ messages (zmq_worker)
2. perform function executions (execution_workers)
3. return results (result_worker)
We have two loops, one that persistently listens for tasks
and the other that waits for task completion and send results
Returns
-------
None
"""

endpoint_worker = ZMQWorker("tcp://{}:{}".format(self.ip, self.port), self.endpoint_uuid)
task_q = queue.Queue()
result_q = queue.Queue()
threads = []
for i in range(1):
thread = threading.Thread(target=self.execution_worker, args=(task_q, result_q,))
thread.daemon = True
threads.append(thread)
thread.start()

thread = threading.Thread(target=self.result_worker, args=(endpoint_worker, result_q,))
thread.daemon = True
threads.append(thread)
thread.start()

def server(ip, port):
"""
We have two loops, one that persistently listens for tasks
and the other that waits for task completion and send results
"""
while True:
(request, reply_to) = endpoint_worker.recv()
task_q.put((request, reply_to))

# Log into funcX via globus
fx = FuncXClient()
def _stage_containers(self, endpoint_containers):
"""Stage the set of containers for local use.
endpoint_uuid = lookup_option("endpoint_uuid")
Parameters
----------
endpoint_containers : dict
A dictionary of containers to have locally for deployment
# Register this endpoint with funcX
endpoint_uuid = fx.register_endpoint(platform.node(), endpoint_uuid)
Returns
-------
None
"""
pass

print(f"Endpoint UUID: {endpoint_uuid}")
def execution_worker(self, task_q, result_q):
"""A worker thread to process tasks and place results on the
result queue.
write_option("endpoint_uuid", endpoint_uuid)
Parameters
----------
task_q : queue.Queue
A queue of tasks to process.
result_q : queue.Queue
A queue to put return queues.
endpoint_worker = ZMQWorker("tcp://{}:{}".format(ip, port), endpoint_uuid)
task_q = queue.Queue()
result_q = queue.Queue()
threads = []
for i in range(1):
thread = threading.Thread(target=parsl_worker, args=(task_q, result_q,))
thread.daemon = True
threads.append(thread)
thread.start()
Returns
-------
None
"""

thread = threading.Thread(target=result_worker, args=(endpoint_worker, result_q, ))
thread.daemon = True
threads.append(thread)
thread.start()
while True:
if task_q:
request, reply_to = task_q.get()

while True:
(request, reply_to) = endpoint_worker.recv()
task_q.put((request, reply_to))
to_do = pickle.loads(request[0])
code, entry_point, event = to_do[-1]['function'], to_do[-1]['entry_point'], to_do[-1]['event']

result = pickle.dumps(execute_function(code, entry_point, event=event).result())

def result_worker(endpoint_worker, result_q):
"""Worker thread to send results back to broker"""
counter = 0
while True:
(result, reply_to)= result_q.get()
endpoint_worker.send(result, reply_to)
result_q.put(([result], reply_to))

def result_worker(self, zmq_worker, result_q):
"""Worker thread to send results back to funcX service via the broker.
def parsl_worker(task_q, result_q):
exec_times = []
endpoint_times = []
while True:
if task_q:
request, reply_to = task_q.get()
to_do = pickle.loads(request[0])
code, entry_point, event = to_do[-1]['function'], to_do[-1]['entry_point'], to_do[-1]['event']
result = pickle.dumps(run_code(code, entry_point, event=event).result())
result_q.put(([result], reply_to))
Parameters
----------
zmq_worker : Thread
The worker thread
result_q : queue.Queue
The queue to add results to.
Returns
-------
None
"""

def main():
server('funcX.org', 50001)
while True:
(result, reply_to) = result_q.get()
zmq_worker.send(result, reply_to)


if __name__ == "__main__":
main()
logging.debug("Starting endpoint")
ep = FuncXEndpoint(ip='funcX.org', port=50001)

Loading

0 comments on commit ddac036

Please sign in to comment.