### 🛠️ Initialize Notebook Variables

**Only modify entries under _USER CONFIGURATION_.**

In [None]:
import utils
from apimtypes import *

# ------------------------------
#    USER CONFIGURATION
# ------------------------------

rg_location = 'eastus2'
index       = 1
apim_sku    = APIM_SKU.BASICV2              # Options: 'BASICV2', 'STANDARDV2', 'PREMIUMV2'
deployment  = INFRASTRUCTURE.SIMPLE_APIM
api_prefix  = 'blob-'
tags        = ['secure-blob-access', 'valet-key', 'storage', 'jwt', 'authz']



# ------------------------------
#    SYSTEM CONFIGURATION
# ------------------------------

# Create the notebook helper with JWT support
sample_folder    = 'secure-blob-access'
rg_name          = utils.get_infra_rg_name(deployment, index)
supported_infras = [INFRASTRUCTURE.AFD_APIM_PE, INFRASTRUCTURE.APIM_ACA, INFRASTRUCTURE.SIMPLE_APIM]
nb_helper        = utils.NotebookHelper(sample_folder, rg_name, rg_location, deployment, supported_infras, True, index = index, apim_sku = apim_sku)

# Blob storage configuration
container_name   = 'hr-assets'
file_name        = 'hr.txt'

# Define the APIs and their operations and policies

# Set up the named values
nvs: List[NamedValue] = [
    NamedValue(nb_helper.jwt_key_name, nb_helper.jwt_key_value_bytes_b64, True),
    NamedValue('HRMemberRoleId', Role.HR_MEMBER)
]

# Load policy fragment definitions
pf_authx_hr_member_xml = utils.read_policy_xml('pf-authx-hr-member.xml', {
    'jwt_signing_key'   : nb_helper.jwt_key_name,
    'hr_member_role_id' : 'HRMemberRoleId'
}, sample_folder)

pf_create_sas_token_xml        = utils.read_policy_xml('pf-create-sas-token.xml', sample_name = sample_folder)
pf_check_blob_existence_via_mi = utils.read_policy_xml('pf-check-blob-existence-via-managed-identity.xml', sample_name = sample_folder)

# Define policy fragments
pfs: List[PolicyFragment] = [
    PolicyFragment('AuthX-HR-Member', pf_authx_hr_member_xml, 'Authenticates and authorizes users with HR Member role.'),
    PolicyFragment('Create-Sas-Token', pf_create_sas_token_xml, 'Creates a SAS token to use with access to a blob.'),
    PolicyFragment('Check-Blob-Existence-via-Managed-Identity', pf_check_blob_existence_via_mi, 'Checks whether the specified blob exists at the blobUrl. A boolean value for blobExists will be available afterwards.')
]

# Load API policy
pol_blob_get = utils.read_and_modify_policy_xml('blob-get-operation.xml', {
    'container_name': container_name
}, sample_folder)

# Define template parameters for blob name
blob_template_parameters = [
    {
        "name": "blob-name",
        "description": "The name of the blob to access",
        "type": "string",
        "required": True
    }
]

# Define API operations

# API 1: Secure Blob Access API
secure_blob_path = f'/{api_prefix}secure-files'
secure_blob_get = GET_APIOperation2('GET', 'GET', '/{blob-name}', 'Gets the blob access valet key info', pol_blob_get, templateParameters=blob_template_parameters)
secure_blob = API('secure-blob-access', 'Secure Blob Access API', f'/{api_prefix}secure-files', 'API for secure access to blob storage using the valet key pattern', operations = [secure_blob_get], tags = tags)

# APIs Array
apis: List[API] = [secure_blob]

utils.print_ok('Notebook initialized')

### 🚀 Deploy Infrastructure and APIs

Creates the bicep deployment into the previously-specified resource group. A bicep parameters, `params.json`, file will be created prior to execution.

In [None]:
# Build the bicep parameters
bicep_parameters = {
    'apis'            : {'value': [api.to_dict() for api in apis]},
    'namedValues'     : {'value': [nv.to_dict() for nv in nvs]},
    'policyFragments' : {'value': [pf.to_dict() for pf in pfs]},
    'containerName'   : {'value': container_name},
    'blobName'        : {'value': file_name}
}

# Deploy the sample
output = nb_helper.deploy_sample(bicep_parameters)

if output.success:
    # Extract deployment outputs for testing
    apim_name            = output.get('apimServiceName', 'APIM Service Name')
    apim_gateway_url     = output.get('apimResourceGatewayURL', 'APIM API Gateway URL')
    storage_account_name = output.get('storageAccountName', 'Storage Account Name')
    storage_endpoint     = output.get('storageAccountEndpoint', 'Storage Endpoint')
    container_name       = output.get('blobContainerName', 'Blob Container Name')
    apim_apis            = output.getJson('apiOutputs', 'APIs')

    utils.print_ok('Deployment completed successfully')
