# Rolling the DICE: 
## A Device Identification and Classification Engine to detect vulnerable devices facing the Internet

This repository contains an example implementation of DICE's classifier and identifier modules for 8 common protocols in IoT and OT.
As proof of work, this code does not represent the final product, and only provides an understanding of how scanning records, identifiers and classifiers work together.
The content of this repository was submited to the Network Traffic Measurement and Analysis (TMA) conference to support a poster with the same title.

In [None]:
from abc import ABC
from datetime import datetime, timezone
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from packaging.version import parse
from dataclasses import dataclass
from pathlib import Path

import numpy as np
import pyarrow.parquet as pq
import pandas as pd
import pyarrow as pa

import os
import base64
import struct
import re
import itertools
import duckdb

DICE's structure is divided into three components: scanner, classifier, and identifier.
Each component is composed of one or more (sometimes) linked modules.
This notebook includes example implementations to identifier and classifier modules.

In [None]:
class Classifier(ABC):
    def label(self, row: pd.Series) -> list[str]: ...
    
class Identifier(ABC):
    def fingerprint(self, row: pd.Series) -> pd.Series: ...

class Signature(ABC):
    def classify(self, id, tag): ...

This minimal example exists to show how classifiers can be linked together to work with common data.
Here, DICE only implements one command, `classify`, which takes a dataset and a singature (a set of identifiers and classifiers).
The signature acts on each of the records as if they were `scan` events, fingerprints them, and assign classification labels.
In this example there will be only one identifier per signature.

The implementation we are aiming to cover should match the following command:

```bash
dice classify --signatures * -f zgrab2_* -s zgrab2
```

This command makes DICE classify previously gathered records using ZGrab2 `-f zgrab2_* -s zgrab2`. Then, `--signatures *` represents a blob, where we are trying to load all existing signatures.

In [None]:
class DICE:
    def classify(df: pd.DataFrame, fp: Identifier, clss: list[Classifier]) -> pd.DataFrame:
        df_post = df.apply(fp.fingerprint, axis=1)
        df_post["labels"] = df_post.apply(lambda row: list(itertools.chain(*[cls.label(row) for cls in clss])), axis=1)
        return df_post

In [None]:
from typing import Protocol
class DatasetLoader(Protocol):
    def __call__(self, scan_id: str, protocol: str) -> pd.DataFrame: ...

class DatasetStore:
    root_path: str = "data"

    def normalize_df(self, df:pd.DataFrame, protocol:str) -> pd.DataFrame:
        def explode(data: dict) -> pd.Series:
            proto_data = data.get(protocol)
            cols = ["error", "protocol", "result", "status", "timestamp"]
            return pd.Series({key:proto_data.get(key, None) for key in cols})
        
        ndf = pd.DataFrame(df["data"].apply(explode))
        norm = (
            pd.concat([df,ndf], axis=1)
            .drop(columns=["data"])
            .reset_index(drop=True)
        )
        norm['timestamp'] = pd.to_datetime(norm['timestamp'])
        return norm

    def to_parquet(self, scan_id: str, protocol: str, schema: pa.Schema, dfs, prefix: str = None, separator: str = '_'):
        # filepath and filename
        fpath = os.path.join(self.root_path, scan_id, f'protocol={protocol}')
        fname = separator.join(filter(None, [prefix, protocol]))

        # Store dfs to parquet
        # NOTE: the writer truncates the file!
        with pq.ParquetWriter(os.path.join(fpath, f'{fname}.parquet'), schema) as writer:
            for df in dfs:
                df = df.reindex(columns=schema.names)
                table = pa.Table.from_pandas(df, schema=schema)
                writer.write_table(table=table)

    def read_parquet(self, scan_id: str, protocol: str) -> pd.DataFrame:
        fpath = os.path.join(self.root_path, scan_id, f'protocol={protocol}', f'{protocol}.parquet')
        df = pd.read_parquet(fpath, engine='pyarrow')
        ndf = (
            pd.json_normalize(df["result"])
            .rename(columns=lambda x: x.removeprefix("result."))
        )
        df = (
            pd.concat([df, ndf], axis=1)
            .drop(columns=["result"])
            .reset_index(drop=True)
        )

        # Remove duplicated rows based on the IP and status
        df = df.drop_duplicates(subset=["ip", "status"])
        return df

