In [None]:
"""
Centralizes filesystem paths, BLE identifier patterns, and other constants
used throughout the BLE data processing workflow.
"""

from __future__ import annotations

import re
from pathlib import Path

# Dataset path (update as needed)
DEFAULT_CSV: Path = Path("data") / "mqtt_input.csv"

# Gateway MAC (update as needed)
GATEWAY_MAC: str = "AA:BB:CC:DD:EE:FF"
GATEWAY_MAC = GATEWAY_MAC.upper()

# Shared pattern fragments for constructing BLE-related regular expressions
_HEX_DIGITS = r"[0-9A-Fa-f]"
_HEX_PAIR = rf"{_HEX_DIGITS}{{2}}"

# Validates standard BLE MAC address formatting (AA:BB:CC:DD:EE:FF)
MAC_RE = re.compile(rf"({_HEX_PAIR}(?::{_HEX_PAIR}){{5}})")

# Matches a 128-bit BLE UUID in the 8-4-4-4-12 hexadecimal structure
UUID128_RE = re.compile(
    rf"({_HEX_DIGITS}{{8}}-{_HEX_DIGITS}{{4}}-{_HEX_DIGITS}{{4}}-{_HEX_DIGITS}{{4}}-{_HEX_DIGITS}{{12}})"
)

# Provided token represents exactly one hexadecimal byte
HEX_BYTE_RE = re.compile(rf"^{_HEX_PAIR}$")


In [None]:
"""
Loads MQTT-generated CSV file and parses nested JSON payloads into a structured
form for BLE analysis.
"""

import json
import logging
from pathlib import Path
from typing import Any

import pandas as pd

logger = logging.getLogger(__name__)


def configure_logging(level: int = logging.INFO) -> None:
    """Configure simple console logging."""
    root = logging.getLogger()
    if root.handlers:
        return
    logging.basicConfig(level=level, format="%(message)s")


def load_csv(file_path: str | Path) -> pd.DataFrame:
    """Loads the CSV dataset from disk and reports rows/columns."""
    logger.info("Loading CSV: %s", file_path)
    df = pd.read_csv(file_path)
    logger.info("Loaded %d rows and columns: %s", len(df), df.columns.tolist())
    return df


def safe_json_load(text: Any) -> dict:
    """Parse JSON string safely, return {} on failure."""
    if not isinstance(text, str):
        return {}
    try:
        return json.loads(text)
    except Exception:
        return {}


def parse_payload(df: pd.DataFrame) -> pd.DataFrame:
    """
    Parses each row's JSON payload and extracts:
      - log_timestamp
      - revelations_raw
    """
    if "payload" not in df.columns:
        raise KeyError("CSV does not contain the required 'payload' column.")

    df = df.copy()
    parsed = df["payload"].apply(safe_json_load)
    df["payload_json"] = parsed
    df["log_timestamp"] = parsed.apply(lambda x: x.get("timestamp"))
    df["revelations_raw"] = parsed.apply(lambda x: x.get("revelations"))
    return df

In [None]:
# Initialize logging with a simple, message-only format.
configure_logging()

# Load the dataset and parse BLE-relevant metadata from JSON payloads
df = load_csv(DEFAULT_CSV)
df = parse_payload(df)

# Display sample timestamps from the parsed payload for verification
print(df["log_timestamp"].head().to_string(index=False))

In [None]:
"""
Inspects the raw MQTT metadata prior to processing. The inspection summarizes
the structure of the timestamp, topic, and payload columns to ensure that incoming
messages follow the expected format before deeper parsing occurs.
"""


def describe_column(label: str, series: pd.Series, sample_size: int = 5) -> None:
    """
    Displays the data type, number of unique values, and a small sample for a
    specified DataFrame column.
    """
    print(f"\n{label}")
    print("Type:", series.dtype)
    print("Unique values:", series.nunique())
    print("\nSample values:")
    print(series.head(sample_size).to_string(index=False))


# Inspect MQTT client's timestamp metadata
describe_column(
    label="TIMESTAMP_CLIENT",
    series=df["timestamp_client"],
    sample_size=10
)

