Skip to content

Commit

Permalink
add async pull_image
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengyao-lin committed Sep 15, 2019
1 parent 10b9612 commit f5e10e8
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 35 deletions.
60 changes: 26 additions & 34 deletions chainlink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import concurrent.futures
import logging
import tempfile
import threading

import docker
import docker.errors
Expand All @@ -18,20 +17,22 @@ class Chainlink:
A utility for running docker containers sequentially
"""

def __init__(self, stages, workdir="/tmp"):
def __init__(self, stages, workdir="/tmp", max_workers=4):
self.client = docker.from_env()
self.stages = stages
self.workdir = workdir
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
self._pull_status = {}
self._pull_images()
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers)

# sync version
def run(self, environ={}):
return asyncio.get_event_loop().run_until_complete(self.run_async(environ))
def run(self, *args, **kwargs):
return asyncio.get_event_loop().run_until_complete(
self.run_async(*args, **kwargs)
)

async def run_async(self, environ={}):
results = []

await self._pull_images()

with tempfile.TemporaryDirectory(dir=self.workdir) as mount:
logger.info("using {} for temporary job directory".format(mount))

Expand All @@ -46,43 +47,34 @@ async def run_async(self, environ={}):

return results

def _pull_images(self):
async def _pull_images(self):
images = set([stage["image"] for stage in self.stages])
threads = []
tasks = []

for image in images:
logger.debug("pulling image '{}'".format(image))
t = threading.Thread(
target=self._pull_image, args=(self.client, image, self._pull_status)
)
t.start()
threads.append(t)
for t in threads:
t.join()
for image in images:
if not self._pull_status.get(image):
raise ValueError("Failed to pull all images")

@staticmethod
def _pull_image(client, image, status):
try:
client.images.pull(image)
status[image] = True
return
except docker.errors.NotFound:
logger.debug("image '{}' not found on Docker Hub".format(image))
except docker.errors.APIError as err:
logger.debug("Docker API Error: {}".format(err))
return
tasks.append(self._pull_image(image))

try:
client.images.get(image)
status[image] = True
await asyncio.gather(*tasks)
except docker.errors.ImageNotFound:
logger.error("image '{}' not found remotely or locally".format(image))
except docker.errors.APIError as err:
logger.debug("Docker API Error: {}".format(err))
return

async def _pull_image(self, image):
def wait():
try:
return self.client.images.pull(image)
except docker.errors.NotFound:
# if not found on docker hub, try locally
logger.debug(
"image '{}' not found on Docker Hub, fetching locally".format(image)
)
return self.client.images.get(image)

return await asyncio.get_event_loop().run_in_executor(self._executor, wait)

async def _run_stage(self, stage, mount, environ):
environ = {**environ, **stage.get("env", {})}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ def test_basic_chain(self):

def test_no_such_image(self):
with self.assertRaises(Exception):
Chainlink(stages_2)
Chainlink(stages_2).run({})

0 comments on commit f5e10e8

Please sign in to comment.