# EMR EC2 Estimator

Welcome to the EMR EC2 estimator. Please follow the steps below to get your estimation files.

1.  Initializes configuration parameters such as the **AWS region**, **user email**, **company**, and the **data retrieval period**.

    **(Optional)** If you want to save your estimation files to S3 instead of downloading them, also set the **S3 configuration**.

In [None]:
region_name = "us-west-2"
email = ""
company = ""
runs_for_last_days = 30  # Default 30 days

# If you want to save the files in S3, set these values
s3_bucket_name = ""  # my-bucket
s3_folder_name = ""  # my_folder

# Imports

2. Imports necessary Python libraries for data manipulation, AWS interaction, etc.

In [None]:
import base64
import io
import logging
import os
import shutil
import uuid
import zipfile

from collections.abc import Iterator
from datetime import datetime, timedelta, timezone
from enum import Enum

import boto3
import pandas as pd

from botocore.config import Config as Boto3Config
from IPython.display import HTML, display

# Boto3 Configuration

3. Sets up a retry policy for boto3 with a maximum of 10 attempts to enhance the resilience of AWS API calls.

In [None]:
config = Boto3Config(retries={"max_attempts": 10, "mode": "standard"})

# Constants

4. Defines string constants for AWS data fields and establishes connections to AWS services.

In [None]:
# Execution details constant

NOTEBOOK_VERSION = "0.1.0"

In [None]:
# Common Keys
STATUS_KEY = "status"
STATE_KEY = "state"
STATE_CHANGE_REASON_KEY = "state_change_reason"
CODE_KEY = "code"
CREATION_DATE_TIME_KEY = "creation_date_time"
READY_DATE_TIME_KEY = "ready_date_time"
END_DATE_TIME_KEY = "end_date_time"
CONFIGURATIONS_KEY = "configurations"
BID_PRICE_KEY = "bid_price"
EBS_OPTIMIZED_KEY = "ebs_optimized"

# General Keys
CLUSTER_ID_KEY = "cluster_id"
INSTANCE_ID_KEY = "instance_id"
INSTANCE_TYPE_KEY = "instance_type"
DURATION_SECONDS_KEY = "seconds"
INSTANCE_GROUP_ID_KEY = "instance_group_id"
INSTANCE_FLEET_ID_KEY = "instance_fleet_id"


# Cluster Keys
NORMALIZED_INSTANCE_HOURS_KEY = "normalized_instance_hours"
AUTO_TERMINATE_KEY = "auto_terminate"
TERMINATION_PROTECTED_KEY = "termination_protected"
PROVISIONING_TIMEOUT_MINUTES_KEY = "provisioning_timeout_minutes"
RELEASE_LABEL_KEY = "release_label"
SCALE_DOWN_BEHAVIOR_KEY = "scale_down_behavior"
STEP_CONCURRENCY_LEVEL_KEY = "step_concurrency_level"
EBS_ROOT_VOLUME_SIZE_KEY = "ebs_root_volume_size"
OS_RELEASE_LABEL_KEY = "os_release_label"
EC2_AVAILABILITY_ZONE_KEY = "ec2_availability_zone"
APPLICATIONS_KEY = "applications"
MANAGED_SCALING_POLICY_KEY = "managed_scaling_policy"
BOOTSTRAP_ACTIONS_KEY = "bootstrap_actions"
REDUCTIONS_KEY = "reductions"
MANAGED_SCALING_JOINING_TIMEOUT_MINUTES_KEY = "managed_scaling_joining_timeout_minutes"
PLACEMENT_GROUPS_KEY = "placement_groups"


# Step Keys
STEP_ID_KEY = "step_id"
PROPERTIES_KEY = "properties"
ACTION_ON_FAILURE_KEY = "action_on_failure"
STATE_CHANGE_REASON_CODE_KEY = "state_change_reason_code"
FAILURE_REASON_KEY = "failure_reason"
START_DATE_TIME_KEY = "start_date_time"


# InstanceGroups Keys
MARKET_KEY = "market"
INSTANCE_GROUP_TYPE_KEY = "instance_group_type"
REQUESTED_INSTANCE_COUNT_KEY = "requested_instance_count"
RUNNING_INSTANCE_COUNT_KEY = "running_instance_count"
CONFIGURATIONS_VERSION_KEY = "configurations_version"
AUTO_SCALING_POLICY_KEY = "auto_scaling_policy"
EBS_BLOCK_DEVICES_KEY = "ebs_block_devices"
VOLUME_TYPE_KEY = "volume_type"
SIZE_IN_GB_KEY = "size_in_gb"
IOPS_KEY = "iops"
THROUGHPUT_KEY = "throughput"
DEVICE_KEY = "device"


# InstanceFleets Keys
INSTANCE_FLEET_TYPE_KEY = "instance_fleet_type"
TARGET_ON_DEMAND_CAPACITY_KEY = "target_on_demand_capacity"
TARGET_SPOT_CAPACITY_KEY = "target_spot_capacity"
PROVISIONED_ON_DEMAND_CAPACITY_KEY = "provisioned_on_demand_capacity"
PROVISIONED_SPOT_CAPACITY_KEY = "provisioned_spot_capacity"
INSTANCE_TYPE_SPECIFICATIONS_KEY = "instance_type_specifications"
WEIGHTED_CAPACITY_KEY = "weighted_capacity"
BID_PRICE_AS_PERCENTAGE_OF_ON_DEMAND_PRICE_KEY = "bid_price_as_percentage_of_on_demand_price"
LAUNCH_SPECIFICATIONS_KEY = "launch_specifications"
SPOT_SPECIFICATION_KEY = "spot_specification"
TIMEOUT_DURATION_MINUTES_KEY = "timeout_duration_minutes"
TIMEOUT_ACTION_KEY = "timeout_action"
BLOCK_DURATION_MINUTES_KEY = "block_duration_minutes"
ALLOCATION_STRATEGY_KEY = "allocation_strategy"
ON_DEMAND_SPECIFICATION_KEY = "on_demand_specification"
SPOT_RESIZE_SPECIFICATION_KEY = "spot_resize_specification"
MIN_TARGET_CAPACITY_KEY = "min_target_capacity"
MAX_TARGET_CAPACITY_KEY = "max_target_capacity"
ON_DEMAND_RESIZE_SPECIFICATION_KEY = "on_demand_resize_specification"

In [None]:
# Common Literals
ID_LITERAL = "Id"
STATUS_LITERAL = "Status"
STATE_LITERAL = "State"
STATE_CHANGE_REASON_LITERAL = "StateChangeReason"
CODE_LITERAL = "Code"
TIMELINE_LITERAL = "Timeline"
CREATION_DATE_TIME_LITERAL = "CreationDateTime"
READY_DATE_TIME_LITERAL = "ReadyDateTime"
END_DATE_TIME_LITERAL = "EndDateTime"
CONFIGURATIONS_LITERAL = "Configurations"
INSTANCE_TYPE_LITERAL = "InstanceType"
MARKET_LITERAL = "Market"
BID_PRICE_LITERAL = "BidPrice"
EBS_OPTIMIZED_LITERAL = "EbsOptimized"
NA_LITERAL = "N/A"

