In [1]:
import pandas as pd
import numpy as np

from typing import Tuple, Dict, List

In [2]:
INPUT_FILE = "inventory_raw.csv"

In [3]:
raw_data = pd.read_csv(INPUT_FILE)

In [4]:
raw_data

Unnamed: 0,source_row_id,ip,hostname,fqdn,mac,owner,device_type,site,notes
0,1,192.168.010.005,HOST01,,AA-BB-CC-DD-EE-FF,priya (platform) priya@corp.example.com,server,BLR Campus,db host
1,2,10.0.1.300,host-02,host-02.local,11-22-33-44-55-66,ops,,HQ Bldg 1,edge gw?
2,3,10.0.1,host03,,aabb.ccdd.eeff,jane@corp.example.com,switch,HQ-BUILDING-1,
3,4,10.0.1.1.2,printer-01,,00:11:22:33:44:55,Facilities,printer,HQ,
4,5,fe80::1%eth0,iot-cam01,,00:aa:bb:cc:dd:ee,sec,iot,Lab-1,camera PoE on port 3
5,6,127.0.0.1,local-test,,,,,,
6,7,169.254.10.20,host-apipa,,,,,,
7,8,10.10.10.10,srv-10,,,platform,server,BLR campus,
8,9,abc.def.ghi.jkl,badhost,,,,,,
9,10,192.168.1.-1,neg,,,,,,


In [5]:
print(list(raw_data.columns))

['source_row_id', 'ip', 'hostname', 'fqdn', 'mac', 'owner', 'device_type', 'site', 'notes']


In [6]:
# Set index to source_row_id
raw_data = raw_data.set_index("source_row_id")

In [7]:
raw_data

Unnamed: 0_level_0,ip,hostname,fqdn,mac,owner,device_type,site,notes
source_row_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
1,192.168.010.005,HOST01,,AA-BB-CC-DD-EE-FF,priya (platform) priya@corp.example.com,server,BLR Campus,db host
2,10.0.1.300,host-02,host-02.local,11-22-33-44-55-66,ops,,HQ Bldg 1,edge gw?
3,10.0.1,host03,,aabb.ccdd.eeff,jane@corp.example.com,switch,HQ-BUILDING-1,
4,10.0.1.1.2,printer-01,,00:11:22:33:44:55,Facilities,printer,HQ,
5,fe80::1%eth0,iot-cam01,,00:aa:bb:cc:dd:ee,sec,iot,Lab-1,camera PoE on port 3
6,127.0.0.1,local-test,,,,,,
7,169.254.10.20,host-apipa,,,,,,
8,10.10.10.10,srv-10,,,platform,server,BLR campus,
9,abc.def.ghi.jkl,badhost,,,,,,
10,192.168.1.-1,neg,,,,,,


# IP Validation

In [8]:
def trim_ip_str(ip: str) -> str:
    try:
        return ip.strip()
    except AttributeError:
        return ""

In [9]:
def validate_and_label_ipv4(ip: str) -> Tuple[str, str]:
    if ip == "":
        return ip, "empty_string"
    if ":" in ip:
        return ip, "ipv6_or_mixed_non_ipv4"
    if "." not in ip:
        return ip, "no_octet_separation"
    parts = ip.split(".")
    if len(parts) != 4:
        return ip, "wrong_part_count"
    canonical_parts = []
    for part in parts:
        part = part.strip()
        if part == "":
            return ip, "empty_octet"
        if not (part.lstrip("+").isdigit() and not part.startswith("-")):
            return ip, "non_numeric_or_negative"
        try:
            v = int(part, 10)
        except ValueError:
            return ip, "non_decimal_format"
        if v < 0 or v > 255:
            return ip, "octet_out_of_range"
        canonical_parts.append(str(v))
    return '.'.join(canonical_parts), "ok"

In [10]:
def classify_ipv4(ip: str, validation_label: str) -> str:
    if validation_label != "ok":
        return "unclassified"
    a, b, c, d = list(map(int, ip.split(".")))
    classification = ""
    if ip == "0.0.0.0" :
        classification = "unspecified"
    elif ip == "255.255.255.255":
        classification = "limited_broadcast"
    elif a == 127:
        classification = "loopback"
    elif a == 169 and b == 254:
        classification = "link_local_apipa"
    elif 224 <= a <= 239:
        classification = "multicast"
    elif 240 <= a <= 255 and ip != "255.255.255.255":
        classification = "reserved"
    elif a == 10 or (a == 172 and 16 <= b <= 31) or (a == 192 and b == 168):
        classification = "private"
    else:
        classification = "public_or_other"
    return classification

