Skip to content

Commit

Permalink
[batch] add tests for python jobs using hl.read_input
Browse files Browse the repository at this point in the history
Stacked on hail-is#14316.

These are tests for the bug fixed by hail-is#14130.
  • Loading branch information
danking committed Feb 28, 2024
1 parent 35d0c1c commit 84fdbff
Showing 1 changed file with 80 additions and 0 deletions.
80 changes: 80 additions & 0 deletions hail/python/test/hailtop/batch/test_python_job_in_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import List, Tuple, Dict
import asyncio
import secrets

Expand Down Expand Up @@ -57,6 +58,85 @@ def reformat(x, y):
assert res.get_job_log(4)['main'] == "3\n5\n30\n{\"x\": 3, \"y\": 5}\n", str(res.debug_info())


async def test_python_job_input(
service_backend: ServiceBackend,
fs: RouterAsyncFS,
upload_test_files: Tuple[Tuple[str, bytes], Tuple[str, bytes], Tuple[str, bytes]],
output_tmpdir: str,
):
(url1, data1), _, _ = upload_test_files

b = batch(service_backend)
input_file = b.read_input(url1)

def readall(path: str) -> str:
return open(path).read()

j = b.new_python_job()
r = j.call(readall, input_file)

out = os.path.join(output_tmpdir, secrets.token_urlsafe(5))
b.write_output(r.as_json(), out)
res = b.run()

assert isinstance(res, bc.Batch)
assert res.status()['state'] == 'success', str((res, res.debug_info()))
assert orjson.loads(await fs.read(out)) == data1


async def test_python_job_input_in_list(
service_backend: ServiceBackend,
fs: RouterAsyncFS,
upload_test_files: Tuple[Tuple[str, bytes], Tuple[str, bytes], Tuple[str, bytes]],
output_tmpdir: str,
):
(url1, data1), _, _ = upload_test_files

b = batch(service_backend)
input_file = b.read_input(url1)

def readall(paths: List[str]) -> str:
assert len(paths) == 1
return open(paths[0]).read()

j = b.new_python_job()
r = j.call(readall, [input_file])

out = os.path.join(output_tmpdir, secrets.token_urlsafe(5))
b.write_output(r.as_json(), out)
res = b.run()

assert isinstance(res, bc.Batch)
assert res.status()['state'] == 'success', str((res, res.debug_info()))
assert orjson.loads(await fs.read(out)) == data1


async def test_python_job_input_in_dict(
service_backend: ServiceBackend,
fs: RouterAsyncFS,
upload_test_files: Tuple[Tuple[str, bytes], Tuple[str, bytes], Tuple[str, bytes]],
output_tmpdir: str,
):
(url1, data1), _, _ = upload_test_files

b = batch(service_backend)
input_file = b.read_input(url1)

def readall(paths: Dict[str, str]) -> str:
return open(paths['hello!!!']).read()

j = b.new_python_job()
r = j.call(readall, {'hello!!!': input_file})

out = os.path.join(output_tmpdir, secrets.token_urlsafe(5))
b.write_output(r.as_json(), out)
res = b.run()

assert isinstance(res, bc.Batch)
assert res.status()['state'] == 'success', str((res, res.debug_info()))
assert orjson.loads(await fs.read(out)) == data1


def test_python_job_w_resource_group_unpack_individually(service_backend: ServiceBackend):
b = batch(service_backend, default_python_image=PYTHON_DILL_IMAGE)
head = b.new_job()
Expand Down

0 comments on commit 84fdbff

Please sign in to comment.