# Cluster Literals
NORMALIZED_INSTANCE_HOURS_LITERAL = "NormalizedInstanceHours"
AUTO_TERMINATE_LITERAL = "AutoTerminate"
TERMINATION_PROTECTED_LITERAL = "TerminationProtected"
PROVISIONING_TIMEOUT_MINUTES_LITERAL = "ProvisioningTimeoutMinutes"
RELEASE_LABEL_LITERAL = "ReleaseLabel"
SCALE_DOWN_BEHAVIOR_LITERAL = "ScaleDownBehavior"
STEP_CONCURRENCY_LEVEL_LITERAL = "StepConcurrencyLevel"
EBS_ROOT_VOLUME_SIZE_LITERAL = "EbsRootVolumeSize"
OS_RELEASE_LABEL_LITERAL = "OSReleaseLabel"
EC2_INSTANCE_ATTRIBUTES_LITERAL = "Ec2InstanceAttributes"
EC2_AVAILABILITY_ZONE_LITERAL = "Ec2AvailabilityZone"
APPLICATIONS_LITERAL = "Applications"
MANAGED_SCALING_POLICY_LITERAL = "ManagedScalingPolicy"
BOOTSTRAP_ACTIONS_LITERAL = "BootstrapActions"
REDUCTIONS_LITERAL = "Reductions"
MANAGED_SCALING_JOINING_TIMEOUT_MINUTES_LITERAL = "ManagedScalingJoiningTimeoutMinutes"
PLACEMENT_GROUPS_LITERAL = "PlacementGroups"

# Step Literals
CONFIG_LITERAL = "Config"
PROPERTIES_LITERAL = "Properties"
ACTION_ON_FAILURE_LITERAL = "ActionOnFailure"
FAILURE_DETAILS_LITERAL = "FailureDetails"
REASON_LITERAL = "Reason"
START_DATE_TIME_LITERAL = "StartDateTime"

# Instance Literals
INSTANCE_GROUP_ID = "InstanceGroupId"
INSTANCE_FLEET_ID = "InstanceFleetId"
INSTANCES_EC2_ID = "Ec2InstanceId"

# InstanceGroups Literals
INSTANCE_GROUP_TYPE_LITERAL = "InstanceGroupType"
REQUESTED_INSTANCE_COUNT_LITERAL = "RequestedInstanceCount"
RUNNING_INSTANCE_COUNT_LITERAL = "RunningInstanceCount"
CONFIGURATIONS_VERSION_LITERAL = "ConfigurationsVersion"
AUTO_SCALING_POLICY_LITERAL = "AutoScalingPolicy"
EBS_BLOCK_DEVICES_LITERAL = "EbsBlockDevices"
VOLUME_SPECIFICATION_LITERAL = "VolumeSpecification"
VOLUME_TYPE_LITERAL = "VolumeType"
SIZE_IN_GB_LITERAL = "SizeInGB"
IOPS_LITERAL = "Iops"
THROUGHPUT_LITERAL = "Throughput"
DEVICE_LITERAL = "Device"

# InstanceFleets Literals
INSTANCE_FLEET_TYPE_LITERAL = "InstanceFleetType"
TARGET_ON_DEMAND_CAPACITY_LITERAL = "TargetOnDemandCapacity"
TARGET_SPOT_CAPACITY_LITERAL = "TargetSpotCapacity"
PROVISIONED_ON_DEMAND_CAPACITY_LITERAL = "ProvisionedOnDemandCapacity"
PROVISIONED_SPOT_CAPACITY_LITERAL = "ProvisionedSpotCapacity"
INSTANCE_TYPE_SPECIFICATIONS_LITERAL = "InstanceTypeSpecifications"
WEIGHTED_CAPACITY_LITERAL = "WeightedCapacity"
BID_PRICE_AS_PERCENTAGE_OF_ON_DEMAND_PRICE_LITERAL = "BidPriceAsPercentageOfOnDemandPrice"
LAUNCH_SPECIFICATIONS_LITERAL = "LaunchSpecifications"
SPOT_SPECIFICATION_LITERAL = "SpotSpecification"
TIMEOUT_DURATION_MINUTES_LITERAL = "TimeoutDurationMinutes"
TIMEOUT_ACTION_LITERAL = "TimeoutAction"
BLOCK_DURATION_MINUTES_LITERAL = "BlockDurationMinutes"
ALLOCATION_STRATEGY_LITERAL = "AllocationStrategy"
ON_DEMAND_SPECIFICATION_LITERAL = "OnDemandSpecification"
RESIZE_SPECIFICATIONS_LITERAL = "ResizeSpecifications"
SPOT_RESIZE_SPECIFICATION_LITERAL = "SpotResizeSpecification"
MIN_TARGET_CAPACITY_LITERAL = "MinTargetCapacity"
MAX_TARGET_CAPACITY_LITERAL = "MaxTargetCapacity"
ON_DEMAND_RESIZE_SPECIFICATION_LITERAL = "OnDemandResizeSpecification"

# Logging Setup
5. This section configures the "emr_ec2_estimator" logger to display informational messages, warnings, and errors directly in your notebook's output, aiding in monitoring the EMR EC2 estimation process.

In [None]:
MIN_LOG_LEVEL = logging.INFO

logger = logging.getLogger("emr_ec2_estimator")
logger.setLevel(MIN_LOG_LEVEL)

if not logger.handlers:
    formatter = logging.Formatter(
        "{asctime} - {name} - {levelname} - {message}",
        style="{",
        datefmt="%Y-%m-%d %H:%M:%S",
    )

    stream_handler = logging.StreamHandler()
    stream_handler.setLevel(MIN_LOG_LEVEL)
    stream_handler.setFormatter(formatter)
    logger.addHandler(stream_handler)

## EMR EC2 Estimator Manager

6. Defines the **EMR Estimator Manager** classes to handle estimation logic and data:

* **EMRSEstimatorManager**: The base class with the core, shared logic.
* **EMRSEstimatorManagerDefault**: Manages data in memory and is designed for standard workloads. This data is returned as an HTML download.
* **EMRSEstimatorManagerS3**: Manages data by writing it directly to CSV files and uploading them to an S3 bucket. This approach is specifically for large workloads that could exceed the memory limits of a standard HTML download.

