## Preparation

Create three organization:
1. `irsa-iamrole-list-bucket`
2. `irsa-iamrole-read-bucket`
3. `irsa-iamrole-write-bucket`

Have the following proxy role and target role combinations created:
| Proxy Role     | Target Role      |
|---------------|---------------|
| `arn:aws:iam::{EKS_ACCOUNT_NO}:role/sw-domino-proxy-project1-role-1` | `arn:aws:iam::{ASSETS_ACCOUNT_NO}:role/sw-domino-project1-role` |
| `arn:aws:iam::{EKS_ACCOUNT_NO}:role/sw-domino-proxy-project2-role-1` | `arn:aws:iam::{ASSETS_ACCOUNT_NO}:role/sw-domino-project2-role` |
| `arn:aws:iam::{EKS_ACCOUNT_NO}:role/sw-domino-proxy-project3-role-1` | `arn:aws:iam::{ASSETS_ACCOUNT_NO}:role/sw-domino-project3-role` |


Create a Domino user who is a non-admin and generate an API key for this user


In [None]:
import requests
import os
import configparser
from io import StringIO

In [None]:
base_endpoint="irsa-svc.domino-field"
EKS_ACCOUNT_NO="946429944765"
ASSETS_ACCOUNT_NO="946429944765"
NON_ADMIN_USER_API_KEY= os.environ['NON_ADMIN_USER_API_KEY']
ADMIN_USER_API_KEY=os.environ['ADMIN_USER_API_KEY']

role_1={"domino_org":"irsa-iamrole-list-bucket",
        "proxy_iam_role_arn":f"arn:aws:iam::{EKS_ACCOUNT_NO}:role/sw-domino-proxy-project1-role-1",
        "iam_role_arn":f"arn:aws:iam::{ASSETS_ACCOUNT_NO}:role/sw-domino-proxy-project1-role-1",
        "update_iam_role_trust_policy":"false"
       }
role_2={"domino_org":"irsa-iamrole-read-bucket",
        "proxy_iam_role_arn":f"arn:aws:iam::{EKS_ACCOUNT_NO}:role/sw-domino-proxy-project2-role-1",
        "iam_role_arn":f"arn:aws:iam::{ASSETS_ACCOUNT_NO}:role/sw-domino-proxy-project2-role-1",
        "update_iam_role_trust_policy":"false"
       }
role_3={"domino_org":"irsa-iamrole-update-bucket",
        "proxy_iam_role_arn":f"arn:aws:iam::{EKS_ACCOUNT_NO}:role/sw-domino-proxy-project3-role-1",
        "iam_role_arn":f"arn:aws:iam::{ASSETS_ACCOUNT_NO}:role/sw-domino-proxy-project3-role-1",
        "update_iam_role_trust_policy":"false"
       }


In [None]:
#Health Endpoint
def check_health_endpoint():
    api_endpoint = f"https://{base_endpoint}/healthz"
    resp = requests.get(api_endpoint,verify=False)
    print(f"Testing {api_endpoint}. Success = {resp.status_code==200}")
##Add Mappings
def add_one_org_to_one_role_mappings():
    access_token_endpoint='http://localhost:8899/access-token'
    resp = requests.get(access_token_endpoint)
    token = resp.text
    headers = {
                 "Content-Type": "application/json",
                 "Authorization": "Bearer " + token,
            }
    api_endpoint=f"https://{base_endpoint}/update_role_mapping"
    body=role_1
    resp = requests.post(api_endpoint,headers=headers,json=body,verify=False)
    print(f"Testing {api_endpoint}. Success = {resp.status_code==200}")
    
    body=role_2
    resp = requests.post(api_endpoint,headers=headers,json=body,verify=False)
    print(f"Testing {api_endpoint}. Success = {resp.status_code==200}")
    
    body=role_3
    resp = requests.post(api_endpoint,headers=headers,json=body,verify=False)
    print(f"Testing {api_endpoint}. Success = {resp.status_code==200}")

def add_one_org_to_many_role_mappings():
    access_token_endpoint='http://localhost:8899/access-token'
    resp = requests.get(access_token_endpoint)
    token = resp.text
    headers = {
                 "Content-Type": "application/json",
                 "Authorization": "Bearer " + token,
            }
    api_endpoint=f"https://{base_endpoint}/update_multirole_mapping"
    body={'domino_org':role_1['domino_org'],
          'role_data':[role_1,role_2]}
    resp = requests.post(api_endpoint,headers=headers,json=body,verify=False)
    print(f"Testing {api_endpoint}. Success = {resp.status_code==200}")
    
    #body=role_2
    #resp = requests.post(api_endpoint,headers=headers,json=body,verify=False)
    #print(f"Testing {api_endpoint}. Success = {resp.status_code==200}")
    
    body={'domino_org':role_2['domino_org'],
          'role_data':[role_1,role_3]}

    resp = requests.post(api_endpoint,headers=headers,json=body,verify=False)
    print(f"Testing {api_endpoint}. Success = {resp.status_code==200}")


