Creating the Project-Structure

In [46]:
import os

# Define project structure
structure = {
    "project": [
        "policies",
        "tests",
        "src"
    ],
    "files": {
        "project/requirements.txt": "",
        "project/README.md": "",
        "project/tests/__init__.py": "",
        "project/tests/test_cases_implementation.py":"",
        "project/tests/test_createdpolicies_deletion.py": "",
        "project/src/__init__.py": "",
        "project/src/policy_manager.py": "",
        "project/src/aws_helper.py": "",
    }
}

# Create directories
for folder in structure["project"]:
    os.makedirs(f"project/{folder}", exist_ok=True)

# Create files
for filepath, content in structure["files"].items():
    with open(filepath, "w") as f:
        f.write(content)

print("Project structure created successfully!")


Project structure created successfully!


Requirements installation

In [47]:
import os

# Defining the file path
file_path = "project/requirements.txt"

# Ensuring the directory of the file exists or not
os.makedirs(os.path.dirname(file_path), exist_ok=True)

# Write to the requirements.txt file
with open(file_path, 'w') as f:
    f.write("boto3\n")
    f.write("botocore\n")
    f.write("openpyxl\n")
    f.write("python-dotenv\n")

print(f"requirements.txt has been created at {file_path}")


requirements.txt has been created at project/requirements.txt


In [61]:
pip install -r project/requirements.txt



Initializing and updating the files in the src package

In [49]:
import os

# Define the project src folder
src_folder = "project/src"

# Ensure the src folder exists
if not os.path.exists(src_folder):
    raise FileNotFoundError(f"The folder '{src_folder}' does not exist.")

