Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ CHANGELOG
- Add new configuration parameter `DeletionPolicy` for EFs and FSx for Lustre shared storage
to support storage retention on deletion.
- Enable server-side encryption for the EcrImageBuilder SNS topic created when deploying ParallelCluster API and used to notify on docker image build events.
- Add support for on-demand capacity reservations.

**CHANGES**
- Remove support for Python 3.6 in aws-parallelcluster-batch-cli.
Expand Down
6 changes: 6 additions & 0 deletions api/infrastructure/parallelcluster-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,12 @@ Resources:
aws:RequestedRegion:
- !Ref Region
Sid: CloudWatchLogs
- Action:
- resource-groups:ListGroupResources
Resource: '*'
Effect: Allow
Sid: ResourceGroupRead


### IMAGE ACTIONS POLICIES

Expand Down
9 changes: 9 additions & 0 deletions cli/src/pcluster/aws/aws_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pcluster.aws.imagebuilder import ImageBuilderClient
from pcluster.aws.kms import KmsClient
from pcluster.aws.logs import LogsClient
from pcluster.aws.resource_groups import ResourceGroupsClient
from pcluster.aws.route53 import Route53Client
from pcluster.aws.s3 import S3Client
from pcluster.aws.s3_resource import S3Resource
Expand Down Expand Up @@ -57,6 +58,7 @@ def __init__(self):
self._logs = None
self._route53 = None
self._secretsmanager = None
self._resource_groups = None

@property
def cfn(self):
Expand Down Expand Up @@ -163,6 +165,13 @@ def secretsmanager(self):
self._secretsmanager = SecretsManagerClient()
return self._secretsmanager

@property
def resource_groups(self):
"""Resource Groups client."""
if not self._resource_groups:
self._resource_groups = ResourceGroupsClient()
return self._resource_groups

@staticmethod
def instance():
"""Return the singleton AWSApi instance."""
Expand Down
25 changes: 25 additions & 0 deletions cli/src/pcluster/aws/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(self):
self.additional_instance_types_data = {}
self.security_groups_cache = {}
self.subnets_cache = {}
self.capacity_reservations_cache = {}

@AWSExceptionHandler.handle_client_exception
@Cache.cached
Expand Down Expand Up @@ -83,6 +84,30 @@ def describe_subnets(self, subnet_ids):
result.append(subnet)
return result

@AWSExceptionHandler.handle_client_exception
def describe_capacity_reservations(self, capacity_reservation_ids):
"""Return a list of Capacity Reservations."""
result = []
missed_capacity_reservations = []
for capacity_reservation_id in capacity_reservation_ids:
cached_data = self.capacity_reservations_cache.get(capacity_reservation_id)
if cached_data:
result.append(cached_data)
else:
missed_capacity_reservations.append(capacity_reservation_id)
if missed_capacity_reservations:
response = list(
self._paginate_results(
self._client.describe_capacity_reservations, CapacityReservationIds=missed_capacity_reservations
)
)
for capacity_reservation in response:
self.capacity_reservations_cache[
capacity_reservation.get("CapacityReservationId")
] = capacity_reservation
result.append(capacity_reservation)
return result

@AWSExceptionHandler.handle_client_exception
@Cache.cached
def get_subnet_avail_zone(self, subnet_id):
Expand Down
35 changes: 35 additions & 0 deletions cli/src/pcluster/aws/resource_groups.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance
# with the License. A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
# limitations under the License.
import re

from pcluster.aws.common import AWSExceptionHandler, Boto3Client, Cache


class ResourceGroupsClient(Boto3Client):
"""Implement Resource Groups Boto3 client."""

def __init__(self):
super().__init__("resource-groups")

