Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 25 additions & 173 deletions test/integration_tests/abstract_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os
import subprocess
import time
import uuid
import re

import boto3
import yaml
from botocore.exceptions import ClientError

from hyperpod_cli.utils import setup_logger
Expand All @@ -32,164 +34,37 @@ class AbstractIntegrationTests:
"InService",
]
suffix = str(uuid.uuid4())[:8]
hyperpod_cli_cluster_name = "hyperpod-cli-cluster-" + suffix
vpc_eks_stack_name = "hyperpod-cli-stack-" + suffix
hyperpod_cli_job_name: str = 'hyperpod-job-'+ suffix
test_job_file = os.path.expanduser("./test/integration_tests/data/basicJob.yaml")
hyperpod_cli_cluster_name = "HyperPodCLI-cluster"
s3_roles_stack_name = "hyperpod-cli-resource-stack"
vpc_stack_name = "hyperpod-cli-vpc-stack"
eks_cluster_name = "hyperpod-cli-cluster-" + suffix
bucket_name = "hyperpod-cli-s3-" + suffix
test_team_name = "test-team"

def _create_session(self):
session = boto3.Session()
return session

def create_test_resources(self, session):
cfn = session.client("cloudformation")
def replace_placeholders(self):
replacements = {
'JOB_NAME': self.hyperpod_cli_job_name,
}
with open(self.test_job_file, 'r') as file:
yaml_content = file.read()
pattern = re.compile(r'\$\{([^}^{]+)\}')

# Get static resources from static-resource stack
self.describe_constant_resources_stack_and_set_values(cfn)
def replace(match):
key = match.group(1)
return str(replacements.get(key, match.group(0)))

# Get static resources from static-resource stack
self.describe_vpc_stack_and_set_values(cfn)
processed_yaml = pattern.sub(replace, yaml_content)

# Create VPC, EKS cluster and roles
with open(
"test/integration_tests/cloudformation/resources.yaml",
"r",
) as fh:
template = fh.read()
cfn.create_stack(
StackName=self.vpc_eks_stack_name,
TemplateBody=template,
Capabilities=["CAPABILITY_NAMED_IAM"],
Parameters=[
{
"ParameterKey": "ClusterName",
"ParameterValue": self.eks_cluster_name,
"ResolvedValue": "string",
},
{
"ParameterKey": "EKSClusterRoleArn",
"ParameterValue": self.cfn_output_map.get("EKSClusterRoleArn"),
"ResolvedValue": "string",
},
{
"ParameterKey": "SubnetId1",
"ParameterValue": self.cfn_output_map.get("PrivateSubnet1"),
"ResolvedValue": "string",
},
{
"ParameterKey": "SubnetId2",
"ParameterValue": self.cfn_output_map.get("PrivateSubnet2"),
"ResolvedValue": "string",
},
{
"ParameterKey": "SecurityGroupId",
"ParameterValue": self.cfn_output_map.get("SecurityGroup"),
"ResolvedValue": "string",
},
],
)
waiter = cfn.get_waiter("stack_create_complete")
waiter.wait(
StackName=self.vpc_eks_stack_name,
WaiterConfig={
"Delay": 30,
"MaxAttempts": 40,
},
)
describe = cfn.describe_stacks(StackName=self.vpc_eks_stack_name)
if describe:
cfn_output = describe.get("Stacks")[0]
if cfn_output and cfn_output.get("Outputs"):
for output in cfn_output.get("Outputs"):
self.cfn_output_map[output.get("OutputKey")] = output.get(
"OutputValue"
)

def delete_cloudformation_stack(self, session):
cfn = session.client("cloudformation")
cfn.delete_stack(StackName=self.vpc_eks_stack_name)

def upload_lifecycle_script(self, session):
s3_client = session.client("s3")
try:
response = s3_client.upload_file(
"test/integration_tests/lifecycle_script/on_create_noop.sh",
self.cfn_output_map.get("Bucket"),
"on_create_noop.sh",
)
except ClientError as e:
logger.error(f"Error uploading lifecycle script to s3 {e}")

def get_hyperpod_cluster_status(self, sagemaker_client):
return sagemaker_client.describe_cluster(
ClusterName=self.hyperpod_cli_cluster_name
)

def create_hyperpod_cluster(self, session):
# Create HyperPod cluster using eks cluster from stack above
sagemaker_client = session.client("sagemaker")
sagemaker_client.create_cluster(
ClusterName=self.hyperpod_cli_cluster_name,
Orchestrator={"Eks": {"ClusterArn": self.cfn_output_map.get("ClusterArn")}},
InstanceGroups=[
{
"InstanceGroupName": "group2",
"InstanceType": "ml.c5.2xlarge",
"InstanceCount": 2,
"LifeCycleConfig": {
"SourceS3Uri": f's3://{self.cfn_output_map.get("Bucket")}',
"OnCreate": "on_create_noop.sh",
},
"ExecutionRole": self.cfn_output_map.get("ExecutionRole"),
"ThreadsPerCore": 1,
}
],
VpcConfig={
"SecurityGroupIds": [self.cfn_output_map.get("SecurityGroup")],
"Subnets": [self.cfn_output_map.get("PrivateSubnet1")],
},
)
with open(self.test_job_file, 'w') as file:
file.write(processed_yaml)

