# Exploration Notebook for Cloudwatch Alarm Analyser

## Setup Dependencies and Clients
Local VSCode Jupyter doesn't play nice with IsengardCLI so I set the creds EnvVars explicitly

In [36]:
import datetime
import json
import os

import boto3

os.environ["AWS_PROFILE"] = "andevelt+docker-Admins"
os.environ["AWS_REGION"] = "eu-west-1"

cw_client = boto3.client("cloudwatch", region_name=os.environ["AWS_REGION"])


## Function to Download All Alarms
Places the alarm dictionaries into a separate metrics and composite alarm lists.

In [3]:
def retrieve_all_cw_alarms(client):
    paginator = client.get_paginator("describe_alarms")

    metric_alarms_list = []
    composite_alarms_list = []
    for page in paginator.paginate():
        print(f"Page:\n {page}")
        for alarm in page["MetricAlarms"]:
            # print(f"Metric Alarm retrieved: {alarm}")
            metric_alarms_list.append(alarm)
        for alarm in page.get("CompositeAlarms", []):
            # print(f"Composite Alarm retrieved: {alarm}")
            composite_alarms_list.append(alarm)

    # print(f"Total Metric Alarms: {len(metric_alarms_list)}")
    # print(f"Total Composite Alarms: {len(composite_alarms_list)}")

    return metric_alarms_list, composite_alarms_list

## Set Sample Alarm to Experiment

In [None]:
metric_alarms, comp_alarms = retrieve_all_cw_alarms(cw_client)

for alarm in metric_alarms:
    print(alarm)

sample_alarm = metric_alarms[48]

print(sample_alarm)

## General Alarm Checks
We only need the alarm info from describe_alarms to check these things
### Alarm has description? 

In [5]:
def alarm_has_description(alarm):
    alarm_desc = alarm.get("AlarmDescription")

    if alarm_desc is None or alarm_desc.strip() == "":
        return False

    return True


### Alarms With Long Thersholds or Too Many Data Points
These alarms are probably unlikely to ever trigger

In [6]:
def alarm_theshold_too_high(alarm):
    alarm_threshold = alarm.get("Threshold")

    if not alarm_threshold or alarm_threshold > 30.0:
        return True
    return False


def alarm_data_points_too_high(alarm):
    alarm_data_points = alarm.get("DatapointsToAlarm")

    if not alarm_data_points or alarm_data_points > 15:
        return True
    return False

### Alarm with No Actions

In [7]:
def alarm_has_actions(alarm):
    return True if len(alarm["AlarmActions"]) > 0 else False

In [None]:
for alarm in metric_alarms:
    if not alarm_has_description(alarm):
        print(f"Missing Description for Alarm: {alarm["AlarmName"]}")
    if alarm_theshold_too_high(alarm):
        print(
            f"Alarm Threshold Too High: {alarm["AlarmName"]} Threshold set at {alarm.get("Threshold")}"
        )
    if alarm_data_points_too_high(alarm):
        print(
            f"Alarm Data Points Too High: {alarm["AlarmName"]} Data Points set at {alarm.get("DatapointsToAlarm")}"
        )
    if not alarm_has_actions(alarm):
        print(f"Missing Actions for Alarm: {alarm["AlarmName"]}")


## Get History for Alarm

In [9]:
def get_alarm_history(client, alarm):
    response = cw_client.describe_alarm_history(
        AlarmName=sample_alarm["AlarmName"]
    )
    return response[
        "AlarmHistoryItems"
    ]  ## TODO: Probably need to add pagination to get full history

In [10]:
alarm_history = get_alarm_history(cw_client, sample_alarm)


## History Based Checks
A series of checks that require going through the history of an alarm to determine
May also limit this to 2 weeks of history or something

### Long Lived Alarm State
Alarm triggers and isn't resolved for > 48 hours

In [40]:
def long_lived_alarm(alarm_history_items):
    long_lived_alarm_count = 0
    alarm_stack = []  # In reality, this should ALWAYS be len==1 or len==0 but i like .pop
    for alarm in alarm_history_items:
        if len(alarm_stack) > 1:  # I assume this is the case
            print(
                "Error: Alarm cannot be triggered twice without being resolved."
            )
            break

        if alarm["HistoryItemType"] != "StateUpdate":
            continue

        if alarm["HistorySummary"] == "Alarm updated from OK to ALARM":
            alarm_stack.append(alarm)

        elif alarm["HistorySummary"] == "Alarm updated from ALARM to OK":
            if (
                len(alarm_stack) == 0
            ):  # TODO: remove this when grabbing FULL history (catches case where pagination happens on active alarm)
                continue
            alarm_trigger = alarm_stack.pop()
            alarm_hist = json.loads(alarm["HistoryData"])
            alarm_trigger_hist = json.loads(alarm_trigger["HistoryData"])

            # Sanity check something hasn't been missed by comparing state changes
            if alarm_hist["newState"] != alarm_trigger_hist["oldState"]:
                print(
                    "Error: Alarm cannot be triggered twice without being resolved."
                )
                break

            alarm_time_string = alarm_trigger_hist["newState"][
                "stateReasonData"
            ]["startDate"]
            alarm_time = datetime.datetime.strptime(
                alarm_time_string, "%Y-%m-%dT%H:%M:%S.%f%z"
            )

            ok_time_string = alarm_hist["newState"]["stateReasonData"][
                "startDate"
            ]
            ok_time = datetime.datetime.strptime(
                ok_time_string, "%Y-%m-%dT%H:%M:%S.%f%z"
            )

            time_to_solve = ok_time - alarm_time
            if time_to_solve > datetime.timedelta(hours=48):
                long_lived_alarm_count += 1
    return long_lived_alarm_count


### Never Alerted
Alarms that have never alerted since creation

In [12]:
def never_alerted(alarm_history):
    if not alarm_history["AlarmHistoryItems"]:
        return True
    return False

### Noisy Alarm
Alarms which close within 2 minutes

In [None]:
def noisy_alarm(alarm_history):
    pass

### Alarms Indicating Long Term Issues
Alarms occuring at least once a day, every 2 days or less.

In [None]:
def long_term_issue_alarms(alarm_history):
    pass

In [41]:
long_lived_alarms = long_lived_alarm(alarm_history)

In [None]:
# for item in alarm_history:
#     print(item)

print(alarm_history)
print(alarm_history[0])
print(type(json.loads(alarm_history[0]["HistoryData"])))
print(json.loads(alarm_history[0]["HistoryData"]))
print(type(json.loads(alarm_history[0]["HistoryData"])["oldState"]))
print(
    type(
        json.loads(alarm_history[0]["HistoryData"])["oldState"][
            "stateReasonData"
        ]
    )
)
print(
    type(
        json.loads(alarm_history[0]["HistoryData"])["oldState"][
            "stateReasonData"
        ]["startDate"]
    )
)

In [None]:
from datetime import timedelta

dt = "2024-10-02T14:36:59.792+0000"

print(type(datetime.datetime.strptime(dt, "%Y-%m-%dT%H:%M:%S.%f%z")))