Skip to content

Commit

Permalink
provide job_timeout_in_seconds in dataproc resource config (#7941)
Browse files Browse the repository at this point in the history
* provide job_timeout_in_seconds in dataproc resource config

* fix lint; black checks

Co-authored-by: prha <prha@elementl.com>
  • Loading branch information
3cham and prha committed May 20, 2022
1 parent 6afdaf9 commit 8f375c4
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 6 deletions.
22 changes: 18 additions & 4 deletions python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/ops.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
from dagster import Bool, Field, op, solid
from dagster import Bool, Field, Int, op, solid
from dagster.seven import json

from .configs import define_dataproc_submit_job_config
from .resources import TWENTY_MINUTES

DATAPROC_CONFIG_SCHEMA = {
"job_timeout_in_seconds": Field(
Int,
description="""Optional. Maximum time in seconds to wait for the job being
completed. Default is set to 1200 seconds (20 minutes).
""",
is_required=False,
default_value=TWENTY_MINUTES,
),
"job_config": define_dataproc_submit_job_config(),
"job_scoped_cluster": Field(
Bool,
Expand All @@ -16,8 +25,12 @@

def _dataproc_compute(context):
job_config = context.solid_config["job_config"]
job_timeout = context.solid_config["job_timeout_in_seconds"]

context.log.info("submitting job with config: %s" % str(json.dumps(job_config)))
context.log.info(
"submitting job with config: %s and timeout of: %d seconds"
% (str(json.dumps(job_config)), job_timeout)
)

if context.solid_config["job_scoped_cluster"]:
# Cluster context manager, creates and then deletes cluster
Expand All @@ -27,15 +40,16 @@ def _dataproc_compute(context):

job_id = result["reference"]["jobId"]
context.log.info("Submitted job ID {}".format(job_id))
cluster.wait_for_job(job_id)
cluster.wait_for_job(job_id, wait_timeout=job_timeout)

else:
# Submit to an existing cluster
# Submit the job specified by this solid to the cluster defined by the associated resource
result = context.resources.dataproc.submit_job(job_config)

job_id = result["reference"]["jobId"]
context.log.info("Submitted job ID {}".format(job_id))
context.resources.dataproc.wait_for_job(job_id)
context.resources.dataproc.wait_for_job(job_id, wait_timeout=job_timeout)


@solid(required_resource_keys={"dataproc"}, config_schema=DATAPROC_CONFIG_SCHEMA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def get_job(self, job_id):
projectId=self.project_id, region=self.region, jobId=job_id
).execute()

def wait_for_job(self, job_id):
def wait_for_job(self, job_id, wait_timeout=TWENTY_MINUTES):
"""This method polls job status every 5 seconds"""
# TODO: Add logging here print('Waiting for job ID {} to finish...'.format(job_id))
def iter_fn():
Expand All @@ -114,7 +114,7 @@ def iter_fn():

return False

done = DataprocResource._iter_and_sleep_until_ready(iter_fn)
done = DataprocResource._iter_and_sleep_until_ready(iter_fn, max_wait_time_sec=wait_timeout)
if not done:
job = self.get_job(job_id)
raise DataprocError("Job run timed out: %s" % str(job["status"]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,57 @@ def test_dataproc():
}
)
assert result.success


def test_wait_for_job_with_timeout():
"""Test submitting a job with timeout of 0 second so that it always fails."""
with mock.patch("httplib2.Http", new=HttpSnooper):

@job(resource_defs={"dataproc": dataproc_resource})
def test_dataproc():
dataproc_op()

try:
test_dataproc.execute_in_process(
run_config={
"ops": {
"dataproc_op": {
"config": {
"job_timeout_in_seconds": 0,
"job_config": {
"projectId": PROJECT_ID,
"region": REGION,
"job": {
"reference": {"projectId": PROJECT_ID},
"placement": {"clusterName": CLUSTER_NAME},
"hiveJob": {"queryList": {"queries": ["SHOW DATABASES"]}},
},
},
"job_scoped_cluster": True,
}
}
},
"resources": {
"dataproc": {
"config": {
"projectId": PROJECT_ID,
"clusterName": CLUSTER_NAME,
"region": REGION,
"cluster_config": {
"softwareConfig": {
"properties": {
# Create a single-node cluster
# This needs to be the string "true" when
# serialized, not a boolean true
"dataproc:dataproc.allow.zero.workers": "true"
}
}
},
}
}
},
}
)
assert False
except Exception as e:
assert "Job run timed out" in str(e)

0 comments on commit 8f375c4

Please sign in to comment.