In [11]:
def determine_subnet(ip: str, classification: str) -> str:
    if classification == "unclassified":
        return ""
    if classification in ["limited_broadcast", "unspecified", "unclassified", "multicast", "reserved"]:
        return ""
    elif classification == "loopback":
        mask = "8"
        subnet_ip = f"{'.'.join(ip.split(".")[:3])}.0/{mask}"
        return subnet_ip
    elif classification == "private":
        mask = "24"
        subnet_ip = f"{ip}/{mask}"
        return subnet_ip
    elif classification == "link_local_apipa":
        mask = "16"
        subnet_ip = f"{'.'.join(ip.split(".")[:2])}.0.0/{mask}"
        return subnet_ip
    else:
        return ""

In [12]:
def process_ipv4(ip: str) -> Dict:
    steps = []
    notes = []
    trimmed_ip = trim_ip_str(ip)
    steps.append("ip_trim")
    trimmed_ip, validation_label = validate_and_label_ipv4(trimmed_ip)
    if validation_label == "ok":
        steps.append("ip_parse")
        steps.append("ip_normalize")
        classification = classify_ipv4(trimmed_ip, validation_label)
        steps.append("ip_classify")
        subnet = determine_subnet(trimmed_ip, classification)
        steps.append("ip_subnet_determine")
        ip_out = trimmed_ip
        ip_valid = "True"
        ip_version = "4"
        subnet_cidr = subnet
    else:
        ip_out = ip.strip()
        ip_valid = "False"
        ip_version = ""
        subnet_cidr = ""
    return {
        "ip": ip_out,
        "ip_valid": ip_valid,
        "ip_version": ip_version,
        "subnet_cidr": subnet_cidr,
        "ip_normalization_steps": "|".join(steps),
        # Add row ID
    }

In [13]:
# Test
ip1 = "192.168.1.1"
ip2 = " 192.168.1.1"
ip3 = " 192. 168.1.1"
ip4 = "192. 168"
ip5 = "0.0.0.0"
ip6 = "130.203.12.23"

# trimmed_ip = trim_ip_str(ip3)
# print(f"Trimmed IP: {trimmed_ip}")
# ip, validation_label = validate_and_label_ipv4(trimmed_ip)
# print(f"Validated IP: {ip}, {validation_label}")
# classification = classify_ipv4(ip, validation_label)
# print(f"Classification of IP: {classification}")
# subnet = determine_subnet(ip, classification)
# print(f"IP with subnet mask: {subnet}")

print(process_ipv4(ip6))

{'ip': '130.203.12.23', 'ip_valid': 'True', 'ip_version': '4', 'subnet_cidr': '', 'normalization_steps': 'ip_trim|ip_parse|ip_normalize|ip_classify|ip_subnet_determine'}


# MAC Validation

In [14]:
def trim_mac_str(mac: str) -> str:
    try:
        return str(mac).strip()
    except Exception:
        return ""

In [15]:
def is_valid_hex(s: str) -> bool:
    HEX = set("01223456789abcdefABCDEF")
    return s != "" and all(c in HEX for c in s)

