Load environment variables defined in a `.env` file

In [None]:
# %load_ext dotenv
# %dotenv
from dotenv import load_dotenv
load_dotenv(override=True)

In [59]:
from enum import Enum
import csv
import re


class VMType(Enum):
    VM = 1
    VMSS = 2

    def __str__(self) -> str:
        return f'{self.name}'


class VMInfo:
    def __init__(self, id, type, resource_group, name, private_ip=None, public_ip=None, size=None):
        self.id = id
        self.type = type
        self.resource_group = resource_group.lower()
        self.name = name.lower()
        self.private_ip = private_ip
        self.public_ip = public_ip
        self.size = size

    def __str__(self) -> str:
        return f'id = {self.id}, type = {self.type}, resource_group = {self.resource_group}, name = {self.name}, size = {self.size}, private_ip = {self.private_ip}, public_ip = {self.public_ip}'


def filter_list(data, field, filter):
    result = data
    if filter:
        result = []
        pattern = re.compile(filter)
        for item in data:
            if pattern.match(str(getattr(item, field))):
                continue
            result.append(item)
    return result


def filter_fields(data, fields):
    result = []
    for item in data:
        filtered_data = {}
        for field in fields:
            filtered_data[field] = str(getattr(item, field))
        result.append(filtered_data)
    return result


def filter_fields_as_array(data, fields):
    result = []
    for item in data:
        filtered_data = []
        for field in fields:
            filtered_data.append(str(getattr(item, field)))
        result.append(filtered_data)
    return result


def write_to_csv(output, data, fields):
    headers = [s.replace('_', ' ').title() for s in fields]
    with open(output + '.csv', 'w', newline='') as file:
        writer = csv.writer(file, dialect='excel')
        writer.writerow(headers)
        writer.writerows(data)


def write_dict_to_csv(output, data, fields):
    with open(output + '.csv', 'w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=fields,
                                restval='', dialect='excel')
        writer.writeheader()
        writer.writerows(data)

In [79]:
import os

from azure.identity import DefaultAzureCredential


class AzCredentials:
    credential = DefaultAzureCredential()

    def subscriptionId():
        return os.environ.get("AZURE_SUBSCRIPTION_ID", None)

    def instance(class_name):
        clazz = globals()[class_name]
        return clazz(
            credential=AzCredentials.credential,
            subscription_id=AzCredentials.subscriptionId()
        )


print("Using Azure Subscription: {}".format(AzCredentials.subscriptionId()))
print(os.environ.get("AZURE_SUBSCRIPTION_ID", None))



Extract VM Info from all VM's in the subscription

In [80]:
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.network import NetworkManagementClient

resource_client = AzCredentials.instance("ResourceManagementClient")
compute_client = AzCredentials.instance("ComputeManagementClient")
network_client = AzCredentials.instance("NetworkManagementClient")


def list_vms_in_subscription():
    result = []
    group_list = resource_client.resource_groups.list()
    for group in list(group_list):
        result = result + list_vms_in_groups(group.name)
    return result


def list_vms_in_groups(resource_group):
    result = []
    for resource in resource_client.resources.list_by_resource_group(resource_group):
        if resource.type == "Microsoft.Compute/virtualMachines":
            vm = compute_client.virtual_machines.get(
                resource_group, resource.name)
            private_ips = []
            public_ips = []
            for interface in vm.network_profile.network_interfaces:
                nic_name = " ".join(interface.id.split('/')[-1:])
                ip_configurations = network_client.network_interfaces.get(
                    resource_group, nic_name).ip_configurations

                for ip_config in ip_configurations:
                    if ip_config.private_ip_address:
                        private_ips.append(ip_config.private_ip_address)
                    if ip_config.public_ip_address:
                        public_ips.append(ip_config.public_ip_address)

            result.append(VMInfo(resource.id, VMType.VM,
                                 resource_group, resource.name, private_ips, public_ips, vm.hardware_profile.vm_size))

        if resource.type == "Microsoft.Compute/virtualMachineScaleSets":
            vmss = compute_client.virtual_machine_scale_set_vms.list(
                resource_group, resource.name)
            for node in vmss:
                private_ips = []
                public_ips = []
                node_nics = network_client.network_interfaces.list_virtual_machine_scale_set_vm_network_interfaces(
                    resource_group, resource.name, node.instance_id)
                for nic in node_nics:
                    for ip_config in nic.ip_configurations:
                        if ip_config.private_ip_address:
                            private_ips.append(ip_config.private_ip_address)
                        if ip_config.public_ip_address:
                            public_ips.append(ip_config.public_ip_address)

                result.append(VMInfo(node.id, VMType.VMSS,
                                     resource_group, node.name, private_ips, public_ips, node.hardware_profile.vm_size))
    return result


def sort_by_name_field(item):
    return str(getattr(item, 'name'))


def print_results(data, fields, exclude):
    filtered_list = filter_list(data, 'name', exclude)
    filtered_list.sort(reverse=False, key=sort_by_name_field)

    for_print = filter_fields_as_array(filtered_list, fields)
    write_to_csv('out/vm_info', for_print, fields)


print_results(list_vms_in_subscription(), [
              'type', 'resource_group', 'name', 'size', 'private_ip', 'public_ip'], None)