In [4]:
import argparse
import boto3
import csv
from datetime import datetime, timedelta
import sys
import string
import random
from decimal import Decimal
import numpy as np
import pandas as pd

RecursionError: maximum recursion depth exceeded

In [None]:
# Parse command line arguments
parser = argparse.ArgumentParser(description='Collect AWS ElastiCache metrics with specific aggregation rules.')
parser.add_argument('-r', '--region', required=True, help='AWS region for the ElastiCache cluster')
parser.add_argument('-c', '--cluster', required=True, help='ElastiCache cluster ID')
parser.add_argument('-dr', '--day-range', type=int, default=1, help='Day range for hourly metrics collection <1>')
parser.add_argument('-o', '--output', required=False, help='Output CSV file name <output.csv>')
args = parser.parse_args()

if not args.region:
    print("ERROR: Missing region parameter. Please pass in the region name [US-EAST-1, US-EAST-2, and so on]")
    exit(1)

print(f"Region Name: {args.region}")
if not args.cluster:
    print("ERROR: Missing cluster name parameter. Please pass in the cluster name <example-elasti-cache>")
    exit(1)

print(f"Cluster Name: {args.cluster}")

if not args.output:
    args.output = 'cost_estimate_' + args.cluster + '_' + datetime.now().strftime("%H:%M_%d_%m_%Y") + '.csv'

In [None]:
ELASTICACHE_SERVERLESS_PRICING = {
    # US Regions
    "us-east-1": {  # US East (N. Virginia)
        "ecpu_per_million": 2.30,
        "storage_per_gb": 0.084
    },
    "us-east-2": {  # US East (Ohio)
        "ecpu_per_million": 2.30,
        "storage_per_gb": 0.084
    },
    "us-west-1": {  # US West (N. California)
        "ecpu_per_million": 2.76,
        "storage_per_gb": 0.101
    },
    "us-west-2": {  # US West (Oregon)
        "ecpu_per_million": 2.30,
        "storage_per_gb": 0.084
    },
    
    # Canada Region
    "ca-central-1": {  # Canada (Central)
        "ecpu_per_million": 2.53,
        "storage_per_gb": 0.092
    },
    
    # South America Region
    "sa-east-1": {  # South America (São Paulo)
        "ecpu_per_million": 3.45,
        "storage_per_gb": 0.126
    },
    
    # Europe Regions
    "eu-central-1": {  # Europe (Frankfurt)
        "ecpu_per_million": 2.53,
        "storage_per_gb": 0.092
    },
    "eu-west-1": {  # Europe (Ireland)
        "ecpu_per_million": 2.30,
        "storage_per_gb": 0.084
    },
    "eu-west-2": {  # Europe (London)
        "ecpu_per_million": 2.42,
        "storage_per_gb": 0.088
    },
    "eu-west-3": {  # Europe (Paris)
        "ecpu_per_million": 2.42,
        "storage_per_gb": 0.088
    },
    "eu-north-1": {  # Europe (Stockholm)
        "ecpu_per_million": 2.19,
        "storage_per_gb": 0.080
    },
    "eu-south-1": {  # Europe (Milan)
        "ecpu_per_million": 2.53,
        "storage_per_gb": 0.092
    },
    
    # Asia Pacific Regions
    "ap-east-1": {  # Asia Pacific (Hong Kong)
        "ecpu_per_million": 3.00,
        "storage_per_gb": 0.109
    },
    "ap-south-1": {  # Asia Pacific (Mumbai)
        "ecpu_per_million": 2.53,
        "storage_per_gb": 0.092
    },
    "ap-northeast-1": {  # Asia Pacific (Tokyo)
        "ecpu_per_million": 2.76,
        "storage_per_gb": 0.101
    },
    "ap-northeast-2": {  # Asia Pacific (Seoul)
        "ecpu_per_million": 2.53,
        "storage_per_gb": 0.092
    },
    "ap-northeast-3": {  # Asia Pacific (Osaka)
        "ecpu_per_million": 2.76,
        "storage_per_gb": 0.101
    },
    "ap-southeast-1": {  # Asia Pacific (Singapore)
        "ecpu_per_million": 2.76,
        "storage_per_gb": 0.101
    },
    "ap-southeast-2": {  # Asia Pacific (Sydney)
        "ecpu_per_million": 2.76,
        "storage_per_gb": 0.101
    },
    
    # Middle East Regions
    "me-south-1": {  # Middle East (Bahrain)
        "ecpu_per_million": 2.76,
        "storage_per_gb": 0.101
    },
    
    # Africa Regions
    "af-south-1": {  # Africa (Cape Town)
        "ecpu_per_million": 2.76,
        "storage_per_gb": 0.101
    }
}


