Skip to content

Commit

Permalink
Kurtwheeler foreman infrastructure (#382)
Browse files Browse the repository at this point in the history
* Gets foreman logs working, fixes some constants for the foreman, switches client instances to 1b, removes old env vars, and slightly improves scripts' error output.
  • Loading branch information
kurtwheeler committed Jul 18, 2018
1 parent 188c28b commit 403ef27
Show file tree
Hide file tree
Showing 36 changed files with 319 additions and 157 deletions.
3 changes: 0 additions & 3 deletions api/environment.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,4 @@ USE_S3=${{USE_S3}}
S3_BUCKET_NAME=${{S3_BUCKET_NAME}}
LOCAL_ROOT_DIR=${{LOCAL_ROOT_DIR}}
NOMAD_HOST=${{NOMAD_HOST}}
RAW_PREFIX=raw
TEMP_PREFIX=temp
PROCESSED_PREFIX=processed
S3_TRANSCRIPTOME_INDEX_BUCKET_NAME=${{S3_TRANSCRIPTOME_INDEX_BUCKET_NAME}}
3 changes: 0 additions & 3 deletions api/environments/local
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ DATABASE_TIMEOUT=5

NOMAD_HOST=nomad

RAW_PREFIX=raw
TEMP_PREFIX=temp
PROCESSED_PREFIX=processed
LOCAL_ROOT_DIR=/home/user/data_store
USE_S3=False
S3_BUCKET_NAME=data-refinery
Expand Down
3 changes: 2 additions & 1 deletion api/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ cd ..

# Ensure that postgres is running
if ! [[ $(docker ps --filter name=drdb -q) ]]; then
echo "You must start Postgres first with './run_postgres.sh'" >&2
echo "You must start Postgres first with:" >&2
echo "./run_postgres.sh" >&2
exit 1
fi

Expand Down
6 changes: 3 additions & 3 deletions common/data_refinery_common/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,14 @@ def pipelines(self):

@property
def pretty_platform(self):
""" Turns
""" Turns
[HT_HG-U133_Plus_PM] Affymetrix HT HG-U133+ PM Array Plate
into
Affymetrix HT HG-U133+ PM Array Plate (hthgu133pluspm)
"""
if ']' in self.platform_name:
platform_base = self.platform_name.split(']')[1].strip()
Expand Down
3 changes: 0 additions & 3 deletions common/environments/local
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ DATABASE_TIMEOUT=5
NOMAD_HOST=nomad
NOMAD_PORT=4646

RAW_PREFIX=raw
TEMP_PREFIX=temp
PROCESSED_PREFIX=processed
LOCAL_ROOT_DIR=/home/user/data_store
USE_S3=False
S3_BUCKET_NAME=data-refinery
Expand Down
3 changes: 0 additions & 3 deletions common/environments/test
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ NOMAD_PORT=5646
RUNNING_IN_CLOUD=False
SERVICE=common

RAW_PREFIX=raw
TEMP_PREFIX=temp
PROCESSED_PREFIX=processed
LOCAL_ROOT_DIR=/home/user/data_store
USE_S3=False
S3_BUCKET_NAME=data-refinery
Expand Down
3 changes: 2 additions & 1 deletion common/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ cd ..

# Ensure that postgres is running
if ! [[ $(docker ps --filter name=drdb -q) ]]; then
echo "You must start Postgres first with './run_postgres.sh'" >&2
echo "You must start Postgres first with:" >&2
echo "./run_postgres.sh" >&2
exit 1
fi

Expand Down
48 changes: 24 additions & 24 deletions foreman/data_refinery_foreman/foreman/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
import nomad
from nomad import Nomad
from nomad.api.exceptions import URLNotFoundNomadException
from typing import Callable, List
from threading import Thread
Expand Down Expand Up @@ -27,15 +27,13 @@
# greater than this because of the first attempt
MAX_NUM_RETRIES = 2

# For now it seems like no jobs should take longer than a day to be
# either picked up or run.
MAX_DOWNLOADER_RUN_TIME = timedelta(days=1)
MAX_PROCESSOR_RUN_TIME = timedelta(days=1)
MAX_QUEUE_TIME = timedelta(days=1)
# The fastest each thread will repeat its checks.
# Could be slower if the thread takes longer than this to check its jobs.
MIN_LOOP_TIME = timedelta(minutes=2)

# To prevent excessive spinning loop no more than once every 30 minutes.
MIN_LOOP_TIME = timedelta(minutes=30)
THREAD_WAIT_TIME = 10.0
# The amount of time the main loop will wait in between checking if
# threads are still alive and then heart beating.
THREAD_WAIT_TIME = timedelta(minutes=10)


@retry(stop_max_attempt_number=3)
Expand Down Expand Up @@ -129,17 +127,16 @@ def retry_failed_downloader_jobs() -> None:
@do_forever(MIN_LOOP_TIME)
def retry_hung_downloader_jobs() -> None:
"""Retry downloader jobs that were started but never finished."""
minimum_start_time = timezone.now() - MAX_DOWNLOADER_RUN_TIME
potentially_hung_jobs = DownloaderJob.objects.filter(
success=None,
retried=False,
end_time=None,
start_time__lt=minimum_start_time
start_time__isnull=False
)

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = nomad.Nomad(nomad_host, port=int(nomad_port), timeout=5)
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=5)
hung_jobs = []
for job in potentially_hung_jobs:
try:
Expand All @@ -166,18 +163,16 @@ def retry_lost_downloader_jobs() -> None:
during which the price of spot instance is higher than our bid
price.
"""
minimum_creation_time = timezone.now() - MAX_QUEUE_TIME
potentially_lost_jobs = DownloaderJob.objects.filter(
success=None,
retried=False,
start_time=None,
end_time=None,
created_at__lt=minimum_creation_time
end_time=None
)

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = nomad.Nomad(nomad_host, port=int(nomad_port), timeout=5)
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=5)
lost_jobs = []
for job in potentially_lost_jobs:
try:
Expand Down Expand Up @@ -249,17 +244,16 @@ def retry_failed_processor_jobs() -> None:
@do_forever(MIN_LOOP_TIME)
def retry_hung_processor_jobs() -> None:
"""Retry processor jobs that were started but never finished."""
minimum_start_time = timezone.now() - MAX_PROCESSOR_RUN_TIME
potentially_hung_jobs = ProcessorJob.objects.filter(
success=None,
retried=False,
end_time=None,
start_time__lt=minimum_start_time
start_time__isnull=False
)

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = nomad.Nomad(nomad_host, port=int(nomad_port), timeout=5)
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=5)
hung_jobs = []
for job in potentially_hung_jobs:
try:
Expand All @@ -278,18 +272,16 @@ def retry_hung_processor_jobs() -> None:
@do_forever(MIN_LOOP_TIME)
def retry_lost_processor_jobs() -> None:
"""Retry processor jobs which never even got started for too long."""
minimum_creation_time = timezone.now() - MAX_QUEUE_TIME
potentially_lost_jobs = ProcessorJob.objects.filter(
success=None,
retried=False,
start_time=None,
end_time=None,
created_at__lt=minimum_creation_time
end_time=None
)

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = nomad.Nomad(nomad_host, port=int(nomad_port), timeout=5)
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=5)
lost_jobs = []
for job in potentially_lost_jobs:
try:
Expand Down Expand Up @@ -322,10 +314,18 @@ def monitor_jobs():
thread = Thread(target=f, name=f.__name__)
thread.start()
threads.append(thread)
logger.info("Thread started for monitoring function: %s", f.__name__)

