In [None]:
import boto3 

session = boto3.Session()
sts_client = session.client('sts')
identity = sts_client.get_caller_identity()
identity

In [None]:
import polars as pl
from polars import col
from itables import init_notebook_mode

pl.Config.set_tbl_rows(25)
pl.Config.set_fmt_str_lengths(255)

init_notebook_mode(all_interactive=False)
df = pl.read_parquet('./hearst-sandbox-dec-cur.snappy.parquet')
df

In [None]:
def split_arn(arn: str) -> list:
    """
    Splits the ARN into its components and validates basic structure.

    Parameters:
    - arn (str): The ARN to split and validate.

    Returns:
    - list: Components of the ARN.
    """
    if not arn.startswith("arn:"):
        raise ValueError("Invalid ARN: must start with 'arn:'")
    
    arn_parts = arn.split(":")
    
    if len(arn_parts) < 6:
        raise ValueError("Invalid ARN format.")
    
    return arn_parts

def extract_resource_id_from_arn(arn: str) -> str:
    """
    Extracts the resource ID from a given AWS ARN string if applicable.

    Parameters:
    - arn (str): The ARN from which to extract the resource ID.

    Returns:
    - str: The resource ID extracted from the ARN.
    """
    arn_parts = split_arn(arn)
    resource_part = arn_parts[5]

    # Specific handling for ARNs where resource is not delimited
    if arn.startswith("arn:aws:s3:::"):
        return resource_part  # Directly return the bucket name

    # Handle case where the resource part might have specific format, e.g., "resource-type/resource-id"
    resource_id = resource_part.split("/")[-1]

    return resource_id

def is_non_arn_identifier(arn: str) -> bool:
    """
    Determines if an ARN belongs to a resource typically shown as a non-ARN identifier in CUR.

    Args:
    - arn (str): The ARN string to check.

    Returns:
    - bool: True if the resource is typically identified by a non-ARN ID in CUR, otherwise False.
    """
    arn_parts = split_arn(arn)
    resource_info = arn_parts[5]
    resource_type = resource_info.split("/")[0].lower()

    # Include "bucket" to target S3 resources accurately
    non_arn_resources = {"instance", "vol", "snap", "db", "table", "bucket", "net"}

    return resource_type in non_arn_resources

def extract_and_handle_resource_id(arn: str) -> str:
    """
    Extracts and returns the resource ID from an ARN only if it is a known non-ARN resource type.

    Args:
    - arn (str): The ARN from which to possibly extract the resource ID.

    Returns:
    - str: The resource ID if applicable, otherwise returns the entire ARN.
    """
    if is_non_arn_identifier(arn):
        return extract_resource_id_from_arn(arn)
    
    return arn

# Example usages
arn1 = "arn:aws:ec2:us-east-1:123456789012:instance/i-0abcdef1234567890"
arn2 = "arn:aws:s3:::my-example-bucket"

resource_id1 = extract_and_handle_resource_id(arn1)
resource_id2 = extract_and_handle_resource_id(arn2)

print(f"Resource ID 1: {resource_id1}")  # Should extract and print instance ID
print(f"Resource ID 2: {resource_id2}")  # Should print bucket name

## Isolate resource_ids which are not ARNs
This will also test out joining them back with the tags triplet (just EC2 and one region)

In [None]:
# isolate the non-ARN resources in EC2

non_arn_df = df.filter(col("line_item_resource_id").is_not_null() &
                    col("product_region_code").is_not_null() &
                    col("line_item_product_code").eq("AmazonEC2") &
                    ~col("line_item_resource_id").str.starts_with("arn:aws:") 
                    )


derived_arn_df = non_arn_df.with_columns(
    pl.when(col("line_item_resource_id").str.starts_with("i-"))
    .then(
        "arn:aws:ec2:" + col("product_region_code") + ":" + col("line_item_usage_account_id") + ":instance/" + col("line_item_resource_id")
    )
    .when(col("line_item_resource_id").str.starts_with("vol-"))
    .then(
        "arn:aws:ec2:" + col("product_region_code") + ":" + col("line_item_usage_account_id") + ":volume/" + col("line_item_resource_id")
    )
    .otherwise(col("line_item_resource_id"))
    .alias("ResourceARN")  # Replace the column with the new one
)

derived_arn_df

## Actual Code


In [None]:
# get all unique AWS resources
not_null_resources_df = df.filter(pl.col('line_item_resource_id').is_not_null() & pl.col('product_region_code').is_not_null())
# Select columns relevant for TCO calculation in addition to those for resources identification
unique_resources_df = not_null_resources_df.select([
    'line_item_resource_id', 
    'line_item_product_code', 
    'product_region_code', 
    'line_item_usage_account_id',
    'resource_tags',

    'line_item_blended_cost', 
    'line_item_blended_rate', 

    'line_item_unblended_cost',
    'line_item_unblended_rate', 

    'savings_plan_savings_plan_effective_cost',  # or potential discount columns
    'pricing_term'  # A potential column for identifying fixed vs variable costs
])
# unique_resources_df = unique_resources_df.unique('line_item_resource_id')
unique_resources_df = unique_resources_df.sort('line_item_product_code')
unique_resources_df.write_parquet('./non-null-resources.snappy.parquet', compression='snappy')
unique_resources_df


