Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions tests/integration-tests/clusters_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def wrapper(*args, **kwargs):
class Cluster:
"""Contain all static and dynamic data related to a cluster instance."""

def __init__(self, name, ssh_key, config_file, region):
def __init__(self, name, ssh_key, config_file, region, custom_cli_credentials=None):
self.name = name
self.config_file = config_file
self.ssh_key = ssh_key
Expand All @@ -57,6 +57,7 @@ def __init__(self, name, ssh_key, config_file, region):
self.__cfn_outputs = None
self.__cfn_resources = None
self.__cfn_stack_arn = None
self.custom_cli_credentials = custom_cli_credentials

def __repr__(self):
attrs = ", ".join(["{key}={value}".format(key=key, value=repr(value)) for key, value in self.__dict__.items()])
Expand Down Expand Up @@ -89,7 +90,12 @@ def update(self, config_file, raise_on_error=True, log_error=True, **kwargs):
# TODO Remove the validator suppression below once the plugin scheduler is officially supported
if self.config["Scheduling"]["Scheduler"] == "plugin":
command.extend(["--suppress-validators", "type:SchedulerValidator"])
result = run_pcluster_command(command, raise_on_error=raise_on_error, log_error=log_error)
result = run_pcluster_command(
command,
raise_on_error=raise_on_error,
log_error=log_error,
custom_cli_credentials=self.custom_cli_credentials,
)
logging.info("update-cluster response: %s", result.stdout)
response = json.loads(result.stdout)
if response.get("cloudFormationStackStatus") != "UPDATE_COMPLETE":
Expand Down Expand Up @@ -130,7 +136,7 @@ def delete(self, delete_logs=False):
logging.warning("CloudWatch logs for cluster %s are preserved due to failure.", self.name)
try:
self.cfn_stack_arn # Cache cfn_stack_arn attribute before stack deletion
result = run_pcluster_command(cmd_args, log_error=False)
result = run_pcluster_command(cmd_args, log_error=False, custom_cli_credentials=self.custom_cli_credentials)
if "DELETE_FAILED" in result.stdout:
error = "Cluster deletion failed for {0} with output: {1}".format(self.name, result.stdout)
logging.error(error)
Expand All @@ -153,7 +159,7 @@ def start(self):
else: # slurm and scheduler plugin case
cmd_args.append("START_REQUESTED")
try:
result = run_pcluster_command(cmd_args, log_error=False)
result = run_pcluster_command(cmd_args, log_error=False, custom_cli_credentials=self.custom_cli_credentials)
logging.info("Cluster {0} started successfully".format(self.name))
return result.stdout
except subprocess.CalledProcessError as e:
Expand All @@ -169,7 +175,7 @@ def stop(self):
else: # slurm and scheduler plugin case
cmd_args.append("STOP_REQUESTED")
try:
result = run_pcluster_command(cmd_args, log_error=False)
result = run_pcluster_command(cmd_args, log_error=False, custom_cli_credentials=self.custom_cli_credentials)
logging.info("Cluster {0} stopped successfully".format(self.name))
return result.stdout
except subprocess.CalledProcessError as e:
Expand All @@ -180,7 +186,7 @@ def describe_cluster(self):
"""Run pcluster describe-cluster and return the result."""
cmd_args = ["pcluster", "describe-cluster", "--cluster-name", self.name]
try:
result = run_pcluster_command(cmd_args, log_error=False)
result = run_pcluster_command(cmd_args, log_error=False, custom_cli_credentials=self.custom_cli_credentials)
response = json.loads(result.stdout)
logging.info("Get cluster {0} status successfully".format(self.name))
return response
Expand All @@ -192,7 +198,7 @@ def describe_compute_fleet(self):
"""Run pcluster describe-compute-fleet and return the result."""
cmd_args = ["pcluster", "describe-compute-fleet", "--cluster-name", self.name]
try:
result = run_pcluster_command(cmd_args, log_error=False)
result = run_pcluster_command(cmd_args, log_error=False, custom_cli_credentials=self.custom_cli_credentials)
response = json.loads(result.stdout)
logging.info("Describe cluster %s compute fleet successfully", self.name)
return response
Expand All @@ -216,7 +222,7 @@ def describe_cluster_instances(self, node_type=None, queue_name=None):
if queue_name:
cmd_args.extend(["--queue-name", queue_name])
try:
result = run_pcluster_command(cmd_args, log_error=False)
result = run_pcluster_command(cmd_args, log_error=False, custom_cli_credentials=self.custom_cli_credentials)
response = json.loads(result.stdout)
logging.info("Get cluster {0} instances successfully".format(self.name))
return response["instances"]
Expand All @@ -239,7 +245,7 @@ def export_logs(self, bucket, output_file=None, bucket_prefix=None, filters=None
if filters:
cmd_args += ["--filters", filters]
try:
result = run_pcluster_command(cmd_args, log_error=False)
result = run_pcluster_command(cmd_args, log_error=False, custom_cli_credentials=self.custom_cli_credentials)
response = json.loads(result.stdout)
logging.info("Cluster's logs exported successfully")
return response
Expand All @@ -253,7 +259,7 @@ def list_log_streams(self, next_token=None):
if next_token:
cmd_args.extend(["--next-token", next_token])
try:
result = run_pcluster_command(cmd_args, log_error=False)
result = run_pcluster_command(cmd_args, log_error=False, custom_cli_credentials=self.custom_cli_credentials)
response = json.loads(result.stdout)
logging.info("Cluster's logs listed successfully")
return response
Expand Down Expand Up @@ -281,7 +287,7 @@ def get_log_events(self, log_stream, **args):
cmd_args.extend([f"--{kebab_case(k)}", str(val)])

try:
result = run_pcluster_command(cmd_args, log_error=False)
result = run_pcluster_command(cmd_args, log_error=False, custom_cli_credentials=self.custom_cli_credentials)
response = json.loads(result.stdout)
logging.info("Log events retrieved successfully")
return response
Expand All @@ -296,7 +302,7 @@ def get_stack_events(self, **args):
cmd_args.extend([f"--{kebab_case(k)}", str(val)])

try:
result = run_pcluster_command(cmd_args, log_error=False)
result = run_pcluster_command(cmd_args, log_error=False, custom_cli_credentials=self.custom_cli_credentials)
response = json.loads(result.stdout)
logging.info("Stack events retrieved successfully")
return response
Expand Down Expand Up @@ -419,8 +425,13 @@ def create_cluster(self, cluster, log_error=True, raise_on_error=True, **kwargs)
logging.info("Creating cluster {0} with config {1}".format(name, cluster.config_file))
command, wait = self._build_command(cluster, kwargs)
try:
result = run_pcluster_command(command, timeout=7200, raise_on_error=raise_on_error, log_error=log_error)

result = run_pcluster_command(
command,
timeout=7200,
raise_on_error=raise_on_error,
log_error=log_error,
custom_cli_credentials=kwargs.get("custom_cli_credentials"),
)
logging.info("create-cluster response: %s", result.stdout)
response = json.loads(result.stdout)
if wait:
Expand Down Expand Up @@ -470,10 +481,11 @@ def _build_command(cluster, kwargs):
kwargs["suppress_validators"] = validators_list

for k, val in kwargs.items():
if isinstance(val, (list, tuple)):
command.extend([f"--{kebab_case(k)}"] + list(map(str, val)))
else:
command.extend([f"--{kebab_case(k)}", str(val)])
if k != "custom_cli_credentials":
if isinstance(val, (list, tuple)):
command.extend([f"--{kebab_case(k)}"] + list(map(str, val)))
else:
command.extend([f"--{kebab_case(k)}", str(val)])

return command, wait

Expand Down
6 changes: 6 additions & 0 deletions tests/integration-tests/configs/common/common.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ iam:
instances: {{ common.INSTANCES_DEFAULT_X86 }}
oss: ["alinux2"]
schedulers: ["slurm", "awsbatch"]
test_iam.py::test_iam_resource_prefix:
dimensions:
- regions: [ "eu-north-1" ]
instances: {{ common.INSTANCES_DEFAULT_X86 }}
oss: [ "alinux2" ]
schedulers: [ "slurm" ]
intel_hpc:
test_intel_hpc.py::test_intel_hpc:
dimensions:
Expand Down
6 changes: 6 additions & 0 deletions tests/integration-tests/configs/pcluster3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ test-suites:
instances: {{ common.INSTANCES_DEFAULT_X86 }}
oss: ["alinux2"]
schedulers: ["slurm"]
test_iam.py::test_iam_resource_prefix:
dimensions:
- regions: [ "eu-north-1" ]
schedulers: [ "slurm" ]
oss: [ "alinux2" ]
instances: {{ common.INSTANCES_DEFAULT_X86 }}
schedulers:
test_awsbatch.py::test_awsbatch:
dimensions:
Expand Down
1 change: 1 addition & 0 deletions tests/integration-tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ def _cluster_factory(cluster_config, upper_case_cluster_name=False, **kwargs):
config_file=cluster_config,
ssh_key=request.config.getoption("key_path"),
region=region,
custom_cli_credentials=kwargs.get("custom_cli_credentials"),
)
if not request.config.getoption("cluster"):
cluster.creation_response = factory.create_cluster(cluster, **kwargs)
Expand Down
6 changes: 5 additions & 1 deletion tests/integration-tests/framework/credential_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ def register_cli_credentials_for_region(region, iam_role):