# Inspect MQTT topic metadata
print("\nTOPIC column")
print("Unique topics:", df["topic"].nunique())

print("\nList of distinct topics:")
for topic_name in df["topic"].unique():
    print(" -", topic_name)

print("\nSample topic values:")
print(df["topic"].head(3).to_string(index=False))

# Inspect raw JSON payload structure
print("\nPAYLOAD column ")
print("Showing first five payload entries:\n")

for idx, value in df["payload"].head(5).items():
    print(f"[PAYLOAD {idx}]")
    print(value)
    print("-" * 60)

In [None]:
"""
Cleans raw BLE scan output by removing ANSI escape sequences and control
characters. The cleaned content is stored separately to simplify downstream
parsing and analysis.
"""


def clean_ansi(text: Any) -> Any:
    """
    Removes ANSI escape sequences and selected control characters from a
    bluetoothctl scan string. Non-string values are returned unchanged.
    """
    if not isinstance(text, str):
        return text

    cleaned = re.sub(r"\x1B\[[0-?]*[ -/]*[@-~]", "", text)
    cleaned = cleaned.replace("\x01", "").replace("\x02", "")
    return cleaned


# ANSI-cleaned BLE scan output
df["revelations_clean"] = df["revelations_raw"].apply(clean_ansi)
print("Derived 'revelations_clean' by stripping ANSI and control codes.")

# Preview a small sample to verify cleaning behavior
print("\nREVELATIONS: RAW vs CLEAN \n")
for idx, row in df[["revelations_raw", "revelations_clean"]].head(3).iterrows():
    print(f"Row {idx}")
    print("RAW:")
    print(row["revelations_raw"])
    print("\nCLEAN:")
    print(row["revelations_clean"])
    print("=" * 40)

In [None]:
"""
Identifies and counts controller-related state changes detected in cleaned BLE
scan output. The results summarise controller behaviour across all MQTT messages.
"""

controller_patterns = {
    "[CHG] Controller": 0,
    "[NEW] Controller": 0,
    "[DEL] Controller": 0,
    "Discovering: yes": 0,
    "Discovering: no": 0,
    "Powered: yes": 0,
    "Powered: no": 0,
    "Pairable: yes": 0,
    "Pairable: no": 0,
}

total_rows = len(df)


def controller_event_present(text: Any) -> bool:
    """
    Determines whether a scan string contains one or more controller-related
    markers, including state transitions and discovery indicators.
    """
    if not isinstance(text, str):
        return False

    if "Controller " in text:
        return True

    for marker in ("Discovering:", "Powered:", "Pairable:"):
        if marker in text:
            return True

    return False


# Reset counters to ensure reproducible results on re-execution
for key in controller_patterns:
    controller_patterns[key] = 0

# Count individual controller state markers across all messages
for text in df["revelations_clean"]:
    if not isinstance(text, str):
        continue

    for pattern in controller_patterns:
        if pattern in text:
            controller_patterns[pattern] += 1

# Count how many messages contain any controller-related activity
rows_with_controller = sum(
    controller_event_present(txt) for txt in df["revelations_clean"]
)

# Report controller-level summary
print("\nCONTROLLER-LEVEL EVENTS\n")
print(f"Total MQTT messages: {total_rows}")
print(f"Messages with controller activity: {rows_with_controller}\n")

print("Controller event counts:")
for pattern, count in controller_patterns.items():
    print(f"  {pattern:<18} -> {count} occurrences")

In [None]:
"""
Collects all BLE MAC addresses observed in cleaned scan output, aggregates their
occurrences across MQTT messages, and computes device-level statistics.
"""

from collections import Counter


def extract_mac_list(block_text: Any) -> list[str]:
    """
    Returns all MAC addresses found in a single cleaned scan block.
    Includes both device and gateway MACs; separation is handled downstream.
    """
    if not isinstance(block_text, str):
        return []
    return [match.upper() for match in MAC_RE.findall(block_text)]


# Ensure required input is present
if "revelations_clean" not in df.columns:
    raise KeyError(
        "Expected 'revelations_clean' column not found. Run the cleaning step first."
    )

# Extract MAC lists per message
df["mac_list"] = df["revelations_clean"].apply(extract_mac_list)