# Make sure that no threads die quietly.
while(True):
start_time = timezone.now()
for thread in threads:
thread.join(THREAD_WAIT_TIME)
if not thread.is_alive():
logger.error("Foreman Thread for the function %s has died!!!!", thread.name)

loop_time = timezone.now() - start_time
if loop_time < THREAD_WAIT_TIME:
remaining_time = THREAD_WAIT_TIME - loop_time
time.sleep(remaining_time.seconds)

logger.info("The Foreman's heart is beating, but he does not feel.")
54 changes: 37 additions & 17 deletions foreman/data_refinery_foreman/foreman/test_main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest.mock import patch
from unittest.mock import patch, MagicMock
from datetime import timedelta
from django.utils import timezone
from django.test import TestCase
Expand Down Expand Up @@ -58,7 +58,7 @@ def test_requeuing_downloader_job(self, mock_send_job):
job = self.create_downloader_job()

main.requeue_downloader_job(job)
mock_send_job.assert_called_once()
self.assertEqual(len(mock_send_job.mock_calls), 1)

jobs = DownloaderJob.objects.order_by('id')
original_job = jobs[0]
Expand Down Expand Up @@ -107,7 +107,7 @@ def test_retrying_failed_downloader_jobs(self, mock_send_job):
# Just run it once, not forever so get the function that is
# decorated with @do_forever
main.retry_failed_downloader_jobs.__wrapped__()
mock_send_job.assert_called_once()
self.assertEqual(len(mock_send_job.mock_calls), 1)

