Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Supported sensor brands
- iNode
- Jinou
- Kegtron
- KKM
- Moat
- Oral-B
- Qingping
Expand Down Expand Up @@ -66,7 +67,7 @@ ble_parser = BleParser(

**report_unknown**

Report unknown sensors. Can be set to `ATC`, `b-parasite`, `BlueMaestro`, `Brifit`, `Govee`, `Inkbird`, `iNode`, `Jinou`, `Kegtron`, `Moat`, `Mi Scale`, `Oral-B`, `Qingping`, `Ruuvitag`, `SensorPush`, `Sensirion`, `Teltonika`, `Thermoplus`, `Xiaogui` or `Xiaomi` to report unknown sensors of a specific brand to the logger. You can set it to `Other` to report all unknown advertisements to the logger. Default: `False`
Report unknown sensors. Can be set to `ATC`, `b-parasite`, `BlueMaestro`, `Brifit`, `Govee`, `Inkbird`, `iNode`, `Jinou`, `Kegtron`, `KKM`, `Moat`, `Mi Scale`, `Oral-B`, `Qingping`, `Ruuvitag`, `SensorPush`, `Sensirion`, `Teltonika`, `Thermoplus`, `Xiaogui` or `Xiaomi` to report unknown sensors of a specific brand to the logger. You can set it to `Other` to report all unknown advertisements to the logger. Default: `False`

**discovery**

Expand Down
12 changes: 9 additions & 3 deletions package/bleparser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .inode import parse_inode
from .jinou import parse_jinou
from .kegtron import parse_kegtron
from .kkm import parse_kkm
from .laica import parse_laica
from .miscale import parse_miscale
from .moat import parse_moat
Expand Down Expand Up @@ -170,9 +171,14 @@ def parse_data(self, data):
sensor_data = parse_xiaomi(self, service_data, mac, rssi)
break
elif uuid16 == 0xFEAA:
# UUID16 = Google (used by Ruuvitag V2/V4)
sensor_data = parse_ruuvitag(self, service_data, mac, rssi)
break
if len(service_data) == 19:
# UUID16 = Google (used by KKM)
sensor_data = parse_kkm(self, service_data, mac, rssi)
break
elif len(service_data) >= 23:
# UUID16 = Google (used by Ruuvitag V2/V4)
sensor_data = parse_ruuvitag(self, service_data, mac, rssi)
break
elif uuid16 == 0xFFF9:
# UUID16 = FIDO (used by Cleargrass)
sensor_data = parse_qingping(self, service_data, mac, rssi)
Expand Down
81 changes: 81 additions & 0 deletions package/bleparser/kkm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Parser for KKM beacon BLE advertisements"""
import logging
import math
from struct import unpack
from .helpers import (
to_mac,
to_unformatted_mac,
)

_LOGGER = logging.getLogger(__name__)


def parse_kkm(self, data, source_mac, rssi):
"""Parser for KKM sensors."""
kkm_mac = source_mac
device_type = "K6 Sensor Beacon"
result = {
"mac": to_unformatted_mac(kkm_mac),
"type": device_type,
"rssi": rssi,
"data": False,
}
if len(data) == 19:
# K6 Sensor Beacon
(frame_type, version, control_byte, volt, temp, temp_frac, humi, humi_frac, accx, accy, accz) = unpack(
">BBBHbBBBhhh", data[4:19]
)
if frame_type == 0x21 and version == 1:
if temp < 0:
temperature = -(temp + 128 + temp_frac / 100)
else:
temperature = temp + temp_frac / 100
humidity = humi + humi_frac / 100
result.update(
{
"temperature": temperature,
"humidity": humidity,
"acceleration": round(math.sqrt(accx ** 2 + accy ** 2 + accz ** 2), 1),
"acceleration_x": accx,
"acceleration_y": accy,
"acceleration_z": accz,
"voltage": volt / 1000,
"firmware": "KKM",
"packet": "no packet id",
"data": True,
}
)
else:
result = None
else:
result = None
if result is None:
if self.report_unknown == "KKM":
_LOGGER.info(
"BLE ADV from UNKNOWN KKM DEVICE: RSSI: %s, MAC: %s, ADV: %s",
rssi,
to_mac(source_mac),
data.hex(),
)
return None
# reformat battery info to match BLE monitor format
if "voltage" in result:
voltage = result["voltage"]
# calculate battery in %
if voltage >= 3.00:
batt = 100
elif voltage >= 2.60:
batt = 60 + (voltage - 2.60) * 100
elif voltage >= 2.50:
batt = 40 + (voltage - 2.50) * 200
elif voltage >= 2.45:
batt = 20 + (voltage - 2.45) * 400
else:
batt = 0
result["battery"] = round(batt, 1)
# check for MAC presence in sensor whitelist, if needed
if self.discovery is False and kkm_mac.lower() not in self.sensor_whitelist:
_LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(kkm_mac))
return None

return result
50 changes: 35 additions & 15 deletions package/bleparser/oral_b.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Parser for Moat BLE advertisements."""
"""Parser for Oral-B BLE advertisements."""
import logging
from struct import unpack

Expand All @@ -12,22 +12,17 @@
4: "charging",
5: "setup",
6: "flight menu",
8: "selection menu",
113: "final test",
114: "pcb test",
115: "sleeping",
116: "transport"
}

