Skip to content

Commit

Permalink
Fix Failed Subjob Result Sending (#454)
Browse files Browse the repository at this point in the history
* Retry sending results on subjob completion up to 5 times with a linear sleep to resolve the issue where sometimes subjobs would fail to send on first attempt and cause a run to never send all results and show as In Progress indefinitely.
* Check if executor count is >= 0 instead of == 0 when marking a slave as idle. There was a bug where a slave could get to a negative executor count likely through a race condition and cause an Exception here.
* Rename stages of Dockerfile to be more explicit and utilize builder stage for building a test image and running tests in docker for easier local development.
* Added targets docker-lint and docker-test to Makefile for making local testing in the same docker image as we build our rpm in possible.
  • Loading branch information
rsennewald committed Dec 22, 2021
1 parent 6cb3519 commit aed64a6
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 21 deletions.
8 changes: 4 additions & 4 deletions Dockerfile
Expand Up @@ -4,7 +4,7 @@

# STAGE 1: Official PEP 513 Python Manylinux (RHEL5) base with Python 3.4 enabled to create
# linux_x86_64 pex.
FROM quay.io/pypa/manylinux1_x86_64:2020-01-31-d8fa357 AS stage1
FROM quay.io/pypa/manylinux1_x86_64:2020-01-31-d8fa357 AS builder
ENV PATH="/opt/python/cp34-cp34m/bin:${PATH}"

WORKDIR /ClusterRunner
Expand All @@ -16,9 +16,9 @@ COPY . .
RUN make dist/clusterrunner

# STAGE 2: CentOS 7 base w/ fpm to package pex into an rpm.
FROM cdrx/fpm-centos:7 AS stage2
FROM cdrx/fpm-centos:7 AS packager
WORKDIR /root
COPY . .
COPY --from=stage1 /ClusterRunner/dist/clusterrunner ./dist/
COPY --from=stage1 /ClusterRunner/clusterrunner.egg-info/PKG-INFO ./clusterrunner.egg-info/
COPY --from=builder /ClusterRunner/dist/clusterrunner ./dist/
COPY --from=builder /ClusterRunner/clusterrunner.egg-info/PKG-INFO ./clusterrunner.egg-info/
RUN make rpm
18 changes: 18 additions & 0 deletions Makefile
Expand Up @@ -59,6 +59,7 @@ FPM_DEPEND_ARGS = $(addprefix --depends , $(RPM_DEPENDS))

# ## Docker defines
DOCKER_TAG := productivity/clusterrunner
DOCKER_TEST_TAG := productivity/clusterrunner-test

# ## Artifactory defines
# Select the release repo based on if version string is an "official" release
Expand All @@ -77,6 +78,9 @@ print_msg = @printf "\n\033[1;34m***%s***\033[0m\n" "$(1)"
# IMPORTANT: $(PY_PKG_INFO) must be a dependency of any targets that use this macro.
pkg_info = $(strip $(shell egrep -i "^$(1): " $(PY_PKG_INFO) | sed 's/[^:]*://'))

# Macro for building docker image for testing
build_test_docker_img = docker build --target builder -t $(DOCKER_TEST_TAG) -f Dockerfile .


all: lint test
lint: pep8 pylint
Expand Down Expand Up @@ -138,6 +142,20 @@ test-functional:
$(call print_msg, Running functional tests... )
nosetests -s -v test/functional

# Build the clusterrunnner testing docker image from only the builder stage and run lint in it.
.PHONY: docker-lint
docker-lint:
$(call print_msg, Building ClusterRunner docker image to run lint in... )
$(call build_test_docker_img)
docker run --rm $(DOCKER_TEST_TAG) make lint

# Build the clusterrunnner testing docker image from only the builder stage and run tests in it.
.PHONY: docker-test
docker-test:
$(call print_msg, Building ClusterRunner docker image to run tests in... )
$(call build_test_docker_img)
docker run --rm $(DOCKER_TEST_TAG) make test

# INFO: The use of multiple targets (before the :) in the next sections enable
# a technique for setting some targets to "phony" so they will always
# run, while allowing other targets to remain conditional based on the
Expand Down
3 changes: 1 addition & 2 deletions app/master/build.py
Expand Up @@ -190,8 +190,7 @@ def complete_subjob(self, subjob_id, payload=None):
self._mark_subjob_complete(subjob_id)

except Exception:
self._logger.exception('Error while completing subjob; marking build as failed.')
self.mark_failed('Error occurred while completing subjob {}.'.format(subjob_id))
self._logger.exception('Error while processing subjob {} payload'.format(subjob_id))
raise

def _parse_payload_for_atom_exit_code(self, subjob_id):
Expand Down
3 changes: 1 addition & 2 deletions app/master/build_scheduler.py
Expand Up @@ -91,8 +91,7 @@ def begin_subjob_executions_on_slave(self, slave):
"""
analytics.record_event(analytics.BUILD_SETUP_FINISH, build_id=self._build.build_id(), slave_id=slave.id)
for slave_executor_count in range(slave.num_executors):
if (self._num_executors_in_use >= self._max_executors
or slave_executor_count >= self._max_executors_per_slave):
if self._num_executors_in_use >= self._max_executors or slave_executor_count >= self._max_executors_per_slave:
break
slave.claim_executor()
self._num_executors_in_use += 1
Expand Down
2 changes: 1 addition & 1 deletion app/master/slave.py
Expand Up @@ -57,7 +57,7 @@ def mark_as_idle(self):
Do bookkeeping when this slave becomes idle. Error if the slave cannot be idle.
If the slave is in shutdown mode, clear the build_id, kill the slave, and raise an error.
"""
if self._num_executors_in_use.value() != 0:
if self._num_executors_in_use.value() > 0:
raise Exception('Trying to mark slave idle while {} executors still in use.',
self._num_executors_in_use.value())

Expand Down
21 changes: 12 additions & 9 deletions app/slave/cluster_slave.py
Expand Up @@ -356,16 +356,19 @@ def _execute_subjob(self, build_id, subjob_id, executor, atomic_commands):
'slave': '{}:{}'.format(self.host, self.port),
'metric_data': {'executor_id': executor.id},
}
files = {'file': ('payload', open(results_file, 'rb'), 'application/x-compressed')}