# Define the updated code for each file
file_updates = {
    "aws_helper.py": '''\
import os
from dotenv import load_dotenv
import boto3
from google.colab import files

# Step 1: Upload the .env file
uploaded = files.upload()

# Step 2: Check the uploaded files and move the .env file
for filename in uploaded.keys():
    print(f"Uploaded file: {filename}")
    # Ensure the destination directory exists
    os.makedirs('project/src', exist_ok=True)
    # Move the uploaded file to the Project/src directory
    os.rename(filename, f'project/src/{filename}')

# Step 3: Check if the .env file exists
env_file_path = 'project/src/env'
if os.path.exists(env_file_path):
    print(f"{env_file_path} exists.")
else:
    print(f"{env_file_path} does not exist.")

# Load environment variables from the .env file
load_dotenv(env_file_path)  # Specify the path to the .env file

# Now you can access your AWS credentials
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
aws_default_region = os.getenv('AWS_DEFAULT_REGION')

# Check if the variables are loaded correctly
print(f"AWS_ACCESS_KEY_ID: {aws_access_key_id}")
print(f"AWS_SECRET_ACCESS_KEY: {aws_secret_access_key}")
print(f"AWS_DEFAULT_REGION: {aws_default_region}")

# Create IAM client
def get_iam_client():
    """Setting up and returning the IAM client."""
    try:
        iam_client = boto3.client(
            'iam',
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            region_name=aws_default_region
        )
        # Fetch IAM users
        response = iam_client.list_users()
        print("IAM Users:")
        for user in response['Users']:
            print(f"Username: {user['UserName']}, ARN: {user['Arn']}")
        return iam_client
    except Exception as e:
        print(f"Error setting up IAM client: {str(e)}")
        raise
''',
    "policy_manager.py": '''\
import json
from botocore.exceptions import ClientError
from src.aws_helper import get_iam_client

# Initialize the IAM client
iam_client = get_iam_client()

def create_policy(iam_client, policy_name, policy_document_path):
    """
    Create an IAM policy with the given name and document."""

    try:
        # Load the policy document from the JSON file
        with open(policy_document_path, 'r') as file:
            policy_document = json.load(file)

        # Create the policy
        response = iam_client.create_policy(
            PolicyName=policy_name,
            PolicyDocument=json.dumps(policy_document)
        )

        policy_arn = response['Policy']['Arn']
        policy_version_id = response['Policy']['DefaultVersionId']
        return policy_arn, policy_version_id, None

    except ClientError as e:
        return None, None, e.response['Error']['Message']
    except Exception as e:
        return None, None, str(e)


def fetch_policy_json(policy_arn):
    """Fetch the policy document JSON of a policy by ARN."""
    try:
        policy = iam_client.get_policy(PolicyArn=policy_arn)
        version_id = policy['Policy']['DefaultVersionId']
        response = iam_client.get_policy_version(
            PolicyArn=policy_arn,
            VersionId=version_id
        )
        return response['PolicyVersion']['Document']
    except ClientError as e:
        return None, e.response['Error']['Message']
    except Exception as e:
        return None, str(e)

def check_policy_creation_limit():
    """Check if the policy creation limit has been reached."""
    try:
        response = iam_client.list_policies(Scope='Local')
        return len(response['Policies']) >= 10, None
    except ClientError as e:
        return False, e.response['Error']['Message']
    except Exception as e:
        return False, str(e)

def list_existing_policies():
    """List all existing local policies in the account."""
    try:
        response = iam_client.list_policies(Scope='Local')
        return response['Policies']
    except ClientError as e:
        print(f"Error fetching policies: {e.response['Error']['Message']}")
        return []
    except Exception as e:
        print(f"Error fetching policies: {str(e)}")
        return []

def update_policy(policy_arn, policy_document_path):
    """Update an existing policy by creating a new version."""
    try:
        with open(policy_document_path, 'r') as file:
            policy_document = json.load(file)
        response = iam_client.create_policy_version(
            PolicyArn=policy_arn,
            PolicyDocument=json.dumps(policy_document),
            SetAsDefault=True
        )
        return response['PolicyVersion']['VersionId'], None
    except ClientError as e:
        return None, e.response['Error']['Message']
    except Exception as e:
        return None, str(e)


# Function to delete all policies in the account
def delete_policies(iam_client, policy_arn):
    try:
        # Get all versions of the specified policy
        versions_response = iam_client.list_policy_versions(PolicyArn=policy_arn)
        policy_versions = versions_response.get('Versions', [])

        # Delete all non-default versions of the policy
        for version in policy_versions:
            if not version['IsDefaultVersion']:
                version_id = version['VersionId']
                try:
                    iam_client.delete_policy_version(PolicyArn=policy_arn, VersionId=version_id)
                    print(f"Deleted version {version_id} of policy {policy_arn}.")
                except ClientError as e:
                    print(f"Error deleting version {version_id} of policy {policy_arn}: {e}")

        # Delete the policy itself
        iam_client.delete_policy(PolicyArn=policy_arn)
        print(f"Policy {policy_arn} deleted successfully.")

    except ClientError as e:
        print(f"Error deleting policy {policy_arn}: {e}")


''',
    "__init__.py": '''\
# Initialize the src package
from .aws_helper import get_iam_client
from .policy_manager import (
    create_policy,
    fetch_policy_json,
    check_policy_creation_limit,
    list_existing_policies,
    update_policy,
    delete_policies
)
'''
}

# Update each file
for filename, content in file_updates.items():
    file_path = os.path.join(src_folder, filename)
    with open(file_path, "w") as file:
        file.write(content)
        print(f"Updated: {file_path}")

print("All files updated successfully!")


Updated: project/src/aws_helper.py
Updated: project/src/policy_manager.py
Updated: project/src/__init__.py
All files updated successfully!


In [62]:
# Define the path to the file
file_path = '/content/project/src/aws_helper.py'

# Define the __file__ variable and execute the script
globals()['__file__'] = file_path  # Mimic the __file__ variable
exec(open(file_path).read())


Saving env to env
Uploaded file: env
project/src/env exists.
AWS_ACCESS_KEY_ID: AKIA5IJOWTSEEBLGPGFR
AWS_SECRET_ACCESS_KEY: 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
AWS_DEFAULT_REGION: us-east-1


In [50]:
import sys
import os

# Add the project root directory to sys.path
sys.path.append(os.path.abspath('/content/project'))


In [63]:
# Define the path to the file
file_path = '/content/project/src/policy_manager.py'

# Define the __file__ variable and execute the script
globals()['__file__'] = file_path  # Mimic the __file__ variable
exec(open(file_path).read())


IAM Users:
Username: KodeKloudQA, ARN: arn:aws:iam::911167888520:user/KodeKloudQA


Policy json -documents creation

In [None]:
import os

# Define the directory for the file
policies_dir = '/content/project/policies'

# Ensure the directory exists
os.makedirs(policies_dir, exist_ok=True)

