Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compute cluster properties to job launching #86

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions domino/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@
"""
MINIMUM_ON_DEMAND_SPARK_CLUSTER_SUPPORT_DOMINO_VERSION = '4.2.0'

"""
Minimum Domino version supporting distributed compute cluster launching
"""
MINIMUM_DISTRIBUTED_CLUSTER_SUPPORT_DOMINO_VERSION = '4.5.0'

"""
Distributed compute cluster types and their minimum supported Domino version
"""
CLUSTER_TYPE_MIN_SUPPORT = [("Spark", "4.5.0"), ("Ray", "4.5.0")]

"""
Environment variable names used by this python-domino library
"""
Expand Down
100 changes: 93 additions & 7 deletions domino/domino.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def runs_start_blocking(self, command, isDirect=False, commitId=None,
try:
run_info = self.get_run_info(run_id)
if run_info is None:
raise RunNotFoundException(f"Tried to access nonexistent run id {run_id}")
raise RunNotFoundException(f"Tried to access nonexistent run id {run_id}")
current_retry_count = 0
except (requests.exceptions.RequestException, RunNotFoundException) as e:
current_retry_count += 1
Expand Down Expand Up @@ -219,7 +219,7 @@ def get_run_log(self, runId, includeSetupLog=True):
logs.append(self._get(url)["stdout"])

return "\n".join(logs)

def get_run_info(self, run_id):
for run_info in self.runs_list()['data']:
if run_info['id'] == run_id:
Expand All @@ -238,8 +238,14 @@ def runs_stdout(self, runId):
# pprint.pformat outputs a string that is ready to be printed
return pprint.pformat(self._get(url)['stdout'])

def job_start(self, command: str, commit_id: str = None, hardware_tier_name: str = None,
environment_id: str = None, on_demand_spark_cluster_properties: dict = None) -> dict:
def job_start(
self,
command: str,
commit_id: Optional[str] = None,
hardware_tier_name: Optional[str] = None,
environment_id: Optional[str] = None,
on_demand_spark_cluster_properties: Optional[dict] = None,
compute_cluster_properties: Optional[dict] = None) -> dict:
"""
Starts a Domino Job via V4 API
:param command: string
Expand All @@ -255,8 +261,8 @@ def job_start(self, command: str, commit_id: str = None, hardware_tier_name: str
The environment id to launch job with. If not provided
it will use the default environment for the project
:param on_demand_spark_cluster_properties: dict (Optional)
On demand spark cluster properties. Following properties
can be provided in spark cluster
This field is deprecated. Please use compute_cluster_properties.
The following properties can be provided in spark cluster
{
"computeEnvironmentId": "<Environment ID configured with spark>"
"executorCount": "<Number of Executors in cluster>"
Expand All @@ -268,6 +274,28 @@ def job_start(self, command: str, commit_id: str = None, hardware_tier_name: str
"executorStorageMB": "<Executor's storage in MB>"
(optional defaults to 0; 1GB is 1000MB Here)
}
:param compute_cluster_properties: dict (Optional)
The compute cluster properties definition contains parameters for
launching any Domino supported compute cluster for a job. Use this
to launch a job that uses a compute cluster instead of
the deprecated on_demand_spark_cluster_properties field. If
on_demand_spark_cluster_properties and compute_cluster_properties
are both present, on_demand_spark_cluster_properties will be ignored.

compute_cluster_properties contains the following fields:
{
"clusterType": <string, one of "Ray", "Spark">,
"computeEnvironmentId": <string, The environment ID for the cluster's nodes>,
"computeEnvironmentRevisionSpec": <one of "ActiveRevision", "LatestRevision",
{"revisionId":"<environment_revision_id>"} (optional)>,
"masterHardwareTierId": <string, the Hardware tier ID for the cluster's master node>,
"masterStorage": <{ "value": <number>, "unit": <one of "GiB", "MB"> },
The disk storage size for the cluster's master node (optional)>,
"workerCount": <number, the total workers to spawn for the cluster>,
"workerHardwareTierId": <string, The Hardware tier ID for the cluster workers>,
"workerStorage": <{ "value": <number>, "unit": <one of "GiB", "MB"> },
The disk storage size for the cluster's worker nodes (optional)>
}
:return: Returns created Job details (number, id etc)
"""
def validate_on_demand_spark_cluster_properties(max_execution_slot_per_user):
Expand Down Expand Up @@ -302,15 +330,57 @@ def validate_is_on_demand_spark_supported():
f"Your domino deployment version {self._version} does not support on demand spark cluster. "
f"Minimum support version {MINIMUM_ON_DEMAND_SPARK_CLUSTER_SUPPORT_DOMINO_VERSION}")

def validate_distributed_compute_cluster_properties():
if not is_compute_cluster_properties_supported(self._version):
raise Exception(f"Domino {self._version} does not support distributed compute cluster launching.")

required_keys = ["clusterType", "computeEnvironmentId", "masterHardwareTierId", "workerHardwareTierId", "workerCount"]
for key in required_keys:
try:
compute_cluster_properties[key]
except KeyError:
raise Exception(f"{key} is required in compute_cluster_properties")

if not is_cluster_type_supported(self._version, compute_cluster_properties["clusterType"]):
supported_types = [ct for ct,min_version in CLUSTER_TYPE_MIN_SUPPORT if is_cluster_type_supported(self._version, ct)]
supported_types_str = ", ".join(supported_types)
raise Exception(
f"Domino {self._version} does not support cluster type {compute_cluster_properties['clusterType']}." +
f" This version of Domino supports the following cluster types: {supported_types_str}"
)


def throw_if_information_invalid(key: str, info: dict) -> bool:
try:
self._validate_information_data_type(info)
except Exception as e:
raise Exception(f"{key} in compute_cluster_properties failed validation: {e}")

if "masterStorage" in compute_cluster_properties:
throw_if_information_invalid("masterStorage", compute_cluster_properties["masterStorage"])
if "workerStorage" in compute_cluster_properties:
throw_if_information_invalid("workerStorage", compute_cluster_properties["workerStorage"])

if compute_cluster_properties["workerCount"] < 1:
raise Exception("compute_cluster_properties workerCount must be greater than 0")

spark_cluster_properties = None
validated_compute_cluster_properties = None

if commit_id is not None:
self._validate_commit_id(commit_id)
if hardware_tier_name is not None:
self._validate_hardware_tier_name(hardware_tier_name)
if environment_id is not None:
self._validate_environment_id(environment_id)
if on_demand_spark_cluster_properties is not None:
if compute_cluster_properties is not None:
validate_distributed_compute_cluster_properties()

validated_compute_cluster_properties = compute_cluster_properties.copy()
validated_compute_cluster_properties["masterHardwareTierId"] = { "value": compute_cluster_properties["masterHardwareTierId"] }
validated_compute_cluster_properties["workerHardwareTierId"] = { "value": compute_cluster_properties["workerHardwareTierId"] }

elif on_demand_spark_cluster_properties is not None:
validate_is_on_demand_spark_supported()
default_spark_setting = get_default_spark_settings()
max_execution_slot = default_spark_setting['maximumExecutionSlotsPerUser']
Expand Down Expand Up @@ -339,6 +409,7 @@ def validate_is_on_demand_spark_supported():
"commitId": commit_id,
"overrideHardwareTierName": hardware_tier_name,
"onDemandSparkClusterProperties": spark_cluster_properties,
"computeClusterProperties": validated_compute_cluster_properties,
"environmentId": environment_id
}
response = self.request_manager.post(url, json=payload)
Expand Down Expand Up @@ -719,6 +790,21 @@ def _validate_blob_key(key):
"If you have a file path and want to get the "
"file, use files_list to get the blob key."))

@staticmethod
def _validate_information_data_type(info: dict):
accepted_units = {'GiB': True, 'MB': True}

try:
unit = info["unit"]
value = info["value"]
accepted_units[unit]
except KeyError:
raise Exception(
f"Information value is formatted incorrectly." +
f" Allowed units: {', '.join(accepted_units.keys())}" +
" Example: { 'unit': 'GiB', 'value': 5 }"
)

def requires_at_least(self, at_least_version):
if at_least_version > self._version:
raise Exception("You need at least version {} but your deployment \
Expand Down
11 changes: 11 additions & 0 deletions domino/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ def is_version_compatible(version: str) -> bool:
"""
return parse_version(version) >= parse_version(MINIMUM_SUPPORTED_DOMINO_VERSION)

def is_cluster_type_supported(version: str, cluster_type: str) -> bool:
curr_version = parse_version(version)

return next(
(True for ct,min_version in CLUSTER_TYPE_MIN_SUPPORT if ct == cluster_type and curr_version >= parse_version(min_version)),
False
)

def is_compute_cluster_properties_supported(version: str) -> bool:
return parse_version(version) >= parse_version(MINIMUM_DISTRIBUTED_CLUSTER_SUPPORT_DOMINO_VERSION)


def is_on_demand_spark_cluster_supported(version: str) -> bool:
return parse_version(version) >= parse_version(MINIMUM_ON_DEMAND_SPARK_CLUSTER_SUPPORT_DOMINO_VERSION)
Expand Down