self._idle_executors.put(executor) # work is done; mark executor as idle
resp = self._network.post(results_url, data=data, files=files)
if resp.ok:
self._logger.info('Build {}, Subjob {} completed and sent results to master.', build_id, subjob_id)
else:
self._logger.error(
('Build {}, Subjob {} encountered an error when sending results to master.'
'\n\tStatus Code {}\n\t{}').format(build_id, subjob_id, resp.status_code, resp.text))

for attempt in range(5):
files = {'file': ('payload', open(results_file, 'rb'), 'application/x-compressed')}
resp = self._network.post(results_url, data=data, files=files)
if resp.status_code == 200:
self._logger.info('Build {}, Subjob {} completed and sent results to master.', build_id, subjob_id)
break
else:
self._logger.error(
('Build {}, Subjob {} encountered an error when sending results to master.'
'\n\tStatus Code {} attempt {}\n\t{}').format(build_id, subjob_id, resp.status_code, attempt+1, resp.text))
time.sleep(attempt+1)

def _notify_master_of_state_change(self, new_state):
"""
Expand Down
3 changes: 3 additions & 0 deletions app/util/fs.py
Expand Up @@ -83,6 +83,9 @@ def extract_tar(archive_file, target_dir=None, delete=False):
if not target_dir:
target_dir, _ = os.path.split(archive_file) # default to same directory as tar file

if not tarfile.is_tarfile(archive_file):
raise Exception("Not a tarfile: {}".format(archive_file))

try:
with tarfile.open(archive_file, 'r:gz') as f:
f.extractall(target_dir)
Expand Down
4 changes: 2 additions & 2 deletions app/util/network.py
Expand Up @@ -51,7 +51,7 @@ def get(self, *args, **kwargs):
return self._request('GET', *args, **kwargs)

# todo: may be a bad idea to retry -- what if post was successful but just had a response error?
@retry_on_exception_exponential_backoff(exceptions=(requests.ConnectionError,))
@retry_on_exception_exponential_backoff(exceptions=(requests.ConnectionError, requests.Timeout), initial_delay=1.0)
def post(self, *args, **kwargs):
"""
Send a POST request to a url. Arguments to this method, unless otherwise documented below in _request(), are
Expand All @@ -75,7 +75,7 @@ def post_with_digest(self, url, request_params, secret, error_on_failure=False):
error_on_failure=error_on_failure)

# todo: may be a bad idea to retry -- what if put was successful but just had a response error?
@retry_on_exception_exponential_backoff(exceptions=(requests.ConnectionError,))
@retry_on_exception_exponential_backoff(exceptions=(requests.ConnectionError, requests.Timeout))
def put(self, *args, **kwargs):
"""
Send a PUT request to a url. Arguments to this method, unless otherwise documented below in _request(), are
Expand Down
17 changes: 17 additions & 0 deletions test/README.md
Expand Up @@ -26,6 +26,23 @@ export CR_VERBOSE=1
nosetests -s -v test/functional/
```

Run lint & tests in docker (no need for any setup on local machine)
--------------
```bash
make docker-lint
make docker-test
# or...
docker build --target builder -t productivity/clusterrunner-tests -f Dockerfile .
docker run --rm productivity/clusterrunner-tests make lint
docker run --rm productivity/clusterrunner-tests make test-unit
docker run --rm productivity/clusterrunner-tests test-integration
docker run --rm productivity/clusterrunner-tests test-functional

# or run the functional tests with verbose logging
docker build --target builder -t productivity/clusterrunner-tests -f Dockerfile .
docker run -e CR_VERBOSE=1 --rm productivity/clusterrunner-tests nosetests -s -v test/functional/
```


Set up Python 3.4 using Pyenv
---------------
Expand Down
2 changes: 1 addition & 1 deletion test/functional/master/test_http_timeout.py
Expand Up @@ -42,7 +42,7 @@ def test_unresponsive_slave_does_not_hang_master(self, responses):
})
build_id = build_resp['build_id']

self.assertTrue(master.block_until_slave_offline(unresponsive_slave_id, timeout=10),
self.assertTrue(master.block_until_slave_offline(unresponsive_slave_id, timeout=40),
'Unresponsive slave should be marked offline.')

# First slave should now be marked offline. Connect a real slave to finish the build.
Expand Down

0 comments on commit aed64a6

Please sign in to comment.