time.sleep(1)
# Wait for sagemkaer stack to create complete
try:
result = self.get_hyperpod_cluster_status(sagemaker_client)
while (
result.get("ClusterStatus") not in self.hyperpod_cluster_terminal_state
):
time.sleep(30)
result = self.get_hyperpod_cluster_status(sagemaker_client)
except Exception as e:
logger.error(e)
logger.info(f"Hyperpod cluster created {self.hyperpod_cli_cluster_name}")

def delete_hyperpod_cluster(self, session):
# delete HyperPod cluster using eks cluster from stack above
sagemaker_client = session.client("sagemaker")
sagemaker_client.delete_cluster(ClusterName=self.hyperpod_cli_cluster_name)

time.sleep(10)
# Wait for sagemaker stack to create complete
try:
result = self.get_hyperpod_cluster_status(sagemaker_client)
while result.get("ClusterStatus") == "Deleting":
time.sleep(30)
result = self.get_hyperpod_cluster_status(sagemaker_client)
except Exception as e:
logger.info(
f"Caught exception while trying to describe cluster during teardown {e}"
)
return
raise Exception(
f"Hyperpod Cluster {self.hyperpod_cli_cluster_name} fail to delete"
)

def create_kube_context(self):
eks_cluster_name = self.cfn_output_map.get("ClusterArn").split(":")[-1]
eks_cluster_name = eks_cluster_name.split("/")[-1]
eks_cluster_name = 'HyperPodCLI-eks-cluster'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is better declared as class constant

command = [
"aws",
"eks",
Expand All @@ -204,28 +79,6 @@ def create_kube_context(self):
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to update kubeconfig: {e}")

def describe_constant_resources_stack_and_set_values(self, cfn_client):
describe_s3_stack = cfn_client.describe_stacks(
StackName=self.s3_roles_stack_name
)
if describe_s3_stack:
cfn_output = describe_s3_stack.get("Stacks")[0]
if cfn_output and cfn_output.get("Outputs"):
for output in cfn_output.get("Outputs"):
self.cfn_output_map[output.get("OutputKey")] = output.get(
"OutputValue"
)

def describe_vpc_stack_and_set_values(self, cfn_client):
describe_vpc_stack = cfn_client.describe_stacks(StackName=self.vpc_stack_name)
if describe_vpc_stack:
cfn_output = describe_vpc_stack.get("Stacks")[0]
if cfn_output and cfn_output.get("Outputs"):
for output in cfn_output.get("Outputs"):
self.cfn_output_map[output.get("OutputKey")] = output.get(
"OutputValue"
)

def apply_helm_charts(self):
command = ["helm", "dependencies", "update", "helm_chart/HyperPodHelmChart"]

Expand All @@ -244,7 +97,8 @@ def apply_helm_charts(self):

apply_command = [
"helm",
"install",
"upgrade",
"--install",
"dependencies",
"helm_chart/HyperPodHelmChart",
"--namespace",
Expand Down Expand Up @@ -410,13 +264,11 @@ def create_quota_allocation_resources(self):

def setup(self):
self.new_session = self._create_session()
self.create_test_resources(self.new_session)
self.replace_placeholders()
self.create_kube_context()
self.apply_helm_charts()
self.create_hyperpod_cluster(self.new_session)
self.install_kueue()
self.create_quota_allocation_resources()
# self.install_kueue()
# self.create_quota_allocation_resources()

def tearDown(self):
self.delete_hyperpod_cluster(self.new_session)
self.delete_cloudformation_stack(self.new_session)
logger.info("Tests completed")
6 changes: 3 additions & 3 deletions test/integration_tests/data/basicJob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ training_cfg:
entry_script: /opt/pytorch-mnist/mnist.py
script_args: []
run:
name: hyperpod-cli-test # Current run name
name: ${JOB_NAME} # Current run name
nodes: 1 # Number of nodes to use for current training
ntasks_per_node: 1 # Number of devices to use per node
cluster:
Expand All @@ -33,7 +33,7 @@ cluster:
service_account_name: null
# persistent volume, usually used to mount FSx
persistent_volume_claims: null
namespace: hyperpod-ns-test-team
namespace: kubeflow
# required node affinity to select nodes with HyperPod
# labels and passed health check if burn-in enabled
label_selector:
Expand All @@ -47,7 +47,7 @@ cluster:
- 100
pullPolicy: IfNotPresent # policy to pull container, can be Always, IfNotPresent and Never
restartPolicy: OnFailure # restart policy
scheduler_type: Kueue
scheduler_type: None

base_results_dir: ./result # Location to store the results, checkpoints and logs.
container: docker.io/kubeflowkatib/pytorch-mnist-cpu:v1beta1-bc09cfd # container to use
Expand Down
Loading
Loading