jobs = DownloaderJob.objects.order_by('id')
original_job = jobs[0]
Expand All @@ -119,15 +119,20 @@ def test_retrying_failed_downloader_jobs(self, mock_send_job):
self.assertEqual(retried_job.num_retries, 1)

@patch('data_refinery_foreman.foreman.main.send_job')
def test_retrying_hung_downloader_jobs(self, mock_send_job):
@patch('data_refinery_foreman.foreman.main.Nomad')
def test_retrying_hung_downloader_jobs(self, mock_nomad, mock_send_job):
mock_nomad.job = MagicMock()
mock_nomad.job.get_job = MagicMock()
mock_nomad.job.get_job.side_effect = lambda _: {"Status:" "dead"}

job = self.create_downloader_job()
job.start_time = timezone.now() - main.MAX_DOWNLOADER_RUN_TIME - timedelta(seconds=1)
job.start_time = timezone.now()
job.save()

# Just run it once, not forever so get the function that is
# decorated with @do_forever
main.retry_hung_downloader_jobs.__wrapped__()
mock_send_job.assert_called_once()
self.assertEqual(len(mock_send_job.mock_calls), 1)

jobs = DownloaderJob.objects.order_by('id')
original_job = jobs[0]
Expand All @@ -139,15 +144,20 @@ def test_retrying_hung_downloader_jobs(self, mock_send_job):
self.assertEqual(retried_job.num_retries, 1)

@patch('data_refinery_foreman.foreman.main.send_job')
def test_retrying_lost_downloader_jobs(self, mock_send_job):
@patch('data_refinery_foreman.foreman.main.Nomad')
def test_retrying_lost_downloader_jobs(self, mock_nomad, mock_send_job):
mock_nomad.job = MagicMock()
mock_nomad.job.get_job = MagicMock()
mock_nomad.job.get_job.side_effect = lambda _: {"Status:" "dead"}

job = self.create_downloader_job()
job.created_at = timezone.now() - main.MAX_QUEUE_TIME - timedelta(seconds=1)
job.created_at = timezone.now()
job.save()

# Just run it once, not forever so get the function that is
# decorated with @do_forever
main.retry_lost_downloader_jobs.__wrapped__()
mock_send_job.assert_called_once()
self.assertEqual(len(mock_send_job.mock_calls), 1)

jobs = DownloaderJob.objects.order_by('id')
original_job = jobs[0]
Expand Down Expand Up @@ -194,7 +204,7 @@ def test_requeuing_processor_job(self, mock_send_job):
job = self.create_processor_job()

main.requeue_processor_job(job)
mock_send_job.assert_called_once()
self.assertEqual(len(mock_send_job.mock_calls), 1)

jobs = ProcessorJob.objects.order_by('id')
original_job = jobs[0]
Expand Down Expand Up @@ -241,7 +251,7 @@ def test_retrying_failed_processor_jobs(self, mock_send_job):
# Just run it once, not forever so get the function that is
# decorated with @do_forever
main.retry_failed_processor_jobs.__wrapped__()
mock_send_job.assert_called_once()
self.assertEqual(len(mock_send_job.mock_calls), 1)

