Skip to content

Commit

Permalink
[qob] Always send the git revision from the python service backend in…
Browse files Browse the repository at this point in the history
…stead of jar_url
  • Loading branch information
daniel-goldstein committed May 9, 2024
1 parent fc47294 commit 631839a
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 63 deletions.
22 changes: 21 additions & 1 deletion batch/batch/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from gear.clients import get_cloud_async_fs
from gear.database import CallError
from gear.profiling import install_profiler_if_requested
from gear.time_limited_max_size_cache import TimeLimitedMaxSizeCache
from hailtop import aiotools, dictfix, httpx, uvloopx, version
from hailtop.auth import hail_credentials
from hailtop.batch_client.globals import MAX_JOB_GROUPS_DEPTH, ROOT_JOB_GROUP_ID
Expand Down Expand Up @@ -1156,7 +1157,10 @@ async def _create_jobs(
revision = spec['process']['jar_spec']['value']
assert_is_sha_1_hex_string(revision)
spec['process']['jar_spec']['type'] = 'jar_url'
spec['process']['jar_spec']['value'] = ACCEPTABLE_QUERY_JAR_URL_PREFIX + '/' + revision + '.jar'
value = await app[AppKeys.QOB_JAR_RESOLUTION_CACHE].lookup(revision)
if value is None:
raise web.HTTPBadRequest(reason=f'Could not find a JAR matching revision {revision}')
spec['process']['jar_spec']['value'] = value
else:
assert spec['process']['jar_spec']['type'] == 'jar_url'
jar_url = spec['process']['jar_spec']['value']
Expand Down Expand Up @@ -3377,6 +3381,10 @@ def log(self, request, response, time):
super().log(request, response, time)


class AppKeys(CommonAiohttpAppKeys):
QOB_JAR_RESOLUTION_CACHE = web.AppKey('qob_jar_resolution_cache', TimeLimitedMaxSizeCache[Tuple[str, str], str])


async def on_startup(app):
exit_stack = AsyncExitStack()
app['exit_stack'] = exit_stack
Expand Down Expand Up @@ -3437,6 +3445,18 @@ async def on_startup(app):
retry_long_running('delete_batch_loop', run_if_changed, delete_batch_state_changed, delete_batch_loop_body, app)
)

async def resolve_qob_jar_url(git_revision: str) -> Optional[str]:
production_url = ACCEPTABLE_QUERY_JAR_URL_PREFIX + '/' + git_revision + '.jar'
dev_url = ACCEPTABLE_QUERY_JAR_URL_PREFIX + '/dev/' + git_revision + '.jar'
for url in (production_url, dev_url):
if await fs.exists(url):
return url
return None

app[AppKeys.QOB_JAR_RESOLUTION_CACHE] = TimeLimitedMaxSizeCache(
resolve_qob_jar_url, int(1e10), 100, AppKeys.QOB_JAR_RESOLUTION_CACHE._name
)

app['task_manager'].ensure_future(periodically_call(5, _refresh, app))


Expand Down
1 change: 0 additions & 1 deletion hail/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ pytest-qob: upload-qob-jar upload-remote-test-resources install-editable
cd python && \
HAIL_TEST_STORAGE_URI=$(TEST_STORAGE_URI) \
HAIL_QUERY_BACKEND=batch \
HAIL_QUERY_JAR_URL=$$(cat ../upload-qob-jar) \
HAIL_DEFAULT_NAMESPACE=$(NAMESPACE) \
HAIL_TEST_RESOURCES_DIR='$(CLOUD_HAIL_TEST_RESOURCES_DIR)' \
HAIL_DOCTEST_DATA_DIR='$(HAIL_DOCTEST_DATA_DIR)' \
Expand Down
44 changes: 6 additions & 38 deletions hail/python/hail/backend/service_backend.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import abc
import asyncio
import logging
import math
Expand Down Expand Up @@ -67,34 +66,6 @@ async def read_str(strm: afs.ReadableStream) -> str:
return b.decode('utf-8')


class JarSpec(abc.ABC):
@abc.abstractmethod
def to_dict(self) -> Dict[str, str]:
raise NotImplementedError


class JarUrl(JarSpec):
def __init__(self, url):
self.url = url

