Skip to content

Commit

Permalink
Merge pull request #119 from populationgenomics/revert-116-upstream
Browse files Browse the repository at this point in the history
Revert "Merge from Upstream"
  • Loading branch information
lgruen committed Jul 2, 2021
2 parents 8f019a8 + 7d3b7bc commit 42b611b
Show file tree
Hide file tree
Showing 620 changed files with 10,684 additions and 25,849 deletions.
2 changes: 0 additions & 2 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
# YAML formatting
1a861505c1fc2ea3c9d7b32a47be7af10d13907c
# black format services code
4fccbe2d18c6d2f4059036d61489467c780bbc0e
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ hs_err_pid*.log
*hail/python/hail/docs/tutorials/data*
*hail/python/hailtop/pipeline/docs/output*
.mypy_cache/
node_modules
*.out
GPATH
GRTAGS
Expand Down
21 changes: 14 additions & 7 deletions address/Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
include ../config.mk

TOKEN = $(shell cat /dev/urandom | LC_ALL=C tr -dc 'a-z0-9' | head -c 12)

ADDRESS_IMAGE := $(DOCKER_PREFIX)/address:$(TOKEN)
ADDRESS_LATEST = $(DOCKER_PREFIX)/address:latest
ADDRESS_IMAGE = $(DOCKER_PREFIX)/address:$(shell docker images -q --no-trunc address | sed -e 's,[^:]*:,,')

PYTHONPATH := $${PYTHONPATH:+$${PYTHONPATH}:}../hail/python:../gear:../web_common
PYTHON := PYTHONPATH=$(PYTHONPATH) python3
Expand All @@ -15,12 +14,20 @@ check:

.PHONY: build
build:
$(MAKE) -C ../docker service-base
python3 ../ci/jinja2_render.py '{"service_base_image":{"image":"'$$(cat ../docker/service-base-image-ref)'"}}' Dockerfile Dockerfile.out
../docker-build.sh .. address/Dockerfile.out $(ADDRESS_IMAGE)
$(MAKE) -C ../docker build
-docker pull $(ADDRESS_LATEST)
python3 ../ci/jinja2_render.py '{"service_base_image":{"image":"service-base"}}' Dockerfile Dockerfile.out
docker build -t address -f Dockerfile.out --cache-from address,$(ADDRESS_LATEST),service-base ..

.PHONY: push
push: build
docker tag address $(ADDRESS_LATEST)
docker push $(ADDRESS_LATEST)
docker tag address $(ADDRESS_IMAGE)
docker push $(ADDRESS_IMAGE)

.PHONY: deploy
deploy: build
deploy: push
! [ -z $(NAMESPACE) ] # call this like: make deploy NAMESPACE=default
python3 ../ci/jinja2_render.py '{"code":{"sha":"$(shell git rev-parse --short=12 HEAD)"},"deploy":$(DEPLOY),"address_image":{"image":"$(ADDRESS_IMAGE)"},"default_ns":{"name":"$(NAMESPACE)"},"global":{"project":"$(PROJECT)","domain":"$(DOMAIN)"}}' deployment.yaml deployment.yaml.out
kubectl -n $(NAMESPACE) apply -f service-account.yaml
Expand Down
6 changes: 4 additions & 2 deletions admin-pod/Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
include ../config.mk

SERVICE_BASE_IMAGE = $(DOCKER_PREFIX)/service-base:$(shell docker images -q --no-trunc service-base:latest | sed -e 's,[^:]*:,,')

.PHONY: deploy
deploy:
! [ -z $(NAMESPACE) ] # call this like: make deploy NAMESPACE=default
$(MAKE) -C ../docker service-base
python3 ../ci/jinja2_render.py '{"deploy":$(DEPLOY),"service_base_image":{"image":"'$$(cat ../docker/service-base-image-ref)'"}}' admin-pod.yaml admin-pod.yaml.out
$(MAKE) -C ../docker push
python3 ../ci/jinja2_render.py '{"service_base_image":{"image":"$(SERVICE_BASE_IMAGE)"}}' admin-pod.yaml admin-pod.yaml.out
kubectl -n $(NAMESPACE) apply -f admin-pod.yaml.out
21 changes: 14 additions & 7 deletions auth/Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
include ../config.mk

TOKEN = $(shell cat /dev/urandom | LC_ALL=C tr -dc 'a-z0-9' | head -c 12)