jobs = ProcessorJob.objects.order_by('id')
original_job = jobs[0]
Expand All @@ -253,15 +263,20 @@ def test_retrying_failed_processor_jobs(self, mock_send_job):
self.assertEqual(retried_job.num_retries, 1)

@patch('data_refinery_foreman.foreman.main.send_job')
def test_retrying_hung_processor_jobs(self, mock_send_job):
@patch('data_refinery_foreman.foreman.main.Nomad')
def test_retrying_hung_processor_jobs(self, mock_nomad, mock_send_job):
mock_nomad.job = MagicMock()
mock_nomad.job.get_job = MagicMock()
mock_nomad.job.get_job.side_effect = lambda _: {"Status:" "dead"}

job = self.create_processor_job()
job.start_time = timezone.now() - main.MAX_PROCESSOR_RUN_TIME - timedelta(seconds=1)
job.start_time = timezone.now()
job.save()

# Just run it once, not forever so get the function that is
# decorated with @do_forever
main.retry_hung_processor_jobs.__wrapped__()
mock_send_job.assert_called_once()
self.assertEqual(len(mock_send_job.mock_calls), 1)

jobs = ProcessorJob.objects.order_by('id')
original_job = jobs[0]
Expand All @@ -273,16 +288,21 @@ def test_retrying_hung_processor_jobs(self, mock_send_job):
self.assertEqual(retried_job.num_retries, 1)

@patch('data_refinery_foreman.foreman.main.send_job')
def test_retrying_lost_processor_jobs(self, mock_send_job):
@patch('data_refinery_foreman.foreman.main.Nomad')
def test_retrying_lost_processor_jobs(self, mock_nomad, mock_send_job):
mock_nomad.job = MagicMock()
mock_nomad.job.get_job = MagicMock()
mock_nomad.job.get_job.side_effect = lambda _: {"Status:" "dead"}

job = self.create_processor_job()
job.created_at = timezone.now() - main.MAX_QUEUE_TIME - timedelta(seconds=1)
job.created_at = timezone.now()
job.save()

# Just run it once, not forever so get the function that is
# decorated with @do_forever
main.retry_lost_processor_jobs.__wrapped__()

mock_send_job.assert_called_once()
self.assertEqual(len(mock_send_job.mock_calls), 1)

jobs = ProcessorJob.objects.order_by('id')
original_job = jobs[0]
Expand Down
2 changes: 1 addition & 1 deletion foreman/data_refinery_foreman/surveyor/test_surveyor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_calls_survey(self, survey_method):
job.save()
surveyor.run_job(job)

survey_method.assert_called()
self.assertEqual(len(survey_method.mock_calls), 1)
self.assertIsInstance(job.replication_ended_at, datetime.datetime)
self.assertIsInstance(job.start_time, datetime.datetime)
self.assertIsInstance(job.end_time, datetime.datetime)
Expand Down
19 changes: 15 additions & 4 deletions foreman/dockerfiles/Dockerfile.foreman
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
FROM python:3.6.1-slim
FROM ubuntu:16.04

RUN apt-get update -qq
RUN apt-get install -y software-properties-common
RUN add-apt-repository ppa:apt-fast/stable
RUN apt-get update -qq
RUN apt-get -y install apt-fast

RUN apt-fast update -qq && \
apt-fast install -y \
python3 \
python3-pip

RUN groupadd user && useradd --create-home --home-dir /home/user -g user user
WORKDIR /home/user

RUN pip install --upgrade pip
RUN pip3 install --upgrade pip

COPY config config

COPY foreman/requirements.txt .

# The base image does not have Python 2.X on it at all, so all calls
# to pip or python are by default calls to pip3 or python3
RUN pip install -r requirements.txt
RUN pip3 install -r requirements.txt

COPY common/dist/data-refinery-common-* common/

# Get the latest version from the dist directory.
RUN pip install \
RUN pip3 install \
common/$(ls common -1 | sort --version-sort | tail -1)

COPY foreman/ .
Expand Down
Loading

0 comments on commit 403ef27

Please sign in to comment.