Skip to content
This repository was archived by the owner on Apr 14, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.tox
113 changes: 86 additions & 27 deletions manager/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from contextlib import contextmanager
import sys

import boto3
from botocore import waiter as boto_waiter
import click

Expand Down Expand Up @@ -32,7 +33,8 @@ def main(ctx, cluster, scheduler, worker, web):
The tool only schedules a task to be run. Check the AWS console to see
if the container ran successfully.
"""
cluster = Cluster(cluster, scheduler, worker, web)
ecs = boto3.client('ecs')
cluster = Cluster(cluster, scheduler, worker, web, ecs)
ctx.ensure_object(dict)
ctx.obj['cluster'] = cluster

Expand Down Expand Up @@ -124,28 +126,22 @@ def initialize(ctx):
@main.command()
@click.confirmation_option(prompt='Are you sure you want to redeploy the '
'selected cluster?')
@click.option('--wait/--no-wait', default=True,
help='Wait until the cluster has fully restarted before '
'exiting. This is the default behavior.')
@click.pass_context
def redeploy(ctx, wait):
def redeploy(ctx):
"""Redeploy Airflow cluster.

This will perform a full redeploy of the Airflow cluster. First, all
containers in all services are stopped. Then, all containers in all
services are restarted with a --force-new-deployment. A redeploy
should be done when new workflows are added. This is the only way to
sync the workflows across all containers.

*IMPORTANT* You shouldn't try to force redeploy a single service as race
conditions could arise. Always use this command to redeploy as it will
ensure the system remains in a consistent state.
This will perform a full redeploy of the Airflow cluster. The cluster
has to be brought down and back up in a specific order or otherwise
race conditions could arise. First, the scheduler is stopped. Then,
the web and worker services are redeployed. Finally, the scheduler is
started back up with the new deployment.

By default, the command will wait until the cluster has been fully
stopped and fully restarted until exiting. The --no-wait flag can be
used to exit as soon as the ECS request to scale back up has been made.
*IMPORTANT* Always use this command to redeploy as it will ensure the
system remains in a consistent state. It should be safe to run again
even if it has previously failed partway through due to, for example,
a network failure.

It may take several minutes for the cluster to fully restart. Be patient.
It will take several minutes for the cluster to fully restart. Be patient.

\b
Example usage:
Expand All @@ -157,23 +153,86 @@ def redeploy(ctx, wait):
ecs_model, cluster.ecs)
start_waiter = cluster.ecs.get_waiter('services_stable')
with check_task():
cluster.stop()
click.echo('Stopping cluster...', nl=False)
stop_waiter.wait(cluster=cluster.name, services=cluster.services)
cluster.stop(services=[cluster.scheduler])
click.echo('Stopping scheduler.......', nl=False)
stop_waiter.wait(cluster=cluster.name, services=[cluster.scheduler])
click.echo("\033[1A")
ok = click.style('OK', fg='green')
click.echo(f'Stopping scheduler.......{ok}')

with check_task():
cluster.start(services=[cluster.worker, cluster.web])
click.echo('Redeploying web/worker...', nl=False)
start_waiter.wait(cluster=cluster.name,
services=[cluster.web, cluster.worker])
click.echo("\033[1A")
ok = click.style('OK', fg='green')
click.echo(f'Redeploying web/worker...{ok}')

with check_task():
cluster.start(services=[cluster.scheduler])
click.echo('Starting scheduler.......', nl=False)
start_waiter.wait(cluster=cluster.name, services=[cluster.scheduler])
click.echo("\033[1A")
ok = click.style('OK', fg='green')
click.echo(f'Starting scheduler.......{ok}')


@main.command()
@click.confirmation_option(prompt='Are you sure you want to stop the '
'scheduler?')
@click.option('--wait/--no-wait', default=True,
help='Wait for the scheduler to stop. This is the default '
'behavior.')
@click.pass_context
def stop_scheduler(ctx, wait):
"""Stop the scheduler service.

Only one Airflow scheduler can be running at a time. During a new
service redeploy through Terraform this would result in a brief period
where two schedulers are running simultaneously. The process for
deploying a new service should be to use this command to stop the
scheduler, then deploy the new service through Terraform, then run the
start-scheduler command.
"""
cluster = ctx.obj['cluster']
stop_waiter = boto_waiter.create_waiter_with_client('ServiceDrained',
ecs_model, cluster.ecs)
with check_task():
cluster.stop(services=[cluster.scheduler])
click.echo('Stopping scheduler...', nl=False)
if not wait:
return
stop_waiter.wait(cluster=cluster.name, services=[cluster.scheduler])
click.echo("\033[1A")
ok = click.style('OK', fg='green')
click.echo(f'Stopping cluster...{ok}')
click.echo(f'Stopping scheduler...{ok}')


@main.command()
@click.option('--wait/--no-wait', default=True,
help='Wait for scheduler to start. This is the default '
'behavior.')
@click.pass_context
def start_scheduler(ctx, wait):
"""Start the scheduler service.

This should only be necessary after running the stop-scheduler
command, though it should be safe to run even if the scheduler is
already running. This simply sets the desiredCount of the scheduler
service to 1.
"""
cluster = ctx.obj['cluster']
start_waiter = cluster.ecs.get_waiter('services_stable')
with check_task():
cluster.start()
cluster.start(services=[cluster.scheduler])
click.echo('Starting scheduler...', nl=False)
if not wait:
click.echo('Starting cluster...')
return
click.echo('Starting cluster...', nl=False)
start_waiter.wait(cluster=cluster.name, services=cluster.services)
start_waiter.wait(cluster=cluster.name, services=[cluster.scheduler])
click.echo("\033[1A")
ok = click.style('OK', fg='green')
click.echo(f'Starting cluster...{ok}')
click.echo(f'Starting scheduler...{ok}')


