# Monad pipeline
For BP purposes

Norbert Matuška

In [1]:
import pandas as pd
import numpy as np
import base64
from collections import defaultdict
import matplotlib.pyplot as plt
import seaborn as sns
import clickhouse_connect
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
import os
from dotenv import load_dotenv

# Parser

Decodes and parses 802.11 wireless frame headers, extracting details like frame type, MAC addresses, and sequence information. It categorizes management, control, and data frames.

In [13]:
management_subtypes = {
    0:  "Association Request",
    1:  "Association Response",
    2:  "Reassociation Request",
    3:  "Reassociation Response",
    4:  "Probe Request",
    5:  "Probe Response",
    6:  "Timing Advertisement (11v)",
    7:  "Reserved",
    8:  "Beacon",
    9:  "ATIM",
    10: "Disassociation",
    11: "Authentication",
    12: "Deauthentication",
    13: "Action",
    14: "Action No Ack (11e)",
    15: "Reserved"
}

control_subtypes = {
    0:   "Reserved",
    1:   "Reserved",
    2:   "Trigger",
    3:   "TACK",
    4:   "Beamforming Report Poll",
    5:   "VHT/HE NDP Announcement",
    6:   "Reserved",
    7:   "Control Wrapper",
    8:   "Block Ack Request",
    9:   "Block Ack",
    10:  "PS-Poll",
    11:  "RTS",
    12:  "CTS",
    13:  "ACK",
    14:  "CF-End",
    15:  "CF-End + CF-Ack"
}

data_subtypes = {
    0:   "Data",
    1:   "Data + CF-Ack",
    2:   "Data + CF-Poll",
    3:   "Data + CF-Ack + CF-Poll",
    4:   "Null Function (No Data)",
    5:   "CF-Ack (No Data)",
    6:   "CF-Poll (No Data)",
    7:   "CF-Ack + CF-Poll (No Data)",
    8:   "QoS Data",
    9:   "QoS Data + CF-Ack",
    10:  "QoS Data + CF-Poll",
    11:  "QoS Data + CF-Ack + CF-Poll",
    12:  "QoS Null",
    13:  "Reserved",
    14:  "Reserved",
    15:  "Reserved"
}

frame_type_map = {
    0: "Management",
    1: "Control",
    2: "Data",
    3: "Extension"
}

TRANSLATED_HEADER_COLUMNS = [
    "frame_control_raw",
    "protocol_version",
    "frame_type",
    "subtype",

    "to_ds",
    "from_ds",
    "more_frag",
    "retry",
    "power_mgmt",
    "more_data",
    "protected_frame",
    "order_flag",

    "duration_id",

    "destination_mac",
    "source_mac",
    "bssid_mac",
    "address4_mac",

    "sequence_control_raw",
    "fragment_number",
    "sequence_number",

    "qos_control_raw",
    "ht_control_raw",

    "frame_body"
]


def decode_header(header_b64):
    try:
        return base64.b64decode(header_b64)
    except Exception as e:
        print(f"Error decoding header: {e}")
        return None

def parse_probe_request(frame_body):

    pos = 0
    ies = {
        "ssid": None,
        "supported_rates": [],
        "extended_rates": [],
        "vendor_specific": []
    }

    while pos < len(frame_body):
        if pos + 1 >= len(frame_body):
            break

        element_id = frame_body[pos]
        element_len = frame_body[pos + 1]
        pos += 2

        if pos + element_len > len(frame_body):
            break

        element_data = frame_body[pos : pos + element_len]
        pos += element_len

        if element_id == 0:
            ies["ssid"] = element_data.decode("ascii", errors="ignore")
        elif element_id == 1:
            ies["supported_rates"] = _decode_supported_rates(element_data)
        elif element_id == 50:
            ies["extended_rates"] = _decode_supported_rates(element_data)
        elif element_id == 221:
            ies["vendor_specific"].append(element_data)
        else:
            pass

    return ies

