In [1]:
import boto3
import pandas as pd
import numpy as np
import math
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
from tqdm import tqdm
import ast


from sklearn.metrics.pairwise import euclidean_distances
from sklearn.preprocessing import MinMaxScaler
import networkx as nx

In [None]:
# Run this to generate df or use the saved csv
# def fetch_region_instance_types(region):
#     client = boto3.client("ec2", region_name=region)
#     paginator = client.get_paginator("describe_instance_types")
#     results = []
#     for page in paginator.paginate(PaginationConfig={"PageSize": 100}):
#         for it in page.get("InstanceTypes", []):
#             it["_region"] = region
#             results.append(it)
#     return results

# #AllRegions=True - AuthFailure
# regions = boto3.client("ec2", region_name="us-east-1").describe_regions()["Regions"]
# regions = [r["RegionName"] for r in regions]
# print("Regions:", regions, len(regions))

# all_entries = []
# for r in tqdm(regions):
#     all_entries.extend(fetch_region_instance_types(r))

# df = pd.DataFrame(all_entries)

In [2]:
df = pd.read_csv('full_catalog.csv')

dict_cols = ['ProcessorInfo', 'VCpuInfo', 'MemoryInfo', 'EbsInfo', 'NetworkInfo', 'GpuInfo']

for col in dict_cols:
    df[col] = df[col].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)

In [3]:
df_grouped = df.groupby('InstanceType', as_index=False).agg({
    '_region': lambda x: list(x.unique()),  # combine all regions into a list
    **{col: 'first' for col in df.columns if col not in ['InstanceType', '_region']}
})

df_grouped = df_grouped.rename(columns={'_region': 'regions'})

In [4]:
def safe_get(d, key_path, default=None):
    """Get nested dictionary value by list of keys (or single key string). Returns default if any part missing."""
    if d is None:
        return default
    if isinstance(key_path, (list, tuple)):
        cur = d
        for k in key_path:
            if not isinstance(cur, dict) or k not in cur:
                return default
            cur = cur[k]
        return cur
    return d.get(key_path, default)x

def list_intersects(a, b):
    if a is None or b is None:
        return False
    try:
        return len(set(a) & set(b)) > 0
    except Exception:
        return False

def compare_networks(src_net, tgt_net):
    # EnaSupport equality
    src_ena = safe_get(src_net, "EnaSupport", None)
    tgt_ena = safe_get(tgt_net, "EnaSupport", None)
    if src_ena != tgt_ena:
        return False, "EnaSupport mismatch"

    # Ipv6Supported equality (normalize to bool or None)
    src_ipv6 = safe_get(src_net, "Ipv6Supported", None)
    tgt_ipv6 = safe_get(tgt_net, "Ipv6Supported", None)
    if src_ipv6 is None or tgt_ipv6 is None or bool(src_ipv6) != bool(tgt_ipv6):
        return False, "Ipv6Supported mismatch"

    # MaximumNetworkInterfaces directional: target >= source
    src_max_if = safe_get(src_net, "MaximumNetworkInterfaces", None)
    tgt_max_if = safe_get(tgt_net, "MaximumNetworkInterfaces", None)
    if src_max_if is None or tgt_max_if is None:
        # if missing numeric info, treat as conservative mismatch
        return False, "MaximumNetworkInterfaces unknown"
    try:
        if int(tgt_max_if) < int(src_max_if):
            return False, f"MaximumNetworkInterfaces too small ({tgt_max_if} < {src_max_if})"
    except Exception:
        return False, "MaximumNetworkInterfaces compare failed"
    return True, "network OK"

def can_replace(src, tgt):
    """Return (bool, reasons_list) whether `tgt` can replace `src`"""
    reasons = []

    # 1. Processor architecture intersection
    src_arch = safe_get(src, ["ProcessorInfo", "SupportedArchitectures"], None)
    tgt_arch = safe_get(tgt, ["ProcessorInfo", "SupportedArchitectures"], None)
    if not list_intersects(src_arch, tgt_arch):
        reasons.append("arch no-intersect")

    # 2. Supported virtualization types intersection
    src_virt = safe_get(src, "SupportedVirtualizationTypes", None)
    tgt_virt = safe_get(tgt, "SupportedVirtualizationTypes", None)
    if not list_intersects(src_virt, tgt_virt):
        reasons.append("virt no-intersect")

    # 3. Network checks (EnaSupport equality, Ipv6Supported equality, MaximumNetworkInterfaces directional)
    src_net = safe_get(src, "NetworkInfo", {})
    tgt_net = safe_get(tgt, "NetworkInfo", {})
    net_ok, net_msg = compare_networks(src_net, tgt_net)
    if not net_ok:
        reasons.append(f"network:{net_msg}")

    # 4. NVMe support (if src required -> tgt must be required)
    src_nvme = safe_get(src, ["EbsInfo", "NvmeSupport"], None)
    tgt_nvme = safe_get(tgt, ["EbsInfo", "NvmeSupport"], None)
    if src_nvme == "required" and tgt_nvme != "required":
        reasons.append("nvme required mismatch")

    # 5. InstanceStorageSupported must match (True/False)
    src_inst_store = safe_get(src, "InstanceStorageSupported", None)
    tgt_inst_store = safe_get(tgt, "InstanceStorageSupported", None)
    if src_inst_store is None or tgt_inst_store is None or bool(src_inst_store) != bool(tgt_inst_store):
        reasons.append("instance-storage mismatch")

    # 6a. Hypervisor must match if both present
    src_hv = safe_get(src, "Hypervisor", None)
    tgt_hv = safe_get(tgt, "Hypervisor", None)
    # Normalize empty/None to None; require equality when either present in both
    if (src_hv is not None and src_hv != "") and (tgt_hv is not None and tgt_hv != ""):
        if str(src_hv).lower() != str(tgt_hv).lower():
            reasons.append("hypervisor mismatch")

    # 6b. BareMetal must match exactly (True/False)
    src_bm = safe_get(src, "BareMetal", None)
    tgt_bm = safe_get(tgt, "BareMetal", None)
    if (src_bm is None) or (tgt_bm is None) or (bool(src_bm) != bool(tgt_bm)):
        reasons.append("baremetal mismatch")

    is_ok = len(reasons) == 0
    return is_ok, reasons