class Results:
    id = ""
    protocol = ""
    df = None

    def __init__(self, id: str, protocol: str, cls_df: pd.DataFrame):
        self.id = id
        self.protocol = protocol
        self.df = cls_df

    def summary(self) -> dict:
        return {
            'id': self.id,
            'protocol': self.protocol,
            'flagged': len(self.df.index),
            'classified': len(self.df[self.df.labels.astype(bool)].index)
        }
    
    def labels(self) -> pd.DataFrame:
        self.df["labels"] = self.df.labels.apply(set)
        exploded_df = self.df.explode('labels')
        return exploded_df["labels"].value_counts()
    
    def to_parquet(self, prefix: str = "dice", schema: pa.Schema | None = None, mode: str = 'labelled'):
        df = self.df

        if mode == "labelled":
            df = df[df.labels.astype(bool)]
            
        if not schema:
            schema = pa.Schema.from_pandas(self.df)

        ds.to_parquet(
            scan_id=self.id,
            protocol=self.protocol,
            schema=schema,
            dfs=[self.df],
            prefix=prefix,
        )

class Signature:
    idf = None
    classifiers = []
    filter = None
    loader = None

    def set_classifiers(self, clss: Classifier | list[Classifier]):
        if isinstance(clss, list):
            self.classifiers = clss
        else:
            self.classifiers = [clss]
        return self
        

    def set_identifier(self, idf: Identifier):
        self.idf = idf
        return self

    def set_filter(self, f):
        self.filter = f
        return self
    
    def set_loader(self, loader):
        self.loader = loader
        return self
    
    def classify(self, id: str, protocol: str) -> Results:
        df = self.loader(scan_id=id, protocol=protocol)
        df = self.filter(df)

        return Results(
            id=id, 
            protocol=protocol,
            cls_df=DICE.classify(df, self.idf, self.classifiers)
        )

ds = DatasetStore()

def classify_records(
        id: str, 
        protocol: str,
        filter: callable,
        fp: Identifier, 
        clss: list[Classifier],
        loader: DatasetLoader = ds.read_parquet
    ) -> 'Results':

    signature = Signature() \
        .set_filter(filter) \
        .set_identifier(fp) \
        .set_classifiers(clss) \
        .set_loader(loader)
    
    return signature.classify(id, protocol)

## Modules

In [None]:
# Helper
def parse_certificate(certificate: str) -> dict:
    "parses a X509 certificate"

    header =  "-----BEGIN CERTIFICATE-----"
    if not certificate.startswith(header):
        certificate = "%s\n%s" % (header, certificate)

    footer = "-----END CERTIFICATE-----"
    if (not certificate.endswith(footer)) or (not certificate.endswith(f"{footer}\n")):
        certificate = "%s\n%s\n" % (certificate, footer)

    b = certificate.encode()
    try:
        cert = x509.load_pem_x509_certificate(b)

        pkey = None
        err = None
        try:
            pkey = cert.public_key()
        except Exception as e:
            err = e

        not_before = cert.not_valid_before_utc.replace(tzinfo=timezone.utc)
        not_after = cert.not_valid_after_utc.replace(tzinfo=timezone.utc)

        return dict(
            raw=certificate,
            subject=cert.subject,
            issuer=cert.issuer,
            pkey=pkey,
            signature_algorithm=cert.signature_algorithm_oid._name,
            not_before=not_before,
            not_after=not_after,
            error=err
        )
    except Exception as e:
        # Ignore, bad certificate
        return {
            'raw': certificate,
            'error': e
        }

### Certificates

