Skip to content

Commit

Permalink
[qob] Always send the git revision from the python service backend in…
Browse files Browse the repository at this point in the history
…stead of jar_url (#14541)

Instead of doing anything fancy with custom JAR urls, the hail python
client always just sends its git revision for QoB jobs. When the batch
front end receives the request, it resolves that git revision to either
a production JAR or a dev JAR under a `/dev/` directory.

Resolves #14539
  • Loading branch information
daniel-goldstein committed May 30, 2024
1 parent cb4bb0d commit 208d69b
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 64 deletions.
22 changes: 21 additions & 1 deletion batch/batch/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from gear.clients import get_cloud_async_fs
from gear.database import CallError
from gear.profiling import install_profiler_if_requested
from gear.time_limited_max_size_cache import TimeLimitedMaxSizeCache
from hailtop import aiotools, dictfix, httpx, uvloopx, version
from hailtop.auth import hail_credentials
from hailtop.batch_client.globals import MAX_JOB_GROUPS_DEPTH, ROOT_JOB_GROUP_ID
Expand Down Expand Up @@ -1161,7 +1162,10 @@ async def _create_jobs(
revision = spec['process']['jar_spec']['value']
assert_is_sha_1_hex_string(revision)
spec['process']['jar_spec']['type'] = 'jar_url'
spec['process']['jar_spec']['value'] = ACCEPTABLE_QUERY_JAR_URL_PREFIX + '/' + revision + '.jar'
value = await app[AppKeys.QOB_JAR_RESOLUTION_CACHE].lookup(revision)
if value is None:
raise web.HTTPBadRequest(reason=f'Could not find a JAR matching revision {revision}')
spec['process']['jar_spec']['value'] = value
else:
assert spec['process']['jar_spec']['type'] == 'jar_url'
jar_url = spec['process']['jar_spec']['value']
Expand Down Expand Up @@ -3431,6 +3435,10 @@ def log(self, request, response, time):
super().log(request, response, time)


class AppKeys(CommonAiohttpAppKeys):
QOB_JAR_RESOLUTION_CACHE = web.AppKey('qob_jar_resolution_cache', TimeLimitedMaxSizeCache[Tuple[str, str], str])


async def on_startup(app):
exit_stack = AsyncExitStack()
app['exit_stack'] = exit_stack
Expand Down Expand Up @@ -3491,6 +3499,18 @@ async def on_startup(app):
retry_long_running('delete_batch_loop', run_if_changed, delete_batch_state_changed, delete_batch_loop_body, app)
)

async def resolve_qob_jar_url(git_revision: str) -> Optional[str]:
production_url = ACCEPTABLE_QUERY_JAR_URL_PREFIX + '/' + git_revision + '.jar'
dev_url = ACCEPTABLE_QUERY_JAR_URL_PREFIX + '/dev/' + git_revision + '.jar'
for url in (production_url, dev_url):
if await fs.exists(url):
return url
return None

app[AppKeys.QOB_JAR_RESOLUTION_CACHE] = TimeLimitedMaxSizeCache(
resolve_qob_jar_url, int(1e10), 100, AppKeys.QOB_JAR_RESOLUTION_CACHE._name
)

app['task_manager'].ensure_future(periodically_call(5, _refresh, app))


Expand Down
2 changes: 0 additions & 2 deletions hail/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ pytest-qob: upload-qob-jar upload-remote-test-resources install-editable
cd python && \
HAIL_TEST_STORAGE_URI=$(TEST_STORAGE_URI) \
HAIL_QUERY_BACKEND=batch \
HAIL_QUERY_JAR_URL=$$(cat ../upload-qob-jar) \
HAIL_DEFAULT_NAMESPACE=$(NAMESPACE) \
HAIL_TEST_RESOURCES_DIR='$(CLOUD_HAIL_TEST_RESOURCES_DIR)' \
HAIL_DOCTEST_DATA_DIR='$(HAIL_DOCTEST_DATA_DIR)' \
Expand Down Expand Up @@ -367,7 +366,6 @@ install-editable: $(FAST_PYTHON_JAR) $(FAST_PYTHON_JAR_EXTRA_CLASSPATH)
install-for-qob: upload-qob-jar install-editable
! [ -z $(NAMESPACE) ] # call this like: make install-for-qob NAMESPACE=default
hailctl config set query/backend batch
hailctl config set query/jar_url $$(cat upload-qob-jar)
hailctl dev config set default_namespace $(NAMESPACE)

.PHONY: install
Expand Down
44 changes: 6 additions & 38 deletions hail/python/hail/backend/service_backend.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import abc
import asyncio
import logging
import math
Expand Down Expand Up @@ -67,34 +66,6 @@ async def read_str(strm: afs.ReadableStream) -> str:
return b.decode('utf-8')


class JarSpec(abc.ABC):
@abc.abstractmethod
def to_dict(self) -> Dict[str, str]:
raise NotImplementedError


class JarUrl(JarSpec):
def __init__(self, url):
self.url = url

def to_dict(self) -> Dict[str, str]:
return {'type': 'jar_url', 'value': self.url}

def __repr__(self):
return f'JarUrl({self.url})'


class GitRevision(JarSpec):
def __init__(self, revision):
self.revision = revision

def to_dict(self) -> Dict[str, str]:
return {'type': 'git_revision', 'value': self.revision}

def __repr__(self):
return f'GitRevision({self.revision})'


@dataclass
class SerializedIRFunction:
name: str
Expand Down Expand Up @@ -195,7 +166,6 @@ async def create(
disable_progress_bar: Optional[bool] = None,
remote_tmpdir: Optional[str] = None,
flags: Optional[Dict[str, str]] = None,
jar_url: Optional[str] = None,
driver_cores: Optional[Union[int, str]] = None,
driver_memory: Optional[str] = None,
worker_cores: Optional[Union[int, str]] = None,
Expand Down Expand Up @@ -229,9 +199,6 @@ async def create(
batch_attributes: Dict[str, str] = dict()
remote_tmpdir = get_remote_tmpdir('ServiceBackend', remote_tmpdir=remote_tmpdir)

jar_url = configuration_of(ConfigVariable.QUERY_JAR_URL, jar_url, None)
jar_spec = GitRevision(revision()) if jar_url is None else JarUrl(jar_url)

name_prefix = configuration_of(ConfigVariable.QUERY_NAME_PREFIX, name_prefix, '')
batch_attributes: Dict[str, str] = {
'hail-version': version(),
Expand Down Expand Up @@ -284,7 +251,6 @@ async def create(
disable_progress_bar=disable_progress_bar,
batch_attributes=batch_attributes,
remote_tmpdir=remote_tmpdir,
jar_spec=jar_spec,
driver_cores=driver_cores,
driver_memory=driver_memory,
worker_cores=worker_cores,
Expand All @@ -305,7 +271,6 @@ def __init__(
disable_progress_bar: bool,
batch_attributes: Dict[str, str],
remote_tmpdir: str,
jar_spec: JarSpec,
driver_cores: Optional[Union[int, str]],
driver_memory: Optional[str],
worker_cores: Optional[Union[int, str]],
Expand All @@ -323,7 +288,6 @@ def __init__(
self.batch_attributes = batch_attributes
self.remote_tmpdir = remote_tmpdir
self.flags: Dict[str, str] = {}
self.jar_spec = jar_spec
self.functions: List[IRFunction] = []
self._registered_ir_function_names: Set[str] = set()
self.driver_cores = driver_cores
Expand All @@ -343,7 +307,7 @@ def validate_file(self, uri: str) -> None:

def debug_info(self) -> Dict[str, Any]:
return {
'jar_spec': str(self.jar_spec),
'jar_spec': self.jar_spec,
'billing_project': self.billing_project,
'batch_attributes': self.batch_attributes,
'remote_tmpdir': self.remote_tmpdir,
Expand All @@ -359,6 +323,10 @@ def debug_info(self) -> Dict[str, Any]:
def fs(self) -> FS:
return self._sync_fs

@property
def jar_spec(self) -> dict:
return {'type': 'git_revision', 'value': revision()}

@property
def logger(self):
return log
Expand Down Expand Up @@ -410,7 +378,7 @@ async def _run_on_batch(
resources['storage'] = service_backend_config.storage

j = self._batch.create_jvm_job(
jar_spec=self.jar_spec.to_dict(),
jar_spec=self.jar_spec,
argv=[
ServiceBackend.DRIVER,
name,
Expand Down
3 changes: 0 additions & 3 deletions hail/python/hail/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,6 @@ def init_spark(
@typecheck(
billing_project=nullable(str),
remote_tmpdir=nullable(str),
jar_url=nullable(str),
log=nullable(str),
quiet=bool,
append=bool,
Expand All @@ -536,7 +535,6 @@ async def init_batch(
*,
billing_project: Optional[str] = None,
remote_tmpdir: Optional[str] = None,
jar_url: Optional[str] = None,
log: Optional[str] = None,
quiet: bool = False,
append: bool = False,
Expand All @@ -562,7 +560,6 @@ async def init_batch(
billing_project=billing_project,
remote_tmpdir=remote_tmpdir,
disable_progress_bar=disable_progress_bar,
jar_url=jar_url,
driver_cores=driver_cores,
driver_memory=driver_memory,
worker_cores=worker_cores,
Expand Down
1 change: 0 additions & 1 deletion hail/python/hailtop/config/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class ConfigVariable(str, Enum):
BATCH_BILLING_PROJECT = 'batch/billing_project'
BATCH_BACKEND = 'batch/backend'
QUERY_BACKEND = 'query/backend'
QUERY_JAR_URL = 'query/jar_url'
QUERY_BATCH_DRIVER_CORES = 'query/batch_driver_cores'
QUERY_BATCH_WORKER_CORES = 'query/batch_worker_cores'
QUERY_BATCH_DRIVER_MEMORY = 'query/batch_driver_memory'
Expand Down
7 changes: 0 additions & 7 deletions hail/python/hailtop/hailctl/config/config_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,6 @@ def config_variables():
'should be one of "local", "spark", or "batch"',
),
),
ConfigVariable.QUERY_JAR_URL: ConfigVariableInfo(
help_msg='Cloud storage URI to a Query JAR',
validation=(
RouterAsyncFS.valid_url,
'should be valid cloud storage URI such as gs://my-bucket/jars/sha.jar',
),
),
ConfigVariable.QUERY_BATCH_DRIVER_CORES: ConfigVariableInfo(
help_msg='Cores specification for the query driver',
validation=(
Expand Down
2 changes: 0 additions & 2 deletions hail/python/test/hailtop/hailctl/config/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def test_config_list_empty_config(runner: CliRunner):
('batch/regions', 'us-central1,us-east1'),
('batch/remote_tmpdir', 'gs://foo/bar'),
('query/backend', 'spark'),
('query/jar_url', 'gs://foo/bar.jar'),
('query/batch_driver_cores', '1'),
('query/batch_worker_cores', '1'),
('query/batch_driver_memory', '1Gi'),
Expand Down Expand Up @@ -113,7 +112,6 @@ def test_config_unset_unknown_name(runner: CliRunner):
('batch/billing_project', 'gs://foo/bar'),
('batch/remote_tmpdir', 'asdf://foo/bar'),
('query/backend', 'random_backend'),
('query/jar_url', 'bar://foo/bar.jar'),
('query/batch_driver_cores', 'a'),
('query/batch_worker_cores', 'b'),
('query/batch_driver_memory', '1bar'),
Expand Down
21 changes: 11 additions & 10 deletions hail/scripts/upload_qob_jar.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,21 @@ SHADOW_JAR=$3
PATH_FILE=$4


TOKEN=$(cat /dev/urandom 2> /dev/null | LC_ALL=C tr -dc 'a-z0-9' 2> /dev/null | head -c 12)
QUERY_STORAGE_URI=$(kubectl get secret global-config --template={{.data.query_storage_uri}} | base64 --decode)
TEST_STORAGE_URI=$(kubectl get secret global-config --template={{.data.test_storage_uri}} | base64 --decode)

if [[ "${NAMESPACE}" == "default" ]]; then
if [[ "${UPLOAD_RELEASE_JAR}" == "true" ]]; then
JAR_LOCATION="${QUERY_STORAGE_URI}/jars/${REVISION}.jar"
else
JAR_LOCATION="${QUERY_STORAGE_URI}/jars/$(whoami)/${TOKEN}/${REVISION}.jar"
fi
JAR_PREFIX=$(kubectl get secret global-config --template={{.data.query_storage_uri}} | base64 --decode)
else
JAR_LOCATION="${TEST_STORAGE_URI}/${NAMESPACE}/jars/${TOKEN}/${REVISION}.jar"
BUCKET=$(kubectl get secret global-config --template={{.data.test_storage_uri}} | base64 --decode)
JAR_PREFIX="${BUCKET}/${NAMESPACE}"
fi

if [[ "${UPLOAD_RELEASE_JAR}" == "true" ]]; then
JAR_DIR="jars"
else
JAR_DIR="jars/dev"
fi

JAR_LOCATION="${JAR_PREFIX}/${JAR_DIR}/${REVISION}.jar"

python3 -m hailtop.aiotools.copy \
-vvv \
'null' \
Expand Down

0 comments on commit 208d69b

Please sign in to comment.