In [None]:
class EMRSEstimatorManager:
    """Manage EMR EC2 clusters, steps, instances, and cost estimations.

    Attributes:
        region_name (str): AWS region name.
        email (str): User email for identification.
        company (str): Company name for identification.
        execution_id (str): Unique identifier for the execution.

    """

    def __init__(
        self,
        region_name: str,
        email: str,
        company: str,
    ):
        """Initialize the EMRSEstimatorManager with region, email, and company.

        Args:
            region_name (str): AWS region name.
            email (str): User email.
            company (str): Company name.

        """
        self.region_name = region_name
        self.email = email
        self.company = company
        self.execution_id = str(uuid.uuid4())
        self.output_config = {
            "title": "Download Analysis Data",
            "execution_detail_name_file": "execution_detail.csv",
            "clusters_name_file": "clusters_list.csv",
            "steps_name_file": "steps_list.csv",
            "instances_name_file": "instances_list.csv",
            "instance_groups_name_file": "instance_groups_list.csv",
            "instance_fleets_name_file": "instance_fleets_list.csv",
        }

    def add_cluster(self, cluster: dict):
        """Process and add a cluster.

        Args:
            cluster (dict): Raw cluster information.

        """
        try:
            processed_cluster = self.get_processed_cluster(cluster)
            if processed_cluster:
                self.loader_add_cluster(processed_cluster)
        except Exception as e:
            logger.exception("Error adding cluster: %s", e)

    def add_step(self, step: dict, cluster_id: str):
        """Process and add a step to a cluster.

        Args:
            step (dict): Raw step information.
            cluster_id (str): The ID of the cluster to which the step belongs.

        """
        try:
            processed_step = self.get_processed_step(step, cluster_id)
            if processed_step:
                self.loader_add_step(processed_step)
        except Exception as e:
            logger.exception("Error adding step: %s", e)

    def add_instance(self, instance: dict, cluster_id: str):
        """Process and add an instance to a cluster.

        Args:
            instance (dict): Raw instance information.
            cluster_id (str): The ID of the cluster to which the instance belongs.

        """
        try:
            processed_instance = self.get_processed_instance(instance, cluster_id)
            if processed_instance:
                self.loader_add_instance(processed_instance)
        except Exception as e:
            logger.exception("Error adding instance: %s", e)

    def add_instance_group(self, instance_group: dict, cluster_id: str):
        """Process and add an instance group to a cluster.

        Args:
            instance_group (dict): Raw instance group information.
            cluster_id (str): The ID of the cluster to which the instance group belongs.

        """
        try:
            processed_instance_group = self.get_processed_instance_group(
                instance_group,
                cluster_id,
            )
            if processed_instance_group:
                self.loader_add_instance_group(processed_instance_group)
        except Exception as e:
            logger.exception("Error adding instance group: %s", e)

    def add_instance_fleet(self, instance_fleet: dict, cluster_id: str):
        """Process and add an instance fleet to a cluster.

        Args:
            instance_fleet (dict): Raw instance fleet information.
            cluster_id (str): The ID of the cluster to which the instance fleet belongs.

        """
        try:
            processed_instance_fleet = self.get_processed_instance_fleet(
                instance_fleet,
                cluster_id,
            )
            if processed_instance_fleet:
                self.loader_add_instance_fleet(processed_instance_fleet)
        except Exception as e:
            logger.exception("Error adding instance fleet: %s", e)

    def loader_add_cluster(self, processed_cluster: dict):
        """Implement the logic to handle a processed cluster."""
        raise NotImplementedError("Subclasses must implement loader_add_cluster")

    def loader_add_step(self, processed_step: dict):
        """Implement the logic to handle a processed step."""
        raise NotImplementedError("Subclasses must implement loader_add_step")

    def loader_add_instance(self, processed_instance: dict):
        """Implement the logic to handle a processed instance."""
        raise NotImplementedError("Subclasses must implement loader_add_instance")

    def loader_add_instance_group(self, processed_instance_group: dict):
        """Implement the logic to handle a processed instance group."""
        raise NotImplementedError(
            "Subclasses must implement loader_add_instance_group",
        )

    def loader_add_instance_fleet(self, processed_instance_fleet: dict):
        """Implement the logic to handle a processed instance fleet."""
        raise NotImplementedError(
            "Subclasses must implement loader_add_instance_fleet",
        )

    def get_processed_cluster(self, cluster: dict) -> dict:
        """Process and return a formatted cluster dictionary.

        This method processes raw cluster information retrieved from AWS EMR
        and returns a dictionary containing the cluster's details in a structured format.

        Args:
            cluster (dict): Raw cluster information retrieved from AWS EMR.

        Returns:
            dict: A dictionary containing processed cluster details.

        """
        try:
            timeline_data = self.extract_timeline_data(cluster)
            ready_time = (
                cluster.get(STATUS_LITERAL, {}).get(TIMELINE_LITERAL, {}).get(READY_DATE_TIME_LITERAL, NA_LITERAL)
            )

            new_cluster = {
                CLUSTER_ID_KEY: cluster.get(ID_LITERAL, NA_LITERAL),
                STATUS_KEY: cluster.get(STATUS_LITERAL, {}).get(STATE_LITERAL, NA_LITERAL),
                STATE_CHANGE_REASON_KEY: cluster.get(STATUS_LITERAL, {})
                .get(STATE_CHANGE_REASON_LITERAL, {})
                .get(CODE_LITERAL, NA_LITERAL),
                CREATION_DATE_TIME_KEY: timeline_data.get(
                    CREATION_DATE_TIME_KEY,
                    NA_LITERAL,
                ),
                READY_DATE_TIME_KEY: str(ready_time),
                END_DATE_TIME_KEY: timeline_data.get(END_DATE_TIME_KEY, NA_LITERAL),
                DURATION_SECONDS_KEY: timeline_data.get(DURATION_SECONDS_KEY, NA_LITERAL),
                NORMALIZED_INSTANCE_HOURS_KEY: cluster.get(
                    NORMALIZED_INSTANCE_HOURS_LITERAL,
                    0,
                ),
                AUTO_TERMINATE_KEY: cluster.get(AUTO_TERMINATE_LITERAL, False),
                TERMINATION_PROTECTED_KEY: cluster.get(
                    TERMINATION_PROTECTED_LITERAL,
                    False,
                ),
                PROVISIONING_TIMEOUT_MINUTES_KEY: cluster.get(
                    PROVISIONING_TIMEOUT_MINUTES_LITERAL,
                    0,
                ),
                RELEASE_LABEL_KEY: cluster.get(RELEASE_LABEL_LITERAL, NA_LITERAL),
                SCALE_DOWN_BEHAVIOR_KEY: cluster.get(
                    SCALE_DOWN_BEHAVIOR_LITERAL,
                    NA_LITERAL,
                ),
                STEP_CONCURRENCY_LEVEL_KEY: cluster.get(
                    STEP_CONCURRENCY_LEVEL_LITERAL,
                    1,
                ),
                EBS_ROOT_VOLUME_SIZE_KEY: cluster.get(EBS_ROOT_VOLUME_SIZE_LITERAL, 0),
                OS_RELEASE_LABEL_KEY: cluster.get(OS_RELEASE_LABEL_LITERAL, NA_LITERAL),
                EC2_AVAILABILITY_ZONE_KEY: cluster.get(
                    EC2_INSTANCE_ATTRIBUTES_LITERAL,
                    {},
                ).get(EC2_AVAILABILITY_ZONE_LITERAL, NA_LITERAL),
                APPLICATIONS_KEY: cluster.get(APPLICATIONS_LITERAL, NA_LITERAL),
                MANAGED_SCALING_POLICY_KEY: cluster.get(MANAGED_SCALING_POLICY_LITERAL),
                CONFIGURATIONS_KEY: cluster.get(CONFIGURATIONS_LITERAL, []),
                BOOTSTRAP_ACTIONS_KEY: cluster.get(BOOTSTRAP_ACTIONS_LITERAL, []),
                REDUCTIONS_KEY: cluster.get(REDUCTIONS_LITERAL, []),
                MANAGED_SCALING_JOINING_TIMEOUT_MINUTES_KEY: cluster.get(
                    MANAGED_SCALING_JOINING_TIMEOUT_MINUTES_LITERAL,
                    NA_LITERAL,
                ),
                PLACEMENT_GROUPS_KEY: cluster.get(PLACEMENT_GROUPS_LITERAL, []),
            }
            return new_cluster
        except Exception as e:
            logger.exception("Error adding cluster: %s", e)
            return None

    def get_processed_step(self, step: dict, cluster_id: str) -> dict:
        """Process and return a formatted step dictionary.

        This method processes raw step information retrieved from AWS EMR
        and returns a dictionary containing the step's details in a structured format.

        Args:
            step (dict): Raw step information retrieved from AWS EMR.
            cluster_id (str): The ID of the cluster to which the step belongs.

        Returns:
            dict: A dictionary containing processed step details.

        """
        try:
            timeline_data = self.extract_timeline_data(step)
            start_time = step.get(STATUS_LITERAL, {}).get(TIMELINE_LITERAL, {}).get(START_DATE_TIME_LITERAL, NA_LITERAL)

            new_step = {
                STEP_ID_KEY: step.get(ID_LITERAL, NA_LITERAL),
                CLUSTER_ID_KEY: cluster_id,
                PROPERTIES_KEY: step.get(CONFIG_LITERAL, {}).get(
                    PROPERTIES_LITERAL,
                    {},
                ),
                ACTION_ON_FAILURE_KEY: step.get(ACTION_ON_FAILURE_LITERAL, NA_LITERAL),
                STATE_KEY: step.get(STATUS_LITERAL, {}).get(
                    STATE_LITERAL,
                    NA_LITERAL,
                ),
                STATE_CHANGE_REASON_CODE_KEY: step.get(STATUS_LITERAL, {})
                .get(STATE_CHANGE_REASON_LITERAL, {})
                .get(CODE_LITERAL, NA_LITERAL),
                FAILURE_REASON_KEY: step.get(STATUS_LITERAL, {})
                .get(FAILURE_DETAILS_LITERAL, {})
                .get(REASON_LITERAL, NA_LITERAL),
                CREATION_DATE_TIME_KEY: timeline_data.get(
                    CREATION_DATE_TIME_KEY,
                    NA_LITERAL,
                ),
                START_DATE_TIME_KEY: str(start_time),
                END_DATE_TIME_KEY: timeline_data.get(END_DATE_TIME_KEY, NA_LITERAL),
                DURATION_SECONDS_KEY: timeline_data.get(DURATION_SECONDS_KEY, NA_LITERAL),
            }
            return new_step
        except Exception as e:
            logger.exception("Error adding step: %s", e)
            return None

    def get_processed_instance(self, instance: dict, cluster_id: str) -> dict:
        """Process and return a formatted instance dictionary.

        This method processes raw instance information retrieved from AWS EMR
        and returns a dictionary containing the instance's details in a structured format.

        Args:
            instance (dict): Raw instance information retrieved from AWS EMR.
            cluster_id (str): The ID of the cluster to which the instance belongs.

        Returns:
            dict: A dictionary containing processed instance details.

        """
        try:
            timeline_data = self.extract_timeline_data(instance)
            ready_time = (
                instance.get(STATUS_LITERAL, {}).get(TIMELINE_LITERAL, {}).get(READY_DATE_TIME_LITERAL, NA_LITERAL)
            )

            new_instance = {
                INSTANCE_ID_KEY: instance.get(INSTANCES_EC2_ID, NA_LITERAL),
                CLUSTER_ID_KEY: cluster_id,
                MARKET_KEY: instance.get(MARKET_LITERAL, NA_LITERAL),
                INSTANCE_GROUP_TYPE_KEY: instance.get(INSTANCE_GROUP_ID, NA_LITERAL),
                INSTANCE_FLEET_TYPE_KEY: instance.get(INSTANCE_FLEET_ID, NA_LITERAL),
                INSTANCE_TYPE_KEY: instance.get(INSTANCE_TYPE_LITERAL, NA_LITERAL),
                STATE_KEY: instance.get(STATUS_LITERAL, {}).get(STATE_LITERAL, NA_LITERAL),
                CODE_KEY: instance.get(STATUS_LITERAL, {})
                .get(STATE_CHANGE_REASON_LITERAL, {})
                .get(CODE_LITERAL, NA_LITERAL),
                CREATION_DATE_TIME_KEY: timeline_data.get(
                    CREATION_DATE_TIME_KEY,
                    NA_LITERAL,
                ),
                READY_DATE_TIME_KEY: str(ready_time),
                END_DATE_TIME_KEY: timeline_data.get(END_DATE_TIME_KEY, NA_LITERAL),
                DURATION_SECONDS_KEY: timeline_data.get(DURATION_SECONDS_KEY, NA_LITERAL),
            }
            return new_instance
        except Exception as e:
            logger.exception("Error adding instance: %s", e)
            return None

    def get_processed_instance_group(self, instance_group: dict, cluster_id: str) -> dict:
        """Process and return a formatted instance group dictionary.

        This method processes raw instance group information retrieved from AWS EMR
        and returns a dictionary containing the instance group's details in a structured format.

        Args:
            instance_group (dict): Raw instance group information retrieved from AWS EMR.
            cluster_id (str): The ID of the cluster to which the instance group belongs.

        Returns:
            dict: A dictionary containing processed instance group details.

        """
        try:
            timeline_data = self.extract_timeline_data(instance_group)
            ready_time = (
                instance_group.get(STATUS_LITERAL, {})
                .get(TIMELINE_LITERAL, {})
                .get(READY_DATE_TIME_LITERAL, NA_LITERAL)
            )

            new_instance_group = {
                INSTANCE_GROUP_ID_KEY: instance_group.get(ID_LITERAL, NA_LITERAL),
                CLUSTER_ID_KEY: cluster_id,
                MARKET_KEY: instance_group.get(MARKET_LITERAL, NA_LITERAL),
                INSTANCE_GROUP_TYPE_KEY: instance_group.get(
                    INSTANCE_GROUP_TYPE_LITERAL,
                    NA_LITERAL,
                ),
                INSTANCE_TYPE_KEY: instance_group.get(INSTANCE_TYPE_LITERAL, NA_LITERAL),
                BID_PRICE_KEY: instance_group.get(BID_PRICE_LITERAL, NA_LITERAL),
                EBS_OPTIMIZED_KEY: instance_group.get(EBS_OPTIMIZED_LITERAL, False),
                REQUESTED_INSTANCE_COUNT_KEY: instance_group.get(
                    REQUESTED_INSTANCE_COUNT_LITERAL,
                    0,
                ),
                RUNNING_INSTANCE_COUNT_KEY: instance_group.get(
                    RUNNING_INSTANCE_COUNT_LITERAL,
                    0,
                ),
                STATE_KEY: instance_group.get(STATUS_LITERAL, {}).get(
                    STATE_LITERAL,
                    NA_LITERAL,
                ),
                CODE_KEY: instance_group.get(STATUS_LITERAL, {})
                .get(STATE_CHANGE_REASON_LITERAL, {})
                .get(CODE_LITERAL, NA_LITERAL),
                CREATION_DATE_TIME_KEY: timeline_data.get(
                    CREATION_DATE_TIME_KEY,
                    NA_LITERAL,
                ),
                READY_DATE_TIME_KEY: str(ready_time),
                END_DATE_TIME_KEY: timeline_data.get(END_DATE_TIME_KEY, NA_LITERAL),
                DURATION_SECONDS_KEY: timeline_data.get(DURATION_SECONDS_KEY, NA_LITERAL),
                CONFIGURATIONS_KEY: instance_group.get(CONFIGURATIONS_LITERAL, []),
                CONFIGURATIONS_VERSION_KEY: instance_group.get(
                    CONFIGURATIONS_VERSION_LITERAL,
                    0,
                ),
                AUTO_SCALING_POLICY_KEY: instance_group.get(
                    AUTO_SCALING_POLICY_LITERAL,
                    NA_LITERAL,
                ),
                EBS_BLOCK_DEVICES_KEY: [
                    {
                        VOLUME_TYPE_KEY: device.get(
                            VOLUME_SPECIFICATION_LITERAL,
                            {},
                        ).get(VOLUME_TYPE_LITERAL, NA_LITERAL),
                        SIZE_IN_GB_KEY: device.get(
                            VOLUME_SPECIFICATION_LITERAL,
                            {},
                        ).get(SIZE_IN_GB_LITERAL, 0),
                        IOPS_KEY: device.get(VOLUME_SPECIFICATION_LITERAL, {}).get(
                            IOPS_LITERAL,
                            NA_LITERAL,
                        ),
                        THROUGHPUT_KEY: device.get(
                            VOLUME_SPECIFICATION_LITERAL,
                            {},
                        ).get(THROUGHPUT_LITERAL, NA_LITERAL),
                        DEVICE_KEY: device.get(DEVICE_LITERAL, NA_LITERAL),
                    }
                    for device in instance_group.get(EBS_BLOCK_DEVICES_LITERAL, [])
                ],
            }
            return new_instance_group
        except Exception as e:
            logger.exception("Error adding instance group: %s", e)
            return None

    def get_processed_instance_fleet(self, instance_fleet: dict, cluster_id: str) -> dict:
        """Process and return a formatted instance fleet dictionary.

        This method processes raw instance fleet information retrieved from AWS EMR
        and returns a dictionary containing the instance fleet's details in a structured format.

        Args:
            instance_fleet (dict): Raw instance fleet information retrieved from AWS EMR.
            cluster_id (str): The ID of the cluster to which the instance fleet belongs.

        Returns:
            dict: A dictionary containing processed instance fleet details.

        """
        try:
            new_instance_fleet = {
                INSTANCE_FLEET_ID_KEY: instance_fleet.get(ID_LITERAL, NA_LITERAL),
                CLUSTER_ID_KEY: cluster_id,
                INSTANCE_FLEET_TYPE_KEY: instance_fleet.get(
                    INSTANCE_FLEET_TYPE_LITERAL,
                    NA_LITERAL,
                ),
                STATE_KEY: instance_fleet.get(STATUS_LITERAL, {}).get(
                    STATE_LITERAL,
                    NA_LITERAL,
                ),
                CODE_KEY: instance_fleet.get(STATUS_LITERAL, {})
                .get(STATE_CHANGE_REASON_LITERAL, {})
                .get(CODE_LITERAL, NA_LITERAL),
                CREATION_DATE_TIME_KEY: str(
                    instance_fleet.get(STATUS_LITERAL, {})
                    .get(TIMELINE_LITERAL, {})
                    .get(CREATION_DATE_TIME_LITERAL, NA_LITERAL),
                ),
                READY_DATE_TIME_KEY: str(
                    instance_fleet.get(STATUS_LITERAL, {})
                    .get(TIMELINE_LITERAL, {})
                    .get(READY_DATE_TIME_LITERAL, NA_LITERAL),
                ),
                END_DATE_TIME_KEY: str(
                    instance_fleet.get(STATUS_LITERAL, {})
                    .get(TIMELINE_LITERAL, {})
                    .get(END_DATE_TIME_LITERAL, NA_LITERAL),
                ),
                TARGET_ON_DEMAND_CAPACITY_KEY: instance_fleet.get(
                    TARGET_ON_DEMAND_CAPACITY_LITERAL,
                    0,
                ),
                TARGET_SPOT_CAPACITY_KEY: instance_fleet.get(
                    TARGET_SPOT_CAPACITY_LITERAL,
                    0,
                ),
                PROVISIONED_ON_DEMAND_CAPACITY_KEY: instance_fleet.get(
                    PROVISIONED_ON_DEMAND_CAPACITY_LITERAL,
                    0,
                ),
                PROVISIONED_SPOT_CAPACITY_KEY: instance_fleet.get(
                    PROVISIONED_SPOT_CAPACITY_LITERAL,
                    0,
                ),
                INSTANCE_TYPE_SPECIFICATIONS_KEY: [
                    {
                        INSTANCE_TYPE_KEY: spec.get(INSTANCE_TYPE_LITERAL, NA_LITERAL),
                        WEIGHTED_CAPACITY_KEY: spec.get(WEIGHTED_CAPACITY_LITERAL, 0),
                        BID_PRICE_KEY: spec.get(BID_PRICE_LITERAL, NA_LITERAL),
                        BID_PRICE_AS_PERCENTAGE_OF_ON_DEMAND_PRICE_KEY: spec.get(
                            BID_PRICE_AS_PERCENTAGE_OF_ON_DEMAND_PRICE_LITERAL,
                            0.0,
                        ),
                        EBS_OPTIMIZED_KEY: spec.get(EBS_OPTIMIZED_LITERAL, False),
                        CONFIGURATIONS_KEY: spec.get(CONFIGURATIONS_LITERAL, []),
                        SPOT_SPECIFICATION_KEY + "_" + TIMEOUT_DURATION_MINUTES_KEY: spec.get(
                            LAUNCH_SPECIFICATIONS_LITERAL,
                            {},
                        )
                        .get(SPOT_SPECIFICATION_LITERAL, {})
                        .get(TIMEOUT_DURATION_MINUTES_LITERAL, 0),
                        SPOT_SPECIFICATION_KEY + "_" + TIMEOUT_ACTION_KEY: spec.get(
                            LAUNCH_SPECIFICATIONS_LITERAL,
                            {},
                        )
                        .get(SPOT_SPECIFICATION_LITERAL, {})
                        .get(TIMEOUT_ACTION_LITERAL, NA_LITERAL),
                        SPOT_SPECIFICATION_KEY + "_" + BLOCK_DURATION_MINUTES_KEY: spec.get(
                            LAUNCH_SPECIFICATIONS_LITERAL,
                            {},
                        )
                        .get(SPOT_SPECIFICATION_LITERAL, {})
                        .get(BLOCK_DURATION_MINUTES_LITERAL, NA_LITERAL),
                        SPOT_SPECIFICATION_KEY + "_" + ALLOCATION_STRATEGY_KEY: spec.get(
                            LAUNCH_SPECIFICATIONS_LITERAL,
                            {},
                        )
                        .get(SPOT_SPECIFICATION_LITERAL, {})
                        .get(ALLOCATION_STRATEGY_LITERAL, NA_LITERAL),
                        ON_DEMAND_SPECIFICATION_KEY + "_" + ALLOCATION_STRATEGY_KEY: spec.get(
                            LAUNCH_SPECIFICATIONS_LITERAL,
                            {},
                        )
                        .get(ON_DEMAND_SPECIFICATION_LITERAL, {})
                        .get(ALLOCATION_STRATEGY_LITERAL, NA_LITERAL),
                    }
                    for spec in instance_fleet.get(
                        INSTANCE_TYPE_SPECIFICATIONS_LITERAL,
                        [],
                    )
                ],
                SPOT_RESIZE_SPECIFICATION_KEY + "_" + TIMEOUT_DURATION_MINUTES_KEY: instance_fleet.get(
                    RESIZE_SPECIFICATIONS_LITERAL,
                    {},
                )
                .get(SPOT_RESIZE_SPECIFICATION_LITERAL, {})
                .get(TIMEOUT_DURATION_MINUTES_LITERAL, 0),
                SPOT_RESIZE_SPECIFICATION_KEY + "_" + MIN_TARGET_CAPACITY_KEY: instance_fleet.get(
                    RESIZE_SPECIFICATIONS_LITERAL,
                    {},
                )
                .get(SPOT_RESIZE_SPECIFICATION_LITERAL, {})
                .get(MIN_TARGET_CAPACITY_LITERAL, 0),
                SPOT_RESIZE_SPECIFICATION_KEY + "_" + MAX_TARGET_CAPACITY_KEY: instance_fleet.get(
                    RESIZE_SPECIFICATIONS_LITERAL,
                    {},
                )
                .get(SPOT_RESIZE_SPECIFICATION_LITERAL, {})
                .get(MAX_TARGET_CAPACITY_LITERAL, 0),
                ON_DEMAND_RESIZE_SPECIFICATION_KEY + "_" + TIMEOUT_DURATION_MINUTES_KEY: instance_fleet.get(
                    RESIZE_SPECIFICATIONS_LITERAL,
                    {},
                )
                .get(ON_DEMAND_RESIZE_SPECIFICATION_LITERAL, {})
                .get(TIMEOUT_DURATION_MINUTES_LITERAL, 0),
                ON_DEMAND_RESIZE_SPECIFICATION_KEY + "_" + MIN_TARGET_CAPACITY_KEY: instance_fleet.get(
                    RESIZE_SPECIFICATIONS_LITERAL,
                    {},
                )
                .get(ON_DEMAND_RESIZE_SPECIFICATION_LITERAL, {})
                .get(MIN_TARGET_CAPACITY_LITERAL, 0),
                ON_DEMAND_RESIZE_SPECIFICATION_KEY + "_" + MAX_TARGET_CAPACITY_KEY: instance_fleet.get(
                    RESIZE_SPECIFICATIONS_LITERAL,
                    {},
                )
                .get(ON_DEMAND_RESIZE_SPECIFICATION_LITERAL, {})
                .get(MAX_TARGET_CAPACITY_LITERAL, 0),
            }
            return new_instance_fleet
        except Exception as e:
            logger.exception("Error adding instance fleet: %s", e)
            return None

    def get_execution_info(self) -> dict:
        """Retrieve execution metadata.

        Returns:
            dict: Dictionary containing execution metadata.

        """
        return {
            "execution_id": self.execution_id,
            "email": self.email,
            "company": self.company,
            "region": self.region_name,
            "version": NOTEBOOK_VERSION,
            "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
            "runs_for_last_days": runs_for_last_days,
        }

    @staticmethod
    def extract_timeline_data(data: dict) -> dict:
        """Extract timeline data from a given dictionary.

        Args:
            data (dict): Dictionary containing timeline information.

        Returns:
            dict: Extracted timeline data including creation, end times, and duration.

        """
        try:
            creation_time = data.get(STATUS_LITERAL, {}).get(TIMELINE_LITERAL, {}).get(CREATION_DATE_TIME_LITERAL, None)
            end_time = data.get(STATUS_LITERAL, {}).get(TIMELINE_LITERAL, {}).get(END_DATE_TIME_LITERAL, None)
            duration = 0
            if isinstance(creation_time, datetime) and isinstance(end_time, datetime):
                duration = (end_time - creation_time).total_seconds()
            return {
                CREATION_DATE_TIME_KEY: str(creation_time or NA_LITERAL),
                END_DATE_TIME_KEY: str(end_time or NA_LITERAL),
                DURATION_SECONDS_KEY: duration,
            }
        except Exception as e:
            logger.exception("Error extracting timeline data: %s", e)
            return {}