# Define the file path
file_path = os.path.join(policies_dir, "policy_documents.py")

# Define the content of the `policy_documents.py` file
policy_documents_content = """
import json
import os

# Define the policy documents
s3_readonly_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": "*"
        }
    ]
}

ec2_fullaccess_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "ec2:*",
            "Resource": "*"
        }
    ]
}

dynamodb_readwrite_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:BatchGetItem",
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem"
            ],
            "Resource": "*"
        }
    ]
}

policy_limiter_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Deny",
            "Action": "iam:CreatePolicy",
            "Resource": "*",
            "Condition": {
                "StringGreaterThanIfExists": {
                    "iam:PolicyCount": 3
                }
            }
        }
    ]
}

# Directory path for the policies
policies_dir = '/content/project/policies'

# Ensure the directory exists
if not os.path.exists(policies_dir):
    print(f"Directory {policies_dir} does not exist.")
else:
    print(f"Directory {policies_dir} exists. Proceeding with file creation.")

# Policy files and their content
policy_files = {
    "s3_readonly_policy.json": s3_readonly_policy,
    "ec2_fullaccess_policy.json": ec2_fullaccess_policy,
    "dynamodb_readwrite_policy.json": dynamodb_readwrite_policy,
    "policy_limiter.json": policy_limiter_policy
}

# Save or overwrite the policies as JSON files
for filename, policy_content in policy_files.items():
    file_path = os.path.join(policies_dir, filename)

    # Check if file exists and notify about overwriting
    if os.path.exists(file_path):
        print(f"File {filename} already exists. Overwriting with new content.")

    with open(file_path, 'w') as f:
        json.dump(policy_content, f, indent=2)
        print(f"File {filename} written successfully.")
"""

# Write the content to the file
with open(file_path, 'w') as f:
    f.write(policy_documents_content)
    print(f"File {file_path} updated successfully.")


File /content/project/policies/policy_documents.py updated successfully.


In [64]:
%run /content/project/policies/policy_documents.py


Directory /content/project/policies exists. Proceeding with file creation.
File s3_readonly_policy.json written successfully.
File ec2_fullaccess_policy.json written successfully.
File dynamodb_readwrite_policy.json written successfully.
File policy_limiter.json written successfully.




```
# This is formatted as code
```

Test Cases Implementation

Define the data frame by reading the test cases from the excel

In [65]:
import openpyxl
import pandas as pd
import os
from google.colab import files

# Define the path for the project directory and subdirectory for file uploads
project_directory = '/content/project/tests'

# Ensure the project directory exists
os.makedirs(project_directory, exist_ok=True)

# Upload the file to Colab
uploaded = files.upload()

# Get the uploaded file name and move it to the project directory
file_path = next(iter(uploaded))  # This will give you the file name of the uploaded file
new_file_path = os.path.join(project_directory, file_path)

# Move the file to the project directory
os.rename(file_path, new_file_path)
print(f"File uploaded and moved to: {new_file_path}")

# Load the uploaded Excel file
wb = openpyxl.load_workbook(new_file_path)

# Display the sheet names
print(wb.sheetnames)

# Read the 'Test Cases' sheet
sheet = wb['Test Cases']

# Create a list to store the row data
rows_data = []

# Iterate through the rows and collect the values
for row in sheet.iter_rows(min_row=1, max_row=sheet.max_row):  # max_row set to sheet.max_row to read all rows
    rows_data.append([cell.value if cell.value is not None else '' for cell in row])  # Leave empty cells as ''

# Convert the rows data to a DataFrame for better readability
df = pd.DataFrame(rows_data, columns=['Test Case ID', 'Test Scenario ID', 'Test Case', 'Steps', 'Test Steps', 'Expected Result'])

# Set Pandas display options to ensure all rows and columns are shown
pd.set_option('display.max_rows', None)  # Show all rows
pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.width', None)  # Disable line wrapping
pd.set_option('display.max_colwidth', None)  # Show full content in each cell

# Display the DataFrame without index
df


Saving KodeKloud_TestCases.xlsx to KodeKloud_TestCases.xlsx
File uploaded and moved to: /content/project/tests/KodeKloud_TestCases.xlsx
['Test Sceanrios', 'Test Cases']