@contextmanager
Expand Down
44 changes: 16 additions & 28 deletions manager/cluster.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import boto3
from botocore import waiter
import jmespath

Expand All @@ -24,11 +23,12 @@


class Cluster:
def __init__(self, cluster, scheduler, worker, web):
def __init__(self, cluster, scheduler, worker, web, client):
self.name = cluster
self.scheduler = scheduler
self.worker = worker
self.web = web
self.ecs = client

def run_task(self, overrides):
default_overrides = {'name': self.worker}
Expand All @@ -43,34 +43,22 @@ def run_task(self, overrides):
networkConfiguration=self.__worker['networkConfiguration'])
return resp['tasks'][0]['taskArn']

def stop(self):
# This needs to be set before stopping everything, because there
# will be a variable number of workers. We need to know how many
# to restart. This is a bit of hack because the value really needs
# to come from the Terraform config.
# TODO: Find a better way to handle this.
self.num_workers = self.__worker['desiredCount']
for service in self.services:
self.ecs.update_service(cluster=self.name, service=service,
def stop(self, services=None):
if services is None:
services = self.services
for service in services:
self.ecs.update_service(cluster=self.name,
service=service,
desiredCount=0)

def start(self):
self.ecs.update_service(cluster=self.name,
service=self.scheduler,
desiredCount=1,
forceNewDeployment=True)
self.ecs.update_service(cluster=self.name,
service=self.web,
desiredCount=1,
forceNewDeployment=True)
self.ecs.update_service(cluster=self.name,
service=self.worker,
desiredCount=self.num_workers,
forceNewDeployment=True)

@property
def ecs(self):
return boto3.client('ecs')
def start(self, services=None):
if services is None:
services = self.services
for service in services:
self.ecs.update_service(cluster=self.name,
service=service,
desiredCount=1,
forceNewDeployment=True)

@property
def services(self):
Expand Down
16 changes: 11 additions & 5 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,17 @@ def test_initializes_cluster(cluster, cluster_opts):
assert command == ['initdb']


def test_redeploys_cluster(cluster, cluster_opts):
def test_stops_scheduler(cluster, cluster_opts):
res = CliRunner().invoke(main, [
*cluster_opts,
'redeploy',
'--yes',
'stop-scheduler',
'--yes'])
assert 'Stopping scheduler...OK' in res.output


def test_starts_scheduler(cluster, cluster_opts):
res = CliRunner().invoke(main, [
*cluster_opts,
'start-scheduler',
'--no-wait'])
assert 'Stopping cluster...OK' in res.output
assert 'Starting cluster...' in res.output
assert 'Starting scheduler...' in res.output
39 changes: 29 additions & 10 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,50 @@


def test_run_task_returns_taskarn(cluster):
c = Cluster('airflow-test', 'scheduler', 'worker', 'web')
ecs = boto3.client('ecs')
c = Cluster('airflow-test', 'scheduler', 'worker', 'web', ecs)
assert c.run_task({'command': ['run_things']})\
.startswith('arn:aws:ecs:us-east-1:012345678910:task/')


def test_stop_sets_desired_count_to_zero(cluster):
ecs = boto3.client('ecs')
Cluster(*cluster).stop()
Cluster(*cluster, ecs).stop()
res = ecs.describe_services(cluster=cluster.name, services=cluster[1:])
for service in res['services']:
assert service['desiredCount'] == 0
assert jmespath.search('services[].desiredCount', res) == [0, 0, 0]


def test_start_sets_resets_desired_count(cluster):
def test_stop_stops_specified_service(cluster):
ecs = boto3.client('ecs')
c = Cluster(*cluster)
c = Cluster(*cluster, ecs)
c.stop(services=[c.scheduler])
res = ecs.describe_services(cluster=cluster.name, services=cluster[1:])
assert jmespath.search(
f"services[?serviceName=='{cluster.scheduler}'].desiredCount|[0]",
res) == 0
assert jmespath.search(
f"services[?serviceName=='{cluster.web}'].desiredCount|[0]",
res) == 1


def test_start_sets_desired_count(cluster):
ecs = boto3.client('ecs')
c = Cluster(*cluster, ecs)
c.stop()
c.start()
res = ecs.describe_services(cluster=cluster.name, services=cluster[1:])
assert jmespath.search(
f"services[?serviceName=='{cluster.worker}'].desiredCount|[0]",
res) == 3
assert jmespath.search('services[].desiredCount', res) == [1, 1, 1]


def test_start_starts_specified_service(cluster):
ecs = boto3.client('ecs')
c = Cluster(*cluster, ecs)
c.stop()
c.start(services=[c.scheduler])
res = ecs.describe_services(cluster=cluster.name, services=cluster[1:])
assert jmespath.search(
f"services[?serviceName=='{cluster.scheduler}'].desiredCount|[0]",
res) == 1
assert jmespath.search(
f"services[?serviceName=='{cluster.web}'].desiredCount|[0]",
res) == 1
res) == 0
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ commands =
pipenv run pytest tests {env:PYTEST_COV:} {posargs:--tb=short}

[testenv:flake8]
commands = pipenv run flake8 manager
commands = pipenv run flake8 manager tests

[testenv:safety]
commands = pipenv check
Expand Down