class EMRSEstimatorManagerDefault(EMRSEstimatorManager):
    """Manage EMR EC2 clusters, steps, instances, and cost estimations.

    Attributes:
        region_name (str): AWS region name.
        email (str): User email for identification.
        company (str): Company name for identification.
        execution_id (str): Unique identifier for the execution.
        clusters (list): List of clusters information.
        steps (list): List of steps information.
        instances (list): List of instances information.
        instance_groups (list): List of instance groups information.
        instance_fleets (list): List of instance fleets information.

    """

    def __init__(
        self,
        region_name: str,
        email: str,
        company: str,
    ):
        """Initialize the EMRSEstimatorManagerEC2 with region, email, and company.

        Args:
            region_name (str): AWS region name.
            email (str): User email.
            company (str): Company name.

        """
        super().__init__(region_name, email, company)
        self.clusters = []
        self.steps = []
        self.instance_groups = []
        self.instance_fleets = []
        self.instances = []

    def loader_add_cluster(self, processed_cluster: dict):
        """Add a processed cluster to the clusters list."""
        self.clusters.append(processed_cluster)

    def loader_add_step(self, processed_step: dict):
        """Add a processed step to the steps list."""
        self.steps.append(processed_step)

    def loader_add_instance(self, processed_instance: dict):
        """Add a processed instance to the instances list."""
        self.instances.append(processed_instance)

    def loader_add_instance_group(self, processed_instance_group: dict):
        """Add a processed instance group to the instance groups list."""
        self.instance_groups.append(processed_instance_group)

    def loader_add_instance_fleet(self, processed_instance_fleet: dict):
        """Add a processed instance fleet to the instance fleets list."""
        self.instance_fleets.append(processed_instance_fleet)

    def show_output(self, output_file_name: str = None):
        """Generate and display a downloadable ZIP file containing analysis data.

        Args:
            output_file_name (str, optional): Name of the output ZIP file. Defaults to None.

        """
        try:
            if output_file_name is None:
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                output_file_name = f"emr_ec2_we_output_{timestamp}.zip"

            clusters_df = pd.DataFrame(self.clusters)
            clusters_buffer = self.get_buffer_output(clusters_df)

            steps_df = pd.DataFrame(self.steps)
            steps_buffer = self.get_buffer_output(steps_df)

            instances_df = pd.DataFrame(self.instances)
            instances_buffer = self.get_buffer_output(instances_df)

            instance_groups_df = pd.DataFrame(self.instance_groups)
            instance_groups_buffer = self.get_buffer_output(instance_groups_df)

            instance_fleets_df = pd.DataFrame(self.instance_fleets)
            instance_fleets_buffer = self.get_buffer_output(instance_fleets_df)

            total_clusters = clusters_df.shape[0]
            total_steps = steps_df.shape[0]
            total_instances = instances_df.shape[0]
            total_instance_groups = instance_groups_df.shape[0]
            total_instance_fleets = instance_fleets_df.shape[0]

            execution_info_df = pd.DataFrame(
                [
                    {
                        **self.get_execution_info(),
                        "total_clusters": total_clusters,
                        "total_steps": total_steps,
                        "total_instances": total_instances,
                        "total_instance_groups": total_instance_groups,
                        "total_instance_fleets": total_instance_fleets,
                    },
                ],
            )
            execution_info_buffer = self.get_buffer_output(execution_info_df)

            payload = self.compress_data(
                execution_info_buffer,
                clusters_buffer,
                steps_buffer,
                instances_buffer,
                instance_groups_buffer,
                instance_fleets_buffer,
            )

            html = (
                f'<html><div style="display:flex;justify-content: center;">'
                f'<a download="{output_file_name}" '
                f'href="data:application/zip;base64,{payload}" '
                f'target="_blank">'
                f'<button style="background-color:#249edc;color: #fff;'
                f"border:1px solid #249edc;cursor:pointer;border-radius:45px;"
                f'font-weight:800;line-height:18px;padding: 8px 16px" '
                f'type="button">{self.output_config.get("title")}</button>'
                f"</a></div></html>"
            )
            display(HTML(html))
        except Exception as e:
            logger.exception("Error showing output: %s", e)

    @staticmethod
    def get_buffer_output(data: pd.DataFrame) -> io.StringIO:
        """Convert a DataFrame to a CSV buffer.

        Args:
            data (pd.DataFrame): DataFrame to convert.

        Returns:
            io.StringIO: Buffer containing the CSV data.

        """
        try:
            buffer = io.StringIO()
            data.to_csv(buffer, index=False, encoding="utf-8")
            buffer.seek(0)
            return buffer
        except Exception as e:
            logger.exception("Error getting buffer output: %s", e)

    def compress_data(
        self,
        execution_info_buffer: io.StringIO,
        clusters_buffer: io.StringIO,
        steps_buffer: io.StringIO,
        instances_buffer: io.StringIO,
        instance_groups_buffer: io.StringIO,
        instance_fleets_buffer: io.StringIO,
    ) -> str:
        """Compress data into a ZIP file and encode it in base64.

        Args:
            execution_info_buffer (io.StringIO): Buffer containing execution info.
            clusters_buffer (io.StringIO): Buffer containing clusters data.
            steps_buffer (io.StringIO): Buffer containing steps data.
            instances_buffer (io.StringIO): Buffer containing instances data.
            instance_groups_buffer (io.StringIO): Buffer containing instance groups data.
            instance_fleets_buffer (io.StringIO): Buffer containing instance fleets data.

        Returns:
            str: Base64-encoded ZIP file content.

        """
        try:
            files_to_compress = {
                self.output_config.get(
                    "execution_detail_name_file",
                ): execution_info_buffer,
                self.output_config.get("clusters_name_file"): clusters_buffer,
                self.output_config.get("steps_name_file"): steps_buffer,
                self.output_config.get("instances_name_file"): instances_buffer,
                self.output_config.get(
                    "instance_groups_name_file",
                ): instance_groups_buffer,
                self.output_config.get(
                    "instance_fleets_name_file",
                ): instance_fleets_buffer,
            }
            zip_buffer = io.BytesIO()
            with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
                for file_name, buffer in files_to_compress.items():
                    if file_name and buffer and buffer.getvalue():
                        zf.writestr(file_name, buffer.getvalue())
                    elif file_name:
                        logger.warning(
                            "Empty buffer for file: %s. It will not be added to the ZIP.",
                            file_name,
                        )
                    else:
                        logger.warning(
                            "File name not provided for one of the buffers in output_config. It will be skipped.",
                        )

            zip_bytes = zip_buffer.getvalue()
            b64 = base64.b64encode(zip_bytes).decode()
            return b64
        except Exception as e:
            logger.exception("Error compressing data: %s", e)
            return None


