Skip to content

Commit

Permalink
Merge branch 'hail-is:main' into dummy-coding
Browse files Browse the repository at this point in the history
  • Loading branch information
Will-Tyler committed Mar 8, 2024
2 parents 4926fff + 923cc55 commit c913cfd
Show file tree
Hide file tree
Showing 326 changed files with 11,101 additions and 5,143 deletions.
34 changes: 0 additions & 34 deletions .github/workflows/codeql-analysis.yml

This file was deleted.

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ wheel-container.tar
hail/python/hail/backend/extra_classpath
hail/python/hail/backend/hail.jar
hail/install-editable
.helix
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ CHECK_SERVICES_MODULES := $(patsubst %, check-%, $(SERVICES_MODULES))
SPECIAL_IMAGES := hail-ubuntu batch-worker letsencrypt

HAILGENETICS_IMAGES = $(foreach img,hail vep-grch37-85 vep-grch38-95,hailgenetics-$(img))
CI_IMAGES = ci-utils ci-buildkit base hail-run
CI_IMAGES = ci-utils hail-buildkit base hail-run
PRIVATE_REGISTRY_IMAGES = $(patsubst %, pushed-private-%-image, $(SPECIAL_IMAGES) $(SERVICES_PLUS_ADMIN_POD) $(CI_IMAGES) $(HAILGENETICS_IMAGES))

HAILTOP_VERSION := hail/python/hailtop/hail_version
Expand Down Expand Up @@ -44,14 +44,17 @@ check-hail-fast:
ruff format hail --diff
$(PYTHON) -m pyright hail/python/hailtop

ruff check hail/python/test/hailtop/batch
$(PYTHON) -m pyright hail/python/test/hailtop/batch

.PHONY: pylint-hailtop
pylint-hailtop:
# pylint on hail is still a work in progress
$(PYTHON) -m pylint --rcfile pylintrc hail/python/hailtop --score=n

.PHONY: check-hail
check-hail: check-hail-fast pylint-hailtop
cd hail && sh ./gradlew spotlessCheck
cd hail && sh millw __.checkFormat + __.fix --check

.PHONY: check-services
check-services: $(CHECK_SERVICES_MODULES)
Expand Down
24 changes: 14 additions & 10 deletions auth/auth/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import kubernetes_asyncio.client
import kubernetes_asyncio.client.rest
import kubernetes_asyncio.config
import uvloop
from aiohttp import web
from prometheus_async.aio.web import server_stats # type: ignore

Expand All @@ -33,11 +32,10 @@
from gear.auth import AIOHTTPHandler, get_session_id
from gear.cloud_config import get_global_config
from gear.profiling import install_profiler_if_requested
from hailtop import httpx
from hailtop import httpx, uvloopx
from hailtop.auth import AzureFlow, Flow, GoogleFlow, IdentityProvider
from hailtop.config import get_deploy_config
from hailtop.hail_logging import AccessLogger
from hailtop.tls import internal_server_ssl_context
from hailtop.utils import secret_alnum_string
from web_common import render_template, set_message, setup_aiohttp_jinja2, setup_common_static_routes

Expand All @@ -56,8 +54,6 @@

log = logging.getLogger('auth')

uvloop.install()

CLOUD = get_global_config()['cloud']
DEFAULT_NAMESPACE = os.environ['HAIL_DEFAULT_NAMESPACE']

Expand Down Expand Up @@ -814,13 +810,20 @@ class AppKeys:
HAILCTL_CLIENT_CONFIG = web.AppKey('hailctl_client_config', dict)
K8S_CLIENT = web.AppKey('k8s_client', kubernetes_asyncio.client.CoreV1Api)
K8S_CACHE = web.AppKey('k8s_cache', K8sCache)
EXIT_STACK = web.AppKey('exit_stack', AsyncExitStack)


async def on_startup(app):
exit_stack = AsyncExitStack()
app[AppKeys.EXIT_STACK] = exit_stack

db = Database()
await db.async_init(maxsize=50)
exit_stack.push_async_callback(db.async_close)
app[AppKeys.DB] = db

app[AppKeys.CLIENT_SESSION] = httpx.client_session()
exit_stack.push_async_callback(app[AppKeys.CLIENT_SESSION].close)

credentials_file = '/auth-oauth2-client-secret/client_secret.json'
if CLOUD == 'gcp':
Expand All @@ -834,14 +837,13 @@ async def on_startup(app):

kubernetes_asyncio.config.load_incluster_config()
app[AppKeys.K8S_CLIENT] = kubernetes_asyncio.client.CoreV1Api()
exit_stack.push_async_callback(app[AppKeys.K8S_CLIENT].api_client.rest_client.pool_manager.close)

