Skip to content

Commit

Permalink
feat: model recipe support use_spot_instance
Browse files Browse the repository at this point in the history
  • Loading branch information
pitt-liang committed Jul 10, 2024
1 parent 4af7c76 commit 2a9bec0
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 20 deletions.
12 changes: 10 additions & 2 deletions pai/api/training_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
CreateTrainingJobRequest,
CreateTrainingJobRequestComputeResource,
CreateTrainingJobRequestComputeResourceInstanceSpec,
CreateTrainingJobRequestComputeResourceSpotSpec,
CreateTrainingJobRequestExperimentConfig,
CreateTrainingJobRequestHyperParameters,
CreateTrainingJobRequestInputChannels,
Expand Down Expand Up @@ -86,9 +87,10 @@ def create(
instance_type,
instance_count,
job_name,
use_spot_instance: bool = False,
spot_spec: Optional[Dict[str, Any]] = None,
instance_spec: Optional[Dict[str, str]] = None,
resource_id: Optional[str] = None,
resource_type: Optional[str] = None,
hyperparameters: Optional[Dict[str, Any]] = None,
input_channels: Optional[List[Dict[str, Any]]] = None,
output_channels: Optional[List[Dict[str, Any]]] = None,
Expand Down Expand Up @@ -127,9 +129,16 @@ def create(
for ch in output_channels
]
if instance_type:
spot_spec = (
CreateTrainingJobRequestComputeResourceSpotSpec().from_map(spot_spec)
if spot_spec
else None
)
compute_resource = CreateTrainingJobRequestComputeResource(
ecs_count=instance_count,
ecs_spec=instance_type,
use_spot_instance=bool(spot_spec),
spot_spec=spot_spec,
)
elif instance_spec:
compute_resource = CreateTrainingJobRequestComputeResource(
Expand All @@ -138,7 +147,6 @@ def create(
instance_spec=CreateTrainingJobRequestComputeResourceInstanceSpec().from_map(
instance_spec
),
use_spot_instance=use_spot_instance,
)
else:
raise ValueError("Please provide instance_type or instance_spec.")
Expand Down
15 changes: 11 additions & 4 deletions pai/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
DEFAULT_OUTPUT_MODEL_CHANNEL_NAME,
DEFAULT_TENSORBOARD_CHANNEL_NAME,
ExperimentConfig,
SpotSpec,
UserVpcConfig,
)
from .model import InferenceSpec, Model, ResourceConfig
Expand Down Expand Up @@ -187,9 +188,10 @@ def __init__(
environments: Optional[Dict[str, str]] = None,
requirements: Optional[List[str]] = None,
instance_type: Optional[str] = None,
use_spot_instance: bool = False,
spot_spec: Optional[SpotSpec] = None,
instance_spec: Optional[Dict] = None,
resource_id: Optional[Dict] = None,
resource_type: Optional[str] = None,
instance_count: Optional[int] = None,
user_vpc_config: Optional[UserVpcConfig] = None,
experiment_config: Optional[ExperimentConfig] = None,
Expand Down Expand Up @@ -253,14 +255,18 @@ def __init__(
'package' or 'package==version'. This is similar to the contents of a requirements.txt file used
in Python projects. If requirements.txt is provided in user code directory, requirements
will override the conflict dependencies directly.
resource_type (str, optional): The resource type used to run the training job.
By default, general computing resource is used. If the resource_type is
'Lingjun', Lingjun computing resource is used.
instance_type (str, optional): The machine instance type used to run the
training job. To view the supported machine instance types, please refer
to the document:
https://help.aliyun.com/document_detail/171758.htm#section-55y-4tq-84y.
If the instance_type is "local", the training job is executed locally
using docker.
use_spot_instance (bool): Specifies whether to use spot instance to run the
job. Only available for public resource group.
spot_spec (:class:`pai.job.SpotSpec`, optional): The specification of the spot
instance used to run the training job. If provided, the training job will
use the spot instance to run the training job.
instance_count (int): The number of machines used to run the training job.
user_vpc_config (:class:`pai.estimator.UserVpcConfig`, optional): The VPC
configuration used to enable the training job instance to connect to the
Expand Down Expand Up @@ -290,7 +296,8 @@ def __init__(
instance_type=instance_type,
instance_count=instance_count,
resource_id=resource_id,
use_spot_instance=use_spot_instance,
resource_type=resource_type,
spot_spec=spot_spec,
instance_spec=instance_spec,
user_vpc_config=user_vpc_config,
max_run_time=max_run_time,
Expand Down
2 changes: 2 additions & 0 deletions pai/job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
InstanceSpec,
ModelRecipeSpec,
OssLocation,
SpotSpec,
TrainingJob,
TrainingJobStatus,
UriInput,
Expand All @@ -45,4 +46,5 @@
"ExperimentConfig",
"InstanceSpec",
"UriInput",
"SpotSpec",
]
51 changes: 48 additions & 3 deletions pai/job/_training_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import time
import typing
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from typing import Any, Dict, List, Optional, Union

from pydantic import BaseModel, ConfigDict, Field
Expand Down Expand Up @@ -55,6 +56,19 @@ def as_oss_dir_uri(uri: str):
DEFAULT_TENSORBOARD_CHANNEL_NAME = "tensorboard"


class SpotStrategy(str, Enum):
SpotWithPriceLimit = "SpotWithPriceLimit"
SpotAsPriceGo = "SpotAsPriceGo"

def __repr__(self):
return self.value


class ResourceType(str, Enum):
Lingjun = "Lingjun"
General = "General"


class BaseAPIModel(BaseModel):

model_config = ConfigDict(
Expand Down Expand Up @@ -300,6 +314,19 @@ class ModelRecipeSpec(BaseAPIModel):
requirements: Optional[List[str]] = None


class SpotSpec(BaseAPIModel):
spot_strategy: SpotStrategy = Field(
...,
description="Spot instance strategy, support 'SpotWithPriceLimit', 'SpotAsPriceGo'",
)
spot_discount_limit: Optional[float] = Field(
None,
description="Spot instance discount limit, maximum 2 decimal places, "
"required when spot_strategy is 'SpotWithPriceLimit'."
"For example, 0.5 means 50% off the original price.",
)


class TrainingJob(BaseAPIModel):
"""TrainingJob represents a training job in the PAI service."""

Expand Down Expand Up @@ -542,7 +569,8 @@ def __init__(
instance_spec: Optional[Dict] = None,
instance_count: Optional[int] = None,
resource_id: Optional[Dict] = None,
use_spot_instance: bool = False,
resource_type: Optional[Union[str, ResourceType]] = None,
spot_spec: Optional[SpotSpec] = None,
environments: Optional[Dict] = None,
requirements: Optional[List[str]] = None,
labels: Optional[Dict[str, str]] = None,
Expand All @@ -552,13 +580,14 @@ def __init__(
self.base_job_name = base_job_name or type(self).__name__.lower()
self.output_path = output_path
self.user_vpc_config = user_vpc_config
self.use_spot_instance = use_spot_instance
self.spot_spec = spot_spec
self.experiment_config = experiment_config
self.max_run_time = max_run_time
self.instance_type = instance_type
self.instance_spec = instance_spec
self.instance_count = instance_count or 1
self.resource_id = resource_id
self.resource_type = ResourceType(resource_type) if resource_type else None
self.environments = environments
self.requirements = requirements
self.labels = labels
Expand Down Expand Up @@ -706,6 +735,7 @@ def build_outputs(

return [item.model_dump() for item in res]

# TODO: get arguments, such as VPCConfig, instance_type etc, from self instance.
def _submit(
self,
job_name: str,
Expand All @@ -730,6 +760,20 @@ def _submit(
show_logs: bool = False,
):
session = get_default_session()

if not self.resource_type or self.resource_type == ResourceType.General:
resource_type = None
else:
resource_type = self.resource_type.value

if self.spot_spec:
spot_spec = {
"SpotStrategy": self.spot_spec.spot_strategy.value,
}
if self.spot_spec.spot_discount_limit:
spot_spec["SpotDiscountLimit"] = self.spot_spec.spot_discount_limit
else:
spot_spec = None
training_job_id = session.training_job_api.create(
instance_count=instance_count,
instance_spec=instance_spec.model_dump() if instance_spec else None,
Expand All @@ -740,10 +784,11 @@ def _submit(
if experiment_config and isinstance(experiment_config, ExperimentConfig)
else experiment_config
),
use_spot_instance=self.use_spot_instance,
spot_spec=spot_spec,
algorithm_version=algorithm_version,
instance_type=instance_type,
resource_id=resource_id,
resource_type=resource_type,
job_name=job_name,
hyperparameters=hyperparameters,
max_running_in_seconds=max_run_time,
Expand Down
18 changes: 15 additions & 3 deletions pai/model/_model_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
InstanceSpec,
ModelRecipeSpec,
OssLocation,
ResourceType,
SpotSpec,
TrainingJob,
UriInput,
UserVpcConfig,
Expand Down Expand Up @@ -99,6 +101,8 @@ def __init__(
instance_type: Optional[str] = None,
instance_spec: Optional[InstanceSpec] = None,
resource_id: Optional[str] = None,
resource_type: Optional[Union[str, ResourceType]] = None,
spot_spec: Optional[SpotSpec] = None,
user_vpc_config: Optional[UserVpcConfig] = None,
labels: Optional[Dict[str, str]] = None,
requirements: Optional[List[str]] = None,
Expand Down Expand Up @@ -149,10 +153,12 @@ def __init__(
self.default_inputs = init_kwargs.default_inputs

super().__init__(
resource_type=resource_type,
base_job_name=base_job_name,
experiment_config=experiment_config,
resource_id=resource_id,
user_vpc_config=user_vpc_config,
spot_spec=spot_spec,
instance_type=init_kwargs.instance_type,
instance_count=init_kwargs.instance_count,
instance_spec=init_kwargs.instance_spec,
Expand Down Expand Up @@ -514,9 +520,10 @@ def __init__(
command: Union[str, List[str]] = None,
instance_count: Optional[int] = None,
instance_type: Optional[str] = None,
spot_spec: Optional[SpotSpec] = None,
instance_spec: Optional[InstanceSpec] = None,
resource_id: Optional[str] = None,
use_spot_instance: bool = False,
resource_type: Optional[Union[str, ResourceType]] = None,
user_vpc_config: Optional[UserVpcConfig] = None,
labels: Optional[Dict[str, str]] = None,
requirements: Optional[List[str]] = None,
Expand Down Expand Up @@ -568,8 +575,11 @@ def __init__(
be provided when the instance spec is set. Default to None.
resource_id (str, optional): The ID of the resource group used to run the
training job. Default to None.
use_spot_instance (bool): Specifies whether to use spot instance to run the
job. Only available for public resource group.
spot_spec (:class:`pai.model.SpotSpec`, optional): The spot instance config
used to run the training job. If provided, spot instance will be used.
resource_type (str, optional): The resource type used to run the training job.
By default, general computing resource is used. If the resource_type is
'Lingjun', Lingjun computing resource is used.
user_vpc_config (:class:`pai.model.UserVpcConfig`, optional): The VPC
configuration used to enable the job instance to connect to the
specified user VPC. Default to None.
Expand Down Expand Up @@ -597,7 +607,9 @@ def __init__(
instance_count=instance_count,
instance_type=instance_type,
instance_spec=instance_spec,
resource_type=resource_type,
resource_id=resource_id,
spot_spec=spot_spec,
user_vpc_config=user_vpc_config,
labels=labels,
requirements=requirements,
Expand Down
11 changes: 8 additions & 3 deletions pai/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Channel,
CodeDir,
ExperimentConfig,
SpotSpec,
TrainingJob,
UserVpcConfig,
_TrainingJobSubmitter,
Expand All @@ -47,7 +48,8 @@ def __init__(
base_job_name: Optional[str] = None,
output_path: Optional[str] = None,
instance_type: Optional[str] = None,
use_spot_instance: bool = False,
spot_spec: Optional[SpotSpec] = None,
resource_type: Optional[str] = None,
instance_count: Optional[int] = None,
user_vpc_config: Optional[UserVpcConfig] = None,
experiment_config: Optional[ExperimentConfig] = None,
Expand Down Expand Up @@ -142,9 +144,10 @@ def __init__(
https://help.aliyun.com/document_detail/171758.htm#section-55y-4tq-84y.
If the instance_type is "local", the job is executed locally
using docker.
use_spot_instance (bool): Specifies whether to use spot instance to run the
job. Only available for public resource group.
instance_count (int): The number of machines used to run the job.
resource_type (str, optional): The resource type used to run the training job.
By default, general computing resource is used. If the resource_type is
'Lingjun', Lingjun computing resource is used.
user_vpc_config (:class:`pai.estimator.UserVpcConfig`, optional): The VPC
configuration used to enable the job instance to connect to the
specified user VPC. If provided, an Elastic Network Interface (ENI) will
Expand Down Expand Up @@ -173,6 +176,8 @@ def __init__(
self._input_channels = None
self._output_channels = None
super().__init__(
resource_type=resource_type,
spot_spec=spot_spec,
base_job_name=base_job_name,
output_path=output_path,
experiment_config=experiment_config,
Expand Down
21 changes: 17 additions & 4 deletions tests/integration/test_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import os
import posixpath
import re
from unittest import skipUnless
from unittest import skipIf, skipUnless

import pytest

Expand All @@ -24,7 +24,7 @@
from pai.estimator import AlgorithmEstimator, Estimator
from pai.experiment import Experiment
from pai.image import retrieve
from pai.job._training_job import ExperimentConfig
from pai.job._training_job import ExperimentConfig, ResourceType, SpotSpec
from pai.session import get_default_session
from tests.integration import BaseIntegTestCase
from tests.integration.utils import t_context
Expand Down Expand Up @@ -72,11 +72,24 @@ def test_xgb_train(self):
"test": self.breast_cancer_test_data_uri,
},
)

model_path = os.path.join(os.path.join(est.model_data(), "model.json"))

self.assertTrue(self.is_oss_object_exists(model_path))

@skipIf(t_context.support_spot_instance, "Skip spot instance test")
def test_use_spot_instance(self):
xgb_image_uri = retrieve("xgboost", framework_version="latest").image_uri
est = Estimator(
command="echo helloworld",
instance_type="ml.gu7ef.8xlarge-gu100",
image_uri=xgb_image_uri,
spot_spec=SpotSpec(
spot_strategy="SpotWithPriceLimit",
spot_discount_limit=0.5,
),
resource_type=ResourceType.Lingjun,
)
est.fit()

def test_torch_run(self):
torch_image_uri = retrieve("pytorch", framework_version="1.12").image_uri
est = Estimator(
Expand Down
Loading

0 comments on commit 2a9bec0

Please sign in to comment.