In [1]:
%cd ../..

/home/andrea/Projects/piacere-model-checker


In [2]:
# XMI_PATH = "./tests/doml/CaseStudies/posidonia.domlx"
XMI_PATH = "../piacere-workspace/Playground/NetValidation/test1.domlx" # Subnets, Ifaces, Gateway
# XMI_PATH = "./tests/doml/CaseStudies/posidonia_IOP.domlx" # Subnets, no Gateway
# XMI_PATH = "../piacere-workspace/Playground/CaseStudies/nio3_test_exec_env.domlx" # No subnets, Gateway
# XMI_PATH = "../piacere-workspace/Playground/CaseStudies/posidonia.domlx"

with open(XMI_PATH, 'rb') as f:
    doml_xmi = f.read()

In [3]:
from mc_openapi.doml_mc.main import init_model

dmc = init_model(doml_xmi, doml_ver=None)

In [4]:
from pprint import pprint
from mc_openapi.doml_mc.intermediate_model import DOMLElement
from mc_openapi.doml_mc.mc import ModelChecker

from ipaddress import IPv4Address, IPv4Network

BASE_ADDR = '0.0.0.0'

ASSOC_SUBNETS = 'infrastructure_Network::subnets'
ASSOC_GATEWAYS = 'infrastructure_Network::gateways'
ASSOC_IFACE_NET = 'infrastructure_NetworkInterface::belongsTo'

ATTR_NET_ADDRESS = 'infrastructure_Network::addressRange'
ATTR_GATEWAY_ADDRESS = 'infrastructure_InternetGateway::address'
ATTR_IFACE_ADDRESS = 'infrastructure_NetworkInterface::endPoint'

def get_attr(elem: DOMLElement, attr_id: str):
    if elem := elem.attributes.get(attr_id):
        return elem[0]
    return None
    
def get_assocs(elem: DOMLElement, assoc_id: str):
    return elem.associations.get(assoc_id, [])

def validate_network_address(dmc: ModelChecker):
    im = dmc.intermediate_model

    networks = [e for e in im.values() if e.class_ == 'infrastructure_Network']
    ifaces = [e for e in im.values() if e.class_ == 'infrastructure_NetworkInterface']
    subnets = [e for e in im.values() if e.class_ == 'infrastructure_Subnet']

# Network/24
#   Subnet/28
#   Subnet/30
#   Address
#   Address

    def visit_subnet(net: DOMLElement, acc: list):
        """Recursively navigate subnets to populate the `acc` list with all the subnet in a network."""
        for subnet in get_assocs(net, ASSOC_SUBNETS):
            subnet = im[subnet]
            subnet_addr = fix_invalid_address( get_attr(subnet, ATTR_NET_ADDRESS) )
            acc.append((subnet, IPv4Network(subnet_addr)))
            visit_subnet(subnet, acc)

    def fix_invalid_address(address: any):
        if isinstance(address, str) and address.startswith('/'):
            return f"{BASE_ADDR}{address}"

        return address


    for network in networks:
        # Tuple(elem, cidr)
        subnets: list[tuple[DOMLElement, IPv4Network]] = []
        # Tuple(elem, address)
        addresses: list[tuple[DOMLElement, IPv4Address]] = []

        # Add subnets (Networks)
        visit_subnet(network, subnets)

        # pprint("SUBNETS:")
        # pprint(subnets)

        # Add addresses (gateways, ifaces)
        for gateway in get_assocs(network, ASSOC_GATEWAYS):
            gateway = im[gateway]
            if gateway_address := get_attr(gateway, ATTR_GATEWAY_ADDRESS):
                addresses.append((gateway, IPv4Address(gateway_address)))

        for iface in ifaces:
            if owner_id := get_assocs(iface, ASSOC_IFACE_NET):
                owner = im[list(owner_id)[0]]
                if owner.id_ in [s.id_ for (s, _) in subnets] + [network.id_]:
                    if ((iface_address := get_attr(iface, ATTR_IFACE_ADDRESS))
                    and (owner_address := get_attr(owner, ATTR_NET_ADDRESS))):
                        owner_address = fix_invalid_address(owner_address)
                        iface_address = IPv4Address(iface_address)
                        owner_address = IPv4Network(owner_address)
                        print(f"{iface.user_friendly_name} ({iface_address}) belongs to {owner.user_friendly_name} ({owner_address})? {iface_address in owner_address.hosts()}")

                        addresses.append((iface, iface_address))
                else:
                    print(f"NetworkInterface '{iface.user_friendly_name}' does not belong to net '{owner.user_friendly_name}'.")

        # Validate Network and Subnets
        net_addr = fix_invalid_address( get_attr(network, ATTR_NET_ADDRESS) )
        # prepend 0.0.0.0 if starts with / i guess, print a warning
        print(f"{net_addr}\t{network.user_friendly_name}")

        net = IPv4Network(net_addr)

        if len(subnets) > 0:
            for (obj, sn) in subnets:
                print(f"{sn}\t{obj.user_friendly_name} belongs? {sn.subnet_of(net)}")
        else:
            print("No subnets!")

        # Validate addresses (again)
        for (obj, addr) in addresses:
            print(f"NetworkInterface '{obj.user_friendly_name}' ({addr}) belong to net '{network.user_friendly_name}' ({net_addr})? {addr in net.hosts()}")

  

validate_network_address(dmc)

174326017 10.100.1.0/24
elasticsearch_iface (10.100.1.1) belongs to subnet1 (10.100.1.0/24)? True
3232235777 10.100.2.0/24
gestaut_iface (192.168.1.1) belongs to subnet2 (10.100.2.0/24)? False
174326018 10.100.3.0/24
edi_iface (10.100.1.2) belongs to subnet3 (10.100.3.0/24)? False
10.100.0.0/16	vpc
10.100.1.0/24	subnet1 belongs? True
10.100.2.0/24	subnet2 belongs? True
10.100.3.0/24	subnet3 belongs? True
NetworkInterface 'gw1' (10.100.0.1) belong to net 'vpc' (10.100.0.0/16)? True
NetworkInterface 'elasticsearch_iface' (10.100.1.1) belong to net 'vpc' (10.100.0.0/16)? True
NetworkInterface 'gestaut_iface' (192.168.1.1) belong to net 'vpc' (10.100.0.0/16)? False
NetworkInterface 'edi_iface' (10.100.1.2) belong to net 'vpc' (10.100.0.0/16)? True