def to_dict(self) -> Dict[str, str]:
return {'type': 'jar_url', 'value': self.url}

def __repr__(self):
return f'JarUrl({self.url})'


class GitRevision(JarSpec):
def __init__(self, revision):
self.revision = revision

def to_dict(self) -> Dict[str, str]:
return {'type': 'git_revision', 'value': self.revision}

def __repr__(self):
return f'GitRevision({self.revision})'


@dataclass
class SerializedIRFunction:
name: str
Expand Down Expand Up @@ -195,7 +166,6 @@ async def create(
disable_progress_bar: Optional[bool] = None,
remote_tmpdir: Optional[str] = None,
flags: Optional[Dict[str, str]] = None,
jar_url: Optional[str] = None,
driver_cores: Optional[Union[int, str]] = None,
driver_memory: Optional[str] = None,
worker_cores: Optional[Union[int, str]] = None,
Expand Down Expand Up @@ -229,9 +199,6 @@ async def create(
batch_attributes: Dict[str, str] = dict()
remote_tmpdir = get_remote_tmpdir('ServiceBackend', remote_tmpdir=remote_tmpdir)

jar_url = configuration_of(ConfigVariable.QUERY_JAR_URL, jar_url, None)
jar_spec = GitRevision(revision()) if jar_url is None else JarUrl(jar_url)

name_prefix = configuration_of(ConfigVariable.QUERY_NAME_PREFIX, name_prefix, '')
batch_attributes: Dict[str, str] = {
'hail-version': version(),
Expand Down Expand Up @@ -284,7 +251,6 @@ async def create(
disable_progress_bar=disable_progress_bar,
batch_attributes=batch_attributes,
remote_tmpdir=remote_tmpdir,
jar_spec=jar_spec,
driver_cores=driver_cores,
driver_memory=driver_memory,
worker_cores=worker_cores,
Expand All @@ -305,7 +271,6 @@ def __init__(
disable_progress_bar: bool,
batch_attributes: Dict[str, str],
remote_tmpdir: str,
jar_spec: JarSpec,
driver_cores: Optional[Union[int, str]],
driver_memory: Optional[str],
worker_cores: Optional[Union[int, str]],
Expand All @@ -323,7 +288,6 @@ def __init__(
self.batch_attributes = batch_attributes
self.remote_tmpdir = remote_tmpdir
self.flags: Dict[str, str] = {}
self.jar_spec = jar_spec
self.functions: List[IRFunction] = []
self._registered_ir_function_names: Set[str] = set()
self.driver_cores = driver_cores
Expand All @@ -343,7 +307,7 @@ def validate_file(self, uri: str) -> None:

def debug_info(self) -> Dict[str, Any]:
return {
'jar_spec': str(self.jar_spec),
'jar_spec': self.jar_spec,
'billing_project': self.billing_project,
'batch_attributes': self.batch_attributes,
'remote_tmpdir': self.remote_tmpdir,
Expand All @@ -359,6 +323,10 @@ def debug_info(self) -> Dict[str, Any]:
def fs(self) -> FS:
return self._sync_fs

@property
def jar_spec(self) -> dict:
return {'type': 'git_revision', 'value': revision()}

@property
def logger(self):
return log
Expand Down Expand Up @@ -410,7 +378,7 @@ async def _run_on_batch(
resources['storage'] = service_backend_config.storage

j = self._batch.create_jvm_job(
jar_spec=self.jar_spec.to_dict(),
jar_spec=self.jar_spec,
argv=[
ServiceBackend.DRIVER,
name,
Expand Down
3 changes: 0 additions & 3 deletions hail/python/hail/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,6 @@ def init_spark(
@typecheck(
billing_project=nullable(str),
remote_tmpdir=nullable(str),
jar_url=nullable(str),
log=nullable(str),
quiet=bool,
append=bool,
Expand All @@ -536,7 +535,6 @@ async def init_batch(
*,
billing_project: Optional[str] = None,
remote_tmpdir: Optional[str] = None,
jar_url: Optional[str] = None,
log: Optional[str] = None,
quiet: bool = False,
append: bool = False,
Expand All @@ -562,7 +560,6 @@ async def init_batch(
billing_project=billing_project,
remote_tmpdir=remote_tmpdir,
disable_progress_bar=disable_progress_bar,
jar_url=jar_url,
driver_cores=driver_cores,
driver_memory=driver_memory,
worker_cores=worker_cores,
Expand Down
1 change: 0 additions & 1 deletion hail/python/hailtop/config/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class ConfigVariable(str, Enum):
BATCH_BILLING_PROJECT = 'batch/billing_project'
BATCH_BACKEND = 'batch/backend'
QUERY_BACKEND = 'query/backend'
QUERY_JAR_URL = 'query/jar_url'
QUERY_BATCH_DRIVER_CORES = 'query/batch_driver_cores'
QUERY_BATCH_WORKER_CORES = 'query/batch_worker_cores'
QUERY_BATCH_DRIVER_MEMORY = 'query/batch_driver_memory'
Expand Down
7 changes: 0 additions & 7 deletions hail/python/hailtop/hailctl/config/config_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,6 @@ def config_variables():
'should be one of "local", "spark", or "batch"',
),
),
ConfigVariable.QUERY_JAR_URL: ConfigVariableInfo(
help_msg='Cloud storage URI to a Query JAR',
validation=(
RouterAsyncFS.valid_url,
'should be valid cloud storage URI such as gs://my-bucket/jars/sha.jar',
),
),
ConfigVariable.QUERY_BATCH_DRIVER_CORES: ConfigVariableInfo(
help_msg='Cores specification for the query driver',
validation=(
Expand Down
2 changes: 0 additions & 2 deletions hail/python/test/hailtop/hailctl/config/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def test_config_list_empty_config(runner: CliRunner):
('batch/regions', 'us-central1,us-east1'),
('batch/remote_tmpdir', 'gs://foo/bar'),
('query/backend', 'spark'),
('query/jar_url', 'gs://foo/bar.jar'),
('query/batch_driver_cores', '1'),
('query/batch_worker_cores', '1'),
('query/batch_driver_memory', '1Gi'),
Expand Down Expand Up @@ -113,7 +112,6 @@ def test_config_unset_unknown_name(runner: CliRunner):
('batch/billing_project', 'gs://foo/bar'),
('batch/remote_tmpdir', 'asdf://foo/bar'),
('query/backend', 'random_backend'),
('query/jar_url', 'bar://foo/bar.jar'),
('query/batch_driver_cores', 'a'),
('query/batch_worker_cores', 'b'),
('query/batch_driver_memory', '1bar'),
Expand Down
21 changes: 11 additions & 10 deletions hail/scripts/upload_qob_jar.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,21 @@ SHADOW_JAR=$3
PATH_FILE=$4


TOKEN=$(cat /dev/urandom 2> /dev/null | LC_ALL=C tr -dc 'a-z0-9' 2> /dev/null | head -c 12)
QUERY_STORAGE_URI=$(kubectl get secret global-config --template={{.data.query_storage_uri}} | base64 --decode)
TEST_STORAGE_URI=$(kubectl get secret global-config --template={{.data.test_storage_uri}} | base64 --decode)

if [[ "${NAMESPACE}" == "default" ]]; then
if [[ "${UPLOAD_RELEASE_JAR}" == "true" ]]; then
JAR_LOCATION="${QUERY_STORAGE_URI}/jars/${REVISION}.jar"
else
JAR_LOCATION="${QUERY_STORAGE_URI}/jars/$(whoami)/${TOKEN}/${REVISION}.jar"
fi
JAR_PREFIX=$(kubectl get secret global-config --template={{.data.query_storage_uri}} | base64 --decode)
else
JAR_LOCATION="${TEST_STORAGE_URI}/${NAMESPACE}/jars/${TOKEN}/${REVISION}.jar"
BUCKET=$(kubectl get secret global-config --template={{.data.test_storage_uri}} | base64 --decode)
JAR_PREFIX="${BUCKET}/${NAMESPACE}"
fi

if [[ "${UPLOAD_RELEASE_JAR}" == "true" ]]; then
JAR_DIR="jars"
else
JAR_DIR="jars/dev"
fi

JAR_LOCATION="${JAR_PREFIX}/${JAR_DIR}/${REVISION}.jar"

python3 -m hailtop.aiotools.copy \
-vvv \
'null' \
Expand Down

0 comments on commit 631839a

Please sign in to comment.