Unnamed: 0,Test Case ID,Test Scenario ID,Test Case,Steps,Test Steps,Expected Result
0,Test Case ID,Test Scenario ID,Test Case,Steps,Test Steps,Expected Result
1,TC001,SC001,Verify if able to create an S3 read-only policy or ec2 full access policy or DynamoDB read-write access,1,"Load the policy JSON for S3 read-only access, EC2 full access, or DynamoDB read-write access.\n","The policy should be created successfully, and a valid ARN should be returned."
2,,,,2,Call the create_policy API with the loaded policy document.\n,
3,,,,3,Verify the returned ARN for the created policy.,
4,,,,,,
5,TC002,SC002,Verify if able to create 3 policies (upto the limit),1,"Load the policy JSON files for S3 read-only, EC2 full access, and DynamoDB read-write.","All three policies should be created successfully, and valid ARNs should be returned for each."
6,,,,2,Call the create_policy API with the loaded policy document.\n,
7,,,,3,Verify the returned ARN for the created policy.,
8,,,,,,
9,TC003,SC003,Verify if not able to create a policy once the limit is exceeded (4th policy creation),1,"Load the policy JSON for S3 read-only, EC2 full access, and DynamoDB read-write.","Policy creation should fail with a LimitExceeded error, indicating the maximum allowed policies have been created."