In [None]:
# # Define the filtering condition for EBS Volumes and EC2 Instances
# filtered_df = unique_resources_df.filter(
#     (pl.col('line_item_product_code').str.contains('EC2')) &
#     (pl.col('line_item_resource_id').str.contains(r'i-')) |
#     (pl.col('line_item_resource_id').str.contains(r'vol-'))
# )

# # Optionally, display the filtered DataFrame
# filtered_df

In [None]:
# get all unique AWS regions
region_df = not_null_resources_df.select('product_region_code').unique()
regions = region_df.to_dict().get('product_region_code')
for r in regions:
    print(r)


In [None]:
# create dataframe where each row is a resource arn, a tag name, and a tag value
# there will be multiple rows for each resource arn if there are multiple tags
triplets_dict = {
        "ResourceARN": [],
        "TagKey": [],
        "TagValue": []
    }


tag_key = 'appNumber'
tagged_resources = []
for r in regions:
    client = boto3.client('resourcegroupstaggingapi', region_name=r)
    paginator = client.get_paginator('get_resources')
    # response_iterator = paginator.paginate()

    # paginator = client.get_resources()
    # response_iterator = paginator.paginate()
    response_iterator = paginator.paginate(
    TagFilters=[
        {
            'Key': tag_key,
            # 'Values': [
            #     'app3',
            #     'app2'
            # ]
        },
    ],
    #   ResourceTypeFilters=[
    #     'ec2:instance',
    # ]
    )

    
    for response in response_iterator:
        for resource in response["ResourceTagMappingList"]:
            # print(resource["ResourceARN"], resource["Tags"])
            for tag in resource["Tags"]:
                # skip null and empty values
                if all([resource["ResourceARN"], tag["Key"], tag["Value"]]):
                    # print(resource["ResourceARN"], tag["Key"], tag["Value"])
                    if tag["Key"] == tag_key:
                        tagged_resources.extend(triplets_dict["ResourceARN"])
                        triplets_dict["ResourceARN"].append(resource["ResourceARN"])
                        triplets_dict["TagKey"].append(tag["Key"])
                        triplets_dict["TagValue"].append(tag["Value"])
triplets_df = pl.DataFrame(triplets_dict)
triplets_df = triplets_df.sort('TagKey')
triplets_df = triplets_df.unique('ResourceARN')


tagged_resources_set = set(tagged_resources)
print(tagged_resources_set)
print(len(tagged_resources_set))
# triplets_df.write_parquet('./tag-data.snappy.parquet', compression='snappy')
triplets_df


In [None]:
# create dataframe where each row is a resource arn, a tag name, and a tag value
# there will be multiple rows for each resource arn if there are multiple tags
triplets_dict = {
        "ResourceARN": [],
        "TagKey": [],
        "TagValue": []
    }

for r in regions:
    client = boto3.client('resourcegroupstaggingapi', region_name=r)
    paginator = client.get_paginator('get_resources')
    # response_iterator = paginator.paginate()

    # paginator = client.get_resources()
    response_iterator = paginator.paginate()
    # response_iterator = paginator.paginate(ResourceTypeFilters=[
    #     'ec2:instance',
    # ])

    for response in response_iterator:
        for resource in response["ResourceTagMappingList"]:
            # print(resource["ResourceARN"], resource["Tags"])
            for tag in resource["Tags"]:
                # skip null and empty values
                if all([resource["ResourceARN"], tag["Key"], tag["Value"]]):
                    # print(resource["ResourceARN"], tag["Key"], tag["Value"])
                    triplets_dict["ResourceARN"].append(resource["ResourceARN"])
                    triplets_dict["TagKey"].append(tag["Key"])
                    triplets_dict["TagValue"].append(tag["Value"])
triplets_df = pl.DataFrame(triplets_dict)
triplets_df = triplets_df.sort('TagKey')

triplets_df.write_parquet('./tag-data.snappy.parquet', compression='snappy')
triplets_df


In [None]:
import pyarrow.parquet as pq


# Replace 'example.parquet' with the path to your local Parquet file for inspection
table = pq.read_table('tag-data.snappy.parquet')

# Extract and print schema
schema = table.schema
print("CREATE EXTERNAL TABLE IF NOT EXISTS cur_database.tag_data_table (")
for field in schema:
    print(f"  {field.name} {field.type},")