@AWSExceptionHandler.handle_client_exception
@Cache.cached
def get_capacity_reservation_ids_from_group_resources(self, group):
"""Return a list of capacity reservation ids."""
capacity_reservation_ids = []
resources = self._client.list_group_resources(Group=group)["Resources"]
for resource in resources:
if resource["Identifier"]["ResourceType"] == "AWS::EC2::CapacityReservation":
capacity_reservation_ids.append(
re.match(
"arn:.*:.*:.*:.*:.*(?P<reservation_id>cr-.*)", resource["Identifier"]["ResourceArn"]
).group("reservation_id")
)
return capacity_reservation_ids
77 changes: 77 additions & 0 deletions cli/src/pcluster/config/cluster_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@
)
from pcluster.validators.ec2_validators import (
AmiOsCompatibleValidator,
CapacityReservationResourceGroupValidator,
CapacityReservationValidator,
CapacityTypeValidator,
InstanceTypeBaseAMICompatibleValidator,
InstanceTypeMemoryInfoValidator,
Expand Down Expand Up @@ -862,6 +864,15 @@ def __init__(self, head_node_bootstrap_timeout: int = None, compute_node_bootstr
)


class CapacityReservationTarget(Resource):
"""Represent the CapacityReservationTarget configuration."""

def __init__(self, capacity_reservation_id: str = None, capacity_reservation_resource_group_arn: str = None):
super().__init__()
self.capacity_reservation_id = Resource.init_param(capacity_reservation_id)
self.capacity_reservation_resource_group_arn = Resource.init_param(capacity_reservation_resource_group_arn)


class ClusterDevSettings(BaseDevSettings):
"""Represent the dev settings configuration."""

Expand Down Expand Up @@ -1602,6 +1613,7 @@ def __init__(
efa: Efa = None,
disable_simultaneous_multithreading: bool = None,
schedulable_memory: int = None,
capacity_reservation_target: CapacityReservationTarget = None,
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -1613,6 +1625,7 @@ def __init__(
)
self.efa = efa or Efa(enabled=False, implied=True)
self.schedulable_memory = Resource.init_param(schedulable_memory)
self.capacity_reservation_target = capacity_reservation_target
self._instance_types_with_instance_storage = []
self._instance_type_info_map = {}

Expand Down Expand Up @@ -1779,13 +1792,15 @@ def __init__(
custom_actions: CustomActions = None,
iam: Iam = None,
image: QueueImage = None,
capacity_reservation_target: CapacityReservationTarget = None,
**kwargs,
):
super().__init__(**kwargs)
self.compute_settings = compute_settings or ComputeSettings(implied=True)
self.custom_actions = custom_actions
self.iam = iam or Iam(implied=True)
self.image = image
self.capacity_reservation_target = capacity_reservation_target

@property
def instance_role(self):
Expand Down Expand Up @@ -2361,6 +2376,64 @@ def _register_validators(self):
os=self.image.os,
architecture=self.head_node.architecture,
)
# The validation below has to be in cluster config class instead of queue class
# to make sure the subnet APIs are cached by previous validations.
if compute_resource.capacity_reservation_target:
cr_target = compute_resource.capacity_reservation_target
self._register_validator(
CapacityReservationValidator,
capacity_reservation_id=cr_target.capacity_reservation_id,
instance_type=compute_resource.instance_type,
subnet=queue.networking.subnet_ids[0],
)
self._register_validator(
CapacityReservationResourceGroupValidator,
capacity_reservation_resource_group_arn=cr_target.capacity_reservation_resource_group_arn,
instance_type=compute_resource.instance_type,
subnet=queue.networking.subnet_ids[0],
)

@property
def _capacity_reservation_targets(self):
"""Return a list of capacity reservation targets from all queues and compute resources with the section."""
capacity_reservation_targets_list = []
for queue in self.scheduling.queues:
if queue.capacity_reservation_target:
capacity_reservation_targets_list.append(queue.capacity_reservation_target)
for compute_resource in queue.compute_resources:
if compute_resource.capacity_reservation_target:
capacity_reservation_targets_list.append(compute_resource.capacity_reservation_target)
return capacity_reservation_targets_list

@property
def capacity_reservation_ids(self):
"""Return a list of capacity reservation ids specified in the config."""
result = set()
for capacity_reservation_target in self._capacity_reservation_targets:
if capacity_reservation_target.capacity_reservation_id:
result.add(capacity_reservation_target.capacity_reservation_id)
return list(result)

@property
def capacity_reservation_resource_group_arns(self):
"""Return a list of capacity reservation resource group in the config."""
result = set()
for capacity_reservation_target in self._capacity_reservation_targets:
if capacity_reservation_target.capacity_reservation_resource_group_arn:
result.add(capacity_reservation_target.capacity_reservation_resource_group_arn)
return list(result)

@property
def all_relevant_capacity_reservation_ids(self):
"""Return a list of capacity reservation ids specified in the config or used by resource groups."""
capacity_reservation_ids = set(self.capacity_reservation_ids)
for capacity_reservation_resource_group_arn in self.capacity_reservation_resource_group_arns:
capacity_reservation_ids.update(
AWSApi.instance().resource_groups.get_capacity_reservation_ids_from_group_resources(
capacity_reservation_resource_group_arn
)
)
return list(capacity_reservation_ids)


class SchedulerPluginClusterConfig(CommonSchedulerClusterConfig):
Expand All @@ -2370,6 +2443,8 @@ def __init__(self, cluster_name: str, scheduling: SchedulerPluginScheduling, **k
super().__init__(cluster_name, **kwargs)
self.scheduling = scheduling
self.__image_dict = None
# Cache capacity reservations information together to reduce number of boto3 calls
AWSApi.instance().ec2.describe_capacity_reservations(self.all_relevant_capacity_reservation_ids)

def get_instance_types_data(self):
"""Get instance type infos for all instance types used in the configuration file."""
Expand Down Expand Up @@ -2426,6 +2501,8 @@ def __init__(self, cluster_name: str, scheduling: SlurmScheduling, **kwargs):
super().__init__(cluster_name, **kwargs)
self.scheduling = scheduling
self.__image_dict = None
# Cache capacity reservations information together to reduce number of boto3 calls
AWSApi.instance().ec2.describe_capacity_reservations(self.all_relevant_capacity_reservation_ids)

def get_instance_types_data(self):
"""Get instance type infos for all instance types used in the configuration file."""
Expand Down
33 changes: 33 additions & 0 deletions cli/src/pcluster/schemas/cluster_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
AwsBatchQueueNetworking,
AwsBatchScheduling,
AwsBatchSettings,
CapacityReservationTarget,
CapacityType,
CloudWatchDashboards,
CloudWatchLogs,
Expand Down Expand Up @@ -946,6 +947,32 @@ def make_resource(self, data, **kwargs):
return Timeouts(**data)


class CapacityReservationTargetSchema(BaseSchema):
"""Represent the schema of the CapacityReservationTarget section."""

capacity_reservation_id = fields.Str(metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY})
capacity_reservation_resource_group_arn = fields.Str(metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY})