def _decode_supported_rates(rate_bytes):

    rates = []
    for r in rate_bytes:
        rate_val = r & 0x7F  # strip off the 'basic rate' bit
        # each unit = 500 kbps => multiply by 0.5 to get Mbps
        rates.append(rate_val * 0.5)
    return rates

def translate_header(header_bytes):
    if not header_bytes or len(header_bytes) < 24:
        return [None] * len(TRANSLATED_HEADER_COLUMNS)

    try:
        # -----------------------------
        # Frame Control
        # -----------------------------
        frame_control_raw = int.from_bytes(header_bytes[0:2], byteorder="little")

        protocol_version =  frame_control_raw & 0b11                # bits 0-1
        frame_type       = (frame_control_raw >> 2) & 0b11          # bits 2-3
        subtype          = (frame_control_raw >> 4) & 0b1111        # bits 4-7

        # Flags
        flags = (frame_control_raw >> 8) & 0xFF

        to_ds           = bool(flags & 0b00000001)  # bit 8
        from_ds         = bool(flags & 0b00000010)  # bit 9
        more_frag       = bool(flags & 0b00000100)  # bit 10
        retry           = bool(flags & 0b00001000)  # bit 11
        power_mgmt      = bool(flags & 0b00010000)  # bit 12
        more_data       = bool(flags & 0b00100000)  # bit 13
        protected_frame = bool(flags & 0b01000000)  # bit 14
        order_flag      = bool(flags & 0b10000000)  # bit 15

        # -----------------------------
        # Type/Subtype Description
        # -----------------------------
        type_description = frame_type_map.get(frame_type, "Reserved")

        if frame_type == 0:
            subtype_description = management_subtypes.get(subtype, "Unknown")
        elif frame_type == 1:
            subtype_description = control_subtypes.get(subtype, "Unknown")
        elif frame_type == 2:
            subtype_description = data_subtypes.get(subtype, "Unknown")
        else:
            # subtype mapping for 802.11n+ is more specialized
            subtype_description = "Extension/Reserved"

        # -----------------------------
        # Duration/ID
        # -----------------------------
        duration_id = int.from_bytes(header_bytes[2:4], byteorder="little")

        # -----------------------------
        # Addresses
        # -----------------------------
        address1 = _format_mac(header_bytes[4:10])
        address2 = _format_mac(header_bytes[10:16])
        address3 = _format_mac(header_bytes[16:22])

        # Sequence Control
        sequence_control_raw = int.from_bytes(header_bytes[22:24], byteorder="little")
        fragment_number = sequence_control_raw & 0x000F        # bits 0-3
        sequence_number = (sequence_control_raw >> 4) & 0x0FFF  # bits 4-15

        offset = 24
        address4 = None

        # if this is a data or QoS data frame with both To DS and From DS set => 4 addresses
        if frame_type == 2 and to_ds and from_ds:
            if len(header_bytes) >= offset + 6:
                address4 = _format_mac(header_bytes[offset:offset+6])
                offset += 6

        # -----------------------------
        # QoS Control
        # -----------------------------
        qos_control_raw = None
        ht_control_raw = None

        # if type is Data and the subtype >= 8 => likely QoS capable
        if frame_type == 2 and subtype >= 8 and subtype <= 15:
            if len(header_bytes) >= offset + 2:
                qos_control_raw = int.from_bytes(header_bytes[offset:offset+2], byteorder="little")
                offset += 2

            if order_flag:
                if len(header_bytes) >= offset + 4:
                    # some references say 4 bytes, others 2, 802.11n says 4. We parse 4 here
                    ht_control_raw = header_bytes[offset:offset+4]
                    offset += 4

        frame_body = header_bytes[offset:]
        
        parsed_body = None
        if type_description == "Management" and subtype_description == "Probe Request":
            parsed_body = parse_probe_request(frame_body)

        return {
            # Protocol/Type/Subtype
            "protocol_version": protocol_version,
            "frame_type":       type_description,
            "subtype":          subtype_description,

            # Flags
            "to_ds":            to_ds,
            "from_ds":          from_ds,
            "more_frag":        more_frag,
            "retry":            retry,
            "power_mgmt":       power_mgmt,
            "more_data":        more_data,
            "protected_frame":  protected_frame,
            "order_flag":       order_flag,

            # Others
            "duration_id":      duration_id,

            # Addresses
            "address1": address1,
            "address2": address2,
            "address3": address3,
            "address4": address4,

            # Sequence
            "fragment_number":  fragment_number,
            "sequence_number":  sequence_number,

            # QoS/HT/Body
            "qos_control_raw":  qos_control_raw,
            "ht_control_raw":   ht_control_raw.hex() if ht_control_raw else None,
            "frame_body":       frame_body.hex()
        }

    except Exception as e:
        print(f"Error translating header: {e}")
        return None