else:
    utils.print_error("Deployment failed!")
    raise SystemExit(1)

### 🔐 Verify Managed Identity Permissions

Ensure APIM's managed identity has proper Storage Blob Data Reader permissions before testing.

In [None]:
# Verify APIM managed identity permissions for blob access
utils.print_info('Verifying APIM Managed Identity Permissions...')

# Check permissions with automatic retry (role assignments can take time to propagate)
permissions_ready = utils.wait_for_apim_blob_permissions(apim_name, storage_account_name, rg_name, 5)

if permissions_ready:
    utils.print_ok('APIM permissions verified successfully')
else:
    utils.print_warning('Permission verification incomplete - you may encounter 503/403 errors during testing')
    utils.print_info('If you see 503 errors in the next step, wait a few minutes and try again.')

### ✅ Verify and Test Secure Blob Access

Test the deployed APIs to confirm secure blob access using the valet key pattern with JWT authentication.

In [None]:
# Test and verify secure blob access using valet key pattern
import json
import requests
from apimrequests import ApimRequests
from apimtesting import ApimTesting
from users import UserHelper
from authfactory import AuthFactory

def handleResponse(response):
    """Handle blob access response and test direct blob access."""
    if isinstance(response, str):
        try:
            access_info = json.loads(response)
            sas_url = access_info.get('sas_url', 'N/A')

            if sas_url == 'N/A':
                return response

            utils.print_info(f"Secure Blob URL: {sas_url}")
            utils.print_info(f"Expires At: {access_info.get('expire_at', 'N/A')}")

            # Test direct blob access using the valet key (SAS URL)
            utils.print_info("🧪 Testing direct blob access...")
            
            try:
                blob_response = requests.get(access_info['sas_url'])
                if blob_response.status_code == 200:
                    utils.print_info("✅ Direct blob access successful!")
                    content_preview = blob_response.text[:200] + "..." if len(blob_response.text) > 200 else blob_response.text
                    utils.print_val("Content preview:", content_preview.strip(), True)
                    return content_preview.strip()
                else:
                    utils.print_error(f"❌ Direct blob access failed: {blob_response.status_code}")
                    return blob_response.status_code
            except Exception as e:
                utils.print_error(f"Error accessing blob directly: {str(e)}")
        except (json.JSONDecodeError, AttributeError):
            utils.print_error("Failed to parse JSON response or response is not in expected format.")
    return response

tests = ApimTesting("Secure Blob Access Sample Tests", sample_folder, deployment)

# Get the appropriate endpoint URL for testing
endpoint_url = utils.test_url_preflight_check(deployment, rg_name, apim_gateway_url)
api_subscription_key = apim_apis[0]['subscriptionPrimaryKey']

# Test 1: Authorized user with HR Member role
utils.print_info("1️⃣ Testing with Authorized User (HR Member role)")

# Create JWT token for HR Member role
encoded_jwt_token_hr_member = AuthFactory.create_symmetric_jwt_token_for_user(
    UserHelper.get_user_by_role(Role.HR_MEMBER), 
    nb_helper.jwt_key_value
)
utils.print_info(f'JWT token for HR Member:\n{encoded_jwt_token_hr_member}')

# Test secure blob access with authorization
reqsApimAuthorized = ApimRequests(endpoint_url, api_subscription_key)
reqsApimAuthorized.headers['Authorization'] = f'Bearer {encoded_jwt_token_hr_member}'

utils.print_info(f"🔒 Getting secure access for {file_name} with authorized user...")
response = reqsApimAuthorized.singleGet(f'/{api_prefix}secure-files/{file_name}', 
                                       msg=f'Requesting secure access for {file_name} (authorized)')
output = handleResponse(response)
tests.verify(output, 'This is an HR document.')

# Test 2: Unauthorized user without required role
utils.print_info("2️⃣ Testing with Unauthorized User (no role)")

# Create JWT token for user with no role
encoded_jwt_token_no_role = AuthFactory.create_symmetric_jwt_token_for_user(
    UserHelper.get_user_by_role(Role.NONE), 
    nb_helper.jwt_key_value
)
utils.print_info(f'JWT token for user with no role:\n{encoded_jwt_token_no_role}')

# Test access denial for unauthorized user
reqsApimUnauthorized = ApimRequests(endpoint_url, api_subscription_key)
reqsApimUnauthorized.headers['Authorization'] = f'Bearer {encoded_jwt_token_no_role}'

utils.print_info(f"🔒 Attempting to obtain secure access for {file_name} with unauthorized user (expect 401/403)...")
response = reqsApimUnauthorized.singleGet(f'/{api_prefix}secure-files/{file_name}', 
                                         msg=f'Requesting secure access for {file_name} (unauthorized)')
output = handleResponse(response)
tests.verify(json.loads(output)['statusCode'], 401)

tests.print_summary()

utils.print_ok('All done!')