@post_load()
def make_resource(self, data, **kwargs):
"""Generate resource."""
return CapacityReservationTarget(**data)

@validates_schema
def no_coexist_instance_type_flexibility(self, data, **kwargs):
"""Validate that 'capacity_reservation_id' and 'capacity_reservation_resource_group_arn' do not co-exist."""
if self.fields_coexist(
data,
["capacity_reservation_id", "capacity_reservation_resource_group_arn"],
one_required=True,
**kwargs,
):
raise ValidationError(
"A Capacity Reservation Target needs to specify either Capacity Reservation ID or "
"Capacity Reservation Resource Group ARN."
)


class ClusterDevSettingsSchema(BaseDevSettingsSchema):
"""Represent the schema of Dev Setting."""

Expand Down Expand Up @@ -1116,6 +1143,9 @@ class SlurmComputeResourceSchema(_ComputeResourceSchema):
efa = fields.Nested(EfaSchema, metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY})
disable_simultaneous_multithreading = fields.Bool(metadata={"update_policy": UpdatePolicy.COMPUTE_FLEET_STOP})
schedulable_memory = fields.Int(metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY})
capacity_reservation_target = fields.Nested(
CapacityReservationTargetSchema, metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY}
)

@validates_schema
def no_coexist_instance_type_flexibility(self, data, **kwargs):
Expand Down Expand Up @@ -1206,6 +1236,9 @@ class _CommonQueueSchema(BaseQueueSchema):
)
iam = fields.Nested(QueueIamSchema, metadata={"update_policy": UpdatePolicy.SUPPORTED})
image = fields.Nested(QueueImageSchema, metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY})
capacity_reservation_target = fields.Nested(
CapacityReservationTargetSchema, metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY}
)


class SlurmQueueSchema(_CommonQueueSchema):
Expand Down
26 changes: 26 additions & 0 deletions cli/src/pcluster/templates/cdk_builder_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,32 @@ def _build_policy(self) -> List[iam.PolicyStatement]:
]
)

if self._config.scheduling.scheduler != "awsbatch":
capacity_reservation_ids = self._config.capacity_reservation_ids
if self._config.capacity_reservation_ids:
policy.append(
iam.PolicyStatement(
actions=["ec2:RunInstances"],
effect=iam.Effect.ALLOW,
resources=[
self._format_arn(
service="ec2",
resource=f"capacity-reservation/{capacity_reservation_id}",
)
for capacity_reservation_id in capacity_reservation_ids
],
)
)
capacity_reservation_resource_group_arns = self._config.capacity_reservation_resource_group_arns
if capacity_reservation_resource_group_arns:
policy.append(
iam.PolicyStatement(
actions=["ec2:RunInstances"],
effect=iam.Effect.ALLOW,
resources=capacity_reservation_resource_group_arns,
)
)

if self._config.directory_service:
policy.append(
iam.PolicyStatement(
Expand Down
11 changes: 11 additions & 0 deletions cli/src/pcluster/templates/cluster_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -1459,6 +1459,16 @@ def _add_compute_resource_launch_template(
if isinstance(compute_resource, SlurmComputeResource):
conditional_template_properties.update({"instance_type": compute_resource.instance_type})

capacity_reservation_specification = None
cr_target = compute_resource.capacity_reservation_target or queue.capacity_reservation_target
if cr_target:
capacity_reservation_specification = ec2.CfnLaunchTemplate.CapacityReservationSpecificationProperty(
capacity_reservation_target=ec2.CfnLaunchTemplate.CapacityReservationTargetProperty(
capacity_reservation_id=cr_target.capacity_reservation_id,
capacity_reservation_resource_group_arn=cr_target.capacity_reservation_resource_group_arn,
)
)

return ec2.CfnLaunchTemplate(
self,
f"LaunchTemplate{create_hash_suffix(queue.name + compute_resource.name)}",
Expand All @@ -1476,6 +1486,7 @@ def _add_compute_resource_launch_template(
),
instance_market_options=instance_market_options,
instance_initiated_shutdown_behavior="terminate",
capacity_reservation_specification=capacity_reservation_specification,
user_data=Fn.base64(
Fn.sub(
get_user_data_content("../resources/compute_node/user_data.sh"),
Expand Down
Loading