app[AppKeys.K8S_CACHE] = K8sCache(app[AppKeys.K8S_CLIENT])


async def on_cleanup(app):
async with AsyncExitStack() as cleanup:
cleanup.push_async_callback(app[AppKeys.K8S_CLIENT].api_client.rest_client.pool_manager.close)
cleanup.push_async_callback(app[AppKeys.DB].async_close)
cleanup.push_async_callback(app[AppKeys.CLIENT_SESSION].close)
await app[AppKeys.EXIT_STACK].aclose()


class AuthAccessLogger(AccessLogger):
Expand Down Expand Up @@ -881,6 +883,8 @@ async def auth_check_csrf_token(request: web.Request, handler: AIOHTTPHandler):


def run():
uvloopx.install()

install_profiler_if_requested('auth')

app = web.Application(middlewares=[auth_check_csrf_token, monitor_endpoints_middleware])
Expand All @@ -900,5 +904,5 @@ def run():
host='0.0.0.0',
port=443,
access_log_class=AuthAccessLogger,
ssl_context=internal_server_ssl_context(),
ssl_context=deploy_config.server_ssl_context(),
)
128 changes: 90 additions & 38 deletions batch/batch/batch.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import json
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, cast

from gear import transaction
from hailtop.batch_client.types import CostBreakdownEntry, JobListEntryV1Alpha
from hailtop.batch_client.globals import ROOT_JOB_GROUP_ID
from hailtop.batch_client.types import CostBreakdownEntry, GetJobGroupResponseV1Alpha, JobListEntryV1Alpha
from hailtop.utils import humanize_timedelta_msecs, time_msecs_str

from .batch_format_version import BatchFormatVersion
from .exceptions import NonExistentBatchError, OpenBatchError
from .exceptions import NonExistentJobGroupError
from .utils import coalesce

log = logging.getLogger('batch')


def _maybe_time_msecs_str(t: Optional[int]) -> Optional[str]:
if t is not None:
return time_msecs_str(t)
return None


def cost_breakdown_to_dict(cost_breakdown: Dict[str, float]) -> List[CostBreakdownEntry]:
return [{'resource': resource, 'cost': cost} for resource, cost in cost_breakdown.items()]

Expand All @@ -30,14 +37,9 @@ def batch_record_to_dict(record: Dict[str, Any]) -> Dict[str, Any]:
else:
state = 'running'

def _time_msecs_str(t):
if t:
return time_msecs_str(t)
return None

time_created = _time_msecs_str(record['time_created'])
time_closed = _time_msecs_str(record['time_closed'])
time_completed = _time_msecs_str(record['time_completed'])
time_created = _maybe_time_msecs_str(record['time_created'])
time_closed = _maybe_time_msecs_str(record['time_closed'])
time_completed = _maybe_time_msecs_str(record['time_completed'])

if record['time_created'] and record['time_completed']:
duration_ms = record['time_completed'] - record['time_created']
Expand All @@ -49,7 +51,7 @@ def _time_msecs_str(t):
if record['cost_breakdown'] is not None:
record['cost_breakdown'] = cost_breakdown_to_dict(json.loads(record['cost_breakdown']))