In [None]:
#Initialize AWS clients
try:
    elasticache = boto3.client('elasticache', region_name=args.region)
    cloudwatch = boto3.client('cloudwatch', region_name=args.region)
except Exception as e:
    print(f"Error initializing AWS client, credential are probably missing: {e}")
    sys.exit(1)

def calculate_total_costs(df):
    """Calculate total costs across the date range"""
    total_storage_cost = Decimal(str(df['StorageCost'].sum()))
    total_cpu_cost = Decimal(str(df['eCPUCost'].sum()))
    total_cost = Decimal(str(df['TotalCost'].sum()))
    
    return {
        'storage_cost': total_storage_cost,
        'cpu_cost': total_cpu_cost,
        'total_cost': total_cost,
        'hours_analyzed': len(df)
    }

def get_nodes(cluster_id):
    """Retrieve primary nodes considering cluster mode enabled scenarios."""
    try:
        response = elasticache.describe_replication_groups(ReplicationGroupId=cluster_id)
        all_nodes = response['ReplicationGroups'][0].get('MemberClusters', [])
        return all_nodes
    except Exception as e:
        print(f"Error retrieving cluster node details: {e}")
        sys.exit(1)

def get_metric_data(metric_name, node_id, start_time, end_time, period=3600, stat='Average'):
    """Aggregate metric data for given node ID over the specified time range."""
    aggregated_data = {}
    
    response = cloudwatch.get_metric_statistics(
        Namespace='AWS/ElastiCache',
        MetricName=metric_name,
        Dimensions=[{'Name': 'CacheClusterId', 'Value': node_id}],
        StartTime=start_time,
        EndTime=end_time,
        Period=period,  # default 3600 seconds or 1 hour
        Statistics=[stat],
    )

    for datapoint in response['Datapoints']:
        timestamp = datapoint['Timestamp'].strftime('%Y-%m-%d %H:%M:%S')
        if timestamp not in aggregated_data:
            aggregated_data[timestamp] = datapoint[stat]
        else:
            aggregated_data[timestamp] += datapoint[stat]
    return aggregated_data