In [18]:
# ChatGPT prompt:
# The functions above I want formatted something like this: def trim_ip_str(ip: str) -> str: try: return ip.strip() except AttributeError: return "" def validate_and_label_ipv4(ip: str) -> Tuple[str, str]: if ip == "": return ip, "empty_string" if ":" in ip: return ip, "ipv6_or_mixed_non_ipv4" if "." not in ip: return ip, "no_octet_separation" parts = ip.split(".") if len(parts) != 4: return ip, "wrong_part_count" canonical_parts = [] for part in parts: part = part.strip() if part == "": return ip, "empty_octet" if not (part.lstrip("+").isdigit() and not part.startswith("-")): return ip, "non_numeric_or_negative" try: v = int(part, 10) except ValueError: return ip, "non_decimal_format" if v < 0 or v > 255: return ip, "octet_out_of_range" canonical_parts.append(str(v)) return '.'.join(canonical_parts), "ok" def classify_ipv4(ip: str, validation_label: str) -> str: if validation_label != "ok": return "unclassified" a, b, c, d = list(map(int, ip.split("."))) classification = "" if ip == "0.0.0.0" : classification = "unspecified" elif ip == "255.255.255.255": classification = "limited_broadcast" elif a == 127: classification = "loopback" elif a == 169 and b == 254: classification = "link_local_apipa" elif 224 <= a <= 239: classification = "multicast" elif 240 <= a <= 255 and ip != "255.255.255.255": classification = "reserved" elif a == 10 or (a == 172 and 16 <= b <= 31) or (a == 192 and b == 168): classification = "private" else: classification = "public_or_other" return classification The trim function should just trim the string, the validate function should check only for validity and return the validated and normalized MAC address and a label for any error encountered, the classify function should only return classifications like "eui64" or "eui48" The function to check valid hex string is: def is_valid_hex(s: str) -> bool: HEX = set("01223456789abcdefABCDEF") return s != "" and all(c in HEX for c in s)

def validate_and_label_mac(mac: str) -> Tuple[str, str]:
    """
    Returns (canonical_mac, label).
      - On success: ('aa:bb:cc:dd:ee:ff', 'ok') or EUI-64 equivalent.
      - On failure: (original_input, '<error_label>').
    Canonical form: lowercase, colon-separated. Accepts:
      - aa:bb:cc:dd:ee:ff  | aa-bb-cc-dd-ee-ff
      - aabb.ccdd.eeff     | aabb.ccdd.eeff.gghh
      - aabbccddeeff       | aabbccddeeffgghh
    """
    if mac is None:
        return "", "missing"
    s = trim_mac_str(mac)
    if s == "":
        return s, "empty_string"

    has_colon = ":" in s
    has_dash  = "-" in s
    has_dot   = "." in s

    # reject mixed separators
    if sum([has_colon, has_dash, has_dot]) > 1:
        return s, "mixed_separators"

    octets: List[str] = []

    if has_colon or has_dash:
        sep = ":" if has_colon else "-"
        parts = s.split(sep)
        if len(parts) not in (6, 8):
            return s, "wrong_group_count"
        for p in parts:
            p = p.strip()
            if len(p) != 2 or not is_valid_hex(p):
                return s, "bad_octet_hex"
            octets.append(p.lower())

    elif has_dot:
        # Cisco-style: aabb.ccdd.eeff(.gghh)
        parts = s.split(".")
        if len(parts) not in (3, 4):
            return s, "wrong_group_count_dot"
        for grp in parts:
            grp = grp.strip()
            if len(grp) != 4 or not is_valid_hex(grp):
                return s, "bad_group_hex_dot"
            octets.extend([grp[0:2].lower(), grp[2:4].lower()])

    else:
        # No separators: 12 (EUI-48) or 16 (EUI-64) hex chars
        if not is_valid_hex(s):
            return s, "non_hex_chars"
        if len(s) not in (12, 16):
            return s, "wrong_length_no_separators"
        octets = [s[i:i+2].lower() for i in range(0, len(s), 2)]

    if len(octets) not in (6, 8):
        return s, "not_6_or_8_octets"

    canonical = ":".join(octets)
    return canonical, "ok"

def classify_mac(mac: str, validation_label: str) -> str:
    """
    Only returns 'eui48' or 'eui64' (or 'unclassified' if not ok).
    """
    if validation_label != "ok":
        return "unclassified"
    count = len(mac.split(":"))
    if count == 6:
        return "eui48"
    if count == 8:
        return "eui64"
    return "unclassified"

In [23]:
def process_mac(mac: str) -> Dict:
    steps = []
    notes = []
    trimmed_mac = trim_mac_str(mac)
    steps.append("mac_trim")
    trimmed_mac, validation_label = validate_and_label_mac(trimmed_mac)
    if validation_label == "ok":
        steps.append("mac_parse")
        steps.append("mac_normalize")
        classification = classify_mac(trimmed_mac, validation_label)
        steps.append("mac_classify")
        mac_out = trimmed_mac
        mac_valid = "True"
        mac_kind = classification
    else:
        mac_out = str(mac).strip()
        mac_valid = "False"
        mac_kind = ""
    return {
        "mac": mac_out,
        "mac_valid": mac_valid,
        "mac_kind": mac_kind,
        "mac_normalization_steps": "|".join(steps),
        # Add row ID
    }