def _format_mac(mac_bytes):
    if len(mac_bytes) < 6:
        return None
    return ":".join(f"{b:02x}" for b in mac_bytes)

Extracts the OUI from MAC addresses and maps them to vendor names based on IEE oui DB.

In [3]:
def process_mac(mac):
    return mac.replace(":", "").upper()[:6]
oui_df = pd.read_csv("oui.csv")
oui_vendor_mapping = dict(zip(oui_df["Assignment"], oui_df["Organization Name"]))


Clickhouse connection

In [15]:
load_dotenv()
password = os.environ.get("CLICKHOUSE_PASSWORD")

gDFtLN2rc8M7VxnTfbPqH6


In [6]:
client = clickhouse_connect.get_client(
    host='localhost',
    port=8123,
    username='monad',
    password='password'
)

NameError: name 'clickhouse_connect' is not defined

In [14]:
source_table = "monadcount.l2pk_v2"
parsed_table = "monadcount.l2pk_v2_struct"

create_sql = f"""
CREATE TABLE IF NOT EXISTS {parsed_table}
(
    id UUID,
    protocol_version UInt8,
    frame_type String,
    subtype String,
    to_ds UInt8,
    from_ds UInt8,
    more_frag UInt8,
    retry UInt8,
    power_mgmt UInt8,
    more_data UInt8,
    protected_frame UInt8,
    order_flag UInt8,

    duration_id UInt16,

    address1 String,
    address2 String,
    address3 String,
    address4 String,

    fragment_number UInt16,
    sequence_number UInt16,

    qos_control_raw UInt16,
    ht_control_raw String,
    frame_body String,

    vendor String
)
ENGINE = ReplacingMergeTree
ORDER BY id
"""

In [15]:
client.command(create_sql)

<clickhouse_connect.driver.summary.QuerySummary at 0x265abe00080>

In [23]:
CHUNK_SIZE = 50_000
OFFSET = 113460000

while True:
    query = f"""
        SELECT id, header
        FROM {source_table}
        LIMIT {CHUNK_SIZE} OFFSET {OFFSET}
    """
    rows = client.query(query).named_results()
    if not rows:
        print("No more rows to process.")
        break

    insert_data = []
    for row in rows:
        row_id = row["id"]
        header_b64 = row["header"]

        decoded = decode_header(header_b64)
        parsed  = translate_header(decoded) if decoded else None
        if not parsed:
            continue

        frame_type_str = parsed["frame_type"]
        from_ds        = parsed["from_ds"]

        # Figure out where the source MAC lives
        if frame_type_str == "Data":
            if from_ds:
                source_mac = parsed["address3"]
            else:
                source_mac = parsed["address2"]
        else:
            source_mac = parsed["address2"]

        oui    = process_mac(source_mac or "")
        vendor = oui_vendor_mapping.get(oui, "Unknown")

        insert_data.append((
            row_id,
            parsed["protocol_version"],
            parsed["frame_type"],
            parsed["subtype"],
            parsed["to_ds"],
            parsed["from_ds"],
            parsed["more_frag"],
            parsed["retry"],
            parsed["power_mgmt"],
            parsed["more_data"],
            parsed["protected_frame"],
            parsed["order_flag"],

            parsed["duration_id"],

            parsed["address1"],
            parsed["address2"],
            parsed["address3"],
            parsed["address4"] if parsed["address4"] else "",

            parsed["fragment_number"],
            parsed["sequence_number"],

            parsed["qos_control_raw"],
            parsed["ht_control_raw"],
            parsed["frame_body"],

            vendor
        ))

    if not insert_data:
        print(f"No valid rows in chunk (offset={OFFSET}).")
        OFFSET += CHUNK_SIZE
        continue

    client.insert(
        parsed_table,
        insert_data,
        column_names=[
            "id",
            "protocol_version",
            "frame_type",
            "subtype",
            "to_ds",
            "from_ds",
            "more_frag",
            "retry",
            "power_mgmt",
            "more_data",
            "protected_frame",
            "order_flag",
            "duration_id",
            "address1",
            "address2",
            "address3",
            "address4",
            "fragment_number",
            "sequence_number",
            "qos_control_raw",
            "ht_control_raw",
            "frame_body",
            "vendor"
        ]
    )

    print(f"Inserted {len(insert_data)} decoded rows (offset={OFFSET}).")
    OFFSET += CHUNK_SIZE