In [67]:
        import os
        import json
        from openpyxl import load_workbook
        from src.aws_helper import get_iam_client
        from src.policy_manager import (
            create_policy,
            fetch_policy_json,
            check_policy_creation_limit,
            list_existing_policies,
            update_policy,
            delete_policies
        )

        # Load DataFrame
        df = pd.read_excel(new_file_path, sheet_name='Test Cases')

        # Filter out rows where all values match the column headers
        df = df[~df.apply(lambda row: row.tolist() == df.columns.tolist(), axis=1)]

        # Ensure 'Actual Result' and 'Status' columns exist in the DataFrame
        if 'Actual Result' not in df.columns:
            df['Actual Result'] = ''  # Initialize with empty strings
        if 'Status' not in df.columns:
            df['Status'] = ''  # Initialize with empty strings

        # Filter DataFrame to only include rows with valid Test Case IDs and Test Cases
        df = df[df['Test Case ID'].notnull() & df['Test Case ID'].str.strip().astype(bool)]
        df = df[df['Test Case'].notnull() & df['Test Case'].str.strip().astype(bool)]

        # Initialize IAM client
        iam_client = get_iam_client()

        # Store created policies for cleanup
        created_policy_arns = []

        # Function to delete all existing customer-managed policies in the account
        def cleanup_existing_policies(iam_client):
            try:
                # List all customer-managed policies
                response = iam_client.list_policies(Scope='Local')
                customer_policies = response.get('Policies', [])

                # Filter for policies to be deleted (all customer-managed policies in this case)
                policies_to_delete = [policy['Arn'] for policy in customer_policies]

                if not policies_to_delete:
                    print("No customer-managed policies found for cleanup.")
                    return

                # Delete each policy individually
                for policy_arn in policies_to_delete:
                    try:
                        iam_client.delete_policy(PolicyArn=policy_arn)
                        print(f"Deleted policy: {policy_arn}")
                    except Exception as e:
                        print(f"Error deleting policy {policy_arn}: {e}")

                print(f"Deleted the following customer-managed policies during pre-cleanup: {policies_to_delete}")

            except Exception as e:
                print(f"Error during pre-cleanup of customer-managed policies: {e}")

        # Pre-cleanup: Delete existing customer-managed policies
        cleanup_existing_policies(iam_client)

        # Iterate through test cases in the DataFrame
        for index, row in df.iterrows():
            actual_result = ''
            status = ''

            # **Test Case TC001: Create a single policy**
            if row['Test Case ID'] == 'TC001':
                try:
                    policy_name = 'S3_ReadOnly_Policy'
                    policy_document_path = '/content/project/policies/s3_readonly_policy.json'

                    policy_arn, policy_version, error = create_policy(iam_client, policy_name, policy_document_path)

                    if policy_arn and 'arn:aws:iam::' in policy_arn:
                        # Prepare policy name and ARN as JSON
                        policy_details = {
                            "PolicyName": policy_name,
                            "PolicyARN": policy_arn
                        }
                        # Convert policy details to JSON string
                        policy_details_json = json.dumps(policy_details, indent=4)

                        # Print policy details to the output
                        print(f"Policy created successfully:\n{policy_details_json}")

                        # Update the Actual Result column with the JSON details
                        actual_result = f"Policy created successfully. Details:\n{policy_details_json}"
                        status = 'Passed'

                        # Add to the cleanup list
                        created_policy_arns.append(policy_arn)
                    else:
                        actual_result = f"Error: {error or 'Unexpected ARN format returned.'}"
                        status = 'Failed'

                except Exception as e:
                    actual_result = f"Error: {str(e)}"
                    status = 'Failed'

                # Update the DataFrame with results
                df.at[index, 'Actual Result'] = actual_result
                df.at[index, 'Status'] = status
                print(f"{row['Test Case ID']} Result: {actual_result}, Status: {status}")

            # **Test Case TC002**
            elif row['Test Case ID'] == 'TC002':
                try:
                    # Path to the directory containing policy JSON files
                    policies_directory = '/content/project/policies'
                    created_policies = []

                    # Get a list of existing policies to avoid duplicates
                    existing_policies_response = iam_client.list_policies(Scope='Local')
                    existing_policy_names = [policy['PolicyName'] for policy in existing_policies_response['Policies']]

                    # Define the path for the created_policies file
                    created_policies_path = '/content/project/policies/created_policies.json'

                    # Create or update the created_policies.json file
                    if not os.path.exists(created_policies_path):
                        with open(created_policies_path, 'w') as file:
                            json.dump([], file, indent=4)

                    # Get a list of all policy JSON files and create policies
                    policy_files = [
                        ("S3_ReadOnly_Policy", "/content/project/policies/s3_readonly_policy.json"),
                        ("EC2_FullAccess_Policy", "/content/project/policies/ec2_fullaccess_policy.json"),
                        ("DynamoDB_ReadWrite_Policy", "/content/project/policies/dynamodb_readwrite_policy.json")
                    ]

                    for policy_name, policy_path in policy_files:
                        # Check if the policy already exists
                        if policy_name in existing_policy_names:
                            print(f"Policy '{policy_name}' already exists. Skipping creation.")
                            continue  # Skip to the next policy file

                        # Create the policy using the create_policy function
                        policy_arn, policy_version, error = create_policy(iam_client, policy_name, policy_path)

                        if policy_arn:
                            print(f"Policy '{policy_name}' created successfully. ARN: {policy_arn}")
                            created_policies.append({
                                "policyName": policy_name,
                                "policyARN": policy_arn,
                                "message": "Policy created successfully"
                            })
                            created_policy_arns.append(policy_arn)  # Add to cleanup list

                        else:
                            print(f"Error creating policy '{policy_name}': {error}")
                            raise Exception(f"Failed to create policy '{policy_name}': {error}")

                    # Update the created_policies.json file with all created policies
                    if created_policies:
                        with open(created_policies_path, 'r') as file:
                            existing_data = json.load(file)
                        existing_data.extend([{"policyName": p["policyName"], "policyARN": p["policyARN"]} for p in created_policies])
                        with open(created_policies_path, 'w') as file:
                            json.dump(existing_data, file, indent=4)

                        actual_result = f"Policies created successfully: {[p['policyName'] for p in created_policies]}"
                        status = 'Passed'
                    else:
                        actual_result = "No policies were created."
                        status = 'Failed'

                except Exception as e:
                    actual_result = f"Error: {str(e)}"
                    status = 'Failed'

                # Update the DataFrame with results
                df.at[index, 'Actual Result'] = actual_result
                df.at[index, 'Status'] = status
                print(f"{row['Test Case ID']} Result: {actual_result}, Status: {status}")

            # **Test Case TC003: Attempt to exceed policy creation limit**
            elif row['Test Case ID'] == 'TC003':
                try:
                    # List existing policies and count them
                    existing_policies_response = iam_client.list_policies(Scope='Local')
                    total_policies = len(existing_policies_response['Policies'])

                    print(f"Total number of existing policies: {total_policies}")

                    # Check if policy creation limit has been reached
                    can_create, error = check_policy_creation_limit()
                    if not can_create:
                        actual_result = (
                            f"Policy creation limit exceeded. Total number of existing policies: {total_policies}."
                        )
                        status = 'Failed'
                    else:
                        policy_name = "ExceedLimitPolicy"
                        policy_document_path = "/content/project/policies/sample_policy.json"

                        # Attempt to create a new policy
                        policy_arn, policy_version, error = create_policy(iam_client, policy_name, policy_document_path)
                        if policy_arn:
                            actual_result = f"Policy created successfully: {policy_arn}"
                            created_policy_arns.append(policy_arn)  # Add to cleanup list
                            status = 'Passed'
                        else:
                            actual_result = f"Error: {error}"
                            status = 'Failed'
                except Exception as e:
                    actual_result = f"Error: {str(e)}"
                    status = 'Failed'

                # Update the DataFrame with results
                df.at[index, 'Actual Result'] = actual_result
                df.at[index, 'Status'] = status
                print(f"{row['Test Case ID']} Result: {actual_result}, Status: {status}")


            # **Test Case TC004: Verify contents of existing policy**
            elif row['Test Case ID'] == 'TC004':
                try:
                    with open("/content/project/policies/created_policies.json", "r") as json_file:
                        created_policies = json.load(json_file)

                    for policy in created_policies:
                        expected_policy_path = f"/content/project/policies/{policy['policyName'].lower()}.json"
                        with open(expected_policy_path, "r") as file:
                            expected_policy_document = json.load(file)

                        actual_policy_document = fetch_policy_json(policy['policyARN'])
                        if actual_policy_document != expected_policy_document:
                            raise Exception(f"Policy content mismatch for {policy['policyName']}")

                    actual_result = "All policies verified successfully."
                    status = 'Passed'

                except Exception as e:
                    actual_result = f"Error: {str(e)}"
                    status = 'Failed'

                # Update the DataFrame with results
                df.at[index, 'Actual Result'] = actual_result
                df.at[index, 'Status'] = status
                print(f"{row['Test Case ID']} Result: {actual_result}, Status: {status}")

            # **Test Case TC005: Update a policy**
            elif row['Test Case ID'] == 'TC005':
                try:
                    # Load existing policies from created_policies.json
                    with open("/content/project/policies/created_policies.json", "r") as json_file:
                        created_policies = json.load(json_file)

                    if not created_policies:
                        raise Exception("No policies found in created_policies.json to update.")

                    # Select a policy to update (e.g., the first policy in the list)
                    policy_to_update = created_policies[0]
                    policy_arn = policy_to_update['policyARN']
                    policy_name = policy_to_update['policyName']
                    # Get the local version or initialize it
                    local_version = policy_to_update.get('localVersion', 1)

                    # Load the corresponding JSON policy file
                    policy_file_path = f"/content/project/policies/{policy_name.lower()}.json"
                    with open(policy_file_path, "r") as json_file:
                        policy_document = json.load(json_file)

                    # Toggle the Effect (Allow ↔ Deny) in the policy document
                    for statement in policy_document['Statement']:
                        if statement['Effect'] == 'Allow':
                            statement['Effect'] = 'Deny'
                        elif statement['Effect'] == 'Deny':
                            statement['Effect'] = 'Allow'

                    # Save the updated policy document back to the file
                    with open(policy_file_path, "w") as json_file:
                        json.dump(policy_document, json_file, indent=4)

                    # Update the created_policies.json file
                    for policy in created_policies:
                        if policy['policyARN'] == policy_arn:
                            policy['lastModifiedDocument'] = policy_document  # Save the updated content
                            policy['localVersion'] = local_version + 1  # Increment local version
                            break

                    with open("/content/project/policies/created_policies.json", "w") as json_file:
                        json.dump(created_policies, json_file, indent=4)

                    # Add the policy ARN to `created_policy_arns` for cleanup if not already present
                    if policy_arn not in created_policy_arns:
                        created_policy_arns.append(policy_arn)

                    # Prepare the consolidated result
                    actual_result = (
                        f"Policy updated successfully.\n"
                        f"ARN: {policy_arn}\n"
                        f"Version: {local_version + 1}\n"
                        f"Updated Content: {json.dumps(policy_document, indent=4)}"
                    )
                    status = 'Passed'


                except Exception as e:
                    actual_result = f"Error: {str(e)}"
                    status = 'Failed'

                # Update the DataFrame with results
                df.at[index, 'Actual Result'] = actual_result
                df.at[index, 'Status'] = status
                print(f"{row['Test Case ID']} Result: {actual_result}, Status: {status}")


            # **Test Case TC006: Check error messages during policy updation with no version value**
            elif row['Test Case ID'] == 'TC006':
                try:
                    # Load created policies from created_policies.json
                    with open("/content/project/policies/created_policies.json", "r") as json_file:
                        created_policies = json.load(json_file)

                    # Check if there are policies to update
                    if not created_policies:
                        raise Exception("No policies found in created_policies.json to update.")

                    # Select a policy to update (e.g., the first policy)
                    policy_to_update = created_policies[0]
                    policy_arn = policy_to_update['policyARN']
                    policy_name = policy_to_update['policyName']

                    # Create an updated policy document with an empty version value
                    updated_policy_path = f"/content/project/policies/{policy_name}_updated.json"
                    with open(updated_policy_path, "w") as file:
                        # Create a valid policy structure with an empty version
                        updated_policy_content = {
                            "Version": "",
                            "Statement": [
                                {
                                    "Effect": "Allow",
                                    "Action": "*",
                                    "Resource": "*"
                                }
                            ]
                        }
                        json.dump(updated_policy_content, file, indent=4)

                    # Attempt to update the policy and capture any error
                    _, error_update = update_policy(policy_arn, updated_policy_path)

                    if error_update:
                        actual_result = f"Error captured during policy update: {error_update}"
                        status = 'Passed'
                    else:
                        actual_result = "Failed to capture expected error during policy update."
                        status = 'Failed'

                except Exception as e:
                    actual_result = f"Error: {str(e)}"
                    status = 'Failed'

                # Update the DataFrame with results
                df.at[index, 'Actual Result'] = actual_result
                df.at[index, 'Status'] = status
                print(f"{row['Test Case ID']} Result: {actual_result}, Status: {status}")

            # **Test Case TC007: Delete a policy**
            elif row['Test Case ID'] == 'TC007':
                try:
                    with open("/content/project/policies/created_policies.json", "r") as json_file:
                        created_policies = json.load(json_file)

                    if not created_policies:
                        raise Exception("No policies found in created_policies.json to delete.")

                    # Select a policy to delete (e.g., the first policy)
                    policy_to_delete = created_policies.pop(0)
                    policy_arn = policy_to_delete['policyARN']

                    # Retrieve all versions of the policy
                    versions_response = iam_client.list_policy_versions(PolicyArn=policy_arn)
                    policy_versions = versions_response.get('Versions', [])

                    # Delete all non-default versions
                    for version in policy_versions:
                        if not version['IsDefaultVersion']:
                            version_id = version['VersionId']
                            try:
                                iam_client.delete_policy_version(PolicyArn=policy_arn, VersionId=version_id)
                                print(f"Deleted version {version_id} of policy {policy_to_delete['policyName']}.")
                            except Exception as e:
                                print(f"Error deleting version {version_id} of policy {policy_to_delete['policyName']}: {e}")

                    # Delete the policy itself
                    try:
                        iam_client.delete_policy(PolicyArn=policy_arn)
                        print(f"Policy {policy_to_delete['policyName']} deleted successfully.")
                        actual_result = f"Policy {policy_to_delete['policyName']} deleted successfully."
                        status = 'Passed'
                    except Exception as e:
                        print(f"Error deleting {policy_arn}: {e}")
                        actual_result = f"Error deleting {policy_arn}: {e}"
                        status = 'Failed'

                    # Update the created_policies.json file
                    with open("/content/project/policies/created_policies.json", "w") as json_file:
                        json.dump(created_policies, json_file, indent=4)

                except Exception as e:
                    actual_result = f"Error: {str(e)}"
                    status = 'Failed'


                # Update the DataFrame with results
                df.at[index, 'Actual Result'] = actual_result
                df.at[index, 'Status'] = status
                print(f"{row['Test Case ID']} Result: {actual_result}, Status: {status}")

        #def cleanup_policies(iam_client, policy_arns):
        #   for policy_arn in policy_arns:
        #      try:
        #       print(f"Policy {policy_arn} deleted successfully.")
            #    except Exception as e:
            #       print(f"Error deleting policy {policy_arn}: {e}")

        # Final cleanup after processing all test cases
        #if created_policy_arns:
        #   cleanup_policies(iam_client, created_policy_arns)
        #  print(f"Deleted policies during final cleanup: {created_policy_arns}")
        # created_policy_arns.clear()  # Clear the list after cleanup

            # Save the cleared list back to the JSON file
            #save_to_json_file(created_policy_arns, '/content/project/policies/created_policies.json')

        try:
            wb = load_workbook(new_file_path)
            if 'Test Results' in wb.sheetnames:
                del wb['Test Results']
            ws = wb.create_sheet(title='Test Results')
            for col_idx, header in enumerate(df.columns, start=1):
                ws.cell(row=1, column=col_idx, value=header)
            for row_idx, row_data in enumerate(df.itertuples(index=False), start=2):
                for col_idx, cell_value in enumerate(row_data, start=1):
                    ws.cell(row=row_idx, column=col_idx, value=cell_value)
            wb.save(new_file_path)
            print("Results saved to 'Test Results' sheet in the Excel file.")

        except Exception as e:
            print(f"Error saving results: {e}")