In [24]:
# Test
for mac in raw_data["mac"]:
    print(f"Working on {mac}")
    print(process_mac(mac))

Working on AA-BB-CC-DD-EE-FF
{'mac': 'aa:bb:cc:dd:ee:ff', 'mac_valid': 'True', 'mac_kind': 'eui48', 'mac_normalization_steps': 'mac_trim|mac_parse|mac_normalize|mac_classify'}
Working on 11-22-33-44-55-66
{'mac': '11:22:33:44:55:66', 'mac_valid': 'True', 'mac_kind': 'eui48', 'mac_normalization_steps': 'mac_trim|mac_parse|mac_normalize|mac_classify'}
Working on aabb.ccdd.eeff
{'mac': 'aa:bb:cc:dd:ee:ff', 'mac_valid': 'True', 'mac_kind': 'eui48', 'mac_normalization_steps': 'mac_trim|mac_parse|mac_normalize|mac_classify'}
Working on 00:11:22:33:44:55
{'mac': '00:11:22:33:44:55', 'mac_valid': 'True', 'mac_kind': 'eui48', 'mac_normalization_steps': 'mac_trim|mac_parse|mac_normalize|mac_classify'}
Working on 00:aa:bb:cc:dd:ee
{'mac': '00:aa:bb:cc:dd:ee', 'mac_valid': 'True', 'mac_kind': 'eui48', 'mac_normalization_steps': 'mac_trim|mac_parse|mac_normalize|mac_classify'}
Working on nan
{'mac': 'nan', 'mac_valid': 'False', 'mac_kind': '', 'mac_normalization_steps': 'mac_trim'}
Working on nan
{

# GPT Client

In [47]:
from openai import OpenAI
from dotenv import load_dotenv
import os
import json

In [50]:
class GPTClient:
    def __init__(self, model="gpt-4o-mini", temperature=0.2):
        # Load environment variables from .env
        load_dotenv()
        api_key = os.getenv("OPENAI_API_KEY")
        if not api_key:
            raise ValueError("OPENAI_API_KEY not found in .env")

        # Initialize the OpenAI client
        self.client = OpenAI(api_key=api_key)
        self.model = model
        self.temperature = temperature

    def generate(self, system_prompt: str, prompt: str) -> str:
        """Send a prompt and return the model's text output."""
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {
                    "role": "system",
                    "content": system_prompt
                },
                {
                    "role": "user",
                    "content": prompt
                }
            ],
            temperature=self.temperature,
        )
        response = response.choices[0].message.content.strip()
        if response.startswith("```"):
            response = response.strip("`").replace("json", "", 1).strip()

        return json.loads(response)


In [51]:
gpt = GPTClient(model="gpt-4o-mini", temperature=0.2)

# Owner parsing

In [61]:
system_prompt = '''
You specialize in network analytics
'''
owner_prompt = '''
Given the following string, I want you to parse it to extract:
- An email address
- A name
- A team name

I want you to return the response to me in a JSON format containing:
- owner (Capitalize if possible, may be possible to obtain from email address too)
- owner_email
- owner_team

I want only the JSON and nothing else

Wherever impossible to do so, return empty strings within the JSON fields

String:
'''

In [62]:
def trim_owner_str(owner: str) -> str:
    try:
        return str(owner).strip()
    except Exception:
        return ""

In [63]:
# Test
for owner in raw_data["owner"]:
    print(f"Working on {owner}")
    owner_prompt_augmented = owner_prompt + trim_owner_str(owner)
    response_json = gpt.generate(system_prompt, owner_prompt_augmented)
    print(response_json)

Working on priya (platform) priya@corp.example.com
{'owner': 'Priya', 'owner_email': 'priya@corp.example.com', 'owner_team': 'platform'}
Working on ops
{'owner': '', 'owner_email': '', 'owner_team': ''}
Working on jane@corp.example.com
{'owner': 'Jane', 'owner_email': 'jane@corp.example.com', 'owner_team': ''}
Working on Facilities
{'owner': '', 'owner_email': '', 'owner_team': 'Facilities'}
Working on sec
{'owner': '', 'owner_email': '', 'owner_team': ''}
Working on nan
{'owner': '', 'owner_email': '', 'owner_team': ''}
Working on nan
{'owner': '', 'owner_email': '', 'owner_team': ''}
Working on platform
{'owner': '', 'owner_email': '', 'owner_team': ''}
Working on nan
{'owner': '', 'owner_email': '', 'owner_team': ''}
Working on nan
{'owner': '', 'owner_email': '', 'owner_team': ''}
Working on nan
{'owner': '', 'owner_email': '', 'owner_team': ''}
Working on nan
{'owner': '', 'owner_email': '', 'owner_team': ''}
Working on nan
{'owner': '', 'owner_email': '', 'owner_team': ''}
Workin