# Flatten MACs across all messages
all_macs = [mac for macs in df["mac_list"] if isinstance(macs, list) for mac in macs]
unique_macs = sorted(set(all_macs))

# Count MAC occurrences
mac_event_counts = Counter(all_macs)

# Separate gateway from devices
gateway_event_count = mac_event_counts.get(GATEWAY_MAC, 0)
device_macs = [mac for mac in unique_macs if mac != GATEWAY_MAC]

print("\nBLE DEVICE IDENTIFICATION\n")
print(f"Distinct MAC addresses observed: {len(unique_macs)}")
print(f"Distinct devices: {len(device_macs)}")
print(f"Gateway MAC ({GATEWAY_MAC}) associated with {gateway_event_count} events\n")

# Show most frequent MACs
print("MAC event frequencies :\n")
for mac, count in mac_event_counts.most_common(5):
    print(f"{mac:<18} -> {count} events")

In [None]:
"""
Scans the cleaned bluetoothctl output line by line, finds device-related events 
([NEW], [CHG], [DEL]), extracts the MAC address from each event, and groups all 
events belonging to the same device together.
"""


def is_device_event_line(line: str) -> bool:
    """
    Indicates whether a log line represents a device-level event emitted by
    bluetoothctl, based on its prefix tag.
    """
    prefixes = ("[NEW] Device ", "[CHG] Device ", "[DEL] Device ")
    return any(line.startswith(prefix) for prefix in prefixes)


# Ensure required input is present
if "revelations_clean" not in df.columns:
    raise KeyError(
        "Expected 'revelations_clean' column not found."
    )

mac_events_by_device: dict[str, list[str]] = {}

for text in df["revelations_clean"]:
    if not isinstance(text, str):
        continue

    for raw_line in text.splitlines():
        line = raw_line.strip()
        if not is_device_event_line(line):
            continue

        match = MAC_RE.search(line)
        if not match:
            continue

        mac = match.group(1).upper()
        if mac == GATEWAY_MAC:
            continue  # exclude gateway events

        mac_events_by_device.setdefault(mac, []).append(line)

print("\nDEVICE EVENTS GROUPED BY MAC \n")
print(f"Devices with device-level events: {len(mac_events_by_device)}")

# Show top 5 devices by event count
top_devices = sorted(
    mac_events_by_device.items(),
    key=lambda item: len(item[1]),
    reverse=True
)[:5]

for mac, events in top_devices:
    print(f"MAC: {mac} -> {len(events)} events")

print("\nUse show_device_events_only(<mac>) to inspect full event history for a device.")

In [None]:
"""
Extracts iBeacon identifiers from cleaned BLE scan output. For each device, the
block derives (MAC, UUID, major, minor) tuples, builds a flat table, and reports
how many devices have iBeacon data.
"""


def extract_ibeacon_tuples(block_text: Any) -> list[tuple[str, str, int, int]]:
    """
    Returns all iBeacon tuples found in a single cleaned scan block.
    Each tuple has the form (mac, beacon_uuid, major_dec, minor_dec).
    """
    results: list[tuple[str, str, int, int]] = []

    if not isinstance(block_text, str):
        return results

    lines = block_text.splitlines()
    current_mac: str | None = None
    idx = 0

    while idx < len(lines):
        line = lines[idx].strip()

        match = MAC_RE.search(line)
        if match:
            current_mac = match.group(1).upper()

        if "ManufacturerData Value:" in line and current_mac is not None:
            hex_bytes: list[str] = []
            cursor = idx + 1

            while cursor < len(lines):
                token_line = lines[cursor].strip()
                if not token_line:
                    break

                tokens = token_line.split()
                hex_tokens = [
                    token for token in tokens if HEX_BYTE_RE.fullmatch(token)
                ]
                if not hex_tokens:
                    break

                hex_bytes.extend(hex_tokens)
                cursor += 1

            if (
                len(hex_bytes) >= 23
                and hex_bytes[0].lower() == "02"
                and hex_bytes[1].lower() == "15"
            ):
                uuid_bytes = hex_bytes[2:18]
                major_bytes = hex_bytes[18:20]
                minor_bytes = hex_bytes[20:22]

                beacon_uuid = (
                    uuid_bytes[0] + uuid_bytes[1] + uuid_bytes[2] + uuid_bytes[3]
                    + "-"
                    + uuid_bytes[4] + uuid_bytes[5]
                    + "-"
                    + uuid_bytes[6] + uuid_bytes[7]
                    + "-"
                    + uuid_bytes[8] + uuid_bytes[9]
                    + "-"
                    + "".join(uuid_bytes[10:])
                ).lower()

                major_dec = int(major_bytes[0] + major_bytes[1], 16)
                minor_dec = int(minor_bytes[0] + minor_bytes[1], 16)

                results.append((current_mac, beacon_uuid, major_dec, minor_dec))

            idx = cursor
        else:
            idx += 1

    return results