IAM Users:
Username: KodeKloudQA, ARN: arn:aws:iam::911167888520:user/KodeKloudQA
Deleted policy: arn:aws:iam::911167888520:policy/DynamoDB_ReadWrite_Policy
Deleted policy: arn:aws:iam::911167888520:policy/S3_ReadOnly_Policy
Deleted the following customer-managed policies during pre-cleanup: ['arn:aws:iam::911167888520:policy/DynamoDB_ReadWrite_Policy', 'arn:aws:iam::911167888520:policy/S3_ReadOnly_Policy']
Policy created successfully:
{
    "PolicyName": "S3_ReadOnly_Policy",
    "PolicyARN": "arn:aws:iam::911167888520:policy/S3_ReadOnly_Policy"
}
TC001 Result: Policy created successfully. Details:
{
    "PolicyName": "S3_ReadOnly_Policy",
    "PolicyARN": "arn:aws:iam::911167888520:policy/S3_ReadOnly_Policy"
}, Status: Passed
Policy 'S3_ReadOnly_Policy' already exists. Skipping creation.
Policy 'EC2_FullAccess_Policy' created successfully. ARN: arn:aws:iam::911167888520:policy/EC2_FullAccess_Policy
Policy 'DynamoDB_ReadWrite_Policy' created successfully. ARN: arn:aws:iam::91116788852