AUTH_IMAGE := $(DOCKER_PREFIX)/auth:$(TOKEN)
AUTH_LATEST = $(DOCKER_PREFIX)/auth:latest
AUTH_IMAGE = $(DOCKER_PREFIX)/auth:$(shell docker images -q --no-trunc auth:latest | sed -e 's,[^:]*:,,')

EXTRA_PYTHONPATH := ../hail/python:../gear:../web_common
PYTHON := PYTHONPATH=$${PYTHONPATH:+$${PYTHONPATH}:}$(EXTRA_PYTHONPATH) python3
Expand All @@ -16,12 +15,20 @@ check:

.PHONY: build
build:
$(MAKE) -C ../docker service-base
python3 ../ci/jinja2_render.py '{"service_base_image":{"image":"'$$(cat ../docker/service-base-image-ref)'"}}' Dockerfile Dockerfile.out
../docker-build.sh .. auth/Dockerfile.out $(AUTH_IMAGE)
$(MAKE) -C ../docker build
-docker pull $(AUTH_LATEST)
python3 ../ci/jinja2_render.py '{"service_base_image":{"image":"service-base"}}' Dockerfile Dockerfile.out
docker build -f Dockerfile.out -t auth --cache-from auth,$(AUTH_LATEST),base ..

.PHONY: push
push: build
docker tag auth $(AUTH_LATEST)
docker push $(AUTH_LATEST)
docker tag auth $(AUTH_IMAGE)
docker push $(AUTH_IMAGE)

.PHONY: deploy
deploy: build
deploy: push
! [ -z $(NAMESPACE) ] # call this like: make deploy NAMESPACE=default
kubectl -n $(NAMESPACE) apply -f auth-driver-service-account.yaml
python3 ../ci/jinja2_render.py '{"code":{"sha":"$(shell git rev-parse --short=12 HEAD)"},"deploy":$(DEPLOY),"default_ns":{"name":"$(NAMESPACE)"},"auth_image":{"image":"$(AUTH_IMAGE)"},"auth_database":{"user_secret_name":"sql-auth-user-config"},"global":{"project":"$(PROJECT)","zone":"$(ZONE)","domain":"$(DOMAIN)"}}' deployment.yaml deployment.yaml.out
Expand Down
37 changes: 27 additions & 10 deletions batch/Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
include ../config.mk

TOKEN = $(shell cat /dev/urandom | LC_ALL=C tr -dc 'a-z0-9' | head -c 12)
BATCH_LATEST = $(DOCKER_PREFIX)/batch:latest
BATCH_IMAGE = $(DOCKER_PREFIX)/batch:$(shell docker images -q --no-trunc batch | sed -e 's,[^:]*:,,')

BATCH_IMAGE := $(DOCKER_PREFIX)/batch:$(TOKEN)
BATCH_WORKER_IMAGE := $(DOCKER_PREFIX)/batch-worker:$(TOKEN)
BATCH_WORKER_LATEST = $(DOCKER_PREFIX)/batch-worker:latest
BATCH_WORKER_IMAGE = $(DOCKER_PREFIX)/batch-worker:$(shell docker images -q --no-trunc batch-worker | sed -e 's,[^:]*:,,')

EXTRA_PYTHONPATH := ../hail/python:../gear:../web_common
PYTHON := PYTHONPATH=$${PYTHONPATH:+$${PYTHONPATH}:}$(EXTRA_PYTHONPATH) python3
Expand All @@ -15,24 +16,40 @@ check:
curlylint .
bash ../check-sql.sh

.PHONY: build-prereqs
build-prereqs:
$(MAKE) -C ../docker build

.PHONY: build-batch
build-batch:
$(MAKE) -C ../docker service-base
python3 ../ci/jinja2_render.py '{"service_base_image":{"image":"'$$(cat ../docker/service-base-image-ref)'"}}' Dockerfile Dockerfile.out
../docker-build.sh . Dockerfile.out $(BATCH_IMAGE)
build-batch: build-prereqs
-docker pull $(BATCH_LATEST)
python3 ../ci/jinja2_render.py '{"service_base_image":{"image":"service-base"}}' Dockerfile Dockerfile.out
docker build -t batch -f Dockerfile.out --cache-from batch,$(BATCH_LATEST),service-base .