In [None]:
class CertificateClassifier(Classifier):
    '''validate certificates'''
    seen_certs = []
    raw_keys: list[bytes] = []

    labels = dict(
        # Validity
        expired="certificate:validity:expired",
        negative="certificate:validity:negative",
        long="certificate:validity:long-lasting",
        future="certificate:validity:future",
        # Encryption
        weak_hash="certificate:hash:weak",
        weak_encryption="certificate:encryption:weak",
        # Cert
        reused="certificate:reused",
        # Public Key
        short_key="certificate:key:short",
        reused_key="certificate:key:reused"
    )
    
    def eval_times(self, cert, timestamp: datetime) -> list[str]:
        labs = []

        not_before = cert.get("not_before")
        not_after = cert.get("not_after")

        if not (not_before and not_after):
            return labs 

        if (not_before > timestamp):
            labs.append(self.labels.get("future"))

        if (not_after < timestamp):
            labs.append(self.labels.get("expired"))

        if ((not_after - not_before) <= pd.Timedelta(days=0)):
            labs.append(self.labels.get("negative"))

        if (not_after - not_before) > pd.Timedelta(days=20*365.25):
            labs.append(self.labels.get("long"))
        return labs
    
    def eval_crypto(self, cert) -> list[str]:
        labs=[]

        # signature, e.g., sha256WithRSAEncryption
        # includes the hash function and the encryption algorithm names
        signature: str = cert.get("signature_algorithm", "")
        if not signature:
            return labs

        separator = "With" if "With" in signature else "-with-"
        hash_func = signature.split(separator)[0]

        # Check hash function
        # SHA-1: deprecated in 2011
        if hash_func in ["md5", "sha1", "dsa"]:
            labs.append(self.labels.get("weak_hash"))

        return labs
    
    def eval_key(self, cert) -> list[str]:
        labs = []
        key = cert.get("pkey")
        if pd.isna(key):
            return labs

        if key.key_size < 2048:
            labs.append(self.labels.get("short_key"))

        raw_key = key.public_bytes(
            encoding=serialization.Encoding.DER,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        if raw_key in self.raw_keys:
            labs.append(self.labels.get("reused_key"))
        self.raw_keys.append(raw_key)
        return labs
    
    def eval_certificate(self, cert, timestamp) -> list[str]:
        labs = []
        if pd.isna(cert) or not cert.get("raw"):
            return labs

        if l := self.eval_times(cert, timestamp):
            labs.extend(l)

        if l := self.eval_crypto(cert):
            labs.extend(l)

        if l := self.eval_key(cert):
            labs.extend(l)

        if (raw:=cert.get("raw")) in self.seen_certs:
            labs.append(self.labels.get("reused"))
        else:
            self.seen_certs.append(raw)

        return labs

    def label(self, row: pd.Series) -> list[str]:
        cert = row["fingerprint"]["certificate"]
        timestamp = row["timestamp"]
        return self.eval_certificate(cert, timestamp)

### BacNet

In [None]:
class BacNetFingerprinter(Identifier):
    def fingerprint(self, row: pd.Series) -> pd.Series:
        row["fingerprint"] = {
            "vendor_name": row.get("vendor_name"),
            "model_name": row.get("model_name"),
            "firmware_revision": row.get("firmware_revision")
        }
        return row
    
class BacNetClassifier(Classifier):
    def eval_access(self, row) -> list[str]:
        labs = []
        # Any of the fields would work
        fp = row.get("fingerprint")
        if any(pd.notna(l) for l in fp.values()):
            labs.append("bacnet:access:read")
        return labs
    def label(self, row: pd.Series) -> list[str]:
        labs = []
        if l := self.eval_access(row):
            labs.extend(l)
        return labs

In [None]:
def bacnet_filter(df): return df[df.is_bacnet.astype(bool)]
bacnet_ds = classify_records(
    'tma-2025', 
    'bacnet',
    bacnet_filter,
    BacNetFingerprinter(),
    BacNetClassifier()
)

### CoAP

In [None]:
class CoAPIdentifier(Identifier):
    def get_str_rep(self, path):
        msgs = path.get("messages", [])
        if len(msgs) == 0:
            return
        
        payload = msgs[0].get("payload")
        if not payload:
            return
        return payload.get("StringRep", "")

    def get_capabilities(self, path):
        caps = []
        if rep := self.get_str_rep(path):
            matches = re.findall(r'<([^>]+)>(?:;([^=]+)="([^"]*)")*', rep)
            for match in matches:
                resource_path = match[0]
                # NOTE: This one does not work, attributes are empty in all cases
                # TODO FIXME: transform the attributes to an object, and put it as a string
                attributes = re.findall(r'([^=]+)="([^"]*)"', ";".join(match[1:]))
                caps.append([("path", resource_path), ("attributes", attributes)])
        return caps
    
    def get_home_payload(self, path):
        return self.get_str_rep(path)

    def fingerprint(self, row: pd.Series) -> pd.Series:
        endpoints = row.get("result", [])
        fp = {}

        # in reality we only have 2 endpoints, the core and the home,
        # the core goes first, and then the home.
        for ep in endpoints:
            match ep.get("path"):
                case ".well-known/core":
                    fp["capabilities"] = self.get_capabilities(ep)
                case "/":
                    fp["payload"] = self.get_home_payload(ep)
        row['fingerprint'] = fp
        return row
    
class CoAPClassifier:

    def eval_payload_libcoap(self, p):
        year = re.search(r"\d+--(\d+)", p)
        if not year:
            return
        
        # lower than v4.3 is vulnerable
        # https://nvd.nist.gov/vuln/search/results?form_type=Basic&results_type=overview&query=libcoap&search_type=all&isCpeNameSearch=true
        # cpe:2.3:a:libcoap:libcoap:*:*:*:*:*:*:*:*
        if int((y := year.group(1))) < 2023:
            return f"coap:product:libcoap:{y}"

    def eval_payload_californium(self, p):
        # Anything below 3.13.0 is not supported, released in 2024
        copyright = re.search(r"\(c\)(.*?)Institute for Pervasive Computing", p)
        if not copyright:
            return
        
        copyright = copyright.group(1)
        numbers = list(map(int, re.findall(r"\d+", copyright)))
        if not any(num >= 2023 for num in numbers):
            return "coap:product:eclipse-californium:outdated"

    def eval_payload(self, payload: str):
        labs = []
        if pd.isna(payload):
            return labs
        
        match payload:
            case p if "libcoap" in payload:
                if l := self.eval_payload_libcoap(p):
                    labs.append(l)

            case p if "Eclipse Californium" in payload:
                if l := self.eval_payload_californium(p):
                    labs.append(l)
        return labs
            
    def eval_capabilities(self, caps: list):
        labs = []
        if (isinstance(caps, np.ndarray) and caps.size > 0) or (isinstance(caps, list) and len(caps)):
            labs.append('coap:access:read')
        return labs

    def eval_fingerprint(self, fingerprint: dict):
        labs = []

        if l := self.eval_capabilities(fingerprint.get("capabilities")):
            labs.extend(l)

        if l := self.eval_payload(fingerprint.get("payload")):
            labs.extend(l)
        return labs

    def label(self, row: pd.Series) -> list[str]:
        labs = []
        fp = row.get('fingerprint')
        if l := self.eval_fingerprint(fp):
            labs.extend(l)
        return labs

In [None]:
def coap_loader(scan_id, protocol):
    fpath = os.path.join('data', scan_id ,f'protocol={protocol}', f'{protocol}.parquet')
    df = pd.read_parquet(fpath, engine='pyarrow')
    df = df.drop_duplicates(subset=["ip", "status"])
    return df

def coap_filter(df): return df[df.result.notna()]
coap_ds = classify_records(
    "tma-2025",
    "coap",
    coap_filter,
    CoAPIdentifier(),
    CoAPClassifier(),
    coap_loader
)

### DNP3

In [None]:
class DNP3Fingerprinter(Identifier):
    def map_function_code(self, primary, fcode):
        if primary == 1:
            match fcode:
                case 0:
                    return "RESET_LINK_STATES"
                case 4:
                    return "UNCONFIRMED_USER_DATA"
                case 9:
                    return "REQUEST_LINK_STATUS"
        if primary == 0:
            match fcode:
                case 0:
                    return "ACK"
                case 11:
                    return "LINK_STATUS"
                case 15:
                    return "NOT_SUPPORTED"
        return "UNKNOWN"
    
    def parse_data_link_layer(self, buffer: bytes) -> dict | None:
        # Error: payload too small to contain a valid DNP3 frame
        if len(buffer) < 10:
            return
        
        # Decode data link layer (little-endian)
        start, length, control, destination, source, crc = struct.unpack("<HBBHHH", buffer[:10])

        # Error: invalid start of frame marker
        if start != 0x6405:
            return
        
        control_header= dict(
            direction=(control >> 7) & 0x01, # Bit 7
            primary=(control >> 6) & 0x01, # Bit 6
            fcb_fcv_or_dfc = (control >> 4) & 0x03, # Bits 5-4
            function_code = control & 0x0F, # Bits 3-0
        )

        return dict(
            start=hex(start),
            length=length,
            control=control_header,
            destination=destination,
            source=source,
            crc=hex(crc),
            function_name=self.map_function_code(control_header["primary"], control_header["function_code"]),
        )
    
    def parse_frame(self, buffer: bytes) -> tuple[dict, bytes]:
        HEADER_LENGTH: int = 5
        HEADER_START: int = 10

        frame={}
        dlink_layer = self.parse_data_link_layer(buffer)
        if not dlink_layer:
            return None, None
        
        frame["data_link_layer"] = dlink_layer
        frame_size = dlink_layer["length"] + HEADER_LENGTH
        frame_data = buffer[:frame_size]
        
        if dlink_layer["length"] > HEADER_LENGTH and len(frame_data) > HEADER_START:
            HEADER_START += 1
            frame["transport_layer"] = dict(
                transport_control=bin(frame_data[HEADER_START]),
            )

        app_layer=frame_data[HEADER_START:]
        frame["application_layer"] = app_layer

        return (frame, buffer[frame_size:])

    def parse_response_frames(self, response: str) -> list[dict]:
        frames = []
        if not response:
            return frames

        # A DNP3 response comes encoded as base64, so the first thing is to convert it to a byte string
        buffer = base64.b64decode(response)
        while buffer:
            frame, buffer = self.parse_frame(buffer)
            if not frame:
                break
            
            frames.append(frame)
        return frames
    
    def fingerprint(self, row: pd.Series) -> pd.Series:
        frames = self.parse_response_frames(row.raw_response)
        row['fingerprint'] = {"frames":frames}
        return row
    
class DNP3Classifier(Classifier):
    def is_unexpected(self, frames: list) -> bool:
        '''
        Some responses may contian only frames with request link status codes
        This is an unexpected behavior.
        '''
        def fn_req_link(frame):
            dlink = frame.get("data_link_layer")
            if dlink and dlink.get("function_name") == "REQUEST_LINK_STATUS":
                return True

        reqs = filter(fn_req_link, frames)
        return len(list(reqs)) > 0

    def label(self, row: pd.Series) -> list[str]:
        labs = []
        fp = row.get("fingerprint")
        frames = fp.get("frames", [])
        if not frames:
            return labs

        # Check is an echo response
        if self.is_unexpected(frames):
            return labs
        labs.append("dnp3:access:read")
        return labs

In [None]:
def dnp3_filter(df): 
    return df[df.is_dnp3.astype(bool) & df.raw_response.notna()]

dnp_ds = classify_records(
    f'tma-2025',
    'dnp3',
    dnp3_filter,
    DNP3Fingerprinter(), 
    DNP3Classifier(),
)

### Modbus

In [None]:
class ModbusIdentifier(Identifier):
    def get_object(self, objects, key):
        if not objects or objects is np.nan:
            return None
        
        for k, v in objects:
            if k == key: return str(v).strip() if v else None

    def fingerprint(self, row: pd.Series) -> pd.Series:
        objs = row.get("mei_response.objects", [])
        row["fingerprint"] = dict(
            vendor=self.get_object(objs, "vendor"),
            product_code=self.get_object(objs, "product_code"),
            version=self.get_object(objs, "revision")
        )
        return row
    
class ModbusClassifier(Classifier):
    def label(self, row: pd.Series):
        labs = []
        if row["fingerprint"]["vendor"]:
            labs.extend(['modbus:access:anonymous'])
        return labs

In [None]:
def modbus_filter(df):
    return df[(df.status == 'success') & (df.mei_response.astype(bool))]

modbus_ds = classify_records(
    f'tma-2025',
    'modbus',
    modbus_filter,
    ModbusIdentifier(), 
    ModbusClassifier(),
)

### MQTT

In [None]:
class MQTTIdentifier(Identifier):
    def find_hub_info(self, topics):
        name, version = (None, None)
        for topic, values in topics:
            if name and version:
                return (name, version)
            
            val = values[0]
            match topic:
                case t if topic.endswith('/sysdescr'):
                    name = val
                    continue

                case t if topic.endswith('/version'):
                    version = val
        return (name, version)

    def get_broker(self, row: pd.Series) -> dict:
        # NOTE: Homeassistants and bridges are not brokers, these are separate
        # devices using the broker to communicate. Do not confuse them
        broker = {
            "broker": None,
            "version": None,
            "cpe": None,
            "topics": row.get("topics", []),
        }

        if not broker['topics']:
            return broker
        
        for topic, values in (topics := broker['topics']):
            match topic:
                # hub
                case "$SYS/brokers":
                    broker["broker"], broker["version"] = self.find_hub_info(topics)
                    return broker
                # case-specific
                case s if s.startswith("$SYS/VerneMQ"):
                    broker["broker"] = "VerneMQ"
                    broker["cpe"] = "cpe:2.3:a:octavolabs:vernemq:*:*:*:*:*:*:*:*"
                    return broker
                case s if s.startswith(("$SYS/ActiveMQ", "ActiveMQ/")):
                    broker["broker"] = "ActiveMQ"
                    broker["cpe"] = "cpe:2.3:a:apache:activemq:*:*:*:*:*:*:*:*"
                    return broker
                
                # regular mqtt broker
                case "$SYS/broker/version":
                    version = values[0] if len(values) else ""
                    # mosquitto
                    if "mosquitto" in version:
                        broker["broker"] = "mosquitto"
                        broker["version"] = parse(version.split("mosquitto version")[1])
                        broker["cpe"] = "cpe:2.3:a:eclipse:mosquitto:*:*:*:*:*:*:*:*"
                        return broker

                    try:
                        # unknown, we only know the version
                        b = version.split("version")
                        version = parse(b[-1])
                        broker["broker"] = b[0]
                    except Exception:
                        broker["broker"] = version
                        broker["version"] = None

                    return broker
                
        return broker
    
    def get_fingerprint(self, row):
        cert = None
        if (c := row.get("certificate", None)) is not None:
            cert = parse_certificate(c[0])

        fp = self.get_broker(row)
        fp.update({'certificate': cert})
        return fp

    def fingerprint(self, row: pd.Series) -> pd.Series:
        row["fingerprint"] = self.get_fingerprint(row)
        return row
    
class MQTTClassifier(Classifier):
    def label(self, row: pd.Series) -> list[str]:
        labs = []
        match row.get("scheme"):
            case "tcp":
                labs.append("mqtt:access:anonymous")
            case "ssl":
                labs.append("mqtt:access:self-signed-certificate")

        fp = row.get("fingerprint")
        topics = fp.get("topics", [])
        if topics:
            labs.append("mqtt:access:read") 

        for topic, _ in fp.get("topics", []):
            if topic.startswith("$SYS/"):
                labs.append("mqtt:access:read-internal")
                break
        return labs  

In [None]:
def mqtt_filter_topics(df): return df[df.topics.astype(bool)]
mqtt_ds = classify_records(
    'tma-2025',
    'mqtt',
    mqtt_filter_topics,
    MQTTIdentifier(),
    [MQTTClassifier(), CertificateClassifier()]
)

### OPC UA

In [None]:
class OPCUAIdentifier(Identifier):
    def parse_security_policy_URI(self, uri, default="None") -> str:
        if len(sp := uri.split("SecurityPolicy#")) > 1:
            return sp[-1]
        return default

    def get_security_policies(self, endpoint) -> list[tuple[str, str]]:
        ttype = {
            0: "ANONYMOUS_0",
            1: "USERNAME_1",
            2: "CERTIFICATE_2",
            3: "ISSUEDTOKEN_3",
        }

        default_policy = self.parse_security_policy_URI(endpoint["SecurityPolicyURI"])

        # Some descriptors do not have identity tokens,
        # this value is stored as None and it exists in the object
        utokens = endpoint.get("UserIdentityTokens")
        if utokens is None:
            return []

        policies = []
        for utoken in utokens:
            spolicy = self.parse_security_policy_URI(utoken["SecurityPolicyURI"], default_policy)
            ttoken = ttype.get(utoken["TokenType"], None)
            policies.append((ttoken, spolicy))
        return policies
    
    def get_system_info(self, nodes) -> dict:
        build_nodes = list(filter(lambda x: x["browse_name"] in ["ServerStatus", "BuildInfo"], nodes))
        for node in build_nodes:
            match node["browse_name"]:
                case "ServerStatus":
                    if v := node.get("value"):
                        return v["BuildInfo"]
                case "BuildInfo":
                    if v := node.get("value"):
                        return v
                
        # Otherwise try to figure out the manufacturer and other info
        info_names = [
            "ProductURI", 
            "ManufacturerName", 
            "ProductName", 
            "SoftwareVersion", 
            "BuildNumber", 
            "BuildDate",
            "SerialNumber",
            # alternatives
            "Model",
            "SoftwareRevision",
            "DeviceRevision",
            "HardwareRevision"
        ]
        sys_info = {}
        for node in nodes:
            if n := node["browse_name"] in info_names:
                sys_info[n] = node.get("value")
        return sys_info

    def fingerprint(self, row: pd.Series) -> pd.Series:
        ends = []
        nodes = n if pd.notna(n := row.get("nodes")) else {}
        if np.all(pd.notnull(eps := row.get("endpoints", []))):
            for endpoint in eps:
                ends.append({
                    "endpoint": endpoint,
                    "info": self.get_system_info(nodes.get(endpoint.get("EndpointURL"), [])),
                    "policies": self.get_security_policies(endpoint),
                    "mode": endpoint.get("SecurityMode"),
                    "certificate": parse_certificate(cert) if pd.notna(cert := endpoint.get("ServerCertificate")) else None
                })

        row["fingerprint"] = dict(
            endpoints=ends
        )
        return row

class OPCUAClassifier(Classifier):
    def eval_policy(self, policies, mode):
        labs = []
        # Anonymous + None = no access control
        if ("ANONYMOUS_0", "None") in policies:
            labs.append("opcua:access:anonymous")

        # broken signing and encryptions
        atuhs = set([auth for _, auth in policies])
        if set(["Basic256", "Basic128Rsa15"]).intersection(atuhs):
            labs.append("opcua:access:weak-policies")

        if mode in ("None", "Sign"):
            labs.append("opcua:access:weak-mode")

        return labs
                
    def label(self, row: pd.Series) -> list[str]:
        labs = []
        fp = row.get("fingerprint")
        for ep in fp.get("endpoints", []):
            if l := self.eval_policy(ep["policies"], ep["mode"]):
                labs.extend(l)
        return labs
    
class OPCUACertificateClassifier():
    def __init__(self):
        self.cls = CertificateClassifier()

    def label(self, row):
        labs = []
        eps = row["fingerprint"]["endpoints"]
        ts = row["timestamp"]
        for ep in eps:
            if l := self.cls.eval_certificate(ep["certificate"], ts):
                labs.extend(l)
        return labs

In [None]:
def s2_opcua_flatten(df: pd.DataFrame) -> pd.DataFrame:
    opcua_cols = ["is_opcua", "endpoints", "discovery_servers", "nodes", "endpoint_sec_used", "rdns", "endpoint_cert_test"]

    ndf = (
        pd.json_normalize(df["result"], max_level=0)
        .rename(columns=lambda x: x.removeprefix("result."))
    )
    ndf.drop(ndf.columns.difference(opcua_cols), axis=1, inplace=True)
    df = (
        pd.concat([df, ndf], axis=1)
        .drop(columns=["result"])
        .reset_index(drop=True)
    )
    df["port"] = 4840
    return df

def opcua_tma_loader(scan_id: str, protocol: str) -> pd.DataFrame:
    with open(f'data/{scan_id}/protocol={protocol}/{protocol}.jsonl', 'rb') as f:
        df = pd.read_json(f, lines=True, encoding='utf-8',)
        df = ds.normalize_df(df, protocol)
        df = df.drop_duplicates(subset=["ip", "status"])
        df = s2_opcua_flatten(df)
        return df

def opcua_filter(df): return df[df.is_opcua.astype(bool)]
s2_opcua = classify_records(
    'tma-2025', 
    'opcua',
    opcua_filter,
    OPCUAIdentifier(),
    [OPCUAClassifier(), OPCUACertificateClassifier()],
    opcua_tma_loader
)

### RTPS DDS

In [None]:
import struct

@dataclass
class Vendor:
    v: str
    product: str
    company: str

    def to_dict(self):
        return {
            "vendor": self.v,
            "product": self.product,
            "company": self.company
        }

class RTPSIdentifier(Identifier):
    vendors = [
        Vendor("01", "RTI Connext DDS", "Real-Time Innovations, Inc. (RTI)"),
        Vendor("02", "OpenSplice DDS", "ADLink Ltd."),
        Vendor("03", "OpenDDS", "Object Computing Inc. (OCI)"),
        Vendor("04", "Mil-DDS", "MilSoft"),
        Vendor("05", "InterCOM DDS", "Kongsberg"),
        Vendor("06", "CoreDX DDS", "Twin Oaks Computing"),
        Vendor("07", "Not Active", "Lakota Technical Solutions, Inc."),
        Vendor("08", "Not Active", "ICOUP Consulting"),
        Vendor("09", "Diamond DDS", "Electronic and Telecommunication Research Institute (ETRI)"),
        Vendor("0a", "RTI Connext DDS Micro", "Real-Time Innovations, Inc. (RTI)"),
        Vendor("0b", "Vortex Cafe", "ADLink Ltd."),
        Vendor("0c", "Not Active", "PrismTech Ltd."),
        Vendor("0d", "Vortex Lite", "ADLink Ltd."),
        Vendor("0e", "Qeo (Not Active)", "Technicolor"),
        Vendor("0f", "FastRTPS, FastDDS", "eProsima"),
        Vendor("10", "Eclipse Cyclone DDS", "EclipseFondation"),
        Vendor("11", "GurumDDS", "Gurum Networks, Inc."),
        Vendor("12", "RustDDS", "Atostek"),
        Vendor("13", "Zhenrong Data Distribution Service (ZRDDS)", "Nanjing Zhenrong Software Technology Co."),
        Vendor("14", "Dust DDS", "S2E Software System B.V.")
    ]

    def get_vendor(self, vendor):
        for ven in self.vendors:
            if ven.v == vendor:
                return ven.to_dict()
    def parse_response(self, response: str) -> dict:
        data = base64.b64decode(response)
        if len(data) < 20:
            return {}
        
        magic, version, vendor, host, app, instance = struct.unpack('!4sH2s4s4s4s', data[:20])
        vmajor = version >> 8
        vminor = version & 0xFF
        vendor = format(vendor[1], "02x")

        fp = {
            "magic": magic.decode('utf-8', 'ignore'),
            "version": f"{vmajor}.{vminor}",
            "vendor": vendor,
            "host": host,
            "app": app,
            "instance": instance,
            "product": None,
            "company": None,
        }

        if v := self.get_vendor(vendor):
            fp.update(v)

        return fp

    def fingerprint(self, row):
        row["fingerprint"] = self.parse_response(row.get("raw_response", row.get("raw-message")))
        return row
    
class RTPSClassifier(Classifier):
    def eval_fingerprint(self, fingerprint):
        labs = []
        if fingerprint.get("magic") != "RTPS":
            return

        if fingerprint.get("vendor", None) is not None:
            labs.append("rtps:access:read")
            labs.append("rtps:access:leak")
        return labs

    def label(self, row) -> list[str]:
        labs =[]
        if l := self.eval_fingerprint(row.get("fingerprint")):
            labs.extend(l)
        return labs

In [None]:
def rtps_filter(df): return df[df["raw-message"].notna()]
s2_rtps = classify_records(
    'tma-2025', 
    'rtps',
    rtps_filter,
    RTPSIdentifier(),
    RTPSClassifier(),
)

### XMPP

In [None]:
class XMPPIdentifier:
    certs: pd.DataFrame = None

    AUTH_P = re.compile(r"<auth xmlns=(\'|\"+)(?P<auth>.*?)(\'|\"+)")
    MECHANISMS_P = re.compile(r"<mechanism>(?P<mech>.*?)<\/mechanism>")
    TLS_P = re.compile(r"<starttls xmlns=(\'|\"+)(?P<tls>.*?)(\'|\"+)")

    def __init__(self, certs):
        self.certs = certs

    def get_certificate(self, ip):
        rows = self.certs[self.certs.ip == ip]
        return rows.iloc[0]['certificate'] if len(rows) else None

    def parse_banner(self, banner: str):
        auth = self.AUTH_P.search(banner)
        auth = auth.groupdict().get("auth") if auth else None

        tls = self.TLS_P.search(banner)
        tls = tls.groupdict().get("tls") if tls else None

        mechanisms = self.MECHANISMS_P.findall(banner)

        return dict(
            is_xmpp='jabber:client' in banner,
            banner=banner,
            authentication=auth,
            tls=tls,
            mechanisms=mechanisms,
        )
    
    def fingerprint(self, row: pd.Series) -> pd.Series:
        banner = self.parse_banner(row.get("banner"))
        cert = self.get_certificate(row.get('ip'))
        banner.update({'certificate': cert})
        row["fingerprint"] = banner
        return row
    
class XMPPClassifier:
    def eval_auth(self, fp):
        labs = []

        if l := self.eval_mechanisms(fp.get("mechanisms")):
            labs.extend(l)

        # IQ-auth is deprecated, servers must use SASL instead
        auth = fp.get("authentication")
        if not auth:
            return labs

        if "iq-auth" in auth:
            labs.append("xmpp:stanza:auth:deprecated")
        return labs

    def eval_mechanisms(self, mechanisms: list[str]) -> list[str]:
        if not mechanisms:
            return []

        bad = set(["PLAIN", "DIGEST-MD5","CRAM-MD5", "ANONYMOUS"])
        if bad.intersection(mechanisms):
            return ["xmpp:stanza:mechanisms:deprecated"]

    def label(self, row: pd.Series) -> list[str]:
        labs = []

        fp = row.get("fingerprint")
        if l := self.eval_auth(fp):
            labs.extend(l)

        return labs

In [None]:
def xmpp_filter(df): 
    return df[
        (df.banner.notna()) &
        (df.banner != "")
    ]

# ANONYMIZED: omited date of the scan
xmpp_date = datetime.strptime("XXXX-XX-XX", "%Y-%m-%d").replace(tzinfo=timezone.utc)
xmpp_ds = classify_records(
    'tma-2025', 
    'xmpp',
    xmpp_filter,
    # ANONYMIZED: this function takes a dataframe with two columns: [ip, certificate]
    # This was omited here
    XMPPIdentifier(...), 
    [XMPPClassifier(), CertificateClassifier(xmpp_date)],
)

### Summary

In [None]:
con = duckdb.connect(database=':memory:')

# Execute the query on the parquet files using a glob pattern
labelled_df = con.execute("""
    SELECT 
        protocol, 
        ip,
        labels
    FROM read_parquet('data/tma-2025/*/labelled_*.parquet', filename=true)
    WHERE labels IS NOT NULL AND len(labels) > 0
""").fetchdf()

prepend_if_needed = lambda s, prefix: s if s.startswith(prefix) else prefix + s

labelled_df["id"] = labelled_df["ip"].astype("category").cat.codes
exp_labdf = labelled_df.explode("labels")
exp_labdf["label"] = exp_labdf.apply(lambda x: x["labels"] if x["labels"].startswith(x["protocol"]) else f'{x["protocol"]}:{x["labels"]}', axis=1 )
exp_labdf = exp_labdf.sort_values("label")
exp_labdf[["id", "label"]].to_json("labelled.json", orient='records')