Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[qob] Always send the git revision from the python service backend instead of jar_url #14541

Merged
merged 2 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion batch/batch/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from gear.clients import get_cloud_async_fs
from gear.database import CallError
from gear.profiling import install_profiler_if_requested
from gear.time_limited_max_size_cache import TimeLimitedMaxSizeCache
from hailtop import aiotools, dictfix, httpx, uvloopx, version
from hailtop.auth import hail_credentials
from hailtop.batch_client.globals import MAX_JOB_GROUPS_DEPTH, ROOT_JOB_GROUP_ID
Expand Down Expand Up @@ -1156,7 +1157,10 @@ async def _create_jobs(
revision = spec['process']['jar_spec']['value']
assert_is_sha_1_hex_string(revision)
spec['process']['jar_spec']['type'] = 'jar_url'
spec['process']['jar_spec']['value'] = ACCEPTABLE_QUERY_JAR_URL_PREFIX + '/' + revision + '.jar'
value = await app[AppKeys.QOB_JAR_RESOLUTION_CACHE].lookup(revision)
if value is None:
raise web.HTTPBadRequest(reason=f'Could not find a JAR matching revision {revision}')
spec['process']['jar_spec']['value'] = value
else:
assert spec['process']['jar_spec']['type'] == 'jar_url'
jar_url = spec['process']['jar_spec']['value']
Expand Down Expand Up @@ -3377,6 +3381,10 @@ def log(self, request, response, time):
super().log(request, response, time)


class AppKeys(CommonAiohttpAppKeys):
QOB_JAR_RESOLUTION_CACHE = web.AppKey('qob_jar_resolution_cache', TimeLimitedMaxSizeCache[Tuple[str, str], str])


async def on_startup(app):
exit_stack = AsyncExitStack()
app['exit_stack'] = exit_stack
Expand Down Expand Up @@ -3437,6 +3445,18 @@ async def on_startup(app):
retry_long_running('delete_batch_loop', run_if_changed, delete_batch_state_changed, delete_batch_loop_body, app)
)

async def resolve_qob_jar_url(git_revision: str) -> Optional[str]:
production_url = ACCEPTABLE_QUERY_JAR_URL_PREFIX + '/' + git_revision + '.jar'
dev_url = ACCEPTABLE_QUERY_JAR_URL_PREFIX + '/dev/' + git_revision + '.jar'
for url in (production_url, dev_url):
if await fs.exists(url):
return url
return None

app[AppKeys.QOB_JAR_RESOLUTION_CACHE] = TimeLimitedMaxSizeCache(
resolve_qob_jar_url, int(1e10), 100, AppKeys.QOB_JAR_RESOLUTION_CACHE._name
)

app['task_manager'].ensure_future(periodically_call(5, _refresh, app))


Expand Down
2 changes: 0 additions & 2 deletions hail/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ pytest-qob: upload-qob-jar upload-remote-test-resources install-editable
cd python && \
HAIL_TEST_STORAGE_URI=$(TEST_STORAGE_URI) \
HAIL_QUERY_BACKEND=batch \
HAIL_QUERY_JAR_URL=$$(cat ../upload-qob-jar) \
HAIL_DEFAULT_NAMESPACE=$(NAMESPACE) \
HAIL_TEST_RESOURCES_DIR='$(CLOUD_HAIL_TEST_RESOURCES_DIR)' \
HAIL_DOCTEST_DATA_DIR='$(HAIL_DOCTEST_DATA_DIR)' \
Expand Down Expand Up @@ -367,7 +366,6 @@ install-editable: $(FAST_PYTHON_JAR) $(FAST_PYTHON_JAR_EXTRA_CLASSPATH)
install-for-qob: upload-qob-jar install-editable
! [ -z $(NAMESPACE) ] # call this like: make install-for-qob NAMESPACE=default
hailctl config set query/backend batch
hailctl config set query/jar_url $$(cat upload-qob-jar)
hailctl dev config set default_namespace $(NAMESPACE)

.PHONY: install
Expand Down
44 changes: 6 additions & 38 deletions hail/python/hail/backend/service_backend.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import abc
import asyncio
import logging
import math
Expand Down Expand Up @@ -67,34 +66,6 @@ async def read_str(strm: afs.ReadableStream) -> str:
return b.decode('utf-8')