class EMRSEstimatorManagerS3(EMRSEstimatorManager):
    """Manage EMR EC2 clusters, steps, instances, and cost estimations.

    Attributes:
        region_name (str): AWS region name.
        email (str): User email for identification.
        company (str): Company name for identification.
        execution_id (str): Unique identifier for the execution.
        clusters (list): List of clusters information.
        steps (list): List of steps information.
        instances (list): List of instances information.
        instance_groups (list): List of instance groups information.
        instance_fleets (list): List of instance fleets information.

    """

    def __init__(
        self,
        region_name: str,
        email: str,
        company: str,
        output: str = f"{os.getcwd()}/.out_emr_we",
    ):
        """Initialize the EMRSEstimatorManagerEC2 with region, email, and company.

        Args:
            region_name (str): AWS region name.
            email (str): User email.
            company (str): Company name.

        """
        super().__init__(region_name, email, company)
        self.total_clusters = 0
        self.total_steps = 0
        self.total_instances = 0
        self.total_instance_groups = 0
        self.total_instance_fleets = 0
        self.output = output
        if os.path.isdir(output):
            shutil.rmtree(output)
        os.makedirs(output, exist_ok=True)

    def loader_add_cluster(self, processed_cluster: dict):
        """Write a processed cluster to a CSV file and increment the total count."""
        self.write_in_csv(
            processed_cluster,
            self.output_config.get("clusters_name_file"),
        )
        self.total_clusters += 1

    def loader_add_step(self, processed_step: dict):
        """Write a processed step to a CSV file and increment the total count."""
        self.write_in_csv(
            processed_step,
            self.output_config.get("steps_name_file"),
        )
        self.total_steps += 1

    def loader_add_instance(self, processed_instance: dict):
        """Write a processed instance to a CSV file and increment the total count."""
        self.write_in_csv(
            processed_instance,
            self.output_config.get("instances_name_file"),
        )
        self.total_instances += 1

    def loader_add_instance_group(self, processed_instance_group: dict):
        """Write a processed instance group to a CSV file and increment the total count."""
        self.write_in_csv(
            processed_instance_group,
            self.output_config.get("instance_groups_name_file"),
        )
        self.total_instance_groups += 1

    def loader_add_instance_fleet(self, processed_instance_fleet: dict):
        """Write a processed instance fleet to a CSV file and increment the total count."""
        self.write_in_csv(
            processed_instance_fleet,
            self.output_config.get("instance_fleets_name_file"),
        )
        self.total_instance_fleets += 1

    def compress_folder_to_zip(self) -> io.BytesIO:
        """Compress the output folder into a ZIP file.

        This method compresses all files in the output folder into a ZIP file and
        returns the compressed data as a `BytesIO` object.

        Returns:
            io.BytesIO: A buffer containing the compressed ZIP file data.

        """
        try:
            if not os.path.exists(self.output):
                raise Exception(f"The source folder '{self.output}' does not exist.")
            zip_buffer = io.BytesIO()
            logger.info("Beginning to add files to the ZIP...")
            with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf:
                for root, dirs, files in os.walk(self.output):
                    for file in files:
                        file_path = os.path.join(root, file)
                        relative_path = os.path.relpath(file_path, self.output)
                        zipf.write(file_path, relative_path)
            logger.info("Compression complete.")
            return zip_buffer
        except Exception as e:
            logger.exception("Error compressing folder %s", e)
            return None

    def write_in_csv(self, response: dict, file_name: str):
        """Write a dictionary response to a CSV file.

        This method appends the given dictionary response to a CSV file. If the file
        does not exist, it creates a new one with the appropriate headers.

        Args:
            response (dict): The dictionary data to write to the CSV file.
            file_name (str): The name of the CSV file to write to.

        """
        try:
            df = pd.DataFrame([response])
            output_file_path = os.path.join(self.output, file_name)
            if os.path.exists(output_file_path):
                df.to_csv(output_file_path, mode="a", header=False, index=False)
            else:
                df.to_csv(output_file_path, mode="w", header=True, index=False)
        except Exception as e:
            logger.exception("Error writing in CSV: %s", e)

    def save_in_s3(self, output_file_name: str = None):
        """Save the output data to an S3 bucket.

        This method compresses the output data into a ZIP file and uploads it to the specified S3 bucket.
        It also writes execution metadata to a CSV file before compression.

        Args:
            output_file_name (str, optional): The name of the ZIP file to be uploaded.
                If not provided, a default name is generated using the current timestamp.

        """
        try:
            if output_file_name is None:
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                output_file_name = f"emr_ec2_we_output_{timestamp}.zip"

            execution_info = {
                **self.get_execution_info(),
                "total_clusters": self.total_clusters,
                "total_steps": self.total_steps,
                "total_instances": self.total_instances,
                "total_instance_groups": self.total_instance_groups,
                "total_instance_fleets": self.total_instance_fleets,
            }
            self.write_in_csv(
                execution_info,
                self.output_config.get("execution_detail_name_file"),
            )

            zip_buffer = self.compress_folder_to_zip()
            s3_key = f"{s3_folder_name}/{output_file_name}"
            if zip_buffer:
                zip_buffer.seek(0)
                s3_client.upload_fileobj(zip_buffer, s3_bucket_name, s3_key)
                logger.info("File uploaded to S3 successfully.")
        except Exception as e:
            logger.exception("Error saving output: %s", e)