d = {
batch_response = {
'id': record['id'],
'user': record['user'],
'billing_project': record['billing_project'],
Expand All @@ -74,9 +76,55 @@ def _time_msecs_str(t):

attributes = json.loads(record['attributes'])
if attributes:
d['attributes'] = attributes
batch_response['attributes'] = attributes

return batch_response


def job_group_record_to_dict(record: Dict[str, Any]) -> GetJobGroupResponseV1Alpha:
if record['n_failed'] > 0:
state = 'failure'
elif record['cancelled'] or record['n_cancelled'] > 0:
state = 'cancelled'
elif record['state'] == 'complete':
assert record['n_succeeded'] == record['n_jobs']
state = 'success'
else:
state = 'running'

return d
time_created = _maybe_time_msecs_str(record['time_created'])
time_completed = _maybe_time_msecs_str(record['time_completed'])

if record['time_created'] and record['time_completed']:
duration_ms = record['time_completed'] - record['time_created']
else:
duration_ms = None

if record['cost_breakdown'] is not None:
record['cost_breakdown'] = cost_breakdown_to_dict(json.loads(record['cost_breakdown']))

job_group_response = {
'batch_id': record['batch_id'],
'job_group_id': record['job_group_id'],
'state': state,
'complete': record['state'] == 'complete',
'n_jobs': record['n_jobs'],
'n_completed': record['n_completed'],
'n_succeeded': record['n_succeeded'],
'n_failed': record['n_failed'],
'n_cancelled': record['n_cancelled'],
'time_created': time_created,
'time_completed': time_completed,
'duration': duration_ms,
'cost': coalesce(record['cost'], 0),
'cost_breakdown': record['cost_breakdown'],
}

attributes = json.loads(record['attributes'])
if attributes:
job_group_response['attributes'] = attributes

return cast(GetJobGroupResponseV1Alpha, job_group_response)


def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> JobListEntryV1Alpha:
Expand All @@ -93,38 +141,42 @@ def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> JobListEn
if record['cost_breakdown'] is not None:
record['cost_breakdown'] = cost_breakdown_to_dict(json.loads(record['cost_breakdown']))

return {
'batch_id': record['batch_id'],
'job_id': record['job_id'],
'name': name,
'user': record['user'],
'billing_project': record['billing_project'],
'state': record['state'],
'exit_code': exit_code,
'duration': duration,
'cost': coalesce(record['cost'], 0),
'msec_mcpu': record['msec_mcpu'],
'cost_breakdown': record['cost_breakdown'],
}


async def cancel_batch_in_db(db, batch_id):
return cast(
JobListEntryV1Alpha,
{
'batch_id': record['batch_id'],
'job_id': record['job_id'],
'name': name,
'user': record['user'],
'billing_project': record['billing_project'],
'state': record['state'],
'exit_code': exit_code,
'duration': duration,
'cost': coalesce(record['cost'], 0),
'msec_mcpu': record['msec_mcpu'],
'cost_breakdown': record['cost_breakdown'],
},
)


async def cancel_job_group_in_db(db, batch_id, job_group_id):
@transaction(db)
async def cancel(tx):
record = await tx.execute_and_fetchone(
"""
SELECT `state` FROM batches
WHERE id = %s AND NOT deleted
SELECT 1
FROM job_groups
LEFT JOIN batches ON batches.id = job_groups.batch_id
LEFT JOIN batch_updates ON job_groups.batch_id = batch_updates.batch_id AND
job_groups.update_id = batch_updates.update_id
WHERE job_groups.batch_id = %s AND job_groups.job_group_id = %s AND NOT deleted AND (batch_updates.committed OR job_groups.job_group_id = %s)
FOR UPDATE;
""",
(batch_id,),
(batch_id, job_group_id, ROOT_JOB_GROUP_ID),
)
if not record:
raise NonExistentBatchError(batch_id)

if record['state'] == 'open':
raise OpenBatchError(batch_id)
raise NonExistentJobGroupError(batch_id, job_group_id)

await tx.just_execute('CALL cancel_batch(%s);', (batch_id,))
await tx.just_execute('CALL cancel_job_group(%s, %s);', (batch_id, job_group_id))

await cancel()
1 change: 0 additions & 1 deletion batch/batch/cloud/azure/driver/create_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ def create_vm_config(
-v /sys/fs/cgroup:/sys/fs/cgroup \
--mount type=bind,source=/mnt/disks/$WORKER_DATA_DISK_NAME,target=/host \
--mount type=bind,source=/dev,target=/dev,bind-propagation=rshared \
-p 5000:5000 \
--device /dev/fuse \
--device $XFS_DEVICE \
--device /dev \
Expand Down
5 changes: 5 additions & 0 deletions batch/batch/cloud/driver.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import os

from gear import Database
from gear.cloud_config import get_global_config

from ..driver.driver import CloudDriver
from ..inst_coll_config import InstanceCollectionConfigs
from .azure.driver.driver import AzureDriver
from .gcp.driver.driver import GCPDriver
from .terra.azure.driver.driver import TerraAzureDriver


async def get_cloud_driver(
Expand All @@ -17,6 +20,8 @@ async def get_cloud_driver(
cloud = get_global_config()['cloud']

if cloud == 'azure':
if os.environ.get('HAIL_TERRA'):
return await TerraAzureDriver.create(app, db, machine_name_prefix, namespace, inst_coll_configs)
return await AzureDriver.create(app, db, machine_name_prefix, namespace, inst_coll_configs)

assert cloud == 'gcp', cloud
Expand Down
1 change: 0 additions & 1 deletion batch/batch/cloud/gcp/driver/create_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ def scheduling() -> dict:
-v /sys/fs/cgroup:/sys/fs/cgroup \
--mount type=bind,source=/mnt/disks/$WORKER_DATA_DISK_NAME,target=/host \
--mount type=bind,source=/dev,target=/dev,bind-propagation=rshared \
-p 5000:5000 \
--device /dev/fuse \
--device $XFS_DEVICE \
--device /dev \
Expand Down
Empty file.
Empty file.
Empty file.
Loading

0 comments on commit c913cfd

Please sign in to comment.