def run_pcluster_command(*args, **kwargs):
"""Run a command after assuming the role configured through register_cli_credentials_for_region."""

region = kwargs.get("region")
if not region:
region = os.environ["AWS_DEFAULT_REGION"]

if region in cli_credentials:
with sts_credential_provider(region, cli_credentials[region]):
with sts_credential_provider(
region, credential_arn=kwargs.get("custom_cli_credentials") or cli_credentials.get(region)
):
kwargs.pop("custom_cli_credentials", None)
return run_command(*args, **kwargs)
else:
return run_command(*args, **kwargs)
Expand Down
132 changes: 131 additions & 1 deletion tests/integration-tests/tests/iam/test_iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import pytest
import yaml
from assertpy import assert_that
from cfn_stacks_factory import CfnStack, CfnStacksFactory
from framework.tests_configuration.config_utils import get_all_regions
from remote_command_executor import RemoteCommandExecutor
from s3_common_utils import check_s3_read_resource, check_s3_read_write_resource, get_policy_resources
from utils import wait_for_computefleet_changed
from utils import generate_stack_name, wait_for_computefleet_changed

from tests.common.assertions import assert_no_errors_in_logs
from tests.schedulers.test_awsbatch import _test_job_submission as _test_job_submission_awsbatch
Expand Down Expand Up @@ -332,3 +334,131 @@ def test_s3_read_write_resource(region, pcluster_config_reader, s3_bucket_factor
# Check S3 resources
check_s3_read_resource(region, cluster, get_policy_resources(config, enable_write_access=False))
check_s3_read_write_resource(region, cluster, get_policy_resources(config, enable_write_access=True))


@pytest.mark.parametrize("iam_resource_prefix", ["/path-prefix/name-prefix-"])
@pytest.mark.usefixtures("os", "instance")
def test_iam_resource_prefix(
initialize_resource_prefix_cli_creds,
pcluster_config_reader,
clusters_factory,
test_datadir,
scheduler_commands_factory,
s3_bucket_factory,
s3_bucket,
iam_resource_prefix,
):
cli_credentials = initialize_resource_prefix_cli_creds(test_datadir)
if cli_credentials:
for region, creds in cli_credentials.items():

bucket_name = s3_bucket
cfn_client, _, iam_client, _ = _create_boto3_clients(region)
create_config, _ = _get_config_create_and_update(test_datadir)
cluster_config = pcluster_config_reader(
config_file=create_config, min_count=1, bucket=bucket_name, iam_resource_prefix=iam_resource_prefix
)

cluster = clusters_factory(cluster_config, custom_cli_credentials=creds)
_test_iam_resource_in_cluster(cfn_client, iam_client, cluster.name, iam_resource_prefix)


def _split_resource_prefix(resource_prefix):
"""To split Path and name prefix from Resource Prefix."""
if resource_prefix:
split_index = resource_prefix.rfind("/") + 1
return (
None
if split_index == 0
else resource_prefix
if split_index == len(resource_prefix)
else resource_prefix[:split_index],
None
if split_index == len(resource_prefix)
else resource_prefix
if split_index == 0
else resource_prefix[split_index:],
)
return None, None


def _check_iam_resource_prefix(resource_arn_list, iam_resource_prefix):
"""Check the path and name of IAM resource ( Roles, policy and Instance profiles)."""
iam_path, iam_name_prefix = _split_resource_prefix(iam_resource_prefix)
for resource in resource_arn_list:
if "arn:aws:iam:" in resource:
if iam_path:
assert_that(resource).contains(iam_path)
else:
assert_that(resource).contains("/parallelcluster/")
if iam_name_prefix:
assert_that(resource).contains(iam_name_prefix)


def _test_iam_resource_in_cluster(cfn_client, iam_client, stack_name, iam_resource_prefix):
"""Test IAM resources by checking the path and name prefix in AWS IAM and check cluster is created."""

# Check for cluster Status

assert_that(cfn_client.describe_stacks(StackName=stack_name).get("Stacks")[0].get("StackStatus")).is_equal_to(
"CREATE_COMPLETE"
)

resources = cfn_client.describe_stack_resources(StackName=stack_name)["StackResources"]
resource_arn_list = []

for resource in resources:
resource_type = resource["ResourceType"]
if resource_type == "AWS::IAM::Role":

resource_arn_list.append(iam_client.get_role(RoleName=resource["PhysicalResourceId"])["Role"]["Arn"])
resource_arn_list.extend(
iam_client.list_role_policies(RoleName=resource["PhysicalResourceId"])["PolicyNames"]
)
if resource_type == "AWS::IAM::InstanceProfile":
resource_arn_list.append(
iam_client.get_instance_profile(InstanceProfileName=resource["PhysicalResourceId"])["InstanceProfile"][
"Arn"
]
)
_check_iam_resource_prefix(resource_arn_list, iam_resource_prefix)


@pytest.fixture(scope="class")
def initialize_resource_prefix_cli_creds(request):
"""Create an IAM Role with Permission Boundary for testing Resource Prefix Feature."""

stack_factory = CfnStacksFactory(request.config.getoption("credential"))

def _create_resource_prefix_cli_creds(test_datadir):
regions = request.config.getoption("regions") or get_all_regions(request.config.getoption("tests_config"))
stack_template_path = os_lib.path.join("..", test_datadir / "user-role-iam-resource-prefix.cfn.yaml")
with open(stack_template_path, encoding="utf-8") as stack_template_file:
stack_template_data = stack_template_file.read()
cli_creds = {}
for region in regions:
if request.config.getoption("iam_user_role_stack_name"):
stack_name = request.config.getoption("iam_user_role_stack_name")
logging.info(f"Using stack {stack_name} in region {region}")
stack = CfnStack(
name=stack_name, region=region, capabilities=["CAPABILITY_IAM"], template=stack_template_data
)
else:
logging.info("Creating IAM roles for pcluster CLI")
stack_name = generate_stack_name(
"integ-tests-iam-rp-user-role", request.config.getoption("stackname_suffix")
)
stack = CfnStack(
name=stack_name, region=region, capabilities=["CAPABILITY_IAM"], template=stack_template_data
)

stack_factory.create_stack(stack)
cli_creds[region] = stack.cfn_outputs["ParallelClusterUserRole"]
return cli_creds

yield _create_resource_prefix_cli_creds

if not request.config.getoption("no_delete"):
stack_factory.delete_all_stacks()
else:
logging.warning("Skipping deletion of CFN stacks because --no-delete option is set")
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
Image:
Os: {{ os }}
Iam:
ResourcePrefix: {{ iam_resource_prefix }}
HeadNode:
InstanceType: {{ instance }}
Networking:
SubnetId: {{ public_subnet_id }}
Ssh:
KeyName: {{ key_name }}
Iam:
S3Access:
- BucketName: {{ bucket }}
KeyName: read_and_write/
EnableWriteAccess: true
Scheduling:
Scheduler: {{ scheduler }}
SlurmQueues:
- Name: queue-0
ComputeResources:
- Name: compute-resource-0
InstanceType: {{ instance }}
MinCount: {{ min_count }}
Networking:
SubnetIds:
- {{ private_subnet_id }}
- Name: queue-1
ComputeResources:
- Name: compute-resource-0
InstanceType: {{ instance }}
MinCount: {{ min_count }}
Networking:
SubnetIds:
- {{ private_subnet_id }}
Iam:
S3Access:
- BucketName: {{ bucket }}
KeyName: read_and_write/
EnableWriteAccess: true


Loading