In [5]:
def build_interchangability_matrix(instances):
    names = [safe_get(i, "InstanceType", f"untitled_{idx}") for idx, i in enumerate(instances)]
    n = len(instances)
    matrix = pd.DataFrame(False, index=names, columns=names, dtype=bool)
    reasons_map = {(s, t): None for s in names for t in names}

    for i, src in enumerate(instances):
        for j, tgt in enumerate(instances):
            ok, reasons = can_replace(src, tgt)
            matrix.iat[i, j] = ok
            reasons_map[(names[i], names[j])] = reasons

    return matrix, reasons_map


In [7]:
matrix, reasons_map = build_interchangability_matrix(df_grouped.to_dict(orient="records"))

In [8]:
interchangeable = matrix.loc['t2.nano']
candidates = interchangeable[interchangeable == True].index.tolist()
candidates, len(candidates)

(['c4.2xlarge',
  'c4.4xlarge',
  'c4.8xlarge',
  'c4.large',
  'c4.xlarge',
  'm4.10xlarge',
  'm4.2xlarge',
  'm4.4xlarge',
  'm4.large',
  'm4.xlarge',
  't2.2xlarge',
  't2.large',
  't2.medium',
  't2.micro',
  't2.nano',
  't2.small',
  't2.xlarge'],
 17)

In [9]:
matrix.to_csv('interchangeable_mat.csv')

In [10]:
df_grouped = pd.read_csv('grouped_full_catalog.csv')
matrix = pd.read_csv('interchangeable_mat.csv')

def get_resource_info(inst):
    vcpus = safe_get(inst, ["VCpuInfo", "DefaultVCpus"], 0)
    mem = safe_get(inst, ["MemoryInfo", "SizeInMiB"], 0)
    gpus = 0
    gpu_info = safe_get(inst, "GpuInfo", None)
    if isinstance(gpu_info, dict):
        gpus = gpu_info.get("Gpus", [{}])[0].get("Count", 0)
    return int(vcpus or 0), int(mem or 0), int(gpus or 0)


def recommend_instance(current_instance_type, required_vcpus, required_memory_mib, required_gpus, 
                       instances, matrix, top_n=3):
    # Convert DataFrame if needed
    if isinstance(instances, pd.DataFrame):
        instances = instances.to_dict(orient="records")
    
    name_map = {safe_get(i, "InstanceType"): i for i in instances}
    if current_instance_type not in name_map:
        raise ValueError(f"Current instance {current_instance_type} not found.")
    
    current_idx = current_instance_type
    if current_idx not in matrix.index:
        raise ValueError(f"{current_instance_type} not found in interchangeability matrix.")

    # Get compatible targets
    interchangeable_targets = matrix.loc[current_idx]
    compatible = [name for name, ok in interchangeable_targets.items() if ok and name != current_instance_type]

    if not compatible:
        return f"No interchangeable instances found for {current_instance_type}."

    # Evaluate each compatible target
    recs = []
    for name in compatible:
        inst = name_map.get(name)
        if not inst:
            continue
        vcpu, mem, gpu = get_resource_info(inst)
        # Must meet or exceed all requirements
        if vcpu < required_vcpus or mem < required_memory_mib or gpu < required_gpus:
            continue
        # Distance score = how much extra resources beyond required
        dist = math.sqrt((vcpu - required_vcpus)**2 + (mem - required_memory_mib)**2 + (gpu - required_gpus)**2)
        recs.append((name, vcpu, mem, gpu, dist))

    if not recs:
        return f"No compatible instance meets the resource requirements (vCPU≥{required_vcpus}, Mem≥{required_memory_mib} MiB, GPU≥{required_gpus})."

    # Sort by distance (closer = better)
    recs.sort(key=lambda x: x[-1])
    best = recs[0]
    df = pd.DataFrame(recs, columns=["InstanceType", "vCPUs", "MemoryMiB", "GPUs", "Score"])
    print("Top interchangeable recommendations:")
    display(df.head(top_n))
    return best

In [12]:
best = recommend_instance(
    current_instance_type="a1.large",
    required_vcpus=8,
    required_memory_mib=16384,
    required_gpus=2,
    instances=df_grouped,
    matrix=matrix
)
print("Best replacement suggestion:", best)

Top interchangeable recommendations:


Unnamed: 0,InstanceType,vCPUs,MemoryMiB,GPUs,Score
0,g5g.16xlarge,64,131072,2,114688.013672


Best replacement suggestion: ('g5g.16xlarge', 64, 131072, 2, 114688.01367187419)