print(")")
print("STORED AS PARQUET")
print("LOCATION 's3://curprocessorstack-curbucket1acad2a6-josrlebznwwi/quicksight/tag-data.snappy.parquet';")

In [None]:
import re

# Compile the regex pattern once
arn_regex = re.compile(
    r"^arn:"
    r"(?P<partition>aws(-[a-z]+)?):"
    r"(?P<service>[a-z0-9-]+):"
    r"(?P<region>[a-z0-9-]*):"
    r"(?P<account_id>[0-9]*):"
    r"(?P<resource>.+)$"
)

def is_arn(string):
    """Check if a string is an AWS ARN."""
    match = arn_regex.match(string)
    return match is not None

def reconstruct_arn(product_code: str, resource_id: str, account_id: str, region: str) -> str:
    """
    Reconstructs an ARN for a given resource based on product code and resource ID.
    """
    if is_arn(resource_id):
        return resource_id
    if product_code == 'AmazonEC2':
        if resource_id.startswith('i-'):
            return f"arn:aws:ec2:{region}:{account_id}:instance/{resource_id}"
        elif resource_id.startswith('vol-'):
            return f"arn:aws:ec2:{region}:{account_id}:volume/{resource_id}"
        elif resource_id.startswith('snap-'):
            return f"arn:aws:ec2:{region}:{account_id}:snapshot/{resource_id}"
    elif product_code in {'AmazonRDS', 'rds'}:
        if resource_id.startswith('db-'):
            return f"arn:aws:rds:{region}:{account_id}:db:{resource_id}"
    elif product_code == 'AmazonS3':
        return f"arn:aws:s3:::{resource_id}"
    elif product_code == 'AmazonDynamoDB':
        return f"arn:aws:dynamodb:{region}:{account_id}:table/{resource_id}"
    return f"UnknownServiceType: {product_code} or Resource: {resource_id} not supported"

# Convert DataFrame to list of dictionaries
resources_ids = unique_resources_df.to_dicts()

# Placeholder for reconstructed ARNs
reconstructed_resources = []

# Loop through resources and reconstruct ARNs
for resource in resources_ids:
    resource_id = resource['line_item_resource_id']
    product_code = resource['line_item_product_code']
    region = resource['product_region_code']
    account_id = resource['line_item_usage_account_id']
    
    reconstructed_arn = reconstruct_arn(product_code, resource_id, account_id, region)
    resource['line_item_resource_id'] = reconstructed_arn
    
    # Append the modified resource back
    reconstructed_resources.append(resource)

# Convert the updated list of dictionaries back to a DataFrame
updated_df = pl.DataFrame(reconstructed_resources)

# Save the modified DataFrame to a Parquet file
updated_df.write_parquet('./modified-unique-resources.snappy.parquet', compression='snappy')
updated_df

In [None]:
# Replace 'example.parquet' with the path to your local Parquet file for inspection
table = pq.read_table('modified-unique-resources.snappy.parquet')

# Extract and print schema
schema = table.schema
print("CREATE EXTERNAL TABLE IF NOT EXISTS cur_database.modified_unique_resources_table (")
for field in schema:
    print(f"  {field.name} {field.type},")
print(")")
print("STORED AS PARQUET")
print("LOCATION 's3://curprocessorstack-curbucket1acad2a6-josrlebznwwi/quicksight/modified-unique-resources/';")

In [None]:
# Define the filtering condition for EBS Volumes and EC2 Instances
filtered_df = updated_df.filter(
    (pl.col('line_item_product_code').str.contains('EC2')) &
    (pl.col('line_item_resource_id').str.contains(r'i-')) |
    (pl.col('line_item_resource_id').str.contains(r'vol-'))
)

# Optionally, display the filtered DataFrame
filtered_df

# Save the filtered DataFrame to a Parquet file
# filtered_df.write_parquet('./ebs-ec2-resources.snappy.parquet', compression='snappy')

In [None]:
counter1 = 0

non_arns = []
for r in resources_ids:
    if not is_arn(r['line_item_resource_id']):
        non_arns.append({'id': r['line_item_resource_id'], 'service': r['line_item_product_code']})
        counter1 = counter1 + 1


print(len(resources_ids))
print(counter1)

# Print results"
print("Number of Non-ARN Strings:", len(non_arns))
non_arns

In [None]:
# Replace 'example.parquet' with the path to your local Parquet file for inspection
table = pq.read_table('unique-resources.snappy.parquet')

# Extract and print schema
schema = table.schema
print("CREATE EXTERNAL TABLE IF NOT EXISTS cur_database.unique_resources_table (")
for field in schema:
    print(f"  {field.name} {field.type},")
print(")")
print("STORED AS PARQUET")
print("LOCATION 's3://curprocessorstack-curbucket1acad2a6-josrlebznwwi/quicksight/unique-resources.snappy.parquet';")