Skip to content

Commit

Permalink
Add compute cluster properties to job launching (#86)
Browse files Browse the repository at this point in the history
* Add compute cluster properties to job launching

* Apply documentation and typing suggestions, add env rev spec changes

* Fix some revision example

* Update minimum support version to be 4.5

* Update documentation for cluster type

* Update documentation

* Remove masterStorage
  • Loading branch information
niole committed Apr 28, 2021
1 parent 77facaa commit b105d34
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 7 deletions.
10 changes: 10 additions & 0 deletions domino/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@
"""
MINIMUM_ON_DEMAND_SPARK_CLUSTER_SUPPORT_DOMINO_VERSION = '4.2.0'

"""
Minimum Domino version supporting distributed compute cluster launching
"""
MINIMUM_DISTRIBUTED_CLUSTER_SUPPORT_DOMINO_VERSION = '4.5.0'

"""
Distributed compute cluster types and their minimum supported Domino version
"""
CLUSTER_TYPE_MIN_SUPPORT = [("Spark", "4.5.0"), ("Ray", "4.5.0")]

"""
Environment variable names used by this python-domino library
"""
Expand Down
96 changes: 89 additions & 7 deletions domino/domino.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def runs_start_blocking(self, command, isDirect=False, commitId=None,
try:
run_info = self.get_run_info(run_id)
if run_info is None:
raise RunNotFoundException(f"Tried to access nonexistent run id {run_id}")
raise RunNotFoundException(f"Tried to access nonexistent run id {run_id}")
current_retry_count = 0
except (requests.exceptions.RequestException, RunNotFoundException) as e:
current_retry_count += 1
Expand Down Expand Up @@ -219,7 +219,7 @@ def get_run_log(self, runId, includeSetupLog=True):
logs.append(self._get(url)["stdout"])

return "\n".join(logs)

def get_run_info(self, run_id):
for run_info in self.runs_list()['data']:
if run_info['id'] == run_id:
Expand All @@ -238,8 +238,14 @@ def runs_stdout(self, runId):
# pprint.pformat outputs a string that is ready to be printed
return pprint.pformat(self._get(url)['stdout'])

def job_start(self, command: str, commit_id: str = None, hardware_tier_name: str = None,
environment_id: str = None, on_demand_spark_cluster_properties: dict = None) -> dict:
def job_start(
self,
command: str,
commit_id: Optional[str] = None,
hardware_tier_name: Optional[str] = None,
environment_id: Optional[str] = None,
on_demand_spark_cluster_properties: Optional[dict] = None,
compute_cluster_properties: Optional[dict] = None) -> dict:
"""
Starts a Domino Job via V4 API
:param command: string
Expand All @@ -255,8 +261,8 @@ def job_start(self, command: str, commit_id: str = None, hardware_tier_name: str
The environment id to launch job with. If not provided
it will use the default environment for the project
:param on_demand_spark_cluster_properties: dict (Optional)
On demand spark cluster properties. Following properties
can be provided in spark cluster
This field is deprecated. Please use compute_cluster_properties.
The following properties can be provided in spark cluster
{
"computeEnvironmentId": "<Environment ID configured with spark>"
"executorCount": "<Number of Executors in cluster>"
Expand All @@ -268,6 +274,26 @@ def job_start(self, command: str, commit_id: str = None, hardware_tier_name: str
"executorStorageMB": "<Executor's storage in MB>"
(optional defaults to 0; 1GB is 1000MB Here)
}
:param compute_cluster_properties: dict (Optional)
The compute cluster properties definition contains parameters for
launching any Domino supported compute cluster for a job. Use this
to launch a job that uses a compute cluster instead of
the deprecated on_demand_spark_cluster_properties field. If
on_demand_spark_cluster_properties and compute_cluster_properties
are both present, on_demand_spark_cluster_properties will be ignored.
compute_cluster_properties contains the following fields:
{
"clusterType": <string, one of "Ray", "Spark">,
"computeEnvironmentId": <string, The environment ID for the cluster's nodes>,
"computeEnvironmentRevisionSpec": <one of "ActiveRevision", "LatestRevision",
{"revisionId":"<environment_revision_id>"} (optional)>,
"masterHardwareTierId": <string, the Hardware tier ID for the cluster's master node>,
"workerCount": <number, the total workers to spawn for the cluster>,
"workerHardwareTierId": <string, The Hardware tier ID for the cluster workers>,
"workerStorage": <{ "value": <number>, "unit": <one of "GiB", "MB"> },
The disk storage size for the cluster's worker nodes (optional)>
}
:return: Returns created Job details (number, id etc)
"""
def validate_on_demand_spark_cluster_properties(max_execution_slot_per_user):
Expand Down Expand Up @@ -302,15 +328,55 @@ def validate_is_on_demand_spark_supported():
f"Your domino deployment version {self._version} does not support on demand spark cluster. "
f"Minimum support version {MINIMUM_ON_DEMAND_SPARK_CLUSTER_SUPPORT_DOMINO_VERSION}")

def validate_distributed_compute_cluster_properties():
if not is_compute_cluster_properties_supported(self._version):
raise Exception(f"Domino {self._version} does not support distributed compute cluster launching.")

required_keys = ["clusterType", "computeEnvironmentId", "masterHardwareTierId", "workerHardwareTierId", "workerCount"]
for key in required_keys:
try:
compute_cluster_properties[key]
except KeyError:
raise Exception(f"{key} is required in compute_cluster_properties")

if not is_cluster_type_supported(self._version, compute_cluster_properties["clusterType"]):
supported_types = [ct for ct,min_version in CLUSTER_TYPE_MIN_SUPPORT if is_cluster_type_supported(self._version, ct)]
supported_types_str = ", ".join(supported_types)
raise Exception(
f"Domino {self._version} does not support cluster type {compute_cluster_properties['clusterType']}." +
f" This version of Domino supports the following cluster types: {supported_types_str}"
)


def throw_if_information_invalid(key: str, info: dict) -> bool:
try:
self._validate_information_data_type(info)
except Exception as e:
raise Exception(f"{key} in compute_cluster_properties failed validation: {e}")

if "workerStorage" in compute_cluster_properties:
throw_if_information_invalid("workerStorage", compute_cluster_properties["workerStorage"])

if compute_cluster_properties["workerCount"] < 1:
raise Exception("compute_cluster_properties workerCount must be greater than 0")

spark_cluster_properties = None
validated_compute_cluster_properties = None

if commit_id is not None:
self._validate_commit_id(commit_id)
if hardware_tier_name is not None:
self._validate_hardware_tier_name(hardware_tier_name)
if environment_id is not None:
self._validate_environment_id(environment_id)
if on_demand_spark_cluster_properties is not None:
if compute_cluster_properties is not None:
validate_distributed_compute_cluster_properties()

validated_compute_cluster_properties = compute_cluster_properties.copy()
validated_compute_cluster_properties["masterHardwareTierId"] = { "value": compute_cluster_properties["masterHardwareTierId"] }
validated_compute_cluster_properties["workerHardwareTierId"] = { "value": compute_cluster_properties["workerHardwareTierId"] }

elif on_demand_spark_cluster_properties is not None:
validate_is_on_demand_spark_supported()
default_spark_setting = get_default_spark_settings()
max_execution_slot = default_spark_setting['maximumExecutionSlotsPerUser']
Expand Down Expand Up @@ -339,6 +405,7 @@ def validate_is_on_demand_spark_supported():
"commitId": commit_id,
"overrideHardwareTierName": hardware_tier_name,
"onDemandSparkClusterProperties": spark_cluster_properties,
"computeClusterProperties": validated_compute_cluster_properties,
"environmentId": environment_id
}
response = self.request_manager.post(url, json=payload)
Expand Down Expand Up @@ -719,6 +786,21 @@ def _validate_blob_key(key):
"If you have a file path and want to get the "
"file, use files_list to get the blob key."))

@staticmethod
def _validate_information_data_type(info: dict):
accepted_units = {'GiB': True, 'MB': True}

try:
unit = info["unit"]
value = info["value"]
accepted_units[unit]
except KeyError:
raise Exception(
f"Information value is formatted incorrectly." +
f" Allowed units: {', '.join(accepted_units.keys())}" +
" Example: { 'unit': 'GiB', 'value': 5 }"
)

def requires_at_least(self, at_least_version):
if at_least_version > self._version:
raise Exception("You need at least version {} but your deployment \
Expand Down
11 changes: 11 additions & 0 deletions domino/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ def is_version_compatible(version: str) -> bool:
"""
return parse_version(version) >= parse_version(MINIMUM_SUPPORTED_DOMINO_VERSION)

def is_cluster_type_supported(version: str, cluster_type: str) -> bool:
curr_version = parse_version(version)

return next(
(True for ct,min_version in CLUSTER_TYPE_MIN_SUPPORT if ct == cluster_type and curr_version >= parse_version(min_version)),
False
)

def is_compute_cluster_properties_supported(version: str) -> bool:
return parse_version(version) >= parse_version(MINIMUM_DISTRIBUTED_CLUSTER_SUPPORT_DOMINO_VERSION)


def is_on_demand_spark_cluster_supported(version: str) -> bool:
return parse_version(version) >= parse_version(MINIMUM_ON_DEMAND_SPARK_CLUSTER_SUPPORT_DOMINO_VERSION)
Expand Down

0 comments on commit b105d34

Please sign in to comment.