# Get All Mappings as an admin
def get_all_mappings_one_to_one():
    access_token_endpoint='http://localhost:8899/access-token'
    resp = requests.get(access_token_endpoint)
    
    
    token = resp.text
    headers = {
                 "Content-Type": "application/json",
                 "Authorization": "Bearer " + token,
              }
    
    api_endpoint=f"https://{base_endpoint}/role_mappings"
    resp = requests.get(api_endpoint,headers=headers,verify=False)
    print(f"Testing {api_endpoint}. Success = {resp.status_code==200}")
    result =resp.json()
    
    mappings = result["all_role_mappings"]
    mapped_role_1 = mappings["irsa-iamrole-list-bucket"][0]==role_1
    mapped_role_2 = mappings["irsa-iamrole-read-bucket"][0]==role_2
    mapped_role_3 = mappings["irsa-iamrole-update-bucket"][0]==role_3
    
    print(f"Testing mapping 1. Success = {mapped_role_1}")
    print(f"Testing mapping 2. Success = {mapped_role_2}")
    print(f"Testing mapping 3. Success = {mapped_role_3}")

    headers = {
             "Content-Type": "application/json",
             "X-Domino-Api-Key": NON_ADMIN_USER_API_KEY,
          }
    resp = requests.get(api_endpoint,headers=headers,verify=False)
    print(f"Testing {api_endpoint} as non-admin. Success = {resp.status_code==403}")

def get_all_mappings_one_to_many():
    access_token_endpoint='http://localhost:8899/access-token'
    resp = requests.get(access_token_endpoint)
    
    
    token = resp.text
    headers = {
                 "Content-Type": "application/json",
                 "Authorization": "Bearer " + token,
              }
    
    api_endpoint=f"https://{base_endpoint}/role_mappings"
    resp = requests.get(api_endpoint,headers=headers,verify=False)
    print(f"Testing {api_endpoint}. Success = {resp.status_code==200}")
    result =resp.json()
    
    mappings = result["all_role_mappings"]
    mapped_role_1 = mappings["irsa-iamrole-list-bucket"][0]==role_1
    mapped_role_2 = mappings["irsa-iamrole-list-bucket"][1]==role_2
    print(f"Testing mapping org_1 Mapping 1. Success = {mapped_role_1}")
    print(f"Testing mapping org_1 Mapping 2. Success = {mapped_role_2}")

    mapped_role_1 = mappings["irsa-iamrole-read-bucket"][0]==role_1
    mapped_role_3 = mappings["irsa-iamrole-read-bucket"][1]==role_3
    print(f"Testing mapping org_2 Mapping 1. Success = {mapped_role_1}")
    print(f"Testing mapping org_2 Mapping 2. Success = {mapped_role_3}")

    headers = {
             "Content-Type": "application/json",
             "X-Domino-Api-Key": NON_ADMIN_USER_API_KEY,
          }
    resp = requests.get(api_endpoint,headers=headers,verify=False)
    print(f"Testing {api_endpoint} as non-admin. Success = {resp.status_code==403}")

# Get my role mappings 
def get_my_role_mappings():
    access_token_endpoint='http://localhost:8899/access-token'
    resp = requests.get(access_token_endpoint)
    
    
    token = resp.text
    headers = {
                 "Content-Type": "application/json",
                 "Authorization": "Bearer " + token,
              }
    
    api_endpoint=f"https://{base_endpoint}/my_role_mappings"
    resp = requests.get(api_endpoint,headers=headers,verify=False)
    print(f"Testing {api_endpoint}. Success = {resp.status_code==200}")
    result =resp.json()
    
    mapped_role_1=True
    mapped_role_2=True
    mapped_role_3=True
    
    mappings = result["my_role_mappings"]
    for m in mappings:
        if m['domino_org']==role_1['domino_org']:
            mapped_role_1 = (m==role_1)        
        if m['domino_org']==role_2['domino_org']:
            mapped_role_2 = (m==role_2)        
        if m['domino_org']==role_3['domino_org']:
            mapped_role_3 = (m==role_3)        
    
    
    print(f"Testing mapping 1. Success = {mapped_role_1}")
    print(f"Testing mapping 2. Success = {mapped_role_2}")
    print(f"Testing mapping 3. Success = {mapped_role_3}")