# Utility functions

7. Defines utility functions for retrieving data from AWS (applications, job runs, costs).

In [None]:
def validate_user_info() -> bool:
    """Validate the user information required for the application.

    Checks if the region, email, company, and the number of days for job runs
    are properly configured. Logs errors or warnings for invalid or missing values.

    Returns:
        bool: True if the user information is valid, False otherwise.

    """
    is_valid = True
    if region_name == "":
        logger.error("Region is empty. Please provide a valid region.")
        is_valid = False
    if not isinstance(runs_for_last_days, int) or runs_for_last_days < 1:
        logger.error("Runs for last days must be a positive integer.")
        is_valid = False
    if email == "":
        logger.warning("Email is empty. Please provide a valid email.")
    if company == "":
        logger.warning("Company is empty. Please provide a valid company.")

    return is_valid


class InstanceCollectionType(Enum):
    """Enumeration for instance collection types.

    Attributes:
        GROUP (int): Represents instance groups.
        FLEET (int): Represents instance fleets.

    """

    GROUP = 1
    FLEET = 2


def list_clusters(
    now: datetime,
    runs_for_last_days_ago: datetime,
) -> Iterator[str, InstanceCollectionType]:
    """Retrieve a list of EMR clusters within a specified date range.

    Args:
        now (datetime): Current date and time.
        runs_for_last_days_ago (datetime): Start date for retrieving clusters.

    Yields:
        Iterator[str, InstanceCollectionType]: Cluster ID and instance collection type.

    """
    try:
        paginator = emr_ec2_client.get_paginator("list_clusters")
        response_iterator = paginator.paginate(
            CreatedAfter=runs_for_last_days_ago,
            CreatedBefore=now,
            ClusterStates=["TERMINATING", "TERMINATED", "TERMINATED_WITH_ERRORS"],
        )

        for page in response_iterator:
            if "Clusters" in page:
                for cluster in page["Clusters"]:
                    cluster_id = cluster["Id"]
                    cluster_info = emr_ec2_client.describe_cluster(
                        ClusterId=cluster_id,
                    )
                    instance_collection_type = InstanceCollectionType.FLEET
                    if cluster_info["Cluster"]["InstanceCollectionType"] == "INSTANCE_GROUP":
                        instance_collection_type = InstanceCollectionType.GROUP
                    estimator_manager.add_cluster(cluster_info["Cluster"])
                    yield cluster_id, instance_collection_type
    except Exception as e:
        logger.exception("Error listing clusters: %s", e)


