## Trusted Identity Propagation with S3 Access Grants

Use this notebook to demonstrate the Trusted Identity Propagation feature of IAM Identity
Center in the context of the S3 Access Grants use case.

### Prerequisites

Make sure the default IAM principal _(In this case, IAM Role assigned to the Sagemaker notebook instance)_ has below permissions.

**OIDC-Policy**



```
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "VisualEditor0",
      "Effect": "Allow",
      "Action": "sso-oauth:CreateTokenWithIAM",
      "Resource": "*"
    }
  ]
}
```

**Sts-Policy**



```
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "VisualEditor0",
      "Effect": "Allow",
      "Action": [
        "sts:SetContext",
        "sts:AssumeRole",
        "sts:SetSourceIdentity"
      ],
      "Resource": "arn:aws:iam::{ACCOUNT_ID}:role/S3DataAccessRole"
    }
  ]
}
```

**Secrets-Manager-Policy**



```
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "VisualEditor0",
      "Effect": "Allow",
      "Action": "secretsmanager:GetSecretValue",
      "Resource": "{SECRET-ARN}"
    }
  ]
}
```

In [None]:
import jwt
import json
import boto3
import logging
import requests
from botocore.config import Config
from botocore.exceptions import ClientError

### Constants

In [None]:
REGION = "REGION"
ACCOUNT_ID= "ACCOUNT_ID"
API_ENDPOINT = "API_URL" # This endpoint hosts the login/auth route

### Utility Methods

In [None]:
def get_iam_oidc_token(id_token):
    """
    Get the IAM OIDC token using the ID token retrieved from Cognito
    """
    client = boto3.client("sso-oidc", region_name=REGION)
    response = client.create_token_with_iam(
        clientId=IDC_APPLICATION_ID,
        grantType="urn:ietf:params:oauth:grant-type:jwt-bearer",
        assertion=id_token,
    )

    print('Token fetched from IAM Identity Center.')
    
    return response


def assume_role_with_identity_context(
    role_arn: str, session_name: str, context_assertion: str
):
    sts = boto3.client("sts")
    response = sts.assume_role(
        RoleArn=role_arn,
        RoleSessionName=session_name,
        ProvidedContexts=[
            {
                "ProviderArn": "arn:aws:iam::aws:contextProvider/IdentityCenter",
                "ContextAssertion": context_assertion,
            }
        ],
    )

    print('IAM Role is assumed successfully.')

    return response["Credentials"]


def get_data_access_for_bucket(credentials: dict, bucket_uri: str, permissions: str):
    s3control = botos. client(
        "s3control",
        region_name=REGION,
        aws_access_key_id=credentials["AccessKeyId"],
        aws_secret_access_key=credentials["SecretAccessKey"],
        aws_session_token=credentials["SessionToken"],
    )

    response = s3control.get_data_access(
        AccountId=ACCOUNT_ID, Target=bucket_uri, Permission=permissions
    )
    
    return response["Credentials"]


def list_objects_with_data_access(data_access_credentials: dict, bucket: str):
    s3 = boto3.client(
        "s3",
        aws_access_key_id=data_access_credentials["AccessKeyId"],
        aws_secret_access_key=data_access_credentials["SecretAccessKey"],
        aws_session_token=data_access_credentials["SessionToken"],
    )

    response = s3.list_objects_v2(Bucket=bucket)

    if "Contents" in response:
        print(f"Objects in bucket '{bucket}':")
        for obj in response["Contents"]:
            print(f"- {obj['Key']}")
    else:
        print("No objects found or permission denied.")


def get_secret(secret_name):
    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(service_name='secretsmanager', region_name=REGION)

    try:
        response = client.get_secret_value(SecretId=secret_name)
    except ClientError as e:
        raise Exception(f"Unable to retrieve secret:{e}")

    # Parse the secret string
    if 'SecretString' in response:
        secret = json.loads(response['SecretString'])
        return secret
    else:
        raise Exception("Secret binary is not supported in this example.")


def login(user_creds):
    payload = {
        "email": user_creds["username"],
        "password": user_creds["password"]
    }
    
    url = f"{API_ENDPOINT}/users/login"
    
    try:
        response = requests.post(url, json=payload)
        response raise_for_status() # raises error for HTTP 4xx/5xx
        
        response = response.json()
    
        print ("Login successful!")
        print("Response:", response)
        return response[ 'token']
    except requests.exceptions.RequestException as e:
        print("Login failed.")
        print("Error:", e)
        if response is not None:
            print("Response body:", response.text)

### 0. Fetch the Application ID Token
Application user credentials are fetched from the Secrets Manager.

In [None]:
TIP_APP_USER_CREDS = "{secrets-manager-secret-name}" # It contains the account credentials

user_creds = get_secret(TIP_APP_USER_CREDS)

APP_ID_TOKEN = login(user_creds)

### 1. Fetch the IAM Identity Center ID Token
Application IdToken is exchanged with an IAM Identity Center IdToken which contains the User Identity Context.

In [None]:
IDC_APPLICATION_ID = (
    "arn:aws:sso::AWS_ACCOUNT_ID:application/ssoins-{APP_ID}
) # Put the Identity Center ID here

res = get_iam_oide_token(APP_ID_TOKEN)

decoded_id_token = jwt.decode(
    res["idToken"], options={"verify_signature": False}
) # Decode the idToken without verifying the signature

print(decoded_id_token)

IDENTITY_CONTEXT = decoded_id_token["sts:identity_context"]

### 2. Assume the IAM Role
This role must have permissions to access the data. `s3:GetDataAccess` in current usecase

In [None]:
CONTEXT_ASSERTION = IDENTITY_CONTEXT

ID_ENHANCED_SESSION_NAME = "S3AccessGrantsSession"
S3_DATA_ACCESS_ROLE_ARN = "arn:aws:iam::AWS_ACCOUNT_ID:role/S3DataAccessRole"

aws_creds = assume_role_with_identity_context(
    role_arn=S3_DATA_ACCESS_ROLE_ARN, session_name=ID_ENHANCED_SESSION_NAME, context_assertion=IDENTITY_CONTEXT
)

aws_creds

### 3. Call the S3 GetDataAccess API
This API validates the authorization for the directory user. Once authorization is passed, new aws credentials are generated to access the s3 data. **S3Control** SDK is used for that.

In [None]:
BUCKET_NAME = "BUCKET_NAME" # bucket where use access is required
BUCKET_URI = f"s3://{BUCKET_NAME}"

data_access_creds = get_data_access_for_bucket(credentials=aws_creds, bucket_uri=BUCKET_URI)

data_access_creds

### 4. Access S3 Data
Lets try to list the object keys in the bucket.

In [None]:
list_objects_with_data_access(data_access_creds, BUCKET_NAME)

### 5. Failure Scenario - Request WRITE Access ❌
The `GetDataAccess` S3 API returns an AccessDenied error when authorization fails. In this example, we intentionally attempt an unauthorized (WRITE) access to demonstrate this behavior.

In [None]:
data_access_creds = get_data_access_for_bucket(credentials=aws_creds, bucket_uri=BUCKET_URI)

data_access_creds