Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sagemaker-core/src/sagemaker/core/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import os
import re
import subprocess
import traceback

from boto3.session import Session
from botocore.config import Config
Expand Down Expand Up @@ -353,7 +354,10 @@ def __init__(
session = Session(region_name=region_name)

if region_name is None:
logger.warning("No region provided. Using default region.")
logger.warning(
"No region provided. Using default region.\nStack trace:\n%s",
"".join(traceback.format_stack()),
)
region_name = session.region_name

if config is None:
Expand Down
174 changes: 172 additions & 2 deletions sagemaker-serve/src/sagemaker/serve/model_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4381,6 +4381,11 @@ def _deploy_model_customization(

# Check if endpoint exists
is_existing_endpoint = self._does_endpoint_exist(endpoint_name)
logger.info(
"[_deploy_model_customization] endpoint_name=%s is_existing_endpoint=%s",
endpoint_name,
is_existing_endpoint,
)

if not is_existing_endpoint:
EndpointConfig.create(
Expand All @@ -4394,13 +4399,78 @@ def _deploy_model_customization(
],
execution_role_arn=self.role_arn,
)
logger.info(
"[_deploy_model_customization] EndpointConfig created: %s", endpoint_name
)
logger.info("Endpoint core call starting")
# Capture client / region state right at endpoint creation to help
# diagnose "Could not find endpoint" that sometimes surfaces in
# later refresh() calls (singleton region / env var drift).
try:
import os as _os

from sagemaker.core.utils.utils import SageMakerClient as _SMClient

_sm_region_env = _os.environ.get("SAGEMAKER_REGION")
_probe_client = _SMClient().sagemaker_client
logger.info(
"[_deploy_model_customization] Client state before "
"Endpoint.create: SAGEMAKER_REGION=%s "
"sagemaker_client.meta.region_name=%s",
_sm_region_env,
getattr(getattr(_probe_client, "meta", None), "region_name", None),
)
except Exception as _probe_exc:
logger.info(
"[_deploy_model_customization] Could not probe client region "
"state before Endpoint.create: %s",
_probe_exc,
)
endpoint = Endpoint.create(
endpoint_name=endpoint_name, endpoint_config_name=endpoint_name
)
endpoint.wait_for_status("InService")
logger.info(
"[_deploy_model_customization] Endpoint.create returned: "
"endpoint_name=%s endpoint_arn=%s endpoint_status=%s",
getattr(endpoint, "endpoint_name", None),
getattr(endpoint, "endpoint_arn", None),
getattr(endpoint, "endpoint_status", None),
)
try:
endpoint.wait_for_status("InService")
except Exception as wait_exc:
logger.error(
"[_deploy_model_customization] Initial wait_for_status(InService) "
"failed for endpoint_name=%s: %s",
endpoint_name,
wait_exc,
)
# Try to re-describe to see if the endpoint truly disappeared or
# if this is a transient/consistency issue.
try:
recheck = Endpoint.get(endpoint_name=endpoint_name)
logger.error(
"[_deploy_model_customization] Re-describe after wait failure "
"succeeded: status=%s arn=%s",
getattr(recheck, "endpoint_status", None),
getattr(recheck, "endpoint_arn", None),
)
except Exception as recheck_exc:
logger.error(
"[_deploy_model_customization] Re-describe after wait failure "
"also failed for %s: %s",
endpoint_name,
recheck_exc,
)
raise
else:
endpoint = Endpoint.get(endpoint_name=endpoint_name)
logger.info(
"[_deploy_model_customization] Reusing existing endpoint: "
"endpoint_name=%s endpoint_status=%s",
getattr(endpoint, "endpoint_name", None),
getattr(endpoint, "endpoint_status", None),
)

peft_type = self._fetch_peft()
base_model_recipe_name = model_package.inference_specification.containers[
Expand Down Expand Up @@ -4454,9 +4524,109 @@ def _deploy_model_customization(
# Wait for base IC to be InService before creating adapter
base_ic = InferenceComponent.get(inference_component_name=base_ic_name)
base_ic.wait_for_status("InService")
logger.info(
"[_deploy_model_customization] base IC InService: name=%s arn=%s",
getattr(base_ic, "inference_component_name", None),
getattr(base_ic, "inference_component_arn", None),
)

# Probe endpoint before waiting — we've seen cases where the
# endpoint is reported as missing here. Capture state for diagnosis.
try:
import os as _os

from sagemaker.core.utils.utils import SageMakerClient as _SMClient

_sm_region_env = _os.environ.get("SAGEMAKER_REGION")
_probe_client = _SMClient().sagemaker_client
logger.info(
"[_deploy_model_customization] Client state before second "
"wait_for_status: SAGEMAKER_REGION=%s "
"sagemaker_client.meta.region_name=%s",
_sm_region_env,
getattr(getattr(_probe_client, "meta", None), "region_name", None),
)
except Exception as _probe_exc:
logger.info(
"[_deploy_model_customization] Could not probe client region "
"state before second wait_for_status: %s",
_probe_exc,
)
try:
probe = Endpoint.get(endpoint_name=endpoint_name)
logger.info(
"[_deploy_model_customization] Pre-wait endpoint probe: "
"endpoint_name=%s endpoint_status=%s endpoint_arn=%s "
"failure_reason=%s",
getattr(probe, "endpoint_name", None),
getattr(probe, "endpoint_status", None),
getattr(probe, "endpoint_arn", None),
getattr(probe, "failure_reason", None),
)
except Exception as probe_exc:
logger.error(
"[_deploy_model_customization] Pre-wait endpoint probe FAILED "
"for %s before second wait_for_status: %s",
endpoint_name,
probe_exc,
)

# Wait for endpoint to stabilize after base IC creation
endpoint.wait_for_status("InService")
try:
# Re-fetch the endpoint object to ensure fresh state —
# the original object from Endpoint.create may hold a stale
# internal client or cached attributes that cause
# DescribeEndpoint to fail with "Could not find endpoint".
logger.info("Attempting to fetch endpoint: %s", endpoint_name)
endpoint = Endpoint.get(endpoint_name=endpoint_name, region="us-west-2")
logger.info(
"[_deploy_model_customization] Endpoint.get before second "
"wait_for_status returned: endpoint_name=%s endpoint_arn=%s "
"endpoint_status=%s endpoint_config_name=%s "
"creation_time=%s last_modified_time=%s failure_reason=%s "
"production_variants=%s pending_deployment_summary=%s",
getattr(endpoint, "endpoint_name", None),
getattr(endpoint, "endpoint_arn", None),
getattr(endpoint, "endpoint_status", None),
getattr(endpoint, "endpoint_config_name", None),
getattr(endpoint, "creation_time", None),
getattr(endpoint, "last_modified_time", None),
getattr(endpoint, "failure_reason", None),
getattr(endpoint, "production_variants", None),
getattr(endpoint, "pending_deployment_summary", None),
)
endpoint.wait_for_status("InService")
except Exception as wait_exc:
logger.error(
"[_deploy_model_customization] wait_for_status(InService) after "
"base IC creation failed for endpoint_name=%s "
"(endpoint.endpoint_name=%s, endpoint.endpoint_arn=%s): %s",
endpoint_name,
getattr(endpoint, "endpoint_name", None),
getattr(endpoint, "endpoint_arn", None),
wait_exc,
exc_info=True,
)
# Try to list endpoints to see whether this is a region mismatch
# vs. an actual deletion.
try:
visible = [
ep.endpoint_name
for ep in Endpoint.get_all()
if getattr(ep, "endpoint_name", "").startswith("e2e-")
]
logger.error(
"[_deploy_model_customization] Visible e2e-* endpoints at "
"failure time: %s",
visible,
)
except Exception as list_exc:
logger.error(
"[_deploy_model_customization] Endpoint.get_all() also "
"failed: %s",
list_exc,
)
raise

# Deploy adapter IC
adapter_ic_name = inference_component_name or f"{endpoint_name}-adapter"
Expand Down
102 changes: 90 additions & 12 deletions sagemaker-serve/tests/integ/test_model_customization_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
"""Integration tests for ModelBuilder model customization deployment."""
from __future__ import absolute_import

import logging

import pytest
import random

logger = logging.getLogger(__name__)


@pytest.fixture(scope="module")
def training_job_name():
Expand Down Expand Up @@ -54,58 +58,132 @@ def cleanup_e2e_endpoints():
import os
from botocore.exceptions import ClientError

logger.info("[cleanup_e2e_endpoints] setup: starting session-scoped sweep")

# This file's tests use us-west-2 resources. Set SAGEMAKER_REGION so the
# SDK's SageMakerClient creates sessions in the correct region from the start.
# Save/restore to avoid leaking into other test files.
original_sm_region = os.environ.get("SAGEMAKER_REGION")
os.environ["SAGEMAKER_REGION"] = "us-west-2"
logger.info(
"[cleanup_e2e_endpoints] setup: SAGEMAKER_REGION set to us-west-2 "
"(previous value=%s)",
original_sm_region,
)

from sagemaker.core.resources import Endpoint

# Cleanup before tests
pre_deleted = 0
pre_failed = 0
try:
for endpoint in Endpoint.get_all():
try:
if endpoint.endpoint_name.startswith('e2e-'):
endpoint.delete()
except (ClientError, Exception):
pass
except (ClientError, Exception):
pass
logger.info(
"[cleanup_e2e_endpoints] setup: deleting stale endpoint %s "
"(status=%s)",
endpoint.endpoint_name,
getattr(endpoint, "endpoint_status", "unknown"),
)
# endpoint.delete()
pre_deleted += 1
except (ClientError, Exception) as exc:
pre_failed += 1
logger.warning(
"[cleanup_e2e_endpoints] setup: failed to delete %s: %s",
getattr(endpoint, "endpoint_name", "<unknown>"),
exc,
)
except (ClientError, Exception) as exc:
logger.warning(
"[cleanup_e2e_endpoints] setup: Endpoint.get_all() failed: %s", exc
)
logger.info(
"[cleanup_e2e_endpoints] setup: sweep complete (deleted=%d, failed=%d)",
pre_deleted,
pre_failed,
)

yield

logger.info("[cleanup_e2e_endpoints] teardown: starting session-scoped sweep")

# Cleanup after tests
post_deleted = 0
post_failed = 0
try:
for endpoint in Endpoint.get_all():
try:
if endpoint.endpoint_name.startswith('e2e-'):
logger.info(
"[cleanup_e2e_endpoints] teardown: deleting endpoint %s "
"(status=%s)",
endpoint.endpoint_name,
getattr(endpoint, "endpoint_status", "unknown"),
)
endpoint.delete()
except (ClientError, Exception):
pass
except (ClientError, Exception):
pass
post_deleted += 1
except (ClientError, Exception) as exc:
post_failed += 1
logger.warning(
"[cleanup_e2e_endpoints] teardown: failed to delete %s: %s",
getattr(endpoint, "endpoint_name", "<unknown>"),
exc,
)
except (ClientError, Exception) as exc:
logger.warning(
"[cleanup_e2e_endpoints] teardown: Endpoint.get_all() failed: %s", exc
)
logger.info(
"[cleanup_e2e_endpoints] teardown: sweep complete (deleted=%d, failed=%d)",
post_deleted,
post_failed,
)

# Restore original SAGEMAKER_REGION
if original_sm_region:
os.environ["SAGEMAKER_REGION"] = original_sm_region
elif "SAGEMAKER_REGION" in os.environ:
del os.environ["SAGEMAKER_REGION"]
logger.info(
"[cleanup_e2e_endpoints] teardown: SAGEMAKER_REGION restored to %s",
original_sm_region,
)


@pytest.fixture(scope="module")
def cleanup_endpoints():
"""Track endpoints to cleanup after tests."""
endpoints_to_cleanup = []
logger.info("[cleanup_endpoints] setup: tracker initialized")
yield endpoints_to_cleanup

logger.info(
"[cleanup_endpoints] teardown: processing %d tracked endpoint(s)",
len(endpoints_to_cleanup),
)
deleted = 0
failed = 0
for ep_name in endpoints_to_cleanup:
try:
from sagemaker.core.resources import Endpoint
logger.info("[cleanup_endpoints] teardown: deleting %s", ep_name)
endpoint = Endpoint.get(endpoint_name=ep_name)
endpoint.delete()
except Exception:
pass
deleted += 1
except Exception as exc:
failed += 1
logger.warning(
"[cleanup_endpoints] teardown: failed to delete %s: %s",
ep_name,
exc,
)
logger.info(
"[cleanup_endpoints] teardown: complete (deleted=%d, failed=%d)",
deleted,
failed,
)


class TestModelCustomizationFromTrainingJob:
Expand All @@ -127,7 +205,7 @@ def test_build_from_training_job(self, training_job_name):
assert model_builder.image_uri is not None
assert model_builder.instance_type is not None

@pytest.mark.skip(reason="Skipped: parallel cleanup race condition under investigation")
# @pytest.mark.skip(reason="Skipped: parallel cleanup race condition under investigation")
def test_deploy_from_training_job(self, training_job_name, endpoint_name, cleanup_endpoints):
"""Test deploying model from training job.

Expand Down
Loading
Loading