# Extract iBeacon tuples per message
df["mac_beacon_tuples"] = df["revelations_clean"].apply(extract_ibeacon_tuples)

# Flatten extracted tuples
flat_rows: list[tuple[str, str, int, int]] = []
for tuples in df["mac_beacon_tuples"]:
    if isinstance(tuples, list):
        flat_rows.extend(tuples)

mac_beacon_df = (
    pd.DataFrame(flat_rows, columns=["mac", "beacon_uuid", "major", "minor"])
    .drop_duplicates()
    .reset_index(drop=True)
)

# Exclude gateway MAC to align with device-level analysis
mac_beacon_df = mac_beacon_df[
    mac_beacon_df["mac"] != GATEWAY_MAC
].reset_index(drop=True)

# Aggregate to one iBeacon tuple per device
if not mac_beacon_df.empty:
    mac_beacon_agg = (
        mac_beacon_df
        .sort_values(["mac", "beacon_uuid", "major", "minor"])
        .groupby("mac", as_index=False)
        .first()
    )
else:
    mac_beacon_agg = pd.DataFrame(
        columns=["mac", "beacon_uuid", "major", "minor"]
    )

print("\niBeacon IDENTIFIERS\n")
print(f"Total iBeacon records: {len(mac_beacon_df)}")
print(f"Devices with iBeacon tuples: {mac_beacon_df['mac'].nunique()}")
print(f"Unique beacon UUIDs: {mac_beacon_df['beacon_uuid'].nunique()}\n")

if not mac_beacon_df.empty:
    print("Sample iBeacon tuples:")
    print(mac_beacon_df.head(10).to_string(index=False))

total_devices = len(device_macs)
devices_with_tuple = mac_beacon_agg["mac"].nunique()
devices_without_tuple = total_devices - devices_with_tuple

print("\nDevice coverage summary\n")
print(f"Total devices : {total_devices}")
print(f"Devices exposing (UUID, major, minor): {devices_with_tuple}")
print(f"Devices without iBeacon tuple: {devices_without_tuple}\n")

In [None]:
"""
Computes each deviceâ€™s presence window (first seen, last seen, duration) using MQTT 
timestamps and links any available iBeacon identifiers..

"""

# Validate required inputs
if "timestamp_client" not in df.columns:
    raise ValueError("Expected 'timestamp_client' column not found in DataFrame.")
if "mac_list" not in df.columns:
    raise ValueError("Expected 'mac_list' column not found in DataFrame.")

# Normalize timestamp column for time-based aggregation
df["timestamp_client"] = pd.to_datetime(df["timestamp_client"], errors="coerce")
time_column = "timestamp_client"

# Build a per-observation event table: (mac, timestamp)
event_records: list[tuple[str, pd.Timestamp]] = []

for _, row in df.iterrows():
    macs = row.get("mac_list", [])
    ts = row[time_column]

    if not isinstance(macs, list) or not macs or pd.isna(ts):
        continue

    for mac in macs:
        event_records.append((mac.upper(), ts))

mac_events = pd.DataFrame(event_records, columns=["mac", "timestamp"])

# Derive first/last sighting times per MAC
if mac_events.empty:
    mac_spans = pd.DataFrame(columns=["mac", "enter_time", "exit_time"])