Created Policies Cleanup

In [68]:
def cleanup_policies(iam_client, policy_arns):
    for policy_arn in policy_arns:
        try:
            # Delete the policy itself
            iam_client.delete_policy(PolicyArn=policy_arn)
            print(f"Policy {policy_arn} deleted successfully.")
        except Exception as e:
            print(f"Error deleting policy {policy_arn}: {e}")

# Add this function to your script
def save_to_json_file(data, file_path):
    """Saves the given data to a JSON file at the specified file path."""
    try:
        with open(file_path, 'w') as file:
            json.dump(data, file, indent=4)
        print(f"Data successfully saved to {file_path}")
    except Exception as e:
        print(f"Error saving data to {file_path}: {e}")

# Your existing cleanup code
if created_policy_arns:
    cleanup_policies(iam_client, created_policy_arns)
    print(f"Deleted policies during final cleanup: {created_policy_arns}")
    created_policy_arns.clear()  # Clear the list after cleanup

    # Save the cleared list back to the JSON file
    save_to_json_file(created_policy_arns, '/content/project/policies/created_policies.json')


Policy arn:aws:iam::911167888520:policy/S3_ReadOnly_Policy deleted successfully.
Policy arn:aws:iam::911167888520:policy/EC2_FullAccess_Policy deleted successfully.
Error deleting policy arn:aws:iam::911167888520:policy/DynamoDB_ReadWrite_Policy: An error occurred (NoSuchEntity) when calling the DeletePolicy operation: Policy arn:aws:iam::911167888520:policy/DynamoDB_ReadWrite_Policy was not found.
Deleted policies during final cleanup: ['arn:aws:iam::911167888520:policy/S3_ReadOnly_Policy', 'arn:aws:iam::911167888520:policy/EC2_FullAccess_Policy', 'arn:aws:iam::911167888520:policy/DynamoDB_ReadWrite_Policy']
Data successfully saved to /content/project/policies/created_policies.json