.PHONY: build-worker
build-worker: src/main/java/is/hail/JVMEntryway.class jars/junixsocket-selftest-2.3.3-jar-with-dependencies.jar
build-worker: build-prereqs
-docker pull $(BATCH_WORKER_LATEST)
python3 ../ci/jinja2_render.py '{"global":{"docker_prefix":"$(DOCKER_PREFIX)"}}' Dockerfile.worker Dockerfile.worker.out
../docker-build.sh .. batch/Dockerfile.worker.out $(BATCH_WORKER_IMAGE)
docker build -t batch-worker -f Dockerfile.worker.out --cache-from batch-worker,$(BATCH_WORKER_LATEST),service-base ..

.PHONY: build
build: build-batch build-worker

.PHONY: push
push: build
docker tag batch $(BATCH_LATEST)
docker push $(BATCH_LATEST)
docker tag batch $(BATCH_IMAGE)
docker push $(BATCH_IMAGE)
docker tag batch-worker $(BATCH_WORKER_LATEST)
docker push $(BATCH_WORKER_LATEST)
docker tag batch-worker $(BATCH_WORKER_IMAGE)
docker push $(BATCH_WORKER_IMAGE)

JINJA_ENVIRONMENT = '{"code":{"sha":"$(shell git rev-parse --short=12 HEAD)"},"deploy":$(DEPLOY),"batch_image":{"image":"$(BATCH_IMAGE)"},"batch_worker_image":{"image":"$(BATCH_WORKER_IMAGE)"},"default_ns":{"name":"$(NAMESPACE)"},"batch_database":{"user_secret_name":"sql-batch-user-config"},"global":{"project":"$(PROJECT)","domain":"$(DOMAIN)","k8s_server_url":"$(KUBERNETES_SERVER_URL)","docker_prefix":"$(DOCKER_PREFIX)","docker_root_image":"$(DOCKER_ROOT_IMAGE)"},"scope":"$(SCOPE)"}'

.PHONY: deploy
deploy: build
deploy: push
! [ -z $(NAMESPACE) ] # call this like: make deploy NAMESPACE=default
E=$(JINJA_ENVIRONMENT) && \
python3 ../ci/jinja2_render.py $$E deployment.yaml deployment.yaml.out && \
Expand Down
1 change: 0 additions & 1 deletion batch/batch/batch_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
REFRESH_INTERVAL_IN_SECONDS = int(os.environ.get('REFRESH_INTERVAL_IN_SECONDS', 5 * 60))
DEFAULT_NAMESPACE = os.environ['HAIL_DEFAULT_NAMESPACE']
PROJECT = os.environ['PROJECT']
SCOPE = os.environ['HAIL_SCOPE']

