#### Resources

- [Amazon CloudWatch concepts](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html): Namespaces, Metrics, Dimensions, Resolution, Alarms, etc.

- [CloudWatch examples using SDK for Python (Boto3)](https://docs.aws.amazon.com/code-library/latest/ug/python_3_cloudwatch_code_examples.html)

- [Embedding metrics within logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format.html)

- [AWS Logs, Query Logs, Query  Syntax Details](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AnalyzingLogData.html)
  - [Useful Insights queries](https://docs.aws.amazon.com/lambda/latest/operatorguide/useful-queries.html)

## Using `boto3`

In [41]:
import time
import logging
from botocore.exceptions import ClientError
from mypy_boto3_cloudwatch.service_resource import CloudWatchServiceResource


# Set up logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
if not logger.handlers:
    handler = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)

class CloudWatchWrapper:
    """Encapsulates CloudWatch functions."""

    def __init__(self, cloudwatch_resource: "CloudWatchServiceResource"):
        """
        :param cloudwatch_resource: A Boto3 CloudWatch resource.
        """
        self.cloudwatch_resource = cloudwatch_resource

    def put_metric_data(self, namespace: str, name: str, value: float, unit: str, dimensions: list[dict]) -> None:
        """
        Sends a single data value to CloudWatch for a metric. This metric is given
        a timestamp of the current UTC time.

        :param namespace: The namespace of the metric.
        :param dimensions: The dimensions of the metric.
        :param name: The name of the metric.
        :param value: The value of the metric.
        :param unit: The unit of the metric.
        """
        try:
            metric = self.cloudwatch_resource.Metric(namespace=namespace, name=name)
            metric.put_data(
                Namespace=namespace,
                MetricData=[
                    {
                        "MetricName": name,
                        "Value": value,
                        "Unit": unit,
                        "Timestamp": time.time(),
                        "Dimensions": dimensions,
                    }
                ],
            )
            logger.info("Put data for metric %s.%s", namespace, name)
        except ClientError:
            logger.exception("Couldn't put data for metric %s.%s", namespace, name)
            raise

    def put_metric_data_set(
        self,
        namespace: str,
        name: str,
        timestamp: float,
        unit: str,
        dataset: dict,
        dimensions: list[dict],
    ) -> None:
        """
        Sends a set of data to CloudWatch for a metric. All of the data in the set
        have the same timestamp and unit.

        :param namespace: The namespace of the metric.
        :param name: The name of the metric.
        :param timestamp: The UTC timestamp for the metric.
        :param unit: The unit of the metric.
        :param data_set: The set of data to send. This set is a dictionary that
                         contains a list of values and a list of corresponding counts.
                         The value and count lists must be the same length.
        :param dimensions: The dimensions of the metric.
        """
        try:
            metric = self.cloudwatch_resource.Metric(namespace, name)
            metric.put_data(
                Namespace=namespace,
                MetricData=[
                    {
                        "MetricName": name,
                        "Timestamp": timestamp,
                        "Unit": unit,
                        "Counts": dataset["counts"],
                        "Values": dataset["values"],
                        "Dimensions": dimensions,
                    }
                ],
            )
            logger.info("Put data set for metric %s.%s.", namespace, name)
        except ClientError:
            logger.exception("Couldn't put data set for metric %s.%s.", namespace, name)
            raise

In [42]:
import random
import uuid
import time
import uuid
from datetime import datetime, timezone


def process_order() -> None:
    """Simulate processing an order."""
    # Simulate processing time
    time.sleep(random.uniform(0.1, 0.5))
    # Randomly raise an exception to simulate an error
    if random.random() < 0.1:
        raise Exception("Error processing order")


def put_order_metric(cloudwatch_wrapper: CloudWatchWrapper) -> None:
    """Put a metric for an order."""
    start_time = time.time()
    order_id = str(uuid.uuid4())[:8]

    # Define the dimensions
    dimensions = [{"Name": "OrderType", "Value": "Standard"}]
    try:
        # Process the order
        process_order()

        # Calculate the processing time
        processing_time = time.time() - start_time

        # Log the order id
        logger.info(f"Order ID: {order_id}")

        # Put custom metrics to CloudWatch
        cloudwatch_wrapper.put_metric_data(
            namespace="Orders", name="ProcessedOrders", value=1, unit="Count", dimensions=dimensions
        )
        cloudwatch_wrapper.put_metric_data(
            namespace="Orders", name="ProcessingTime", value=processing_time, unit="Seconds", dimensions=dimensions
        )
    except Exception as e:
        cloudwatch_wrapper.put_metric_data(
            namespace="Orders", name="ProcessingErrors", value=1, unit="Count", dimensions=dimensions
        )
        raise e


# Function to simulate processing multiple orders and put metrics as a set
def put_metrics_set(cloudwatch_wrapper: CloudWatchWrapper, number_of_orders: int) -> None:
    """Simulate processing multiple orders and put metrics as a set."""
    order_ids = []
    processing_times = []
    processing_counts = []
    errors = 0

    for _ in range(number_of_orders):
        order_id = str(uuid.uuid4())[:8]
        order_ids.append(order_id)
        start_time = time.time()

        try:
            process_order()
            processing_time = time.time() - start_time
            processing_times.append(processing_time)
            processing_counts.append(1)
            logger.info(f"Order ID: {order_id} processed successfully.")
        except Exception as e:
            errors += 1
            logger.error(f"Error processing order ID {order_id}: {e}")

    timestamp = datetime.now(timezone.utc)
    dimensions = [{"Name": "OrderType", "Value": "Standard"}]

    # Put custom metrics set to CloudWatch
    if processing_times:
        cloudwatch_wrapper.put_metric_data_set(
            namespace="Orders",
            name="ProcessingTime",
            timestamp=timestamp,
            unit="Seconds",
            dataset={"values": processing_times, "counts": processing_counts},
            dimensions=dimensions,
        )
        cloudwatch_wrapper.put_metric_data(
            namespace="Orders", name="ProcessedOrders", value=len(processing_times), unit="Count", dimensions=dimensions
        )
    if errors:
        cloudwatch_wrapper.put_metric_data(
            namespace="Orders", name="ProcessingErrors", value=errors, unit="Count", dimensions=dimensions
        )

In [43]:
import boto3

# Create a Boto3 CloudWatch resource
cloudwatch_resource = boto3.resource("cloudwatch")

# Create a CloudWatchWrapper object
cloudwatch_wrapper = CloudWatchWrapper(cloudwatch_resource)

# Put a metric for an order
put_order_metric(cloudwatch_wrapper)

# Execute the function with a set of 10 orders
put_metrics_set(cloudwatch_wrapper, number_of_orders=20)

2024-07-30 21:39:47,909 - root - INFO - Order ID: ae4ecf6c
2024-07-30 21:39:48,158 - root - INFO - Put data for metric Orders.ProcessedOrders
2024-07-30 21:39:48,207 - root - INFO - Put data for metric Orders.ProcessingTime
2024-07-30 21:39:48,376 - root - INFO - Order ID: ee6d806d processed successfully.
2024-07-30 21:39:48,521 - root - INFO - Order ID: 8b64ba1e processed successfully.
2024-07-30 21:39:48,698 - root - INFO - Order ID: 9f284ba4 processed successfully.
2024-07-30 21:39:48,964 - root - INFO - Order ID: 7ba78b0a processed successfully.
2024-07-30 21:39:49,220 - root - INFO - Order ID: b66051bd processed successfully.
2024-07-30 21:39:49,621 - root - INFO - Order ID: ee9252ee processed successfully.
2024-07-30 21:39:49,792 - root - INFO - Order ID: bf01d35a processed successfully.
2024-07-30 21:39:50,282 - root - INFO - Order ID: ed36bd9d processed successfully.
2024-07-30 21:39:50,393 - root - INFO - Order ID: 0fad7504 processed successfully.
2024-07-30 21:39:50,671 - roo

## Using [`aws-embedded-metrics`](https://github.com/awslabs/aws-embedded-metrics-python/tree/master) Python SDK

- [`put_metric(key: str, value: float, unit: str = "None", storage_resolution: int = 60)`](https://github.com/awslabs/aws-embedded-metrics-python/tree/master?tab=readme-ov-file#metricslogger)
  - Adds a new metric to the current logger context. 
  - Multiple metrics using the same key will be appended to an array of values. Multiple metrics cannot have same key and different storage resolution. 
  - The Embedded Metric Format supports a maximum of 100 values per key.

- `set_dimensions(*dimensions: Dict[str, str], use_default: bool = False)`
  - Explicitly override all dimensions. By default, this will disable the default dimensions, but can be configured using the keyword-only parameter `use_default`.

- `set_namespace(value: str)`
  - Sets the CloudWatch namespace that extracted metrics should be published to.

In [2]:
import time
import random
import logging
import uuid
from aws_embedded_metrics import metric_scope
from aws_embedded_metrics.config import get_config
from aws_embedded_metrics import MetricsLogger
# from aws_embedded_metrics.storage_resolution import StorageResolution
import nest_asyncio

# Allow the current event loop to be re-entered
nest_asyncio.apply()

# Configure aws-embedded-metrics to send metrics directly to CloudWatch
config = get_config()
config.service_name = "OrderProcessingService"
config.log_group_name = "OrderProcessingLogGroup"
config.log_stream_name = "OrderProcessingLogStream"
# config.flush_to_log = True

# Set up logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
if not logger.handlers:
    handler = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    

def process_order() -> None:
    """Simulate processing an order."""
    # Simulate processing time
    time.sleep(random.uniform(0.1, 0.5))
    # Randomly raise an exception to simulate an error
    if random.random() < 0.1:
        raise Exception("Error processing order")
    

@metric_scope
def put_order_metric(metrics: MetricsLogger):
    """Put a metric for an order."""
    # Set the namespace & dimensions
    metrics.set_namespace("Orders")
    metrics.set_dimensions({"OrderType": "Standard"})
    
    start_time = time.time()
    order_id = str(uuid.uuid4())[:8]
    try:
        # Process the order
        process_order()
        # Calculate the processing time
        processing_time = time.time() - start_time

        # Log the order id
        logger.info(f"Order ID: {order_id} processed successfully.")
        
        # Put custom metrics to CloudWatch
        metrics.put_metric(key="ProcessedOrders", value=1, unit="Count")
        metrics.put_metric("ProcessingTime", processing_time, "Seconds")
        # metrics.set_property("OrderID", order_id)
    except Exception as e:
        logger.error(f"Error processing order ID {order_id}: {e}")
        metrics.put_metric("ProcessingErrors", 1, "Count")
        # metrics.set_property("OrderID", order_id)
        # raise e
    # finally:
    #     await metrics.flush()


# for _ in range(25):
put_order_metric()

2024-07-31 20:21:08,170 - root - INFO - Order ID: 460676e6 processed successfully.
2024-07-31 20:21:08,174 - aws_embedded_metrics.sinks.agent_sink - INFO - Parsed agent endpoint (tcp) 0.0.0.0:25888
2024-07-31 20:21:08,176 - aws_embedded_metrics.sinks.tcp_client - ERROR - Failed to connect to the socket. [Errno 111] Connection refused
2024-07-31 20:21:08,177 - aws_embedded_metrics.sinks.tcp_client - ERROR - Failed to write metrics to the socket due to socket.error. [Errno 32] Broken pipe
2024-07-31 20:21:08,179 - aws_embedded_metrics.sinks.tcp_client - ERROR - Failed to connect to the socket. [Errno 111] Connection refused
2024-07-31 20:21:08,180 - aws_embedded_metrics.sinks.tcp_client - ERROR - Failed to connect to the socket. [Errno 111] Connection refused
2024-07-31 20:21:08,181 - aws_embedded_metrics.sinks.tcp_client - ERROR - Failed to write metrics to the socket due to socket.error. [Errno 32] Broken pipe
2024-07-31 20:21:08,182 - aws_embedded_metrics.sinks.tcp_client - ERROR - Fa