def list_steps(cluster_id: str):
    """Retrieve a list of steps for a given EMR cluster.

    Args:
        cluster_id (str): ID of the cluster to retrieve steps for.

    """
    try:
        paginator = emr_ec2_client.get_paginator("list_steps")
        response_iterator = paginator.paginate(ClusterId=cluster_id)
        for page in response_iterator:
            if "Steps" in page:
                for step in page["Steps"]:
                    step_id = step["Id"]
                    step_info = emr_ec2_client.describe_step(
                        ClusterId=cluster_id,
                        StepId=step_id,
                    )
                    estimator_manager.add_step(step_info["Step"], cluster_id)
    except Exception as e:
        logger.exception("Error listing steps: %s", e)


def list_all_instances(cluster_id: str):
    """Retrieve a list of all instances for a given EMR cluster.

    Args:
        cluster_id (str): ID of the cluster to retrieve instances for.

    """
    try:
        paginator = emr_ec2_client.get_paginator("list_instances")
        response_iterator = paginator.paginate(ClusterId=cluster_id)
        for page in response_iterator:
            if "Instances" in page:
                for instances in page["Instances"]:
                    if instances.get("Market", "") == "ON_DEMAND":
                        estimator_manager.add_instance(instances, cluster_id)
    except Exception as e:
        logger.exception("Error listing instances: %s", e)