In [None]:
def collect_and_write_metrics(cluster_id, start_time, end_time, filename):
    """Main function that collects, displays, and saves metrics data to a csv file."""
    primary_nodes = []
    reader_nodes = []
    
    all_nodes = get_nodes(cluster_id)

    # Identify the primary and read replica nodes
    try:
        # Generate a list of current primary and read replica nodes
        # based on the role each cluster node played in the last minute
        l_start_time = end_time - timedelta(minutes=1)
        for node in all_nodes:
            aggregated_data = get_metric_data('IsMaster', node, l_start_time, end_time, 60, 'Sum')
            if next(iter(aggregated_data.values())) == 1.0:
                primary_nodes.append(node)
            else:
                reader_nodes.append(node)
    except Exception as e:
        print(f"No metrics exist for cluster: {cluster_id}")
        sys.exit(1)

    primary_node = primary_nodes[0]
    print("Primary node used: " + primary_node)

    if len(reader_nodes) > 1:
        reader_node = reader_nodes[0]
        print("Reader node used: " + reader_node)
    else:
        reader_node = None

    num_shards = len(primary_nodes)
    num_readers = len(reader_nodes)
    num_replicas = int(num_readers/num_shards)

    print("Number of primaries: " + str(num_shards))
    print("Number of replicas: " + str(num_replicas))

    # Prepare data structure for CSV writing
    collected_data = {}

    # For a primary node collect the following metrics
    for metric in ['BytesUsedForCache', 'EvalBasedCmds', 'EvalBasedCmdsLatency', 'GetTypeCmds', 
                   'NetworkBytesIn', 'NetworkBytesOut', 'ReplicationBytes', 'SetTypeCmds']:
        # Retrieve the average for the below metrics
        if metric in ['BytesUsedForCache', 'EvalBasedCmdsLatency']:
            aggregated_data = get_metric_data(metric, primary_node, start_time, end_time, stat='Average')
        # The sum for the rest of the metrics
        else:
            aggregated_data = get_metric_data(metric, primary_node, start_time, end_time, stat='Sum')

        for timestamp, value in aggregated_data.items():
            if timestamp not in collected_data:
                collected_data[timestamp] = {}
            collected_data[timestamp][metric] = value

    # For a read replica node only the GetTypeCmds and NetworkBytesOut metrics are needed
    if reader_node is not None:
        for metric in ['GetTypeCmds', 'NetworkBytesOut']:
            aggregated_data = get_metric_data(metric, reader_node, start_time, end_time, stat='Sum')
            reader_metric = 'Reader' + metric
            for timestamp, value in aggregated_data.items():
                collected_data[timestamp][reader_metric] = value

    dataKeys = list(collected_data.keys())
    dataKeys.sort()
    sorted_collected_data = {i: collected_data[i] for i in dataKeys}

    pd.set_option('display.max_rows', None)
    df = pd.DataFrame(sorted_collected_data)
    df = df.transpose()

    # Since certain fields might not be populated, for lack of data, set them to 0
    df['GetTypeCmds'] = df.get('GetTypeCmds', 0)
    df['SetTypeCmds'] = df.get('SetTypeCmds', 0)
    df['EvalBasedCmds'] = df.get('EvalBasedCmds', 0)
    df['EvalBasedCmdsLatency'] = df.get('EvalBasedCmdsLatency', 0)
    df['ReaderGetTypeCmds'] = df.get('ReaderGetTypeCmds', 0)
    df['ReaderNetworkBytesOut'] = df.get('ReaderNetworkBytesOut', 0)
    df['ReplicationBytes'] = df.get('ReplicationBytes', 0)
    df = df.fillna(0)

    columns = ['BytesUsedForCache', 'EvalBasedCmds', 'EvalBasedCmdsLatency', 'GetTypeCmds', 
               'ReaderGetTypeCmds', 'NetworkBytesIn', 'NetworkBytesOut', 'ReaderNetworkBytesOut', 
               'ReplicationBytes', 'SetTypeCmds']
    df = df[columns]
    
    df['TotalSizeMB'] = df['BytesUsedForCache'].div(1000*1000).mul(num_shards).round(2).apply(lambda x : "{:,}".format(x))
    df['EvaleCPU'] = (df['EvalBasedCmds'].mul(num_shards) * df['EvalBasedCmdsLatency'].div(2)).astype(int)
    df['EvalBasedCmds'] = df['EvalBasedCmds'].mul(num_shards)
    
    # Prevent division by 0
    df['AVGInSize'] = np.where(df['SetTypeCmds'] == 0, 0,
                              df['NetworkBytesIn'].div(1000)/df['SetTypeCmds'].astype(int))
    
    df['PrimaryIneCPU'] = np.where((df['AVGInSize'] > 0) & (df['AVGInSize'] < 1), 
                                  df['SetTypeCmds'] * num_shards,
                                  df['SetTypeCmds'] * df['AVGInSize'] * num_shards)

    df['GetTypeCmds'] = df['GetTypeCmds'].astype(int)
    df['ReaderGetTypeCmds'] = df['ReaderGetTypeCmds'].astype(int)

    df['AVGOutSize'] = np.where(df['GetTypeCmds'] == 0, 0,
                               ((df['NetworkBytesOut'].div(1000))-
                                (df['ReplicationBytes'].div(1000).mul(num_readers)))/
                               df['GetTypeCmds'].astype(int))

    df['ReaderAVGOutSize'] = np.where(df['ReaderGetTypeCmds'] == 0, 0,
                                     df['ReaderNetworkBytesOut'].div(1000)/
                                     df['ReaderGetTypeCmds'].astype(int))

    df['PrimaryOuteCPU'] = np.where((df['AVGOutSize'] > 0) & (df['AVGOutSize'] < 1), 
                                   df['GetTypeCmds'] * num_shards,
                                   df['GetTypeCmds'] * df['AVGOutSize'] * num_shards)

    df['ReaderOuteCPU'] = np.where((df['ReaderAVGOutSize'] > 0) & (df['ReaderAVGOutSize'] <= 1), 
                                  df['ReaderGetTypeCmds'].astype(int) * num_readers,
                                  df['ReaderGetTypeCmds'].astype(int) * df['ReaderAVGOutSize'] * num_readers)

    df['SetTypeCmds'] = df['SetTypeCmds'].astype(int)

    # Minimum storage cost is for 100MB
    df['StorageCost'] = np.where(df['BytesUsedForCache'].div(1000*1000).mul(num_shards).round(4) <= 100, 
                                (0.084),
                                df['BytesUsedForCache'].div(1000*1000*1000).mul(num_shards).mul(0.084).round(2))

    df['eCPUCost'] = (df['EvaleCPU'].mul(0.0000000023).apply(lambda x: round(x, 4)) + 
                      df['PrimaryIneCPU'].mul(0.0000000023).apply(lambda x: round(x, 4)) + 
                      df['PrimaryOuteCPU'].mul(0.0000000023).apply(lambda x: round(x, 4)) + 
                      df['ReaderOuteCPU'].mul(0.0000000023).apply(lambda x: round(x, 4)))
    
    df['TotalCost'] = (df['StorageCost'] + df['eCPUCost']).round(3)

    print("")
    print(df[['TotalSizeMB', 'EvaleCPU', 'PrimaryIneCPU', 'PrimaryOuteCPU', 'ReaderOuteCPU', 
              'StorageCost', 'eCPUCost', 'TotalCost']])

    # Calculate total costs across the date range
    total_costs = calculate_total_costs(df)
    
    print("\nSummary for the entire period:")
    print(f"Total Hours Analyzed: {total_costs['hours_analyzed']}")
    print(f"Total Cost: ${total_costs['total_cost']:.3f}")
    
    # Add summary rows to DataFrame
    summary_rows = [
        pd.Series({
            'TotalSizeMB': '',
            'StorageCost': '',
            'eCPUCost': '',
            'TotalCost': ''
        }, name=''),
        pd.Series({
            'TotalSizeMB': 'SUMMARY',
            'StorageCost': '',
            'eCPUCost': '',
            'TotalCost': ''
        }, name='Summary Statistics'),
        pd.Series({
            'TotalSizeMB': f'Total Hours Analyzed: {total_costs["hours_analyzed"]}',
            'StorageCost': '',
            'eCPUCost': '',
            'TotalCost': ''
        }, name=''),
        pd.Series({
            'TotalSizeMB': f'Total Cost: ${total_costs["total_cost"]:.3f}',
            'StorageCost': '',
            'eCPUCost': '',
            'TotalCost': ''
        }, name='')
    ]

    # Concatenate the original DataFrame with the summary rows
    df = pd.concat([df] + [pd.DataFrame([row]) for row in summary_rows])

    # Write to CSV
    with open(filename, 'w', newline='') as csvfile:
        df.to_csv(csvfile, index=True)

In [None]:
if __name__ == '__main__':
    end_time = datetime.utcnow()
    end_time = end_time.replace(minute=0, second=0)
    start_time = end_time - timedelta(days=args.day_range)
    print('Start time: ' + str(start_time))
    print('End time: ' + str(end_time))
    collect_and_write_metrics(args.cluster, start_time, end_time, args.output)