class JarSpec(abc.ABC):
@abc.abstractmethod
def to_dict(self) -> Dict[str, str]:
raise NotImplementedError


class JarUrl(JarSpec):
def __init__(self, url):
self.url = url

def to_dict(self) -> Dict[str, str]:
return {'type': 'jar_url', 'value': self.url}

def __repr__(self):
return f'JarUrl({self.url})'


class GitRevision(JarSpec):
def __init__(self, revision):
self.revision = revision

def to_dict(self) -> Dict[str, str]:
return {'type': 'git_revision', 'value': self.revision}

def __repr__(self):
return f'GitRevision({self.revision})'


@dataclass
class SerializedIRFunction:
name: str
Expand Down Expand Up @@ -195,7 +166,6 @@ async def create(
disable_progress_bar: Optional[bool] = None,
remote_tmpdir: Optional[str] = None,
flags: Optional[Dict[str, str]] = None,
jar_url: Optional[str] = None,
driver_cores: Optional[Union[int, str]] = None,
driver_memory: Optional[str] = None,
worker_cores: Optional[Union[int, str]] = None,
Expand Down Expand Up @@ -229,9 +199,6 @@ async def create(
batch_attributes: Dict[str, str] = dict()
remote_tmpdir = get_remote_tmpdir('ServiceBackend', remote_tmpdir=remote_tmpdir)

jar_url = configuration_of(ConfigVariable.QUERY_JAR_URL, jar_url, None)
jar_spec = GitRevision(revision()) if jar_url is None else JarUrl(jar_url)

name_prefix = configuration_of(ConfigVariable.QUERY_NAME_PREFIX, name_prefix, '')
batch_attributes: Dict[str, str] = {
'hail-version': version(),
Expand Down Expand Up @@ -284,7 +251,6 @@ async def create(
disable_progress_bar=disable_progress_bar,
batch_attributes=batch_attributes,
remote_tmpdir=remote_tmpdir,
jar_spec=jar_spec,
driver_cores=driver_cores,
driver_memory=driver_memory,
worker_cores=worker_cores,
Expand All @@ -305,7 +271,6 @@ def __init__(
disable_progress_bar: bool,
batch_attributes: Dict[str, str],
remote_tmpdir: str,
jar_spec: JarSpec,
driver_cores: Optional[Union[int, str]],
driver_memory: Optional[str],
worker_cores: Optional[Union[int, str]],
Expand All @@ -323,7 +288,6 @@ def __init__(
self.batch_attributes = batch_attributes
self.remote_tmpdir = remote_tmpdir
self.flags: Dict[str, str] = {}
self.jar_spec = jar_spec
self.functions: List[IRFunction] = []
self._registered_ir_function_names: Set[str] = set()
self.driver_cores = driver_cores
Expand All @@ -343,7 +307,7 @@ def validate_file(self, uri: str) -> None:

def debug_info(self) -> Dict[str, Any]:
return {
'jar_spec': str(self.jar_spec),
'jar_spec': self.jar_spec,
'billing_project': self.billing_project,
'batch_attributes': self.batch_attributes,
'remote_tmpdir': self.remote_tmpdir,
Expand All @@ -359,6 +323,10 @@ def debug_info(self) -> Dict[str, Any]:
def fs(self) -> FS:
return self._sync_fs

@property
def jar_spec(self) -> dict:
return {'type': 'git_revision', 'value': revision()}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, we need to make upload-for-qob on our branches to use qob?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we do


@property
def logger(self):
return log
Expand Down Expand Up @@ -410,7 +378,7 @@ async def _run_on_batch(
resources['storage'] = service_backend_config.storage

j = self._batch.create_jvm_job(
jar_spec=self.jar_spec.to_dict(),
jar_spec=self.jar_spec,
argv=[
ServiceBackend.DRIVER,
name,
Expand Down
3 changes: 0 additions & 3 deletions hail/python/hail/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,6 @@ def init_spark(
@typecheck(
billing_project=nullable(str),
remote_tmpdir=nullable(str),
jar_url=nullable(str),
log=nullable(str),
quiet=bool,
append=bool,
Expand All @@ -536,7 +535,6 @@ async def init_batch(
*,
billing_project: Optional[str] = None,
remote_tmpdir: Optional[str] = None,
jar_url: Optional[str] = None,
log: Optional[str] = None,
quiet: bool = False,
append: bool = False,
Expand All @@ -562,7 +560,6 @@ async def init_batch(
billing_project=billing_project,
remote_tmpdir=remote_tmpdir,
disable_progress_bar=disable_progress_bar,
jar_url=jar_url,
driver_cores=driver_cores,
driver_memory=driver_memory,
worker_cores=worker_cores,
Expand Down
1 change: 0 additions & 1 deletion hail/python/hailtop/config/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class ConfigVariable(str, Enum):
BATCH_BILLING_PROJECT = 'batch/billing_project'
BATCH_BACKEND = 'batch/backend'
QUERY_BACKEND = 'query/backend'
QUERY_JAR_URL = 'query/jar_url'
QUERY_BATCH_DRIVER_CORES = 'query/batch_driver_cores'
QUERY_BATCH_WORKER_CORES = 'query/batch_worker_cores'
QUERY_BATCH_DRIVER_MEMORY = 'query/batch_driver_memory'
Expand Down
7 changes: 0 additions & 7 deletions hail/python/hailtop/hailctl/config/config_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,6 @@ def config_variables():
'should be one of "local", "spark", or "batch"',
),
),
ConfigVariable.QUERY_JAR_URL: ConfigVariableInfo(
help_msg='Cloud storage URI to a Query JAR',
validation=(
RouterAsyncFS.valid_url,
'should be valid cloud storage URI such as gs://my-bucket/jars/sha.jar',
),
),
ConfigVariable.QUERY_BATCH_DRIVER_CORES: ConfigVariableInfo(
help_msg='Cores specification for the query driver',
validation=(
Expand Down
2 changes: 0 additions & 2 deletions hail/python/test/hailtop/hailctl/config/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def test_config_list_empty_config(runner: CliRunner):
('batch/regions', 'us-central1,us-east1'),
('batch/remote_tmpdir', 'gs://foo/bar'),
('query/backend', 'spark'),
('query/jar_url', 'gs://foo/bar.jar'),
('query/batch_driver_cores', '1'),
('query/batch_worker_cores', '1'),
('query/batch_driver_memory', '1Gi'),
Expand Down Expand Up @@ -113,7 +112,6 @@ def test_config_unset_unknown_name(runner: CliRunner):
('batch/billing_project', 'gs://foo/bar'),
('batch/remote_tmpdir', 'asdf://foo/bar'),
('query/backend', 'random_backend'),
('query/jar_url', 'bar://foo/bar.jar'),
('query/batch_driver_cores', 'a'),
('query/batch_worker_cores', 'b'),
('query/batch_driver_memory', '1bar'),
Expand Down
21 changes: 11 additions & 10 deletions hail/scripts/upload_qob_jar.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,21 @@ SHADOW_JAR=$3
PATH_FILE=$4


TOKEN=$(cat /dev/urandom 2> /dev/null | LC_ALL=C tr -dc 'a-z0-9' 2> /dev/null | head -c 12)
QUERY_STORAGE_URI=$(kubectl get secret global-config --template={{.data.query_storage_uri}} | base64 --decode)
TEST_STORAGE_URI=$(kubectl get secret global-config --template={{.data.test_storage_uri}} | base64 --decode)

if [[ "${NAMESPACE}" == "default" ]]; then
if [[ "${UPLOAD_RELEASE_JAR}" == "true" ]]; then
JAR_LOCATION="${QUERY_STORAGE_URI}/jars/${REVISION}.jar"
else
JAR_LOCATION="${QUERY_STORAGE_URI}/jars/$(whoami)/${TOKEN}/${REVISION}.jar"
fi
JAR_PREFIX=$(kubectl get secret global-config --template={{.data.query_storage_uri}} | base64 --decode)
else
JAR_LOCATION="${TEST_STORAGE_URI}/${NAMESPACE}/jars/${TOKEN}/${REVISION}.jar"
BUCKET=$(kubectl get secret global-config --template={{.data.test_storage_uri}} | base64 --decode)
JAR_PREFIX="${BUCKET}/${NAMESPACE}"
fi

if [[ "${UPLOAD_RELEASE_JAR}" == "true" ]]; then
JAR_DIR="jars"
else
JAR_DIR="jars/dev"
fi

JAR_LOCATION="${JAR_PREFIX}/${JAR_DIR}/${REVISION}.jar"

python3 -m hailtop.aiotools.copy \
-vvv \
'null' \
Expand Down