GCP_REGION = os.environ['HAIL_GCP_REGION']
GCP_ZONE = os.environ['HAIL_GCP_ZONE']
Expand Down
12 changes: 11 additions & 1 deletion batch/batch/driver/create_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,15 @@ async def create_instance(
sudo mkdir -p /mnt/disks/$WORKER_DATA_DISK_NAME/gcsfuse/
sudo ln -s /mnt/disks/$WORKER_DATA_DISK_NAME/gcsfuse /gcsfuse
sudo mkdir -p /mnt/disks/$WORKER_DATA_DISK_NAME/xfsquota/
sudo ln -s /mnt/disks/$WORKER_DATA_DISK_NAME/xfsquota /xfsquota
touch /xfsquota/projects
touch /xfsquota/projid
ln -s /xfsquota/projects /etc/projects
ln -s /xfsquota/projid /etc/projid
export HOME=/root
CORES=$(nproc)
Expand Down Expand Up @@ -197,7 +206,7 @@ async def create_instance(
</source>
EOF
sudo tee /etc/google-fluentd/config.d/worker-log.conf <<EOF
sudo tee /etc/google-fluentd/config.d/worker-log.conf <<EOF {{
<source>
@type tail
format json
Expand Down Expand Up @@ -269,6 +278,7 @@ async def create_instance(
-v /batch:/batch:shared \
-v /logs:/logs \
-v /gcsfuse:/gcsfuse:shared \
-v /xfsquota:/xfsquota \
--mount type=bind,source=/mnt/disks/$WORKER_DATA_DISK_NAME,target=/host \
--mount type=bind,source=/dev,target=/dev,bind-propagation=rshared \
-p 5000:5000 \
Expand Down
50 changes: 15 additions & 35 deletions batch/batch/driver/gce.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import re
import json
import logging
import dateutil.parser
import datetime
import aiohttp

from gear import Database
from hailtop import aiotools, aiogoogle
from hailtop.utils import periodically_call, time_msecs
from hailtop.utils import periodically_call

from ..batch_configuration import PROJECT, DEFAULT_NAMESPACE
from .zone_monitor import ZoneMonitor
from .instance_collection_manager import InstanceCollectionManager
from ..utils import parse_timestamp_msecs

log = logging.getLogger('gce_event_monitor')

Expand Down Expand Up @@ -58,7 +58,7 @@ async def handle_event(self, event):
log.warning(f'event has no payload {json.dumps(event)}')
return

timestamp_msecs = parse_timestamp_msecs(event['timestamp'])
timestamp = dateutil.parser.isoparse(event['timestamp']).timestamp() * 1000

resource_type = event['resource']['type']
if resource_type != 'gce_instance':
Expand Down Expand Up @@ -101,16 +101,16 @@ async def handle_event(self, event):
log.error(f'event for unknown instance {name}: {json.dumps(event)}')
return

if event_subtype == 'compute.instances.preempted':
if event_subtype == 'v1.compute.instances.preempted':
log.info(f'event handler: handle preempt {instance}')
await self.handle_preempt_event(instance, timestamp_msecs)
await self.handle_preempt_event(instance, timestamp)
elif event_subtype == 'v1.compute.instances.delete':
if event_type == 'COMPLETED':
log.info(f'event handler: delete {instance} done')
await self.handle_delete_done_event(instance, timestamp_msecs)
await self.handle_delete_done_event(instance, timestamp)
elif event_type == 'STARTED':
log.info(f'event handler: handle call delete {instance}')
await self.handle_call_delete_event(instance, timestamp_msecs)
await self.handle_call_delete_event(instance, timestamp)

async def handle_events(self):
row = await self.db.select_and_fetchone('SELECT * FROM `gevents_mark`;')
Expand All @@ -120,9 +120,7 @@ async def handle_events(self):
await self.db.execute_update('UPDATE `gevents_mark` SET mark = %s;', (mark,))

filter = f'''
(logName="projects/{PROJECT}/logs/cloudaudit.googleapis.com%2Factivity" OR
logName="projects/{PROJECT}/logs/cloudaudit.googleapis.com%2Fsystem_event"
) AND
logName="projects/{PROJECT}/logs/cloudaudit.googleapis.com%2Factivity" AND
resource.type=gce_instance AND
protoPayload.resourceName:"{self.machine_name_prefix}" AND
timestamp >= "{mark}"
Expand Down Expand Up @@ -154,34 +152,16 @@ async def delete_orphaned_disks(self):
params = {'filter': f'(labels.namespace = {DEFAULT_NAMESPACE})'}

for zone in self.zone_monitor.zones:
log.info(f'deleting orphaned disks for zone {zone}')
async for disk in await self.compute_client.list(f'/zones/{zone}/disks', params=params):
disk_name = disk['name']
instance_name = disk['labels']['instance-name']
instance = self.inst_coll_manager.get_instance(instance_name)

creation_timestamp_msecs = parse_timestamp_msecs(disk.get('creationTimestamp'))
last_attach_timestamp_msecs = parse_timestamp_msecs(disk.get('lastAttachTimestamp'))
last_detach_timestamp_msecs = parse_timestamp_msecs(disk.get('lastDetachTimestamp'))

now_msecs = time_msecs()
if instance is None:
log.exception(f'deleting disk {disk_name} from instance that no longer exists')
elif (last_attach_timestamp_msecs is None
and now_msecs - creation_timestamp_msecs > 10 * 60 * 1000):
log.exception(f'deleting disk {disk_name} that has not attached within 10 minutes')
elif (last_detach_timestamp_msecs is not None
and now_msecs - last_detach_timestamp_msecs > 5 * 60 * 1000):
log.exception(f'deleting detached disk {disk_name} that has not been cleaned up within 5 minutes')
else:
continue

try:
await self.compute_client.delete_disk(f'/zones/{zone}/disks/{disk_name}')
except aiohttp.ClientResponseError as e:
if e.status == 404:
continue
log.exception(f'error while deleting orphaned disk {disk_name}')
try:
await self.compute_client.delete_disk(f'/zones/{zone}/disks/{disk["name"]}')
except aiohttp.ClientResponseError as e:
if e.status == 404:
continue
raise

async def delete_orphaned_disks_loop(self):
await periodically_call(15, self.delete_orphaned_disks)
await periodically_call(300, self.delete_orphaned_disks)
28 changes: 6 additions & 22 deletions batch/batch/driver/instance_collection.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import asyncio
import aiohttp
import sortedcontainers
import logging
import dateutil.parser
import collections
from typing import Dict

from hailtop.utils import time_msecs, secret_alnum_string, periodically_call
Expand All @@ -28,7 +26,6 @@ def __init__(self, app, name, machine_name_prefix, is_pool):
self.is_pool = is_pool

self.name_instance: Dict[str, Instance] = {}
self.live_free_cores_mcpu_by_zone: Dict[str, int] = collections.defaultdict(int)

self.instances_by_last_updated = sortedcontainers.SortedSet(key=lambda instance: instance.last_updated)

Expand Down Expand Up @@ -73,7 +70,6 @@ def adjust_for_remove_instance(self, instance):
if instance.state in ('pending', 'active'):
self.live_free_cores_mcpu -= max(0, instance.free_cores_mcpu)
self.live_total_cores_mcpu -= instance.cores_mcpu
self.live_free_cores_mcpu_by_zone[instance.zone] -= max(0, instance.free_cores_mcpu)

async def remove_instance(self, instance, reason, timestamp=None):
await instance.deactivate(reason, timestamp)
Expand All @@ -92,7 +88,6 @@ def adjust_for_add_instance(self, instance):
if instance.state in ('pending', 'active'):
self.live_free_cores_mcpu += max(0, instance.free_cores_mcpu)
self.live_total_cores_mcpu += instance.cores_mcpu
self.live_free_cores_mcpu_by_zone[instance.zone] += max(0, instance.free_cores_mcpu)

def add_instance(self, instance):
assert instance.name not in self.name_instance
Expand Down Expand Up @@ -128,13 +123,6 @@ async def check_on_instance(self, instance):
return
raise

if (instance.state == 'active'
and instance.failed_request_count > 5
and time_msecs() - instance.last_updated > 5 * 60 * 1000):
log.exception(f'deleting {instance} with {instance.failed_request_count} failed request counts after more than 5 minutes')
await self.call_delete_instance(instance, 'not_responding')
return

# PROVISIONING, STAGING, RUNNING, STOPPING, TERMINATED
gce_state = spec['status']

Expand Down Expand Up @@ -169,16 +157,12 @@ async def check_on_instance(self, instance):

async def monitor_instances(self):
if self.instances_by_last_updated:
# [:50] are the fifty smallest (oldest)
instances = self.instances_by_last_updated[:50]

async def check(instance):
since_last_updated = time_msecs() - instance.last_updated
if since_last_updated > 60 * 1000:
log.info(f'checking on {instance}, last updated {since_last_updated / 1000}s ago')
await self.check_on_instance(instance)

await asyncio.gather(*[check(instance) for instance in instances])
# 0 is the smallest (oldest)
instance = self.instances_by_last_updated[0]
since_last_updated = time_msecs() - instance.last_updated
if since_last_updated > 60 * 1000:
log.info(f'checking on {instance}, last updated {since_last_updated / 1000}s ago')
await self.check_on_instance(instance)

async def monitor_instances_loop(self):
await periodically_call(1, self.monitor_instances)
7 changes: 1 addition & 6 deletions batch/batch/driver/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,9 @@ async def make_request():
if instance.state in ('inactive', 'deleted'):
return
try:
async with aiohttp.ClientSession(raise_for_status=True, timeout=aiohttp.ClientTimeout(total=5)) as session:
async with aiohttp.ClientSession(raise_for_status=True, timeout=aiohttp.ClientTimeout(total=60)) as session:
await session.delete(url)
await instance.mark_healthy()
except asyncio.TimeoutError:
await instance.incr_failed_request_count()
return
except aiohttp.ClientResponseError as err:
if err.status == 404:
await instance.mark_healthy()
Expand Down Expand Up @@ -428,8 +425,6 @@ async def schedule_job(app, record, instance):
await instance.mark_healthy()
if e.status == 403:
log.info(f'attempt already exists for job {id} on {instance}, aborting')
if e.status == 503:
log.info(f'job {id} cannot be scheduled because {instance} is shutting down, aborting')
raise e
except Exception:
await instance.incr_failed_request_count()
Expand Down

0 comments on commit 42b611b

Please sign in to comment.