def list_instances_type(
    cluster_id: str,
    instance_collection_type: InstanceCollectionType,
):
    """Retrieve instance groups or fleets based on the cluster's instance collection type.

    Args:
        cluster_id (str): ID of the cluster to retrieve instances for.
        instance_collection_type (InstanceCollectionType): Type of instance collection (GROUP or FLEET).

    """
    try:
        if instance_collection_type == InstanceCollectionType.GROUP:
            list_instance_groups(cluster_id)
        elif instance_collection_type == InstanceCollectionType.FLEET:
            list_instance_fleets(cluster_id)
    except Exception as e:
        logger.exception("Error listing instances_types: %s", e)


def list_instance_groups(cluster_id: str):
    """Retrieve a list of instance groups for a given EMR cluster.

    Args:
        cluster_id (str): ID of the cluster to retrieve instance groups for.

    """
    try:
        paginator = emr_ec2_client.get_paginator("list_instance_groups")
        response_iterator = paginator.paginate(ClusterId=cluster_id)
        for page in response_iterator:
            if "InstanceGroups" in page:
                for instance_group in page["InstanceGroups"]:
                    if instance_group.get("Market") == "ON_DEMAND":
                        estimator_manager.add_instance_group(instance_group, cluster_id)
    except Exception as e:
        logger.exception("Error listing instance groups: %s", e)


def list_instance_fleets(cluster_id: str):
    """Retrieve a list of instance fleets for a given EMR cluster.

    Args:
        cluster_id (str): ID of the cluster to retrieve instance fleets for.

    """
    try:
        paginator = emr_ec2_client.get_paginator("list_instance_fleets")
        response_iterator = paginator.paginate(ClusterId=cluster_id)
        for page in response_iterator:
            if "InstanceFleets" in page:
                for instance_fleet in page["InstanceFleets"]:
                    estimator_manager.add_instance_fleet(instance_fleet, cluster_id)
    except Exception as e:
        logger.exception("Error listing instance fleets: %s", e)

# Generate estimation

8. Executes the main estimation workflow: instantiates the manager, retrieves data, and generates the output.

In [None]:
if validate_user_info():
    # Initialize the EMR client with the specified region and configuration
    emr_ec2_client = boto3.client("emr", region_name=region_name, config=config)

    is_s3_option = s3_bucket_name and s3_folder_name

    if is_s3_option:
        s3_client = boto3.client("s3", region_name=region_name, config=config)
        estimator_manager = EMRSEstimatorManagerS3(region_name, email, company)
    else:
        estimator_manager = EMRSEstimatorManagerDefault(region_name, email, company)
    now = datetime.now(timezone.utc)

    # Calculate the start date for retrieving clusters based on the configured number of days
    runs_for_last_days_ago = now - timedelta(days=runs_for_last_days)

    logger.info("Start date: %s", runs_for_last_days_ago.strftime("%Y-%m-%d"))
    logger.info("End date: %s", now.strftime("%Y-%m-%d"))
    logger.info("Starting to extract the information: this process may take a few minutes...")

    # Iterate through the clusters retrieved within the specified date range
    for cluster_id, instance_collection_type in list_clusters(
        now,
        runs_for_last_days_ago,
    ):
        # Retrieve and add steps associated with the cluster
        list_steps(cluster_id)
        # Retrieve and add all instances associated with the cluster
        list_all_instances(cluster_id)
        # Retrieve and add instance groups or fleets based on the cluster's instance collection type
        list_instances_type(cluster_id, instance_collection_type)

    if is_s3_option:
        # The output is saved directly to the configured bucket.
        logger.info("Total clusters: %d", estimator_manager.total_clusters)
        logger.info("Total steps: %d", estimator_manager.total_steps)
        logger.info("Total instances: %d", estimator_manager.total_instances)
        logger.info(
            "Total instance groups: %d",
            estimator_manager.total_instance_groups,
        )
        logger.info(
            "Total instance fleets: %d",
            estimator_manager.total_instance_fleets,
        )
        estimator_manager.save_in_s3()
    else:
        # Generate and display a downloadable ZIP file containing the analysis data
        logger.info("Total clusters: %d", len(estimator_manager.clusters))
        logger.info("Total steps: %d", len(estimator_manager.steps))
        logger.info("Total instances: %d", len(estimator_manager.instances))
        logger.info("Total instance groups: %d", len(estimator_manager.instance_groups))
        logger.info("Total instance fleets: %d", len(estimator_manager.instance_fleets))
        estimator_manager.show_output()