MODES = {
0: "off",
1: "daily clean",
2: "sensitive",
3: "massage",
4: "whitening",
5: "deep clean",
6: "tongue cleaning",
7: "turbo",
255: "unknown"
PRESSURE = {
114: "normal",
118: "button pressed",
178: "high"
}


Expand All @@ -38,7 +33,6 @@ def parse_oral_b(self, data, source_mac, rssi):
oral_b_mac = source_mac
result = {"firmware": firmware}
if msg_length == 15:
device_type = "SmartSeries 7000"
(state, pressure, counter, mode, sector, sector_timer, no_of_sectors) = unpack(
">BBHBBBB", data[7:15]
)
Expand All @@ -48,8 +42,34 @@ def parse_oral_b(self, data, source_mac, rssi):
else:
result.update({"toothbrush": 0})

device_bytes = data[4:7]
if device_bytes == b'\x062k':
device_type = "IO Series 7"
MODES = {
0: "daily clean",
1: "sensitive",
2: "gum care",
3: "whiten",
4: "intense",
8: "settings"
}
else:
device_type = "SmartSeries 7000"
MODES = {
0: "off",
1: "daily clean",
2: "sensitive",
3: "massage",
4: "whitening",
5: "deep clean",
6: "tongue cleaning",
7: "turbo",
255: "unknown"
}

tb_state = STATES.get(state, "unknown state " + str(state))
tb_mode = MODES.get(mode, "unknown mode " + str(mode))
tb_pressure = PRESSURE.get(pressure, "unknown pressure " + str(pressure))

if sector == 254:
tb_sector = "last sector"
Expand All @@ -60,7 +80,7 @@ def parse_oral_b(self, data, source_mac, rssi):

result.update({
"toothbrush state": tb_state,
"pressure": pressure,
"pressure": tb_pressure,
"counter": counter,
"mode": tb_mode,
"sector": tb_sector,
Expand All @@ -71,7 +91,7 @@ def parse_oral_b(self, data, source_mac, rssi):
else:
if self.report_unknown == "Oral-B":
_LOGGER.info(
"BLE ADV from UNKNOWN Moat DEVICE: RSSI: %s, MAC: %s, ADV: %s",
"BLE ADV from UNKNOWN Oral-B DEVICE: RSSI: %s, MAC: %s, ADV: %s",
rssi,
to_mac(source_mac),
data.hex()
Expand All @@ -96,4 +116,4 @@ def parse_oral_b(self, data, source_mac, rssi):

def to_mac(addr: int):
"""Return formatted MAC address"""
return ':'.join(f'{i:02X}' for i in addr)
return ':'.join(f'{i:02X}' for i in addr)
29 changes: 29 additions & 0 deletions package/tests/test_kkm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""The tests for the KKM ble_parser."""
from bleparser import BleParser


class TestKKM:
"""Tests for the KKM parser"""
def test_kkm_k6(self):
"""Test KKM BLE parser for K6 sensors"""
data_string = "043E26020100016CD0060234DD1A0201060303AAFE1216AAFE21010F0E07192A224FFFFCFFEC03EBD3"
data = bytes(bytearray.fromhex(data_string))

# pylint: disable=unused-variable
ble_parser = BleParser()
sensor_msg, tracker_msg = ble_parser.parse_data(data)

assert sensor_msg["firmware"] == "KKM"
assert sensor_msg["type"] == "K6 Sensor Beacon"
assert sensor_msg["mac"] == "DD340206D06C"
assert sensor_msg["packet"] == "no packet id"
assert sensor_msg["data"]
assert sensor_msg["temperature"] == 25.42
assert sensor_msg["humidity"] == 34.79
assert sensor_msg["acceleration"] == 1003.2
assert sensor_msg["acceleration_x"] == -4
assert sensor_msg["acceleration_y"] == -20
assert sensor_msg["acceleration_z"] == 1003
assert sensor_msg["voltage"] == 3.591
assert sensor_msg["battery"] == 100
assert sensor_msg["rssi"] == -45
2 changes: 1 addition & 1 deletion package/tests/test_oral_b.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def test_oral_b_smartseries_7000(self):
assert sensor_msg["data"]
assert sensor_msg["toothbrush"] == 1
assert sensor_msg["toothbrush state"] == 'running'
assert sensor_msg["pressure"] == 40
assert sensor_msg["pressure"] == 'unknown pressure 40'
assert sensor_msg["counter"] == 1041
assert sensor_msg["mode"] == 'turbo'
assert sensor_msg["sector"] == 'sector 55'
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = bleparser
version = 3.0.0
version = 3.1.0
author = Ernst79
author_email = e.klamer@gmail.com
description = Parser for passive BLE advertisements
Expand Down