else:
    mac_spans = (
        mac_events
        .groupby("mac")
        .agg(
            enter_time=("timestamp", "min"),
            exit_time=("timestamp", "max"),
        )
        .reset_index()
    )

# Exclude gateway MAC from device presence intervals
mac_spans = mac_spans[mac_spans["mac"] != GATEWAY_MAC].reset_index(drop=True)

# Attach iBeacon identifiers (if available) and compute presence duration
devices_full = (
    mac_spans
    .merge(mac_beacon_agg, on="mac", how="left")
    .sort_values("mac")
    .reset_index(drop=True)
)

devices_full["duration_seconds"] = (
    devices_full["exit_time"] - devices_full["enter_time"]
).dt.total_seconds()


devices_full = devices_full[
    ["mac", "enter_time", "exit_time", "duration_seconds", "beacon_uuid", "major", "minor"]
]

In [None]:
# Per-device presence with first/last sightings, duration, and iBeacon identifiers.
devices_full

In [None]:
print(devices_full.to_string(index=False))

In [None]:
"""
Computes activity metrics for each BLE device based on its appearance in the
per-message event log. For every MAC, the block derives event counts, presence
duration, and event rates over various time scales.
"""

# Handle empty event table
if mac_events.empty:
    device_freq_df = pd.DataFrame(
        columns=[
            "mac",
            "n_events",
            "duration_seconds",
            "events_500ms",
            "events_per_second",
            "events_per_minute",
        ]
    )
else:
    # Aggregate event counts and time bounds per MAC
    per_mac = (
        mac_events
        .groupby("mac")
        .agg(
            n_events=("timestamp", "size"),
            enter_time=("timestamp", "min"),
            exit_time=("timestamp", "max"),
        )
        .reset_index()
    )

    # Exclude gateway MAC from activity metrics
    per_mac = per_mac[per_mac["mac"] != GATEWAY_MAC].reset_index(drop=True)

    # Compute presence duration
    per_mac["duration_seconds"] = (
        per_mac["exit_time"] - per_mac["enter_time"]
    ).dt.total_seconds()

    # Remove zero-duration rows to avoid division-by-zero
    per_mac = per_mac[per_mac["duration_seconds"] > 0].copy()

    # Compute event frequency metrics per device
    per_mac["events_per_second"] = per_mac["n_events"] / per_mac["duration_seconds"]
    per_mac["events_per_minute"] = per_mac["events_per_second"] * 60.0
    per_mac["events_500ms"] = per_mac["events_per_second"] * 0.5

    # Final, report-ready table
    device_freq_df = (
        per_mac[
            [
                "mac",
                "n_events",
                "duration_seconds",
                "events_500ms",
                "events_per_second",
                "events_per_minute",
            ]
        ]
        .sort_values("mac")
        .reset_index(drop=True)
    )

In [None]:
# Show as DataFrame 
device_freq_df

In [None]:
print(device_freq_df.to_string(index=False))

In [None]:
def show_device_events_only(mac: str) -> None:
    """
    Displays bluetoothctl lines involving a single MAC address, based on the
    ANSI-cleaned scan text stored in the 'revelations_clean' column.
    """
    if "revelations_clean" not in df.columns:
        raise KeyError("Expected 'revelations_clean' column is missing.")

    mac_norm = mac.upper()
    events: list[tuple[int, object, str]] = []

    for row in df.itertuples(index=True):
        block = getattr(row, "revelations_clean", None)
        if not isinstance(block, str) or mac_norm not in block:
            continue

        timestamp = getattr(row, "timestamp_client", None)
        for line in block.splitlines():
            if mac_norm in line:
                events.append((row.Index, timestamp, line))

    row_count = len({idx for idx, _, _ in events})
    line_count = len(events)

    print(f"\nEvents for device {mac_norm}")
    print(f"Rows containing this MAC: {row_count}")
    print(f"Event lines for this MAC: {line_count}\n")

    for idx, ts, line in events:
        print(f"Row index: {idx}")
        print("timestamp_client:", ts)
        print("event:", line)
        print()


In [None]:
show_device_events_only("AA:BB:CC:DD:EE:FF") # (Chnage MAC Address)