Inserted 50000 decoded rows (offset=113460000).
Inserted 50000 decoded rows (offset=113510000).
Inserted 50000 decoded rows (offset=113560000).
Inserted 50000 decoded rows (offset=113610000).
Inserted 50000 decoded rows (offset=113660000).
Inserted 50000 decoded rows (offset=113710000).
Inserted 50000 decoded rows (offset=113760000).
Inserted 50000 decoded rows (offset=113810000).
Inserted 50000 decoded rows (offset=113860000).
Inserted 50000 decoded rows (offset=113910000).
Inserted 50000 decoded rows (offset=113960000).
Inserted 50000 decoded rows (offset=114010000).
Inserted 50000 decoded rows (offset=114060000).
Inserted 50000 decoded rows (offset=114110000).
Inserted 50000 decoded rows (offset=114160000).
Inserted 50000 decoded rows (offset=114210000).
Inserted 50000 decoded rows (offset=114260000).
Inserted 50000 decoded rows (offset=114310000).
Inserted 50000 decoded rows (offset=114360000).
Inserted 50000 decoded rows (offset=114410000).
Inserted 50000 decoded rows (offset=1144

Unexpected Http Driver Exception


OperationalError: Error HTTPConnectionPool(host='localhost', port=8123): Max retries exceeded with url: /?date_time_input_format=best_effort&session_id=d28b7853-b6b8-45bf-80c4-deb508b1ab8a&client_protocol_version=54405&enable_http_compression=1&wait_end_of_query=1&send_progress_in_http_headers=1&http_headers_progress_interval_ms=120000 (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x00000265D01F4EC0>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it')) executing HTTP request attempt 2 (http://localhost:8123)

Processes a large CSV file in chunks, decoding and translating headers, then combines the results into a final DataFrame for analysis.

In [None]:
data = []
translated_headers = []
for chunk in pd.read_csv("D:/monadcount_l2pk_v2_10000.csv", chunksize=100000):
    chunk["decoded_headers"] = chunk["header"].apply(decode_header)
    chunk["translated_header"] = chunk["decoded_headers"].apply(translate_header)

    data.append(chunk)

data_df = pd.concat(data, ignore_index=True)
data_df

# Analysis

Processes timestamp data to generate a heatmap of event occurrences by date and hour to check data continuity

In [19]:
def minute_to_hhmm(m):
    hh = m // 60
    mm = m % 60
    return f"{hh:02d}:{mm:02d}"

# GET ALL DEVICES (DISTINCT) FOR YEAR >= 2024
devices_df = client.query_df(f"""
    SELECT DISTINCT device
    FROM {source_table}
    WHERE toYear(happened_at) >= 2024
""")
devices = devices_df['device'].tolist()

# BUILD A MASTER LIST OF ALL POSSIBLE DATES
dates_df = client.query_df(f"""
    SELECT DISTINCT toDate(happened_at) AS date
    FROM {source_table}
    WHERE toYear(happened_at) >= 2024
    ORDER BY date
""")
all_dates = [d.date() for d in pd.to_datetime(dates_df['date'])]

all_minutes = np.arange(1440)

for device in devices:
    print(f"Processing device: {device}")

    query = f"""
        SELECT
            toDate(happened_at) AS date,
            toHour(happened_at)*60 + toMinute(happened_at) AS minute_of_day,
            count(*) AS event_count
        FROM {source_table}
        WHERE toYear(happened_at) >= 2024
          AND device = '{device}'
        GROUP BY date, minute_of_day
        ORDER BY date, minute_of_day
    """
    agg_df = client.query_df(query)
    if agg_df.empty:
        print(f"No data for device {device} in year >= 2024. Skipping.")
        continue

    agg_df['date'] = pd.to_datetime(agg_df['date']).dt.date

    agg_df['has_data'] = (agg_df['event_count'] > 0).astype(int)

    heatmap_pivot = agg_df.pivot(
        index='date',
        columns='minute_of_day',
        values='has_data'
    ).fillna(0)

    heatmap_pivot = heatmap_pivot.reindex(columns=all_minutes, fill_value=0)
    heatmap_pivot = heatmap_pivot.reindex(index=all_dates, fill_value=0)

    # PLOT THE HEATMAP 
    plt.figure(figsize=(40, 10))
    sns.heatmap(
        heatmap_pivot,
        cmap="Reds",
        cbar=False,
        linewidths=0.5,
        linecolor="black"
    )
    plt.xlabel("Time (HH:MM)")
    plt.ylabel("Date")
    plt.title(f"Heatmap of Data Presence by Minute of the Day\nDevice: {device}")

    ticks_step = 5
    xtick_locs = range(0, 1440, ticks_step)
    xtick_labels = [minute_to_hhmm(m) for m in xtick_locs]
    plt.xticks(xtick_locs, xtick_labels, rotation=90)
    plt.yticks(rotation=0)

    safe_device = str(device).replace(":", "_").replace("/", "_")

    plt.savefig(f"heatmap_{safe_device}.png", dpi=300, bbox_inches='tight')
    plt.close()

    print(f"Saved heatmap for device: {device}")

Processing device: C8:2E:18:8D:90:A4
Saved heatmap for device: C8:2E:18:8D:90:A4
Processing device: CC:7B:5C:27:9E:30
Saved heatmap for device: CC:7B:5C:27:9E:30
Processing device: 10:06:1C:86:69:30
Saved heatmap for device: 10:06:1C:86:69:30
Processing device: C8:2E:18:8D:B5:64
Saved heatmap for device: C8:2E:18:8D:B5:64
Processing device: 10:06:1C:86:5A:A4
Saved heatmap for device: 10:06:1C:86:5A:A4
Processing device: C8:2E:18:8D:94:74
Saved heatmap for device: C8:2E:18:8D:94:74


In [None]:
vendor_counts = data_df["vendor"].value_counts()
print(vendor_counts)

plt.figure(figsize=(15, 6))
plt.bar(vendor_counts.index, vendor_counts.values, log=True)
plt.xlabel("Vendor")
plt.ylabel("Count")
plt.title("MAC Address Vendor Distribution (log scale)")
plt.xticks(rotation=90)
plt.show()

In [None]:
data_norm = pd.json_normalize(data_df["translated_header"])
data_norm["frame_control.frame_type"].value_counts()

In [None]:
group_counts = data_norm.groupby(
    ["frame_control.frame_type", "frame_control.subtype"]
).size().reset_index(name="count")

group_counts.sort_values("count", ascending=False, inplace=True)

group_counts

# Crowd counter

Function identifies static devices based on RSSI variance, grouping packets by src_MAC. Devices with consistently low RSSI variance across sniffers are classified as static.
-- needs another condition

In [ ]:
def detect_static_devices(datadf, rssi_threshold=1, chunksize=100000):
    aggregator = {}

    for start in range(0, len(datadf), chunksize):
        chunk = datadf.iloc[start : start + chunksize]
        # group by source_mac in the current chunk
        for source_mac, group in chunk.groupby("source_mac"):
            # if first time seeing MAC
            if source_mac not in aggregator:
                aggregator[source_mac] = {
                    "sniffer_rssi": defaultdict(list),
                    "sniffer_packet_count": defaultdict(int),
                    "total_packet_count": 0
                }
            
            # now group by sniffer in this chunk
            for sniffer_id, sniffer_group in group.groupby("device"):
                rssi_vals = sniffer_group["rssi"].tolist()
                aggregator[source_mac]["sniffer_rssi"][sniffer_id].extend(rssi_vals)
                aggregator[source_mac]["sniffer_packet_count"][sniffer_id] += len(rssi_vals)
            
            aggregator[source_mac]["total_packet_count"] += len(group)

    static_devices = []
    for source_mac, data in aggregator.items():
        sniffer_rssi = data["sniffer_rssi"]
        sniffer_packet_count = data["sniffer_packet_count"]
        total_packets = data["total_packet_count"]

        sniffer_variances = {}
        all_below_threshold = True
        max_variance = 0.0

        for sniffer_id, rssi_values in sniffer_rssi.items():
            variance = pd.Series(rssi_values).var()
            sniffer_variances[sniffer_id] = variance
            if pd.isna(variance) or variance >= rssi_threshold:
                all_below_threshold = False
                break
            if variance > max_variance:
                max_variance = variance

        if all_below_threshold:            
            detected_by_device = ",".join(sniffer_rssi.keys())
            static_devices.append({
                "source_mac": source_mac,
                "rssi_variance": max_variance,
                "detected_by_device": detected_by_device,
                "packet_count": total_packets,
            })
    return pd.DataFrame(static_devices)

def extract_source_mac(row_dict):
    if not row_dict or "frame_control" not in row_dict or "addresses" not in row_dict:
        return None
    
    fc = row_dict["frame_control"]
    addrs = row_dict["addresses"]
    
    to_ds = fc.get("to_ds", False)
    from_ds = fc.get("from_ds", False)

    # 802.11 logic
    if not to_ds and not from_ds:
        return addrs.get("address2")
    elif to_ds and not from_ds:
        return addrs.get("address2")
    elif not to_ds and from_ds:
        return addrs.get("address3")
    elif to_ds and from_ds:
        # WDS bridging
        return addrs.get("address4")
    
    return None

data_df["source_mac"] = data_df["translated_header"].apply(extract_source_mac)

static_devices = detect_static_devices(data_df)
static_devices

Crowd Counter version 1, partially inspired by 10.1109/JIOT.2020.2972062 

In [ ]:
def sensor_id_to_numeric(sensor_id: str) -> int:
    return int(hash(sensor_id) & 0xffffffff)

def counter_v1(start_time: str, end_time: str):
    query = f"""
    SELECT 
      v.happened_at,
      v.rssi,
      v.device as sensor_id,
      if(p.frame_type = 'Data', 
         if(p.from_ds = 1, p.address3, p.address2),
         p.address2) as computed_mac
    FROM monadcount.parsed_header p
    JOIN monadcount.l2pk_v2 v ON p.id = v.id
    WHERE v.happened_at BETWEEN '{start_time}' AND '{end_time}'
    """
    
    df = client.query_dataframe(query)
    
    df['timestamp'] = pd.to_datetime(df['happened_at']).astype(np.int64) // 10**9
    
    df['sensor_numeric'] = df['sensor_id'].apply(sensor_id_to_numeric)
    
    features = df[['timestamp', 'rssi', 'sensor_numeric']].values
    
    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features)
    
    # NOTE: params need tuning
    dbscan = DBSCAN(eps=0.5, min_samples=2)
    cluster_labels = dbscan.fit_predict(features_scaled)
    df['cluster'] = cluster_labels
    
    valid_cluster_count = df[df['cluster'] != -1]['cluster'].nunique()
    
    noise_count = df[df['cluster'] == -1].shape[0]
    
    total_count = valid_cluster_count + noise_count
    
    return total_count

- data bez static devices dat kam ak ich vobec davat niekam?
- zamerat sa na prs?
- ground truth kedy bude?
- vystup z countera dat do db na vizualizaciu v grafane?
- nove data when? 