#Get AWS Config file
def get_aws_config_file():
    access_token_endpoint='http://localhost:8899/access-token'
    resp = requests.get(access_token_endpoint)
    
    os.environ['SSL_CERT_DIR']='/etc/ssl/certs/irsa'
    token = resp.text
    headers = {
                 "Content-Type": "application/json",
                 "Authorization": "Bearer " + token,
            }
    api_endpoint=f"https://{base_endpoint}/map_iam_roles_to_pod_sa"
    workload_type= "edge-workload"
    data = {"run_id": os.environ['DOMINO_RUN_ID'],"irsa_workload_type": workload_type} ## It fetches this fom the downward api
    resp = requests.post(api_endpoint,headers=headers,json=data,verify=False)
    aws_config = resp.text
    # Parse the config string
    config = configparser.ConfigParser()
    config.read_string(aws_config)
    
    # Print all profiles
    for section in config.sections():
        print(f"Profile: {section}")
        for key, value in config[section].items():
            print(f"  {key}: {value}")
    #EYEBALL RESULTS
#Delete Role Mapping

def delete_role_mappings():
    access_token_endpoint='http://localhost:8899/access-token'
    resp = requests.get(access_token_endpoint)
    token = resp.text
    headers = {
                 "Content-Type": "application/json",
                 "Authorization": "Bearer " + token,
            }
    
    api_endpoint=f"https://{base_endpoint}/delete_role_mapping"
    body={
        "domino_org":role_1["domino_org"]
    }
    resp = requests.delete(api_endpoint,headers=headers,json=body,verify=False)
    print(f"Testing DELETE {api_endpoint}. Success = {resp.status_code==200}")
    body={
        "domino_org":role_2["domino_org"]
    }
    resp = requests.delete(api_endpoint,headers=headers,json=body,verify=False)
    print(f"Testing DELETE {api_endpoint}. Success = {resp.status_code==200}")
    body={
        "domino_org":role_3["domino_org"]
    }
    resp = requests.delete(api_endpoint,headers=headers,json=body,verify=False)
    print(f"Testing DELETE {api_endpoint}. Success = {resp.status_code==200}")
    
    api_endpoint=f"https://{base_endpoint}/role_mappings"
    resp = requests.get(api_endpoint,headers=headers,verify=False)
    result = resp.json()
    mappings = result["all_role_mappings"]
    print(f"Testing Post Delete for {api_endpoint}. Success = {resp.status_code==200 and len(mappings)==0}")
#Test SA Creations
def create_sa():
    access_token_endpoint='http://localhost:8899/access-token'
    resp = requests.get(access_token_endpoint)
    
    os.environ['SSL_CERT_DIR']='/etc/ssl/certs/irsa'
    token = resp.text
    headers = {
                 "Content-Type": "application/json",
                 "Authorization": "Bearer " + token,
            }
    api_endpoint=f"https://{base_endpoint}/service_account"
    data = {"service_account_name": "fake-sa"} 
    resp = requests.post(api_endpoint,headers=headers,json=data,verify=False)
    print(f"Testing ADD for {api_endpoint}. Success = {resp.status_code==200}")

def delete_sa():
    access_token_endpoint='http://localhost:8899/access-token'
    resp = requests.get(access_token_endpoint)
    
    os.environ['SSL_CERT_DIR']='/etc/ssl/certs/irsa'
    token = resp.text
    headers = {
                 "Content-Type": "application/json",
                 "Authorization": "Bearer " + token,
            }
    api_endpoint=f"https://{base_endpoint}/service_account"
    data = {"service_account_name": "fake-sa"} 
    data = {"service_account_name": "fake-sa"} 
    resp = requests.delete(api_endpoint,headers=headers,json=data,verify=False)
    print(f"Testing DELETE for {api_endpoint}. Success = {resp.status_code==200}")



In [None]:
check_health_endpoint()
add_one_org_to_one_role_mappings()
get_all_mappings_one_to_one()
get_my_role_mappings()
get_aws_config_file()
delete_role_mappings()


In [None]:
add_one_org_to_many_role_mappings()
get_all_mappings_one_to_many()
get_aws_config_file()
delete_role_mappings()

In [None]:
create_sa()
delete_sa()