# BTSnoop Log Sniffer

This notebook demonstrates the process of parsing bluetooth packet data from android hci logs.

In [1]:
# !pip install fpdf
# !pip install kaleido
# !pip install dataframe_image

In [2]:
import sys
import json
import time
import requests
from constants import *

import fpdf
from fpdf.enums import XPos, YPos
from fpdf import FPDF
import pandas as pd
import numpy as np
from pprint import pprint
from decimal import Decimal
import dataframe_image as dfi
from datetime import datetime

# import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots


pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)


In [3]:
# filepath = sys.argv[1]

# with open(filepath, 'r') as file:
with open('./output1.json', 'r') as file:
    data = [x["_source"]["layers"] for x in json.load(file)]

[source](https://github.com/nccgroup/BLE-Replay/blob/master/btsnoop/btsnoop/bt/hci_evt.py)

The HCI LE Meta Event is used to encapsulate all LE Controller specific events.
The Event Code of all LE Meta Events shall be 0x3E. The Subevent_Code is
the first octet of the event parameters. The Subevent_Code shall be set to one
of the valid Subevent_Codes from an LE specific event


HCI inherently cannot differentiate between packet types. Hence a common physical interface is used with the indicators that are sent right before the packet is sent. These indicators are as follows

| HCI Packet Type | HCI Packet Indicator |
| --------------- | -------------------- |
| HCI Command Packet | 0x01 |
| HCI ACL Data Packet | 0x02 |
| HCI Synchronous Data Packet | 0x03 |
| HCI Event Packet | 0x04 |
| HCI ISO Data Packet | 0x05 |

LE Meta events encapsulate all LE Controller events and have a code of 0x03

protocols = {
    "bluetooth:hci_h4:bthci_acl:btl2cap": 'ACL-L2CAP Event',
    "bluetooth:hci_h4:bthci_acl:btl2cap:btatt": 'ACL-L2CAP - ATT (Attribute) Protocol',
    "bluetooth:hci_h4:bthci_acl:btl2cap:btavctp:btavrcp": 'ACL-L2CAP - AVCTP/AVR Protocol',
    "bluetooth:hci_h4:bthci_acl:btl2cap:btavdtp": 'ACL-L2CAP - AVDTP Protocol',
    "bluetooth:hci_h4:bthci_acl:btl2cap:btrfcomm": 'ACL-L2CAP - RF Communication protocol',
    "bluetooth:hci_h4:bthci_acl:btl2cap:btrfcomm:bthfp": 'ACL-L2CAP - Hands Free Protocol',
    "bluetooth:hci_h4:bthci_acl:btl2cap:btrfcomm:data": 'ACL-L2CAP - Data Transfer',
    "bluetooth:hci_h4:bthci_acl:btl2cap:btsdp": 'ACL-L2CAP - Service Discovery Protocol (SDP)',
    "bluetooth:hci_h4:bthci_cmd": 'HCI Command',
    "bluetooth:hci_h4:bthci_cmd:btcommon": 'Bluetooth Advertising Event',
    "bluetooth:hci_h4:bthci_cmd:bthci_vendor.broadcom": 'Vendor-Specific Broadcom Packet',
    "bluetooth:hci_h4:bthci_evt": 'HCI Event',
    "bluetooth:hci_h4:bthci_evt:btcommon": 'Bluetooth Advertising Event',
    "bluetooth:hci_h4:bthci_evt:bthci_vendor.broadcom": 'Vendor-Specific Broadcom Packet'
    }

In [4]:
def format_packet_data(data):
    data_input = data
    for packet in data_input:
        packet["Sequence Number"] = packet["frame.number"][0] if "frame.number" in packet else None
        packet["Epoch Timestamp"] = packet["frame.time_epoch"][0] if "frame.time_epoch" in packet else None
        packet["Timestamp"] = datetime.fromtimestamp(int(Decimal(packet["frame.time_epoch"][0]))) if "frame.time_epoch" in packet else None
        packet["Packet Length"] = packet["frame.len"][0] if "frame.len" in packet else None
        packet["LE Protocol"] = PROTOCOL_TYPES[packet["frame.protocols"][0]] if "frame.protocols" in packet else None
        packet["LE Event Code"] = packet["hci_h4.type"][0] if "hci_h4.type" in packet else None
        packet["LE Event Type"] = HCI_LE_EVENT[packet["hci_h4.type"][0]] if "hci_h4.type" in packet else None
        packet["Direction"] = EVENT_DIRECTION[packet["hci_h4.direction"][0]] if "hci_h4.direction" in packet else None
        packet["HCI Command"] = packet["bthci_cmd"][0] if "bthci_cmd" in packet else None
        packet["HCI Event Code"] = packet["bthci_evt.code"][0] if "bthci_evt.code" in packet else None
        packet["HCI Event"] = packet["bthci_evt"][0] if "bthci_evt" in packet else None
        packet["HCI Event Command Packet Count"] = packet["bthci_evt.num_command_packets"][0] if "bthci_evt.num_command_packets" in packet else None
        packet["HCI Event Status"] = packet["bthci_evt.status"][0] if "bthci_evt.status" in packet else None
        packet["HCI Event Command in Frame"] = packet["bthci_evt.command_in_frame"][0] if "bthci_evt.command_in_frame" in packet else None
        packet["HCI ACL chandle"] = packet["bthci_acl.chandle"][0] if "bthci_acl.chandle" in packet else None
        packet["HCI ACL Length"] = packet["bthci_acl.length"][0] if "bthci_acl.length" in packet else None
        packet["Source Device MAC"] = packet["bthci_acl.src.bd_addr"][0] if "bthci_acl.src.bd_addr" in packet else None
        packet["Source Device Name"] = packet["bthci_cmd.device_name"][0] if "bthci_cmd.device_name" in packet else (packet["bthci_acl.src.name"][0] if "bthci_acl.src.name" in packet else None)
        packet["Destination Device MAC"] = packet["bthci_acl.dst.bd_addr"][0] if "bthci_acl.dst.bd_addr" in packet else None
        packet["Destination Device Name"] = packet["bthci_acl.dst.name"][0] if "bthci_acl.dst.name" in packet else None
        packet["L2CAP CID"] = packet["btl2cap.cid"][0] if "btl2cap.cid" in packet else None
        packet["L2CAP Length"] = packet["btl2cap.length"][0] if "btl2cap.length" in packet else None


        if packet["HCI Command"]:
            packet["HCI Command"] = packet["HCI Command"].replace("Bluetooth HCI Command - ", "")

        if packet["HCI Event"]:
            packet["HCI Event"] = packet["HCI Event"].replace("Bluetooth HCI Event - ", "")

        if packet["L2CAP CID"] is not None:
            if packet["L2CAP CID"] in L2CAP_CID_VALUES:
                packet["L2CAP Type"] = L2CAP_CID_VALUES[packet["L2CAP CID"]]
            else:
                packet["L2CAP Type"] = 'Dynamically Allocated'

        for key in PACKET_KEYS:
            if key in packet:
                del packet[key]

        keys_to_delete = [key for key, value in packet.items() if value is None]
        for key in keys_to_delete:
            del packet[key]

    df = pd.DataFrame(data)

    # handling nan values in device name if name already present in the dataframe 
    src_mac_device_dict = df.dropna(subset=['Source Device MAC', 'Source Device Name']).set_index('Source Device MAC')['Source Device Name'].to_dict()
    df['Source Device Name'] = df['Source Device MAC'].map(src_mac_device_dict).fillna(df['Source Device Name'])

    dst_mac_device_dict = df.dropna(subset=['Destination Device MAC', 'Destination Device Name']).set_index('Destination Device MAC')['Destination Device Name'].to_dict()
    df['Destination Device Name'] = df['Destination Device MAC'].map(dst_mac_device_dict).fillna(df['Destination Device Name'])

    df['Source Device Name'] = df['Source Device Name'].replace('', 'Unknown Device')
    df['Source Device Name'] = df['Source Device Name'].fillna('Unknown Device')
    df['Destination Device Name'] = df['Destination Device Name'].replace('', 'Unknown Device')
    df['Destination Device Name'] = df['Destination Device Name'].fillna('Unknown Device')

    return df

def generate_device_piecharts(df, names, title, export_filename):
    fig = px.pie(df, names=names, title=title, hole=0.3, width=750, height=400)
    fig.update_layout(title_font_size=24)
    fig.update_traces(textinfo='percent', texttemplate='%{percent:.2%}', textfont_size=18)
    fig.update_layout(legend=dict(x=0.85, y=1.1))
    fig.update_layout(margin=dict(t=50, b=30, l=0, r=220))
    fig.write_image(f'./generated_images/{export_filename}', scale=2)
    # fig.show()

def generate_hci_cmdevt_plot(df_hci_cmd, df_hci_event, export_filename):
    fig = make_subplots(specs=[[{"secondary_y": True}]])

    fig.add_trace(
        go.Scatter(
            x=df_hci_cmd['Timestamp'],
            y=df_hci_cmd['Sequence Number'],
            mode='lines+markers',
            name='HCI Commands',
            line=dict(color='red')),
            secondary_y=False
        )

    fig.add_trace(
        go.Scatter(
            x=df_hci_event['Timestamp'],
            y=df_hci_event['Sequence Number'],
            mode='lines+markers',
            name='HCI Events',
            line=dict(color='blue')),
            secondary_y=True
        )

    fig.update_yaxes(title_text="Sequence Number (HCI Commands)", dtick=200, tickmode='linear', secondary_y=False)
    fig.update_yaxes(title_text="Sequence Number (HCI Events)", dtick=200, tickmode='linear', secondary_y=True)

    fig.update_layout(title='HCI Events and Commands over Time',
                      title_font_size=24,
                      xaxis_title='Timestamp (UTC)',
                      xaxis=dict(tickangle=45),
                      legend=dict(x=0.01, y=0.99),
                      template='plotly_white',
                      width=900,
                      height=600)
    fig.write_image(f'./generated_images/{export_filename}', scale=2)

def get_mac_info(mac_address):
    headers = {
        'Content-Type': 'application/json',
        'Accept': 'application/json'
    }
    url = VENDOR_LOOKUP_API + mac_address
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return response.json()[0]
    else:
        return None

def generate_mac_info_dataframe(unique_mac_list):
    unique_mac_info_list = []
    for mac in unique_mac_list:
        info = get_mac_info(mac)
        info['mac_address'] = mac
        if info:
            unique_mac_info_list.append(info)
    mac_info_df = pd.DataFrame(unique_mac_info_list)[['mac_address', 'company', 'addressL1', 'addressL2', 'addressL3', 'country']]
    return mac_info_df

In [5]:
df_data = format_packet_data(data)

In [6]:
df_data.head()

Unnamed: 0,Sequence Number,Epoch Timestamp,Timestamp,Packet Length,LE Protocol,LE Event Code,LE Event Type,Direction,HCI Command,HCI Event Code,HCI Event,HCI Event Command Packet Count,HCI Event Status,HCI Event Command in Frame,Source Device Name,HCI ACL chandle,HCI ACL Length,Source Device MAC,Destination Device MAC,Destination Device Name,L2CAP CID,L2CAP Length,L2CAP Type
0,1,1727925135.575141,2024-10-02 23:12:15,4,HCI Command,0x01,Command (HCI_CMD),Host > Controller,Reset,,,,,,Unknown Device,,,,,Unknown Device,,,
1,2,1727925135.580693,2024-10-02 23:12:15,7,HCI Command,0x04,Event (HCI_EVT),Controller > Host,,0x0e,Command Complete,1.0,0x00,1.0,Unknown Device,,,,,Unknown Device,,,
2,3,1727925135.580795,2024-10-02 23:12:15,12,HCI Command,0x01,Command (HCI_CMD),Host > Controller,Set Event Mask,,,,,,Unknown Device,,,,,Unknown Device,,,
3,4,1727925135.581154,2024-10-02 23:12:15,7,HCI Command,0x04,Event (HCI_EVT),Controller > Host,,0x0e,Command Complete,1.0,0x00,3.0,Unknown Device,,,,,Unknown Device,,,
4,5,1727925135.581234,2024-10-02 23:12:15,6,HCI Command,0x01,Command (HCI_CMD),Host > Controller,Write LE Host Supported,,,,,,Unknown Device,,,,,Unknown Device,,,


In [7]:
source_mac_info = df_data['Source Device Name'] + "\n(" + df_data['Source Device MAC'] + ")"
source_mac_info = source_mac_info.dropna()
df_sources = pd.DataFrame(source_mac_info, columns=['Source Devices'])

generate_device_piecharts(df_sources, 'Source Devices', 'Distribution of Source Devices', 'source_devices_distribution.png')

In [8]:
destination_device_info = df_data['Destination Device Name'] + "\n(" + df_data['Destination Device MAC'] + ")"
destination_device_info = destination_device_info.dropna()
df_destinations = pd.DataFrame(destination_device_info, columns=['Destination Devices'])

generate_device_piecharts(df_destinations, 'Destination Devices', 'Distribution of Destination Devices', 'destination_devices_distribution.png')

In [9]:
df_hci_cmd = df_data[df_data['LE Event Type'] == 'Command (HCI_CMD)'].dropna(axis=1, how='all')
df_hci_event = df_data[df_data['LE Event Type'] == 'Event (HCI_EVT)'].dropna(axis=1, how='all')
df_hci_event_paired = df_hci_event[df_hci_event['HCI Event Command in Frame'].isin(df_hci_cmd['Sequence Number'])]

generate_hci_cmdevt_plot(df_hci_cmd, df_hci_event_paired, export_filename="hci_cmd_event_communication.png")

In [10]:
mac_addresses = pd.DataFrame({
    'MAC Address': np.unique(
        np.concatenate(
                (
                df_data['Source Device MAC'].dropna().unique(),
                df_data['Destination Device MAC'].dropna().unique()
                )
            )
        )
})
mac_vendor_info = generate_mac_info_dataframe(mac_addresses['MAC Address'].tolist())

In [11]:
df_acl_le_events = df_data[df_data['LE Event Type'] == 'ACL (HCI_ACL)']
df_acl_le_events = df_acl_le_events.dropna(axis=1, how='all')

In [22]:
# Summary statistics
first_timestamp = df_data['Timestamp'].min()
last_timestamp = df_data['Timestamp'].max()

host_mac_address = df_data[df_data['Direction'] == 'Host > Controller']['Source Device MAC'].dropna().unique()[0]
host_device_name = df_data[df_data['Source Device MAC'] == host_mac_address]['Source Device Name'].dropna().unique()[0]
host_device_company = mac_vendor_info[mac_vendor_info['mac_address'] == host_mac_address]['company'].values[0]
controller_devices = df_data[df_data['Direction'] == 'Controller > Host'][['Source Device MAC', 'Source Device Name']].dropna().drop_duplicates()

unique_hci_commands = df_hci_cmd['HCI Command'].unique()
unique_hci_events = df_hci_event['HCI Event'].unique()
hci_command_counts = df_hci_cmd['HCI Command'].value_counts().reset_index()
hci_command_counts.columns = ['HCI Command', 'Count']
hci_event_counts = df_hci_event['HCI Event'].value_counts().reset_index()
hci_event_counts.columns = ['HCI Event', 'Count']

outgoing_commands = df_hci_cmd[df_hci_cmd['Direction'] == 'Host > Controller'].shape[0]
incoming_events = df_hci_event[df_hci_event['Direction'] == 'Controller > Host'].shape[0]


summary_stats = {
    'Capture Start Time': first_timestamp,
    'Capture End Time': last_timestamp,
    'Host Device Name': host_device_name,
    'Host Device MAC Address': host_mac_address,
    'Host Device Vendor': host_device_company,
    'Controller Devices Interacted With': len(controller_devices),
    'Total Number of Packets': len(df_data),
    'HCI Command Packets': len(df_hci_cmd),
    'HCI Event Packets': len(df_hci_event),
    'ACL Packets': len(df_acl_le_events)
}

hci_command_event_stats = {
    'Unique HCI Commands': len(unique_hci_commands),
    'Unique HCI Events': len(unique_hci_events),
    'Outgoing Commands': outgoing_commands,
    'Incoming Events': incoming_events
}

hci_command_event_info = {
    'HCI Command Counts': hci_command_counts,
    'HCI Event Counts': hci_event_counts
}

hci_acl_stats = {
    'ACL LE Protocols Used': df_acl_le_events['LE Protocol'].nunique()
}


In [13]:
class PDF(FPDF):
    def footer(self):
        self.set_y(-15)
        self.set_font('Helvetica', 'I', 8)
        self.set_text_color(128)
        self.cell(0, 10, f'Page {self.page_no()}', XPos.RIGHT, new_y=YPos.TOP, align='C')

def create_title(pdf, title):
    
    # PDF Title
    pdf.set_font('Helvetica', 'b', 26)  
    pdf.ln(10)
    pdf.write(5, title)
    pdf.ln(10)
    
    # Report Date
    pdf.set_font('Helvetica', '', 20)
    pdf.set_text_color(r=128,g=128,b=128)
    today = time.strftime("%m/%d/%Y")
    pdf.write(4, f'{today}')
    pdf.ln(10)

def create_heading(pdf, title):
    pdf.set_left_margin(10)
    pdf.set_font('Helvetica', 'b', 22)
    pdf.set_text_color(r=0,g=0,b=0)
    pdf.write(5, title)
    pdf.ln(12)

def create_subheading(pdf, title):
    pdf.set_font('Helvetica', 'b', 18)
    pdf.set_text_color(r=0,g=0,b=0)
    pdf.write(5, title)
    pdf.ln(8)

def write_to_pdf(pdf, words, bold=False):
    pdf.set_text_color(r=0, g=0, b=0)
    if bold:
        pdf.set_font('Helvetica', 'B', 14)
    else:
        pdf.set_font('Helvetica', '', 14)
    pdf.write(5, words)


In [23]:
pdf = PDF(orientation="P", unit="mm", format="A4")

pdf.add_page()

create_title(pdf, PDF_TITLE)

create_heading(pdf, "General Information")

pdf.set_left_margin(15)
for key, value in summary_stats.items():
    write_to_pdf(pdf, f"{key}: ", True)
    write_to_pdf(pdf, f"{value}")
    pdf.ln(6)
pdf.set_left_margin(10)
pdf.ln(6)

write_to_pdf(pdf, "Bluetooth communication is done through a standardized protocol known as the HCI interface which allows for sending and recieving commands, events and data between hosts and controllers. We will be looking at the breakdown of these between the current host device and the controllers below.")
pdf.ln(10)

create_heading(pdf, "HCI Command and Event Packet Details")

pdf.set_left_margin(15)
for key, value in hci_command_event_stats.items():
    write_to_pdf(pdf, f"{key}: ", True)
    write_to_pdf(pdf, f"{value}")
    pdf.ln(6)
pdf.ln(4)

for key, value in hci_command_event_info.items():
    if isinstance(value, pd.DataFrame):
        create_subheading(pdf, f"{key}: ")
        with pdf.table(text_align="C", padding=-2, borders_layout="SINGLE_TOP_LINE", cell_fill_color=220, cell_fill_mode="ROWS") as table:
            pdf.set_font('Helvetica', 'B', 14)
            row = table.row()
            for col in value.columns:
                row.cell(col)
            pdf.set_font('Helvetica', '', 12)
            for df_row in value.itertuples():
                row = table.row()
                row.cell(df_row[1])
                row.cell(str(df_row[2]))
    pdf.ln(10)

pdf.set_left_margin(10)
write_to_pdf(pdf, "The chart below/on the next page shows the variation of HCI commands and events over the timeframe of packet capture.")
pdf.ln(10)

pdf.set_left_margin(10)
pdf.image("./generated_images/hci_cmd_event_communication.png", x=(PDF_WIDTH - 170)/2, w=170)
pdf.ln(10)

pdf.add_page()

pdf.ln(10)

create_heading(pdf, "ACL Packet Summary")

write_to_pdf(pdf, "Bluetooth ACL Packets, which is short for Bluetooth Asynchronous Connection-Less protocol is used for general data transfer between the host and controller when it is not in real-time. It is provided alongside the SCO (Synchronous Connection Oriented) protocol which finds its use in audio and video data transfer in real-time.")
pdf.ln(10)
write_to_pdf(pdf, "These packets usually contain idetifying information about the devices in use such as the device name and mac addresses.")

pdf.ln(10)

pdf.set_left_margin(15)
for key, value in hci_acl_stats.items():
    write_to_pdf(pdf, f"{key}: ", True)
    write_to_pdf(pdf, f"{value}")
    pdf.ln(6)
pdf.set_left_margin(10)
pdf.ln(4)

create_heading(pdf, "MAC Vendor Information")


pdf.output("./outputs/output.pdf")

The Bluetooth Asynchronous Connection-oriented Logical transport (ACL) is a method of transfer of asynchronous (data that is not in real time) data over a bluetooth connection.

Looking at some of the profiles under ACL we see the following:

- *AVDTP*: This is the Audio/Video Distribution Transport Protocol (AVDTP) signifies the streaming of music to a bluetooth speaker device such as a set of headsets over the L2CAP protocol.
- *HFP*: The hands free profile (HFP) can be seen as a mode of remote control between the phone and bluetooth device.
- *AVRCP*: The Audio/Video Remote Control Profile is used along with the A2DP (Advanced Audio Distribution Profile) profile to allow a single remote device to control a bluetooth device
- *SDP*: The Service Discovert Protocol is a manner of communication to discover the available bluetooth devices nearby and their types.

[Reference](https://datatracker.ietf.org/doc/html/rfc1761)

Android bluetooth logs come in the **Snoop Version 1 Packet Capture File Format** which is similar to the second version developed by Sun Microsystems in 1995. When we capture logs of this format, we obtain arrays of octets (8 bit packets of information), with each array item corresponding to a packet record.