# Device type parsing

In [64]:
def trim_device_type_str(device_type: str) -> str:
    try:
        return str(device_type).strip()
    except Exception:
        return ""

In [74]:
system_prompt = '''
You specialize in network analytics
'''
device_prompt = '''
Given the following string, I want you to parse it to extract:
- Device Type (based on Hostname and Device Type and Notes)
- Confidence score (low, high, mid) based on your classification, be very critical of this

I want you to return the response to me in a JSON format containing:
- device_type
- device_type_confidence

I want only the JSON and nothing else

Wherever impossible to do so, return empty strings within the JSON fields

String:
'''

In [75]:
# Test
for hostname, device, notes in zip(raw_data["hostname"], raw_data["device_type"], raw_data["notes"]):
    print(f"Working on {hostname} and {device}")
    device_prompt_augmented = device_prompt + f"Hostname: {hostname} Device Type: {trim_device_type_str(device)} Notes: {notes}"
    response_json = gpt.generate(system_prompt, device_prompt_augmented)
    print(response_json)

Working on HOST01 and server
{'device_type': 'server', 'device_type_confidence': 'high'}
Working on host-02 and nan
{'device_type': 'edge gateway', 'device_type_confidence': 'mid'}
Working on host03 and switch
{'device_type': 'switch', 'device_type_confidence': 'high'}
Working on printer-01 and printer
{'device_type': 'printer', 'device_type_confidence': 'high'}
Working on iot-cam01 and iot
{'device_type': 'camera', 'device_type_confidence': 'high'}
Working on local-test and nan
{'device_type': '', 'device_type_confidence': ''}
Working on host-apipa and nan
{'device_type': '', 'device_type_confidence': ''}
Working on srv-10 and server
{'device_type': 'server', 'device_type_confidence': 'high'}
Working on badhost and nan
{'device_type': '', 'device_type_confidence': ''}
Working on neg and nan
{'device_type': '', 'device_type_confidence': ''}
Working on bcast and nan
{'device_type': 'broadcast', 'device_type_confidence': 'mid'}
Working on netid and nan
{'device_type': '', 'device_type_co

# Site normalization

In [76]:
import re

In [83]:
def normalize_site_name(name: str) -> str:
    if not name or not isinstance(name, str):
        return ""

    # Mapping of common abbreviations to full forms
    replacements = {
        r"\bBldg\b": "Building",
        r"\bBLR\b": "Bangalore",
        r"\bDC\b": "Datacenter",
        r"\bHQ\b": "Headquarters",
        r"\bLab\b": "Laboratory",
        r"\bCampus\b": "Campus",  # keep capitalization consistent
    }

    s = name.strip()

    # Apply replacements (case-insensitive)
    for pattern, full in replacements.items():
        s = re.sub(pattern, full, s, flags=re.IGNORECASE)

    # Replace spaces/underscores with hyphens
    s = re.sub(r"[ _]+", "-", s)

    # Remove duplicate hyphens
    s = re.sub(r"-{2,}", "-", s)

    # Normalize capitalization (title case or upper depending on your style)
    s = s.title()

    return s

In [84]:
for site in raw_data["site"]:
    print(f"Working on {site}")
    print(normalize_site_name(site))

Working on BLR Campus
Bangalore-Campus
Working on HQ Bldg 1
Headquarters-Building-1
Working on HQ-BUILDING-1
Headquarters-Building-1
Working on HQ
Headquarters
Working on Lab-1
Laboratory-1
Working on nan

Working on nan

Working on BLR campus
Bangalore-Campus
Working on nan

Working on nan

Working on nan

Working on nan

Working on DC-1
Datacenter-1
Working on nan

Working on nan



# Hostname and FQDN