From 2388807ec19776099dc3f9418cbd7ae3ecace5c1 Mon Sep 17 00:00:00 2001 From: Sagar Sumant Date: Tue, 26 Nov 2024 08:19:28 -0800 Subject: [PATCH 1/4] Format files. --- sdk/ml/azure-ai-ml/azure/ai/ml/_ml_client.py | 155 ++++++++++++++---- .../ml/entities/_job/_input_output_helpers.py | 90 +++++++--- .../finetuning/custom_model_finetuning_job.py | 17 +- .../_job/finetuning/finetuning_job.py | 4 +- .../_job/finetuning/finetuning_vertical.py | 4 +- .../azure/ai/ml/entities/_job/job.py | 36 +++- .../ai/ml/entities/_job/queue_settings.py | 18 +- .../azure/ai/ml/operations/_job_operations.py | 149 +++++++++++++---- 8 files changed, 362 insertions(+), 111 deletions(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/_ml_client.py b/sdk/ml/azure-ai-ml/azure/ai/ml/_ml_client.py index 1ff6643f3b60..b5a04a6a14f5 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/_ml_client.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/_ml_client.py @@ -24,26 +24,56 @@ from azure.ai.ml._restclient.v2020_09_01_dataplanepreview import ( AzureMachineLearningWorkspaces as ServiceClient092020DataplanePreview, ) -from azure.ai.ml._restclient.v2022_02_01_preview import AzureMachineLearningWorkspaces as ServiceClient022022Preview -from azure.ai.ml._restclient.v2022_05_01 import AzureMachineLearningWorkspaces as ServiceClient052022 -from azure.ai.ml._restclient.v2022_10_01 import AzureMachineLearningWorkspaces as ServiceClient102022 -from azure.ai.ml._restclient.v2022_10_01_preview import AzureMachineLearningWorkspaces as ServiceClient102022Preview -from azure.ai.ml._restclient.v2023_02_01_preview import AzureMachineLearningWorkspaces as ServiceClient022023Preview -from azure.ai.ml._restclient.v2023_04_01 import AzureMachineLearningWorkspaces as ServiceClient042023 -from azure.ai.ml._restclient.v2023_04_01_preview import AzureMachineLearningWorkspaces as ServiceClient042023Preview -from azure.ai.ml._restclient.v2023_06_01_preview import AzureMachineLearningWorkspaces as ServiceClient062023Preview -from azure.ai.ml._restclient.v2023_08_01_preview import AzureMachineLearningWorkspaces as ServiceClient082023Preview +from azure.ai.ml._restclient.v2022_02_01_preview import ( + AzureMachineLearningWorkspaces as ServiceClient022022Preview, +) +from azure.ai.ml._restclient.v2022_05_01 import ( + AzureMachineLearningWorkspaces as ServiceClient052022, +) +from azure.ai.ml._restclient.v2022_10_01 import ( + AzureMachineLearningWorkspaces as ServiceClient102022, +) +from azure.ai.ml._restclient.v2022_10_01_preview import ( + AzureMachineLearningWorkspaces as ServiceClient102022Preview, +) +from azure.ai.ml._restclient.v2023_02_01_preview import ( + AzureMachineLearningWorkspaces as ServiceClient022023Preview, +) +from azure.ai.ml._restclient.v2023_04_01 import ( + AzureMachineLearningWorkspaces as ServiceClient042023, +) +from azure.ai.ml._restclient.v2023_04_01_preview import ( + AzureMachineLearningWorkspaces as ServiceClient042023Preview, +) +from azure.ai.ml._restclient.v2023_06_01_preview import ( + AzureMachineLearningWorkspaces as ServiceClient062023Preview, +) +from azure.ai.ml._restclient.v2023_08_01_preview import ( + AzureMachineLearningWorkspaces as ServiceClient082023Preview, +) # Same object, but was renamed starting in v2023_08_01_preview from azure.ai.ml._restclient.v2023_10_01 import AzureMachineLearningServices as ServiceClient102023 -from azure.ai.ml._restclient.v2024_01_01_preview import AzureMachineLearningWorkspaces as ServiceClient012024Preview -from azure.ai.ml._restclient.v2024_04_01_preview import AzureMachineLearningWorkspaces as ServiceClient042024Preview -from azure.ai.ml._restclient.v2024_07_01_preview import AzureMachineLearningWorkspaces as ServiceClient072024Preview -from azure.ai.ml._restclient.v2024_10_01_preview import AzureMachineLearningWorkspaces as ServiceClient102024Preview +from azure.ai.ml._restclient.v2024_01_01_preview import ( + AzureMachineLearningWorkspaces as ServiceClient012024Preview, +) +from azure.ai.ml._restclient.v2024_04_01_preview import ( + AzureMachineLearningWorkspaces as ServiceClient042024Preview, +) +from azure.ai.ml._restclient.v2024_07_01_preview import ( + AzureMachineLearningWorkspaces as ServiceClient072024Preview, +) +from azure.ai.ml._restclient.v2024_10_01_preview import ( + AzureMachineLearningWorkspaces as ServiceClient102024Preview, +) from azure.ai.ml._restclient.workspace_dataplane import ( AzureMachineLearningWorkspaces as ServiceClientWorkspaceDataplane, ) -from azure.ai.ml._scope_dependent_operations import OperationConfig, OperationsContainer, OperationScope +from azure.ai.ml._scope_dependent_operations import ( + OperationConfig, + OperationsContainer, + OperationScope, +) from azure.ai.ml._telemetry.logging_handler import get_appinsights_log_handler from azure.ai.ml._user_agent import USER_AGENT from azure.ai.ml._utils._experimental import experimental @@ -102,7 +132,9 @@ from azure.ai.ml.operations._local_deployment_helper import _LocalDeploymentHelper from azure.ai.ml.operations._local_endpoint_helper import _LocalEndpointHelper from azure.ai.ml.operations._schedule_operations import ScheduleOperations -from azure.ai.ml.operations._workspace_outbound_rule_operations import WorkspaceOutboundRuleOperations +from azure.ai.ml.operations._workspace_outbound_rule_operations import ( + WorkspaceOutboundRuleOperations, +) from azure.core.credentials import TokenCredential from azure.core.polling import LROPoller @@ -184,7 +216,9 @@ def __init__( self._ws_sub: Any = None show_progress = kwargs.pop("show_progress", True) enable_telemetry = kwargs.pop("enable_telemetry", True) - self._operation_config = OperationConfig(show_progress=show_progress, enable_telemetry=enable_telemetry) + self._operation_config = OperationConfig( + show_progress=show_progress, enable_telemetry=enable_telemetry + ) if "cloud" in kwargs: cloud_name = kwargs["cloud"] @@ -229,7 +263,9 @@ def __init__( workspace_reference = kwargs.pop("workspace_reference", None) if workspace_reference or registry_reference: ws_ops = WorkspaceOperations( - OperationScope(str(subscription_id), str(resource_group_name), workspace_reference), + OperationScope( + str(subscription_id), str(resource_group_name), workspace_reference + ), ServiceClient042023Preview( credential=self._credential, subscription_id=subscription_id, @@ -239,7 +275,9 @@ def __init__( ) self._ws_rg = resource_group_name self._ws_sub = subscription_id - workspace_details = ws_ops.get(workspace_reference if workspace_reference else workspace_name) + workspace_details = ws_ops.get( + workspace_reference if workspace_reference else workspace_name + ) workspace_location, workspace_id = ( workspace_details.location, workspace_details._workspace_id, @@ -250,7 +288,10 @@ def __init__( resource_group_name, subscription_id, ) = get_registry_client( - self._credential, registry_name if registry_name else registry_reference, workspace_location, **kwargs + self._credential, + registry_name if registry_name else registry_reference, + workspace_location, + **kwargs, ) if not workspace_name: workspace_name = workspace_reference @@ -284,7 +325,9 @@ def __init__( **{"properties": properties}, enable_telemetry=self._operation_config.enable_telemetry, ) - app_insights_handler_kwargs: Dict[str, Tuple] = {"app_insights_handler": app_insights_handler} + app_insights_handler_kwargs: Dict[str, Tuple] = { + "app_insights_handler": app_insights_handler + } base_url = _get_base_url_from_metadata(cloud_name=cloud_name, is_local_mfe=True) self._base_url = base_url @@ -586,7 +629,11 @@ def __init__( self._code = CodeOperations( self._ws_operation_scope if registry_reference else self._operation_scope, self._operation_config, - self._service_client_10_2021_dataplanepreview if registry_name else self._service_client_04_2023, + ( + self._service_client_10_2021_dataplanepreview + if registry_name + else self._service_client_04_2023 + ), self._datastores, **ops_kwargs, # type: ignore[arg-type] ) @@ -594,12 +641,18 @@ def __init__( self._environments = EnvironmentOperations( self._ws_operation_scope if registry_reference else self._operation_scope, self._operation_config, - self._service_client_10_2021_dataplanepreview if registry_name else self._service_client_04_2023_preview, + ( + self._service_client_10_2021_dataplanepreview + if registry_name + else self._service_client_04_2023_preview + ), self._operation_container, **ops_kwargs, # type: ignore[arg-type] ) self._operation_container.add(AzureMLResourceType.ENVIRONMENT, self._environments) - self._local_endpoint_helper = _LocalEndpointHelper(requests_pipeline=self._requests_pipeline) + self._local_endpoint_helper = _LocalEndpointHelper( + requests_pipeline=self._requests_pipeline + ) self._local_deployment_helper = _LocalDeploymentHelper(self._operation_container) self._online_endpoints = OnlineEndpointOperations( self._operation_scope, @@ -643,12 +696,18 @@ def __init__( service_client_02_2023_preview=self._service_client_02_2023_preview, **ops_kwargs, ) - self._operation_container.add(AzureMLResourceType.ONLINE_DEPLOYMENT, self._online_deployments) + self._operation_container.add( + AzureMLResourceType.ONLINE_DEPLOYMENT, self._online_deployments + ) self._operation_container.add(AzureMLResourceType.BATCH_DEPLOYMENT, self._batch_deployments) self._data = DataOperations( self._operation_scope, self._operation_config, - self._service_client_10_2021_dataplanepreview if registry_name else self._service_client_04_2023_preview, + ( + self._service_client_10_2021_dataplanepreview + if registry_name + else self._service_client_04_2023_preview + ), self._service_client_01_2024_preview, self._datastores, requests_pipeline=self._requests_pipeline, @@ -659,7 +718,11 @@ def __init__( self._components = ComponentOperations( self._operation_scope, self._operation_config, - self._service_client_10_2021_dataplanepreview if registry_name else self._service_client_01_2024_preview, + ( + self._service_client_10_2021_dataplanepreview + if registry_name + else self._service_client_01_2024_preview + ), self._operation_container, self._preflight, **ops_kwargs, # type: ignore[arg-type] @@ -714,7 +777,9 @@ def __init__( AzureMLResourceType.VIRTUALCLUSTER, self._virtual_clusters # type: ignore[arg-type] ) except Exception as ex: # pylint: disable=broad-except - module_logger.debug("Virtual Cluster operations could not be initialized due to %s ", ex) + module_logger.debug( + "Virtual Cluster operations could not be initialized due to %s ", ex + ) self._featurestores = FeatureStoreOperations( self._operation_scope, @@ -759,9 +824,15 @@ def __init__( ) self._operation_container.add(AzureMLResourceType.FEATURE_STORE, self._featurestores) # type: ignore[arg-type] self._operation_container.add(AzureMLResourceType.FEATURE_SET, self._featuresets) - self._operation_container.add(AzureMLResourceType.FEATURE_STORE_ENTITY, self._featurestoreentities) - self._operation_container.add(AzureMLResourceType.SERVERLESS_ENDPOINT, self._serverless_endpoints) - self._operation_container.add(AzureMLResourceType.MARKETPLACE_SUBSCRIPTION, self._marketplace_subscriptions) + self._operation_container.add( + AzureMLResourceType.FEATURE_STORE_ENTITY, self._featurestoreentities + ) + self._operation_container.add( + AzureMLResourceType.SERVERLESS_ENDPOINT, self._serverless_endpoints + ) + self._operation_container.add( + AzureMLResourceType.MARKETPLACE_SUBSCRIPTION, self._marketplace_subscriptions + ) @classmethod def from_config( # pylint: disable=C4758 @@ -873,7 +944,9 @@ def from_config( # pylint: disable=C4758 error_category=ErrorCategory.USER_ERROR, ) - subscription_id, resource_group, workspace_name = MLClient._get_workspace_info(str(found_path)) + subscription_id, resource_group, workspace_name = MLClient._get_workspace_info( + str(found_path) + ) module_logger.info("Found the config file in: %s", found_path) return MLClient( @@ -885,7 +958,9 @@ def from_config( # pylint: disable=C4758 ) @classmethod - def _ml_client_cli(cls, credentials: TokenCredential, subscription_id: Optional[str], **kwargs) -> "MLClient": + def _ml_client_cli( + cls, credentials: TokenCredential, subscription_id: Optional[str], **kwargs + ) -> "MLClient": """This method provides a way to create MLClient object for cli to leverage cli context for authentication. With this we do not have to use AzureCliCredentials from azure-identity package (not meant for heavy usage). The @@ -1176,7 +1251,9 @@ def _get_workspace_info(cls, found_path: Optional[str]) -> Tuple[str, str, str]: # Checking the keys in the config.json file to check for required parameters. scope = config.get("Scope") if not scope: - if not all(k in config.keys() for k in ("subscription_id", "resource_group", "workspace_name")): + if not all( + k in config.keys() for k in ("subscription_id", "resource_group", "workspace_name") + ): msg = ( "The config file found in: {} does not seem to contain the required " "parameters. Please make sure it contains your subscription_id, " @@ -1376,7 +1453,9 @@ def _(entity: BatchEndpoint, operations, *args, **kwargs): @_begin_create_or_update.register(OnlineDeployment) def _(entity: OnlineDeployment, operations, *args, **kwargs): module_logger.debug("Creating or updating online_deployments") - return operations[AzureMLResourceType.ONLINE_DEPLOYMENT].begin_create_or_update(entity, **kwargs) + return operations[AzureMLResourceType.ONLINE_DEPLOYMENT].begin_create_or_update( + entity, **kwargs + ) @_begin_create_or_update.register(BatchDeployment) @@ -1406,10 +1485,14 @@ def _(entity: Schedule, operations, *args, **kwargs): @_begin_create_or_update.register(ServerlessEndpoint) def _(entity: ServerlessEndpoint, operations, *args, **kwargs): module_logger.debug("Creating or updating serverless endpoints") - return operations[AzureMLResourceType.SERVERLESS_ENDPOINT].begin_create_or_update(entity, **kwargs) + return operations[AzureMLResourceType.SERVERLESS_ENDPOINT].begin_create_or_update( + entity, **kwargs + ) @_begin_create_or_update.register(MarketplaceSubscription) def _(entity: MarketplaceSubscription, operations, *args, **kwargs): module_logger.debug("Creating or updating marketplace subscriptions") - return operations[AzureMLResourceType.MARKETPLACE_SUBSCRIPTION].begin_create_or_update(entity, **kwargs) + return operations[AzureMLResourceType.MARKETPLACE_SUBSCRIPTION].begin_create_or_update( + entity, **kwargs + ) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py index a6b8f858bfc1..11d3ead409a2 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py @@ -6,31 +6,61 @@ import re from typing import Any, Dict, Optional, Union -from azure.ai.ml._restclient.v2023_04_01_preview.models import CustomModelJobInput as RestCustomModelJobInput -from azure.ai.ml._restclient.v2023_04_01_preview.models import CustomModelJobOutput as RestCustomModelJobOutput +from azure.ai.ml._restclient.v2023_04_01_preview.models import ( + CustomModelJobInput as RestCustomModelJobInput, +) +from azure.ai.ml._restclient.v2023_04_01_preview.models import ( + CustomModelJobOutput as RestCustomModelJobOutput, +) from azure.ai.ml._restclient.v2023_04_01_preview.models import InputDeliveryMode from azure.ai.ml._restclient.v2023_04_01_preview.models import JobInput as RestJobInput from azure.ai.ml._restclient.v2023_04_01_preview.models import JobInputType from azure.ai.ml._restclient.v2023_04_01_preview.models import JobOutput as RestJobOutput from azure.ai.ml._restclient.v2023_04_01_preview.models import JobOutputType, LiteralJobInput -from azure.ai.ml._restclient.v2023_04_01_preview.models import MLFlowModelJobInput as RestMLFlowModelJobInput -from azure.ai.ml._restclient.v2023_04_01_preview.models import MLFlowModelJobOutput as RestMLFlowModelJobOutput -from azure.ai.ml._restclient.v2023_04_01_preview.models import MLTableJobInput as RestMLTableJobInput -from azure.ai.ml._restclient.v2023_04_01_preview.models import MLTableJobOutput as RestMLTableJobOutput +from azure.ai.ml._restclient.v2023_04_01_preview.models import ( + MLFlowModelJobInput as RestMLFlowModelJobInput, +) +from azure.ai.ml._restclient.v2023_04_01_preview.models import ( + MLFlowModelJobOutput as RestMLFlowModelJobOutput, +) +from azure.ai.ml._restclient.v2023_04_01_preview.models import ( + MLTableJobInput as RestMLTableJobInput, +) +from azure.ai.ml._restclient.v2023_04_01_preview.models import ( + MLTableJobOutput as RestMLTableJobOutput, +) from azure.ai.ml._restclient.v2023_04_01_preview.models import OutputDeliveryMode -from azure.ai.ml._restclient.v2023_04_01_preview.models import TritonModelJobInput as RestTritonModelJobInput -from azure.ai.ml._restclient.v2023_04_01_preview.models import TritonModelJobOutput as RestTritonModelJobOutput -from azure.ai.ml._restclient.v2023_04_01_preview.models import UriFileJobInput as RestUriFileJobInput -from azure.ai.ml._restclient.v2023_04_01_preview.models import UriFileJobOutput as RestUriFileJobOutput -from azure.ai.ml._restclient.v2023_04_01_preview.models import UriFolderJobInput as RestUriFolderJobInput -from azure.ai.ml._restclient.v2023_04_01_preview.models import UriFolderJobOutput as RestUriFolderJobOutput +from azure.ai.ml._restclient.v2023_04_01_preview.models import ( + TritonModelJobInput as RestTritonModelJobInput, +) +from azure.ai.ml._restclient.v2023_04_01_preview.models import ( + TritonModelJobOutput as RestTritonModelJobOutput, +) +from azure.ai.ml._restclient.v2023_04_01_preview.models import ( + UriFileJobInput as RestUriFileJobInput, +) +from azure.ai.ml._restclient.v2023_04_01_preview.models import ( + UriFileJobOutput as RestUriFileJobOutput, +) +from azure.ai.ml._restclient.v2023_04_01_preview.models import ( + UriFolderJobInput as RestUriFolderJobInput, +) +from azure.ai.ml._restclient.v2023_04_01_preview.models import ( + UriFolderJobOutput as RestUriFolderJobOutput, +) from azure.ai.ml._utils.utils import is_data_binding_expression from azure.ai.ml.constants import AssetTypes, InputOutputModes, JobType from azure.ai.ml.constants._component import IOConstants from azure.ai.ml.entities._inputs_outputs import Input, Output from azure.ai.ml.entities._job.input_output_entry import InputOutputEntry from azure.ai.ml.entities._util import normalize_job_input_output_type -from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, JobException, ValidationErrorType, ValidationException +from azure.ai.ml.exceptions import ( + ErrorCategory, + ErrorTarget, + JobException, + ValidationErrorType, + ValidationException, +) INPUT_MOUNT_MAPPING_FROM_REST = { InputDeliveryMode.READ_WRITE_MOUNT: InputOutputModes.RW_MOUNT, @@ -132,7 +162,9 @@ def build_input_output( return item -def _validate_inputs_for(input_consumer_name: str, input_consumer: str, inputs: Optional[Dict]) -> None: +def _validate_inputs_for( + input_consumer_name: str, input_consumer: str, inputs: Optional[Dict] +) -> None: implicit_inputs = re.findall(r"\${{inputs\.([\w\.-]+)}}", input_consumer) # optional inputs no need to validate whether they're in inputs optional_inputs = re.findall(r"\[[\w\.\s-]*\${{inputs\.([\w\.-]+)}}]", input_consumer) @@ -174,9 +206,7 @@ def validate_pipeline_input_key_characters(key: str) -> None: # Note: ([a-zA-Z_]+[a-zA-Z0-9_]*) is a valid single key, # so a valid pipeline key is: ^{single_key}([.]{single_key})*$ if re.match(IOConstants.VALID_KEY_PATTERN, key) is None: - msg = ( - "Pipeline input key name {} must be composed letters, numbers, and underscores with optional split by dots." - ) + msg = "Pipeline input key name {} must be composed letters, numbers, and underscores with optional split by dots." raise ValidationException( message=msg.format(key), no_personal_data_message=msg.format("[key]"), @@ -229,7 +259,11 @@ def to_rest_dataset_literal_inputs( if input_value.type in target_cls_dict: input_data = target_cls_dict[input_value.type]( uri=input_value.path, - mode=INPUT_MOUNT_MAPPING_TO_REST[input_value.mode.lower()] if input_value.mode else None, + mode=( + INPUT_MOUNT_MAPPING_TO_REST[input_value.mode.lower()] + if input_value.mode + else None + ), ) else: @@ -292,7 +326,11 @@ def from_rest_inputs_to_dataset_literal(inputs: Dict[str, RestJobInput]) -> Dict input_data = Input( type=type_transfer_dict[input_value.job_input_type], path=path, - mode=INPUT_MOUNT_MAPPING_FROM_REST[input_value.mode] if input_value.mode else None, + mode=( + INPUT_MOUNT_MAPPING_FROM_REST[input_value.mode] + if input_value.mode + else None + ), path_on_compute=sourcePathOnCompute, ) elif input_value.job_input_type in (JobInputType.LITERAL, JobInputType.LITERAL): @@ -331,13 +369,19 @@ def to_rest_data_outputs(outputs: Optional[Dict]) -> Dict[str, RestJobOutput]: else: target_cls_dict = get_output_rest_cls_dict() - output_value_type = output_value.type if output_value.type else AssetTypes.URI_FOLDER + output_value_type = ( + output_value.type if output_value.type else AssetTypes.URI_FOLDER + ) if output_value_type in target_cls_dict: output = target_cls_dict[output_value_type]( asset_name=output_value.name, asset_version=output_value.version, uri=output_value.path, - mode=OUTPUT_MOUNT_MAPPING_TO_REST[output_value.mode.lower()] if output_value.mode else None, + mode=( + OUTPUT_MOUNT_MAPPING_TO_REST[output_value.mode.lower()] + if output_value.mode + else None + ), pathOnCompute=getattr(output_value, "path_on_compute", None), description=output_value.description, ) @@ -380,7 +424,9 @@ def from_rest_data_outputs(outputs: Dict[str, RestJobOutput]) -> Dict[str, Outpu from_rest_outputs[output_name] = Output( type=output_type_mapping[output_value.job_output_type], path=output_value.uri, - mode=OUTPUT_MOUNT_MAPPING_FROM_REST[output_value.mode] if output_value.mode else None, + mode=( + OUTPUT_MOUNT_MAPPING_FROM_REST[output_value.mode] if output_value.mode else None + ), path_on_compute=sourcePathOnCompute, description=output_value.description, name=output_value.asset_name, diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/custom_model_finetuning_job.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/custom_model_finetuning_job.py index ad9376eb3fe6..009f0a8df455 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/custom_model_finetuning_job.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/custom_model_finetuning_job.py @@ -12,7 +12,10 @@ FineTuningJob as RestFineTuningJob, JobBase as RestJobBase, ) -from azure.ai.ml.entities._job._input_output_helpers import from_rest_data_outputs, to_rest_data_outputs +from azure.ai.ml.entities._job._input_output_helpers import ( + from_rest_data_outputs, + to_rest_data_outputs, +) from azure.ai.ml.entities._inputs_outputs import Input from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY from azure.ai.ml.entities._job.finetuning.finetuning_vertical import FineTuningVertical @@ -101,7 +104,9 @@ def _to_dict(self) -> Dict: # pylint: disable=arguments-differ :return: dictionary representation of the object. :rtype: typing.Dict """ - from azure.ai.ml._schema._finetuning.custom_model_finetuning import CustomModelFineTuningSchema + from azure.ai.ml._schema._finetuning.custom_model_finetuning import ( + CustomModelFineTuningSchema, + ) schema_dict: dict = {} # TODO: Combeback to this later for FineTuningJob in pipeline @@ -196,7 +201,9 @@ def _load_from_dict( :return: CustomModelFineTuningJob object. :rtype: CustomModelFineTuningJob """ - from azure.ai.ml._schema._finetuning.custom_model_finetuning import CustomModelFineTuningSchema + from azure.ai.ml._schema._finetuning.custom_model_finetuning import ( + CustomModelFineTuningSchema, + ) # TODO: Combeback to this later - Pipeline part. # from azure.ai.ml._schema.pipeline.automl_node import AutoMLClassificationNodeSchema @@ -210,7 +217,9 @@ def _load_from_dict( # **kwargs, # ) # else: - loaded_data = load_from_dict(CustomModelFineTuningSchema, data, context, additional_message, **kwargs) + loaded_data = load_from_dict( + CustomModelFineTuningSchema, data, context, additional_message, **kwargs + ) training_data = loaded_data.get("training_data", None) if isinstance(training_data, str): diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_job.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_job.py index 27f3422eb4b6..6436df274207 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_job.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_job.py @@ -90,7 +90,9 @@ def _load_from_rest(cls, obj: RestJobBase) -> "FineTuningJob": if class_type: res: FineTuningJob = class_type._from_rest_object(obj) return res - msg = f"Unsupported model provider type: {obj.properties.fine_tuning_details.model_provider}" + msg = ( + f"Unsupported model provider type: {obj.properties.fine_tuning_details.model_provider}" + ) raise ValidationException( message=msg, no_personal_data_message=msg, diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_vertical.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_vertical.py index e8049e39bd3a..fe40c8d09152 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_vertical.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_vertical.py @@ -170,7 +170,9 @@ def _restore_inputs(self) -> None: type=AssetTypes.URI_FILE, path=self.validation_data.uri # pylint: disable=no-member ) if isinstance(self.model, MLFlowModelJobInput): - self.model = Input(type=AssetTypes.MLFLOW_MODEL, path=self.model.uri) # pylint: disable=no-member + self.model = Input( + type=AssetTypes.MLFLOW_MODEL, path=self.model.uri + ) # pylint: disable=no-member def __eq__(self, other: object) -> bool: """Returns True if both instances have the same values. diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/job.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/job.py index a1d68170ca59..c3c7bdce7d6e 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/job.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/job.py @@ -17,10 +17,16 @@ from azure.ai.ml._restclient.v2023_04_01_preview.models import JobBase, JobService from azure.ai.ml._restclient.v2023_04_01_preview.models import JobType as RestJobType from azure.ai.ml._restclient.v2024_01_01_preview.models import JobBase as JobBase_2401 -from azure.ai.ml._restclient.v2024_01_01_preview.models import JobType as RestJobType_20240101Preview +from azure.ai.ml._restclient.v2024_01_01_preview.models import ( + JobType as RestJobType_20240101Preview, +) from azure.ai.ml._utils._html_utils import make_link, to_html from azure.ai.ml._utils.utils import dump_yaml_to_file -from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY, CommonYamlFields +from azure.ai.ml.constants._common import ( + BASE_PATH_CONTEXT_KEY, + PARAMS_OVERRIDE_KEY, + CommonYamlFields, +) from azure.ai.ml.constants._compute import ComputeType from azure.ai.ml.constants._job.job import JobServices, JobType from azure.ai.ml.entities._mixins import TelemetryMixin @@ -191,7 +197,10 @@ def _repr_html_(self) -> str: if self.studio_url: info.update( [ - ("Details Page", make_link(self.studio_url, "Link to Azure Machine Learning studio")), + ( + "Details Page", + make_link(self.studio_url, "Link to Azure Machine Learning studio"), + ), ] ) res: str = to_html(info) @@ -202,7 +211,9 @@ def _to_dict(self) -> Dict: pass @classmethod - def _resolve_cls_and_type(cls, data: Dict, params_override: Optional[List[Dict]] = None) -> Tuple: + def _resolve_cls_and_type( + cls, data: Dict, params_override: Optional[List[Dict]] = None + ) -> Tuple: from azure.ai.ml.entities._builders.command import Command from azure.ai.ml.entities._builders.spark import Spark from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob @@ -214,7 +225,9 @@ def _resolve_cls_and_type(cls, data: Dict, params_override: Optional[List[Dict]] job_type: Optional[Type["Job"]] = None type_in_override = find_type_in_override(params_override) - type_str = type_in_override or data.get(CommonYamlFields.TYPE, JobType.COMMAND) # override takes the priority + type_str = type_in_override or data.get( + CommonYamlFields.TYPE, JobType.COMMAND + ) # override takes the priority if type_str == JobType.COMMAND: job_type = Command elif type_str == JobType.SPARK: @@ -306,7 +319,9 @@ def _from_rest_object( # pylint: disable=too-many-return-statements if obj.properties.job_type == RestJobType.COMMAND: # PrP only until new import job type is ready on MFE in PuP # compute type 'DataFactory' is reserved compute name for 'clusterless' ADF jobs - if obj.properties.compute_id and obj.properties.compute_id.endswith("/" + ComputeType.ADF): + if obj.properties.compute_id and obj.properties.compute_id.endswith( + "/" + ComputeType.ADF + ): return ImportJob._load_from_rest(obj) res_command: Job = Command._load_from_rest_job(obj) @@ -334,7 +349,10 @@ def _from_rest_object( # pylint: disable=too-many-return-statements except Exception as ex: error_message = json.dumps(obj.as_dict(), indent=2) if obj else None module_logger.info( - "Exception: %s.\n%s\nUnable to parse the job resource: %s.\n", ex, traceback.format_exc(), error_message + "Exception: %s.\n%s\nUnable to parse the job resource: %s.\n", + ex, + traceback.format_exc(), + error_message, ) raise JobParsingError( message=str(ex), @@ -355,5 +373,7 @@ def _get_telemetry_values(self) -> Dict: # pylint: disable=arguments-differ @classmethod @abstractmethod - def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Job": + def _load_from_dict( + cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any + ) -> "Job": pass diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/queue_settings.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/queue_settings.py index 36b715296c09..50485c129867 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/queue_settings.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/queue_settings.py @@ -47,19 +47,29 @@ def __init__( def _to_rest_object(self) -> RestQueueSettings: self._validate() - job_tier = JobTierNames.ENTITY_TO_REST.get(self.job_tier.lower(), None) if self.job_tier else None - priority = JobPriorityValues.ENTITY_TO_REST.get(self.priority.lower(), None) if self.priority else None + job_tier = ( + JobTierNames.ENTITY_TO_REST.get(self.job_tier.lower(), None) if self.job_tier else None + ) + priority = ( + JobPriorityValues.ENTITY_TO_REST.get(self.priority.lower(), None) + if self.priority + else None + ) return RestQueueSettings(job_tier=job_tier, priority=priority) @classmethod - def _from_rest_object(cls, obj: Union[Dict[str, Any], RestQueueSettings, None]) -> Optional["QueueSettings"]: + def _from_rest_object( + cls, obj: Union[Dict[str, Any], RestQueueSettings, None] + ) -> Optional["QueueSettings"]: if obj is None: return None if isinstance(obj, dict): queue_settings = RestQueueSettings.from_dict(obj) return cls._from_rest_object(queue_settings) job_tier = JobTierNames.REST_TO_ENTITY.get(obj.job_tier, None) if obj.job_tier else None - priority = JobPriorityValues.REST_TO_ENTITY.get(obj.priority, None) if obj.priority else None + priority = ( + JobPriorityValues.REST_TO_ENTITY.get(obj.priority, None) if obj.priority else None + ) return cls(job_tier=job_tier, priority=priority) def _validate(self) -> None: diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_job_operations.py b/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_job_operations.py index b986483134e6..2382713091c4 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_job_operations.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_job_operations.py @@ -23,11 +23,19 @@ _resource_to_scopes, ) from azure.ai.ml._exception_helper import log_and_raise_error -from azure.ai.ml._restclient.dataset_dataplane import AzureMachineLearningWorkspaces as ServiceClientDatasetDataplane -from azure.ai.ml._restclient.model_dataplane import AzureMachineLearningWorkspaces as ServiceClientModelDataplane -from azure.ai.ml._restclient.runhistory import AzureMachineLearningWorkspaces as ServiceClientRunHistory +from azure.ai.ml._restclient.dataset_dataplane import ( + AzureMachineLearningWorkspaces as ServiceClientDatasetDataplane, +) +from azure.ai.ml._restclient.model_dataplane import ( + AzureMachineLearningWorkspaces as ServiceClientModelDataplane, +) +from azure.ai.ml._restclient.runhistory import ( + AzureMachineLearningWorkspaces as ServiceClientRunHistory, +) from azure.ai.ml._restclient.runhistory.models import Run -from azure.ai.ml._restclient.v2023_04_01_preview import AzureMachineLearningWorkspaces as ServiceClient022023Preview +from azure.ai.ml._restclient.v2023_04_01_preview import ( + AzureMachineLearningWorkspaces as ServiceClient022023Preview, +) from azure.ai.ml._restclient.v2023_04_01_preview.models import JobBase, ListViewType, UserIdentity from azure.ai.ml._restclient.v2023_08_01_preview.models import JobType as RestJobType from azure.ai.ml._restclient.v2024_01_01_preview.models import JobBase as JobBase_2401 @@ -105,7 +113,11 @@ from ._component_operations import ComponentOperations from ._compute_operations import ComputeOperations from ._dataset_dataplane_operations import DatasetDataplaneOperations -from ._job_ops_helper import get_git_properties, get_job_output_uris_from_dataplane, stream_logs_until_completion +from ._job_ops_helper import ( + get_git_properties, + get_job_output_uris_from_dataplane, + stream_logs_until_completion, +) from ._local_job_invoker import is_local_run, start_run_if_local from ._model_dataplane_operations import ModelDataplaneOperations from ._operation_orchestrator import ( @@ -207,7 +219,8 @@ def _virtual_cluster_operations(self) -> VirtualClusterOperations: return cast( VirtualClusterOperations, self._all_operations.get_operation( # type: ignore[misc] - AzureMLResourceType.VIRTUALCLUSTER, lambda x: isinstance(x, VirtualClusterOperations) + AzureMLResourceType.VIRTUALCLUSTER, + lambda x: isinstance(x, VirtualClusterOperations), ), ) @@ -215,7 +228,9 @@ def _virtual_cluster_operations(self) -> VirtualClusterOperations: def _datastore_operations(self) -> "DatastoreOperations": from azure.ai.ml.operations import DatastoreOperations - return cast(DatastoreOperations, self._all_operations.all_operations[AzureMLResourceType.DATASTORE]) + return cast( + DatastoreOperations, self._all_operations.all_operations[AzureMLResourceType.DATASTORE] + ) @property def _runs_operations(self) -> RunOperations: @@ -360,7 +375,9 @@ def get(self, name: str) -> Job: else: # Child jobs are no longer available through MFE, fetch # through run history instead - job = self._runs_operations._translate_from_rest_object(self._runs_operations.get_run(name)) + job = self._runs_operations._translate_from_rest_object( + self._runs_operations.get_run(name) + ) return job @@ -387,13 +404,18 @@ def show_services(self, name: str, node_index: int = 0) -> Optional[Dict[str, Se """ service_instances_dict = self._runs_operations._operation.get_run_service_instances( - self._subscription_id, self._operation_scope.resource_group_name, self._workspace_name, name, node_index + self._subscription_id, + self._operation_scope.resource_group_name, + self._workspace_name, + name, + node_index, ) if not service_instances_dict.instances: return None return { - k: ServiceInstance._from_rest_object(v, node_index) for k, v in service_instances_dict.instances.items() + k: ServiceInstance._from_rest_object(v, node_index) + for k, v in service_instances_dict.instances.items() } @distributed_trace @@ -441,7 +463,9 @@ def begin_cancel(self, name: str, **kwargs: Any) -> LROPoller[None]: results.append(result) return results - def _try_get_compute_arm_id(self, compute: Union[Compute, str]) -> Optional[Union[Compute, str]]: + def _try_get_compute_arm_id( + self, compute: Union[Compute, str] + ) -> Optional[Union[Compute, str]]: # pylint: disable=too-many-return-statements # TODO: Remove in PuP with native import job/component type support in MFE/Designer # DataFactory 'clusterless' job @@ -488,7 +512,9 @@ def _try_get_compute_arm_id(self, compute: Union[Compute, str]) -> Optional[Unio @distributed_trace @monitor_with_telemetry_mixin(ops_logger, "Job.Validate", ActivityType.PUBLICAPI) - def validate(self, job: Job, *, raise_on_failure: bool = False, **kwargs: Any) -> ValidationResult: + def validate( + self, job: Job, *, raise_on_failure: bool = False, **kwargs: Any + ) -> ValidationResult: """Validates a Job object before submitting to the service. Anonymous assets may be created if there are inline defined entities such as Component, Environment, and Code. Only pipeline jobs are supported for validation currently. @@ -513,7 +539,11 @@ def validate(self, job: Job, *, raise_on_failure: bool = False, **kwargs: Any) - @monitor_with_telemetry_mixin(ops_logger, "Job.Validate", ActivityType.INTERNALCALL) def _validate( - self, job: Job, *, raise_on_failure: bool = False, **kwargs: Any # pylint:disable=unused-argument + self, + job: Job, + *, + raise_on_failure: bool = False, + **kwargs: Any, # pylint:disable=unused-argument ) -> ValidationResult: """Implementation of validate. @@ -528,7 +558,9 @@ def _validate( :return: The validation result :rtype: ValidationResult """ - git_code_validation_result = PathAwareSchemaValidatableMixin._create_empty_validation_result() + git_code_validation_result = ( + PathAwareSchemaValidatableMixin._create_empty_validation_result() + ) # TODO: move this check to Job._validate after validation is supported for all job types # If private features are enable and job has code value of type str we need to check # that it is a valid git path case. Otherwise we should throw a ValidationException @@ -576,10 +608,14 @@ def error_func(msg: str, no_personal_data_msg: str) -> ValidationException: if not isinstance(node, ControlFlowNode): node.compute = self._try_get_compute_arm_id(node.compute) except Exception as e: # pylint: disable=W0718 - validation_result.append_error(yaml_path=f"jobs.{node_name}.compute", message=str(e)) + validation_result.append_error( + yaml_path=f"jobs.{node_name}.compute", message=str(e) + ) validation_result.resolve_location_for_diagnostics(str(job._source_path)) - return job._try_raise(validation_result, raise_error=raise_on_failure) # pylint: disable=protected-access + return job._try_raise( + validation_result, raise_error=raise_on_failure + ) # pylint: disable=protected-access @distributed_trace @monitor_with_telemetry_mixin(ops_logger, "Job.CreateOrUpdate", ActivityType.PUBLICAPI) @@ -683,7 +719,9 @@ def create_or_update( ): self._set_headers_with_user_aml_token(kwargs) - result = self._create_or_update_with_different_version_api(rest_job_resource=rest_job_resource, **kwargs) + result = self._create_or_update_with_different_version_api( + rest_job_resource=rest_job_resource, **kwargs + ) if is_local_run(result): ws_base_url = self._all_operations.all_operations[ @@ -714,7 +752,9 @@ def create_or_update( if snapshot_id is not None: job_object.properties.properties["ContentSnapshotId"] = snapshot_id - result = self._create_or_update_with_latest_version_api(rest_job_resource=job_object, **kwargs) + result = self._create_or_update_with_latest_version_api( + rest_job_resource=job_object, **kwargs + ) return self._resolve_azureml_id(Job._from_rest_object(result)) @@ -831,7 +871,10 @@ def stream(self, name: str) -> None: raise PipelineChildJobError(job_id=job_object.id) self._stream_logs_until_completion( - self._runs_operations, job_object, self._datastore_operations, requests_pipeline=self._requests_pipeline + self._runs_operations, + job_object, + self._datastore_operations, + requests_pipeline=self._requests_pipeline, ) @distributed_trace @@ -871,12 +914,17 @@ def download( job_details = self.get(name) # job is reused, get reused job to download if ( - job_details.properties.get(PipelineConstants.REUSED_FLAG_FIELD) == PipelineConstants.REUSED_FLAG_TRUE + job_details.properties.get(PipelineConstants.REUSED_FLAG_FIELD) + == PipelineConstants.REUSED_FLAG_TRUE and PipelineConstants.REUSED_JOB_ID in job_details.properties ): reused_job_name = job_details.properties[PipelineConstants.REUSED_JOB_ID] reused_job_detail = self.get(reused_job_name) - module_logger.info("job %s reuses previous job %s, download from the reused job.", name, reused_job_name) + module_logger.info( + "job %s reuses previous job %s, download from the reused job.", + name, + reused_job_name, + ) name, job_details = reused_job_name, reused_job_detail job_status = job_details.status if job_status not in RunHistoryConstants.TERMINAL_STATUSES: @@ -901,11 +949,16 @@ def download( def log_missing_uri(what: str) -> None: module_logger.debug( - 'Could not download %s for job "%s" (job status: %s)', what, job_details.name, job_details.status + 'Could not download %s for job "%s" (job status: %s)', + what, + job_details.name, + job_details.status, ) if isinstance(job_details, SweepJob): - best_child_run_id = job_details.properties.get(SWEEP_JOB_BEST_CHILD_RUN_ID_PROPERTY_NAME, None) + best_child_run_id = job_details.properties.get( + SWEEP_JOB_BEST_CHILD_RUN_ID_PROPERTY_NAME, None + ) if best_child_run_id: self.download( best_child_run_id, @@ -1068,7 +1121,8 @@ def _get_workspace_url(self, url_key: WorkspaceDiscoveryUrlKey) -> str: ) all_urls = json.loads( download_text_from_url( - discovery_url, create_requests_pipeline_with_retry(requests_pipeline=self._requests_pipeline) + discovery_url, + create_requests_pipeline_with_retry(requests_pipeline=self._requests_pipeline), ) ) return all_urls[url_key] @@ -1176,7 +1230,9 @@ def _resolve_compute_id(self, resolver: _AssetResolver, target: Any) -> Any: # Compute target can be either workspace-scoped compute type, # or AML scoped VC. In the case of VC, resource name will be of form # azureml:virtualClusters/ to disambiguate from azureml:name (which is always compute) - modified_target_name = modified_target_name[len(AzureMLResourceType.VIRTUALCLUSTER) + 1 :] + modified_target_name = modified_target_name[ + len(AzureMLResourceType.VIRTUALCLUSTER) + 1 : + ] modified_target_name = LEVEL_ONE_NAMED_RESOURCE_ID_FORMAT.format( self._operation_scope.subscription_id, self._operation_scope.resource_group_name, @@ -1192,7 +1248,9 @@ def _resolve_compute_id(self, resolver: _AssetResolver, target: Any) -> Any: except Exception: # pylint: disable=W0718 return resolver(target, azureml_type=AzureMLResourceType.COMPUTE) - def _resolve_job_inputs(self, entries: Iterable[Union[Input, str, bool, int, float]], base_path: str) -> None: + def _resolve_job_inputs( + self, entries: Iterable[Union[Input, str, bool, int, float]], base_path: str + ) -> None: """resolve job inputs as ARM id or remote url. :param entries: An iterable of job inputs @@ -1229,7 +1287,9 @@ def _flatten_group_inputs( input_values.append(item._data) # type: ignore return input_values - def _resolve_job_input(self, entry: Union[Input, str, bool, int, float], base_path: str) -> None: + def _resolve_job_input( + self, entry: Union[Input, str, bool, int, float], base_path: str + ) -> None: """resolve job input as ARM id or remote url. :param entry: The job input @@ -1258,7 +1318,9 @@ def _resolve_job_input(self, entry: Union[Input, str, bool, int, float], base_pa return try: datastore_name = ( - entry.datastore if hasattr(entry, "datastore") and entry.datastore else WORKSPACE_BLOB_STORE + entry.datastore + if hasattr(entry, "datastore") and entry.datastore + else WORKSPACE_BLOB_STORE ) # absolute local path, upload, transform to remote url @@ -1288,7 +1350,11 @@ def _resolve_job_input(self, entry: Union[Input, str, bool, int, float], base_pa show_progress=self._show_progress, ) # TODO : Move this part to a common place - if entry.type == AssetTypes.URI_FOLDER and entry.path and not entry.path.endswith("/"): + if ( + entry.type == AssetTypes.URI_FOLDER + and entry.path + and not entry.path.endswith("/") + ): entry.path = entry.path + "/" # Check for AzureML id, is there a better way? elif ":" in entry.path or "@" in entry.path: # type: ignore @@ -1312,7 +1378,11 @@ def _resolve_job_input(self, entry: Union[Input, str, bool, int, float], base_pa show_progress=self._show_progress, ) # TODO : Move this part to a common place - if entry.type == AssetTypes.URI_FOLDER and entry.path and not entry.path.endswith("/"): + if ( + entry.type == AssetTypes.URI_FOLDER + and entry.path + and not entry.path.endswith("/") + ): entry.path = entry.path + "/" except (MlException, HttpResponseError) as e: raise e @@ -1322,7 +1392,8 @@ def _resolve_job_input(self, entry: Union[Input, str, bool, int, float], base_pa f"Met {type(e)}:\n{e}", target=ErrorTarget.JOB, no_personal_data_message=( - "Supported input path value are ARM id, AzureML id, " "remote uri or local path." + "Supported input path value are ARM id, AzureML id, " + "remote uri or local path." ), error=e, error_category=ErrorCategory.USER_ERROR, @@ -1336,7 +1407,9 @@ def _resolve_job_inputs_arm_id(self, job: Job) -> None: if isinstance(entry, InputOutputBase): # extract original input form input builder. entry = entry._data - if not isinstance(entry, Input) or is_url(entry.path): # Literal value or remote url + if not isinstance(entry, Input) or is_url( + entry.path + ): # Literal value or remote url continue # ARM id entry.path = self._orchestrators.resolve_azureml_id(entry.path) @@ -1345,7 +1418,9 @@ def _resolve_job_inputs_arm_id(self, job: Job) -> None: # If the job object doesn't have "inputs" attribute, we don't need to resolve. E.g. AutoML jobs pass - def _resolve_arm_id_or_azureml_id(self, job: Job, resolver: Union[Callable, _AssetResolver]) -> Job: + def _resolve_arm_id_or_azureml_id( + self, job: Job, resolver: Union[Callable, _AssetResolver] + ) -> Job: """Resolve arm_id for a given job. @@ -1466,7 +1541,9 @@ def _resolve_arm_id_for_import_job(self, job: ImportJob, resolver: _AssetResolve job.compute = self._resolve_compute_id(resolver, ComputeType.ADF) return job - def _resolve_arm_id_for_parallel_job(self, job: ParallelJob, resolver: _AssetResolver) -> ParallelJob: + def _resolve_arm_id_for_parallel_job( + self, job: ParallelJob, resolver: _AssetResolver + ) -> ParallelJob: """Resolve arm_id for ParallelJob. :param job: The Parallel job @@ -1537,7 +1614,9 @@ def _resolve_arm_id_for_automl_job( job.compute = resolver(job.compute, azureml_type=AzureMLResourceType.COMPUTE) return job - def _resolve_arm_id_for_pipeline_job(self, pipeline_job: PipelineJob, resolver: _AssetResolver) -> PipelineJob: + def _resolve_arm_id_for_pipeline_job( + self, pipeline_job: PipelineJob, resolver: _AssetResolver + ) -> PipelineJob: """Resolve arm_id for pipeline_job. :param pipeline_job: The pipeline job From e7e2f6d8799e92715fa46da022e631b4b1358997 Mon Sep 17 00:00:00 2001 From: Sagar Sumant Date: Tue, 26 Nov 2024 22:30:06 -0800 Subject: [PATCH 2/4] Fix linting error. --- .../azure/ai/ml/entities/_job/_input_output_helpers.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py index 11d3ead409a2..feb019cef80e 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py @@ -206,7 +206,10 @@ def validate_pipeline_input_key_characters(key: str) -> None: # Note: ([a-zA-Z_]+[a-zA-Z0-9_]*) is a valid single key, # so a valid pipeline key is: ^{single_key}([.]{single_key})*$ if re.match(IOConstants.VALID_KEY_PATTERN, key) is None: - msg = "Pipeline input key name {} must be composed letters, numbers, and underscores with optional split by dots." + msg = ( + "Pipeline input key name {} must be composed letters, numbers, and underscores with optional " + "split by dots." + ) raise ValidationException( message=msg.format(key), no_personal_data_message=msg.format("[key]"), From c4a3e96e54f94d9bc39990359cf00c72448f2630 Mon Sep 17 00:00:00 2001 From: Sagar Sumant Date: Wed, 27 Nov 2024 09:55:27 -0800 Subject: [PATCH 3/4] Fix. --- sdk/ml/azure-ai-ml/azure/ai/ml/operations/_job_operations.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_job_operations.py b/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_job_operations.py index 2382713091c4..d5ae68b28653 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_job_operations.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_job_operations.py @@ -543,7 +543,8 @@ def _validate( job: Job, *, raise_on_failure: bool = False, - **kwargs: Any, # pylint:disable=unused-argument + # pylint:disable=unused-argument + **kwargs: Any, ) -> ValidationResult: """Implementation of validate. From ff9c352e1f09b8ec89d1665a9c777af229987af1 Mon Sep 17 00:00:00 2001 From: Sagar Sumant Date: Wed, 27 Nov 2024 13:28:04 -0800 Subject: [PATCH 4/4] Fix as per black. --- sdk/ml/azure-ai-ml/azure/ai/ml/_ml_client.py | 88 ++++-------------- .../ml/entities/_job/_input_output_helpers.py | 30 ++---- .../finetuning/custom_model_finetuning_job.py | 4 +- .../_job/finetuning/finetuning_job.py | 4 +- .../_job/finetuning/finetuning_vertical.py | 4 +- .../azure/ai/ml/entities/_job/job.py | 16 +--- .../ai/ml/entities/_job/queue_settings.py | 18 +--- .../azure/ai/ml/operations/_job_operations.py | 93 +++++-------------- 8 files changed, 60 insertions(+), 197 deletions(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/_ml_client.py b/sdk/ml/azure-ai-ml/azure/ai/ml/_ml_client.py index b5a04a6a14f5..fe4f138e4607 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/_ml_client.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/_ml_client.py @@ -216,9 +216,7 @@ def __init__( self._ws_sub: Any = None show_progress = kwargs.pop("show_progress", True) enable_telemetry = kwargs.pop("enable_telemetry", True) - self._operation_config = OperationConfig( - show_progress=show_progress, enable_telemetry=enable_telemetry - ) + self._operation_config = OperationConfig(show_progress=show_progress, enable_telemetry=enable_telemetry) if "cloud" in kwargs: cloud_name = kwargs["cloud"] @@ -263,9 +261,7 @@ def __init__( workspace_reference = kwargs.pop("workspace_reference", None) if workspace_reference or registry_reference: ws_ops = WorkspaceOperations( - OperationScope( - str(subscription_id), str(resource_group_name), workspace_reference - ), + OperationScope(str(subscription_id), str(resource_group_name), workspace_reference), ServiceClient042023Preview( credential=self._credential, subscription_id=subscription_id, @@ -275,9 +271,7 @@ def __init__( ) self._ws_rg = resource_group_name self._ws_sub = subscription_id - workspace_details = ws_ops.get( - workspace_reference if workspace_reference else workspace_name - ) + workspace_details = ws_ops.get(workspace_reference if workspace_reference else workspace_name) workspace_location, workspace_id = ( workspace_details.location, workspace_details._workspace_id, @@ -325,9 +319,7 @@ def __init__( **{"properties": properties}, enable_telemetry=self._operation_config.enable_telemetry, ) - app_insights_handler_kwargs: Dict[str, Tuple] = { - "app_insights_handler": app_insights_handler - } + app_insights_handler_kwargs: Dict[str, Tuple] = {"app_insights_handler": app_insights_handler} base_url = _get_base_url_from_metadata(cloud_name=cloud_name, is_local_mfe=True) self._base_url = base_url @@ -629,11 +621,7 @@ def __init__( self._code = CodeOperations( self._ws_operation_scope if registry_reference else self._operation_scope, self._operation_config, - ( - self._service_client_10_2021_dataplanepreview - if registry_name - else self._service_client_04_2023 - ), + (self._service_client_10_2021_dataplanepreview if registry_name else self._service_client_04_2023), self._datastores, **ops_kwargs, # type: ignore[arg-type] ) @@ -641,18 +629,12 @@ def __init__( self._environments = EnvironmentOperations( self._ws_operation_scope if registry_reference else self._operation_scope, self._operation_config, - ( - self._service_client_10_2021_dataplanepreview - if registry_name - else self._service_client_04_2023_preview - ), + (self._service_client_10_2021_dataplanepreview if registry_name else self._service_client_04_2023_preview), self._operation_container, **ops_kwargs, # type: ignore[arg-type] ) self._operation_container.add(AzureMLResourceType.ENVIRONMENT, self._environments) - self._local_endpoint_helper = _LocalEndpointHelper( - requests_pipeline=self._requests_pipeline - ) + self._local_endpoint_helper = _LocalEndpointHelper(requests_pipeline=self._requests_pipeline) self._local_deployment_helper = _LocalDeploymentHelper(self._operation_container) self._online_endpoints = OnlineEndpointOperations( self._operation_scope, @@ -696,18 +678,12 @@ def __init__( service_client_02_2023_preview=self._service_client_02_2023_preview, **ops_kwargs, ) - self._operation_container.add( - AzureMLResourceType.ONLINE_DEPLOYMENT, self._online_deployments - ) + self._operation_container.add(AzureMLResourceType.ONLINE_DEPLOYMENT, self._online_deployments) self._operation_container.add(AzureMLResourceType.BATCH_DEPLOYMENT, self._batch_deployments) self._data = DataOperations( self._operation_scope, self._operation_config, - ( - self._service_client_10_2021_dataplanepreview - if registry_name - else self._service_client_04_2023_preview - ), + (self._service_client_10_2021_dataplanepreview if registry_name else self._service_client_04_2023_preview), self._service_client_01_2024_preview, self._datastores, requests_pipeline=self._requests_pipeline, @@ -718,11 +694,7 @@ def __init__( self._components = ComponentOperations( self._operation_scope, self._operation_config, - ( - self._service_client_10_2021_dataplanepreview - if registry_name - else self._service_client_01_2024_preview - ), + (self._service_client_10_2021_dataplanepreview if registry_name else self._service_client_01_2024_preview), self._operation_container, self._preflight, **ops_kwargs, # type: ignore[arg-type] @@ -777,9 +749,7 @@ def __init__( AzureMLResourceType.VIRTUALCLUSTER, self._virtual_clusters # type: ignore[arg-type] ) except Exception as ex: # pylint: disable=broad-except - module_logger.debug( - "Virtual Cluster operations could not be initialized due to %s ", ex - ) + module_logger.debug("Virtual Cluster operations could not be initialized due to %s ", ex) self._featurestores = FeatureStoreOperations( self._operation_scope, @@ -824,15 +794,9 @@ def __init__( ) self._operation_container.add(AzureMLResourceType.FEATURE_STORE, self._featurestores) # type: ignore[arg-type] self._operation_container.add(AzureMLResourceType.FEATURE_SET, self._featuresets) - self._operation_container.add( - AzureMLResourceType.FEATURE_STORE_ENTITY, self._featurestoreentities - ) - self._operation_container.add( - AzureMLResourceType.SERVERLESS_ENDPOINT, self._serverless_endpoints - ) - self._operation_container.add( - AzureMLResourceType.MARKETPLACE_SUBSCRIPTION, self._marketplace_subscriptions - ) + self._operation_container.add(AzureMLResourceType.FEATURE_STORE_ENTITY, self._featurestoreentities) + self._operation_container.add(AzureMLResourceType.SERVERLESS_ENDPOINT, self._serverless_endpoints) + self._operation_container.add(AzureMLResourceType.MARKETPLACE_SUBSCRIPTION, self._marketplace_subscriptions) @classmethod def from_config( # pylint: disable=C4758 @@ -944,9 +908,7 @@ def from_config( # pylint: disable=C4758 error_category=ErrorCategory.USER_ERROR, ) - subscription_id, resource_group, workspace_name = MLClient._get_workspace_info( - str(found_path) - ) + subscription_id, resource_group, workspace_name = MLClient._get_workspace_info(str(found_path)) module_logger.info("Found the config file in: %s", found_path) return MLClient( @@ -958,9 +920,7 @@ def from_config( # pylint: disable=C4758 ) @classmethod - def _ml_client_cli( - cls, credentials: TokenCredential, subscription_id: Optional[str], **kwargs - ) -> "MLClient": + def _ml_client_cli(cls, credentials: TokenCredential, subscription_id: Optional[str], **kwargs) -> "MLClient": """This method provides a way to create MLClient object for cli to leverage cli context for authentication. With this we do not have to use AzureCliCredentials from azure-identity package (not meant for heavy usage). The @@ -1251,9 +1211,7 @@ def _get_workspace_info(cls, found_path: Optional[str]) -> Tuple[str, str, str]: # Checking the keys in the config.json file to check for required parameters. scope = config.get("Scope") if not scope: - if not all( - k in config.keys() for k in ("subscription_id", "resource_group", "workspace_name") - ): + if not all(k in config.keys() for k in ("subscription_id", "resource_group", "workspace_name")): msg = ( "The config file found in: {} does not seem to contain the required " "parameters. Please make sure it contains your subscription_id, " @@ -1453,9 +1411,7 @@ def _(entity: BatchEndpoint, operations, *args, **kwargs): @_begin_create_or_update.register(OnlineDeployment) def _(entity: OnlineDeployment, operations, *args, **kwargs): module_logger.debug("Creating or updating online_deployments") - return operations[AzureMLResourceType.ONLINE_DEPLOYMENT].begin_create_or_update( - entity, **kwargs - ) + return operations[AzureMLResourceType.ONLINE_DEPLOYMENT].begin_create_or_update(entity, **kwargs) @_begin_create_or_update.register(BatchDeployment) @@ -1485,14 +1441,10 @@ def _(entity: Schedule, operations, *args, **kwargs): @_begin_create_or_update.register(ServerlessEndpoint) def _(entity: ServerlessEndpoint, operations, *args, **kwargs): module_logger.debug("Creating or updating serverless endpoints") - return operations[AzureMLResourceType.SERVERLESS_ENDPOINT].begin_create_or_update( - entity, **kwargs - ) + return operations[AzureMLResourceType.SERVERLESS_ENDPOINT].begin_create_or_update(entity, **kwargs) @_begin_create_or_update.register(MarketplaceSubscription) def _(entity: MarketplaceSubscription, operations, *args, **kwargs): module_logger.debug("Creating or updating marketplace subscriptions") - return operations[AzureMLResourceType.MARKETPLACE_SUBSCRIPTION].begin_create_or_update( - entity, **kwargs - ) + return operations[AzureMLResourceType.MARKETPLACE_SUBSCRIPTION].begin_create_or_update(entity, **kwargs) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py index feb019cef80e..84e330876656 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py @@ -162,9 +162,7 @@ def build_input_output( return item -def _validate_inputs_for( - input_consumer_name: str, input_consumer: str, inputs: Optional[Dict] -) -> None: +def _validate_inputs_for(input_consumer_name: str, input_consumer: str, inputs: Optional[Dict]) -> None: implicit_inputs = re.findall(r"\${{inputs\.([\w\.-]+)}}", input_consumer) # optional inputs no need to validate whether they're in inputs optional_inputs = re.findall(r"\[[\w\.\s-]*\${{inputs\.([\w\.-]+)}}]", input_consumer) @@ -262,11 +260,7 @@ def to_rest_dataset_literal_inputs( if input_value.type in target_cls_dict: input_data = target_cls_dict[input_value.type]( uri=input_value.path, - mode=( - INPUT_MOUNT_MAPPING_TO_REST[input_value.mode.lower()] - if input_value.mode - else None - ), + mode=(INPUT_MOUNT_MAPPING_TO_REST[input_value.mode.lower()] if input_value.mode else None), ) else: @@ -329,11 +323,7 @@ def from_rest_inputs_to_dataset_literal(inputs: Dict[str, RestJobInput]) -> Dict input_data = Input( type=type_transfer_dict[input_value.job_input_type], path=path, - mode=( - INPUT_MOUNT_MAPPING_FROM_REST[input_value.mode] - if input_value.mode - else None - ), + mode=(INPUT_MOUNT_MAPPING_FROM_REST[input_value.mode] if input_value.mode else None), path_on_compute=sourcePathOnCompute, ) elif input_value.job_input_type in (JobInputType.LITERAL, JobInputType.LITERAL): @@ -372,19 +362,13 @@ def to_rest_data_outputs(outputs: Optional[Dict]) -> Dict[str, RestJobOutput]: else: target_cls_dict = get_output_rest_cls_dict() - output_value_type = ( - output_value.type if output_value.type else AssetTypes.URI_FOLDER - ) + output_value_type = output_value.type if output_value.type else AssetTypes.URI_FOLDER if output_value_type in target_cls_dict: output = target_cls_dict[output_value_type]( asset_name=output_value.name, asset_version=output_value.version, uri=output_value.path, - mode=( - OUTPUT_MOUNT_MAPPING_TO_REST[output_value.mode.lower()] - if output_value.mode - else None - ), + mode=(OUTPUT_MOUNT_MAPPING_TO_REST[output_value.mode.lower()] if output_value.mode else None), pathOnCompute=getattr(output_value, "path_on_compute", None), description=output_value.description, ) @@ -427,9 +411,7 @@ def from_rest_data_outputs(outputs: Dict[str, RestJobOutput]) -> Dict[str, Outpu from_rest_outputs[output_name] = Output( type=output_type_mapping[output_value.job_output_type], path=output_value.uri, - mode=( - OUTPUT_MOUNT_MAPPING_FROM_REST[output_value.mode] if output_value.mode else None - ), + mode=(OUTPUT_MOUNT_MAPPING_FROM_REST[output_value.mode] if output_value.mode else None), path_on_compute=sourcePathOnCompute, description=output_value.description, name=output_value.asset_name, diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/custom_model_finetuning_job.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/custom_model_finetuning_job.py index 009f0a8df455..f60a971543da 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/custom_model_finetuning_job.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/custom_model_finetuning_job.py @@ -217,9 +217,7 @@ def _load_from_dict( # **kwargs, # ) # else: - loaded_data = load_from_dict( - CustomModelFineTuningSchema, data, context, additional_message, **kwargs - ) + loaded_data = load_from_dict(CustomModelFineTuningSchema, data, context, additional_message, **kwargs) training_data = loaded_data.get("training_data", None) if isinstance(training_data, str): diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_job.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_job.py index 6436df274207..27f3422eb4b6 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_job.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_job.py @@ -90,9 +90,7 @@ def _load_from_rest(cls, obj: RestJobBase) -> "FineTuningJob": if class_type: res: FineTuningJob = class_type._from_rest_object(obj) return res - msg = ( - f"Unsupported model provider type: {obj.properties.fine_tuning_details.model_provider}" - ) + msg = f"Unsupported model provider type: {obj.properties.fine_tuning_details.model_provider}" raise ValidationException( message=msg, no_personal_data_message=msg, diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_vertical.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_vertical.py index fe40c8d09152..e8049e39bd3a 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_vertical.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/finetuning/finetuning_vertical.py @@ -170,9 +170,7 @@ def _restore_inputs(self) -> None: type=AssetTypes.URI_FILE, path=self.validation_data.uri # pylint: disable=no-member ) if isinstance(self.model, MLFlowModelJobInput): - self.model = Input( - type=AssetTypes.MLFLOW_MODEL, path=self.model.uri - ) # pylint: disable=no-member + self.model = Input(type=AssetTypes.MLFLOW_MODEL, path=self.model.uri) # pylint: disable=no-member def __eq__(self, other: object) -> bool: """Returns True if both instances have the same values. diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/job.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/job.py index c3c7bdce7d6e..663257a65d6c 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/job.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/job.py @@ -211,9 +211,7 @@ def _to_dict(self) -> Dict: pass @classmethod - def _resolve_cls_and_type( - cls, data: Dict, params_override: Optional[List[Dict]] = None - ) -> Tuple: + def _resolve_cls_and_type(cls, data: Dict, params_override: Optional[List[Dict]] = None) -> Tuple: from azure.ai.ml.entities._builders.command import Command from azure.ai.ml.entities._builders.spark import Spark from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob @@ -225,9 +223,7 @@ def _resolve_cls_and_type( job_type: Optional[Type["Job"]] = None type_in_override = find_type_in_override(params_override) - type_str = type_in_override or data.get( - CommonYamlFields.TYPE, JobType.COMMAND - ) # override takes the priority + type_str = type_in_override or data.get(CommonYamlFields.TYPE, JobType.COMMAND) # override takes the priority if type_str == JobType.COMMAND: job_type = Command elif type_str == JobType.SPARK: @@ -319,9 +315,7 @@ def _from_rest_object( # pylint: disable=too-many-return-statements if obj.properties.job_type == RestJobType.COMMAND: # PrP only until new import job type is ready on MFE in PuP # compute type 'DataFactory' is reserved compute name for 'clusterless' ADF jobs - if obj.properties.compute_id and obj.properties.compute_id.endswith( - "/" + ComputeType.ADF - ): + if obj.properties.compute_id and obj.properties.compute_id.endswith("/" + ComputeType.ADF): return ImportJob._load_from_rest(obj) res_command: Job = Command._load_from_rest_job(obj) @@ -373,7 +367,5 @@ def _get_telemetry_values(self) -> Dict: # pylint: disable=arguments-differ @classmethod @abstractmethod - def _load_from_dict( - cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any - ) -> "Job": + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Job": pass diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/queue_settings.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/queue_settings.py index 50485c129867..36b715296c09 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/queue_settings.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/queue_settings.py @@ -47,29 +47,19 @@ def __init__( def _to_rest_object(self) -> RestQueueSettings: self._validate() - job_tier = ( - JobTierNames.ENTITY_TO_REST.get(self.job_tier.lower(), None) if self.job_tier else None - ) - priority = ( - JobPriorityValues.ENTITY_TO_REST.get(self.priority.lower(), None) - if self.priority - else None - ) + job_tier = JobTierNames.ENTITY_TO_REST.get(self.job_tier.lower(), None) if self.job_tier else None + priority = JobPriorityValues.ENTITY_TO_REST.get(self.priority.lower(), None) if self.priority else None return RestQueueSettings(job_tier=job_tier, priority=priority) @classmethod - def _from_rest_object( - cls, obj: Union[Dict[str, Any], RestQueueSettings, None] - ) -> Optional["QueueSettings"]: + def _from_rest_object(cls, obj: Union[Dict[str, Any], RestQueueSettings, None]) -> Optional["QueueSettings"]: if obj is None: return None if isinstance(obj, dict): queue_settings = RestQueueSettings.from_dict(obj) return cls._from_rest_object(queue_settings) job_tier = JobTierNames.REST_TO_ENTITY.get(obj.job_tier, None) if obj.job_tier else None - priority = ( - JobPriorityValues.REST_TO_ENTITY.get(obj.priority, None) if obj.priority else None - ) + priority = JobPriorityValues.REST_TO_ENTITY.get(obj.priority, None) if obj.priority else None return cls(job_tier=job_tier, priority=priority) def _validate(self) -> None: diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_job_operations.py b/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_job_operations.py index d5ae68b28653..b963a7a1a73c 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_job_operations.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_job_operations.py @@ -228,9 +228,7 @@ def _virtual_cluster_operations(self) -> VirtualClusterOperations: def _datastore_operations(self) -> "DatastoreOperations": from azure.ai.ml.operations import DatastoreOperations - return cast( - DatastoreOperations, self._all_operations.all_operations[AzureMLResourceType.DATASTORE] - ) + return cast(DatastoreOperations, self._all_operations.all_operations[AzureMLResourceType.DATASTORE]) @property def _runs_operations(self) -> RunOperations: @@ -375,9 +373,7 @@ def get(self, name: str) -> Job: else: # Child jobs are no longer available through MFE, fetch # through run history instead - job = self._runs_operations._translate_from_rest_object( - self._runs_operations.get_run(name) - ) + job = self._runs_operations._translate_from_rest_object(self._runs_operations.get_run(name)) return job @@ -414,8 +410,7 @@ def show_services(self, name: str, node_index: int = 0) -> Optional[Dict[str, Se return None return { - k: ServiceInstance._from_rest_object(v, node_index) - for k, v in service_instances_dict.instances.items() + k: ServiceInstance._from_rest_object(v, node_index) for k, v in service_instances_dict.instances.items() } @distributed_trace @@ -463,9 +458,7 @@ def begin_cancel(self, name: str, **kwargs: Any) -> LROPoller[None]: results.append(result) return results - def _try_get_compute_arm_id( - self, compute: Union[Compute, str] - ) -> Optional[Union[Compute, str]]: + def _try_get_compute_arm_id(self, compute: Union[Compute, str]) -> Optional[Union[Compute, str]]: # pylint: disable=too-many-return-statements # TODO: Remove in PuP with native import job/component type support in MFE/Designer # DataFactory 'clusterless' job @@ -512,9 +505,7 @@ def _try_get_compute_arm_id( @distributed_trace @monitor_with_telemetry_mixin(ops_logger, "Job.Validate", ActivityType.PUBLICAPI) - def validate( - self, job: Job, *, raise_on_failure: bool = False, **kwargs: Any - ) -> ValidationResult: + def validate(self, job: Job, *, raise_on_failure: bool = False, **kwargs: Any) -> ValidationResult: """Validates a Job object before submitting to the service. Anonymous assets may be created if there are inline defined entities such as Component, Environment, and Code. Only pipeline jobs are supported for validation currently. @@ -559,9 +550,7 @@ def _validate( :return: The validation result :rtype: ValidationResult """ - git_code_validation_result = ( - PathAwareSchemaValidatableMixin._create_empty_validation_result() - ) + git_code_validation_result = PathAwareSchemaValidatableMixin._create_empty_validation_result() # TODO: move this check to Job._validate after validation is supported for all job types # If private features are enable and job has code value of type str we need to check # that it is a valid git path case. Otherwise we should throw a ValidationException @@ -609,14 +598,10 @@ def error_func(msg: str, no_personal_data_msg: str) -> ValidationException: if not isinstance(node, ControlFlowNode): node.compute = self._try_get_compute_arm_id(node.compute) except Exception as e: # pylint: disable=W0718 - validation_result.append_error( - yaml_path=f"jobs.{node_name}.compute", message=str(e) - ) + validation_result.append_error(yaml_path=f"jobs.{node_name}.compute", message=str(e)) validation_result.resolve_location_for_diagnostics(str(job._source_path)) - return job._try_raise( - validation_result, raise_error=raise_on_failure - ) # pylint: disable=protected-access + return job._try_raise(validation_result, raise_error=raise_on_failure) # pylint: disable=protected-access @distributed_trace @monitor_with_telemetry_mixin(ops_logger, "Job.CreateOrUpdate", ActivityType.PUBLICAPI) @@ -720,9 +705,7 @@ def create_or_update( ): self._set_headers_with_user_aml_token(kwargs) - result = self._create_or_update_with_different_version_api( - rest_job_resource=rest_job_resource, **kwargs - ) + result = self._create_or_update_with_different_version_api(rest_job_resource=rest_job_resource, **kwargs) if is_local_run(result): ws_base_url = self._all_operations.all_operations[ @@ -753,9 +736,7 @@ def create_or_update( if snapshot_id is not None: job_object.properties.properties["ContentSnapshotId"] = snapshot_id - result = self._create_or_update_with_latest_version_api( - rest_job_resource=job_object, **kwargs - ) + result = self._create_or_update_with_latest_version_api(rest_job_resource=job_object, **kwargs) return self._resolve_azureml_id(Job._from_rest_object(result)) @@ -915,8 +896,7 @@ def download( job_details = self.get(name) # job is reused, get reused job to download if ( - job_details.properties.get(PipelineConstants.REUSED_FLAG_FIELD) - == PipelineConstants.REUSED_FLAG_TRUE + job_details.properties.get(PipelineConstants.REUSED_FLAG_FIELD) == PipelineConstants.REUSED_FLAG_TRUE and PipelineConstants.REUSED_JOB_ID in job_details.properties ): reused_job_name = job_details.properties[PipelineConstants.REUSED_JOB_ID] @@ -957,9 +937,7 @@ def log_missing_uri(what: str) -> None: ) if isinstance(job_details, SweepJob): - best_child_run_id = job_details.properties.get( - SWEEP_JOB_BEST_CHILD_RUN_ID_PROPERTY_NAME, None - ) + best_child_run_id = job_details.properties.get(SWEEP_JOB_BEST_CHILD_RUN_ID_PROPERTY_NAME, None) if best_child_run_id: self.download( best_child_run_id, @@ -1231,9 +1209,7 @@ def _resolve_compute_id(self, resolver: _AssetResolver, target: Any) -> Any: # Compute target can be either workspace-scoped compute type, # or AML scoped VC. In the case of VC, resource name will be of form # azureml:virtualClusters/ to disambiguate from azureml:name (which is always compute) - modified_target_name = modified_target_name[ - len(AzureMLResourceType.VIRTUALCLUSTER) + 1 : - ] + modified_target_name = modified_target_name[len(AzureMLResourceType.VIRTUALCLUSTER) + 1 :] modified_target_name = LEVEL_ONE_NAMED_RESOURCE_ID_FORMAT.format( self._operation_scope.subscription_id, self._operation_scope.resource_group_name, @@ -1249,9 +1225,7 @@ def _resolve_compute_id(self, resolver: _AssetResolver, target: Any) -> Any: except Exception: # pylint: disable=W0718 return resolver(target, azureml_type=AzureMLResourceType.COMPUTE) - def _resolve_job_inputs( - self, entries: Iterable[Union[Input, str, bool, int, float]], base_path: str - ) -> None: + def _resolve_job_inputs(self, entries: Iterable[Union[Input, str, bool, int, float]], base_path: str) -> None: """resolve job inputs as ARM id or remote url. :param entries: An iterable of job inputs @@ -1288,9 +1262,7 @@ def _flatten_group_inputs( input_values.append(item._data) # type: ignore return input_values - def _resolve_job_input( - self, entry: Union[Input, str, bool, int, float], base_path: str - ) -> None: + def _resolve_job_input(self, entry: Union[Input, str, bool, int, float], base_path: str) -> None: """resolve job input as ARM id or remote url. :param entry: The job input @@ -1319,9 +1291,7 @@ def _resolve_job_input( return try: datastore_name = ( - entry.datastore - if hasattr(entry, "datastore") and entry.datastore - else WORKSPACE_BLOB_STORE + entry.datastore if hasattr(entry, "datastore") and entry.datastore else WORKSPACE_BLOB_STORE ) # absolute local path, upload, transform to remote url @@ -1351,11 +1321,7 @@ def _resolve_job_input( show_progress=self._show_progress, ) # TODO : Move this part to a common place - if ( - entry.type == AssetTypes.URI_FOLDER - and entry.path - and not entry.path.endswith("/") - ): + if entry.type == AssetTypes.URI_FOLDER and entry.path and not entry.path.endswith("/"): entry.path = entry.path + "/" # Check for AzureML id, is there a better way? elif ":" in entry.path or "@" in entry.path: # type: ignore @@ -1379,11 +1345,7 @@ def _resolve_job_input( show_progress=self._show_progress, ) # TODO : Move this part to a common place - if ( - entry.type == AssetTypes.URI_FOLDER - and entry.path - and not entry.path.endswith("/") - ): + if entry.type == AssetTypes.URI_FOLDER and entry.path and not entry.path.endswith("/"): entry.path = entry.path + "/" except (MlException, HttpResponseError) as e: raise e @@ -1393,8 +1355,7 @@ def _resolve_job_input( f"Met {type(e)}:\n{e}", target=ErrorTarget.JOB, no_personal_data_message=( - "Supported input path value are ARM id, AzureML id, " - "remote uri or local path." + "Supported input path value are ARM id, AzureML id, " "remote uri or local path." ), error=e, error_category=ErrorCategory.USER_ERROR, @@ -1408,9 +1369,7 @@ def _resolve_job_inputs_arm_id(self, job: Job) -> None: if isinstance(entry, InputOutputBase): # extract original input form input builder. entry = entry._data - if not isinstance(entry, Input) or is_url( - entry.path - ): # Literal value or remote url + if not isinstance(entry, Input) or is_url(entry.path): # Literal value or remote url continue # ARM id entry.path = self._orchestrators.resolve_azureml_id(entry.path) @@ -1419,9 +1378,7 @@ def _resolve_job_inputs_arm_id(self, job: Job) -> None: # If the job object doesn't have "inputs" attribute, we don't need to resolve. E.g. AutoML jobs pass - def _resolve_arm_id_or_azureml_id( - self, job: Job, resolver: Union[Callable, _AssetResolver] - ) -> Job: + def _resolve_arm_id_or_azureml_id(self, job: Job, resolver: Union[Callable, _AssetResolver]) -> Job: """Resolve arm_id for a given job. @@ -1542,9 +1499,7 @@ def _resolve_arm_id_for_import_job(self, job: ImportJob, resolver: _AssetResolve job.compute = self._resolve_compute_id(resolver, ComputeType.ADF) return job - def _resolve_arm_id_for_parallel_job( - self, job: ParallelJob, resolver: _AssetResolver - ) -> ParallelJob: + def _resolve_arm_id_for_parallel_job(self, job: ParallelJob, resolver: _AssetResolver) -> ParallelJob: """Resolve arm_id for ParallelJob. :param job: The Parallel job @@ -1615,9 +1570,7 @@ def _resolve_arm_id_for_automl_job( job.compute = resolver(job.compute, azureml_type=AzureMLResourceType.COMPUTE) return job - def _resolve_arm_id_for_pipeline_job( - self, pipeline_job: PipelineJob, resolver: _AssetResolver - ) -> PipelineJob: + def _resolve_arm_id_for_pipeline_job(self, pipeline_job: PipelineJob, resolver: _AssetResolver) -> PipelineJob: """Resolve arm_id for pipeline_job. :param pipeline_job: The pipeline job