In [None]:
%pip install pycryptodome
from Crypto.Cipher import AES
from IPython.display import clear_output

BD_ADDR = "2D4DE701-9160-935E-C351-707F30C5709C"
BD_NAME = "GLASSES-02FB6E"
AES_KEY = b'\x32\x67\x2f\x79\x74\xad\x43\x45\x1d\x9c\x6c\x89\x4a\x0e\x87\x64'
AES_MODE = AES.MODE_ECB
DEFAULT_ATTEMPTS = 3

cipher = AES.new(AES_KEY, AES_MODE)
def encrypt(value):
    return cipher.encrypt(value)

EXAMPLE_MSGS = [
    encrypt(b'\x06PLAY\x01\x00;\x97\xf2\xf3U\xa9r\x13\x8b'),
    encrypt(b'\x06PLAY\x01\x04;\x97\xf2\xf3U\xa9r\x13\x8b'),
    encrypt(b'\x06PLAY\x01\x03;\x97\xf2\xf3U\xa9r\x13\x8b')]


clear_output()


In [6]:
import simplepyble
print(f"Running on {simplepyble.get_operating_system()}")
adapters = simplepyble.Adapter.get_adapters()

if len(adapters) == 0:
    print("No adapters found")

for adapter in adapters:
    print(f"Adapter: {adapter.identifier()} [{adapter.address()}]")

if len(adapters) != 1:
    print("Please connect only one adapter")
    exit(1)

adapter = adapters[0]
adapter.set_callback_on_scan_start(lambda: print("Scan started."))
adapter.set_callback_on_scan_stop(lambda: print("Scan complete."))
def ble_log(peripheral):
    return print(f"Found {peripheral.address()} [{peripheral.identifier()}]")
adapter.set_callback_on_scan_found(ble_log)


def ble_scan(adapter, device_name, timeout=5000):
    device = None
    def on_receive(scan_entry):
        nonlocal device
        if scan_entry.identifier() == device_name and device == None:
            device = scan_entry
            print(f"Found {device.address()} [{device.identifier()}]")
    adapter.set_callback_on_scan_found(on_receive)
    adapter.scan_for(timeout)
    adapter.set_callback_on_scan_found(ble_log)

    return device

clear_output()

In [7]:
driver = None
if driver is not None:
    driver.device.disconnect()

class ShiningGlassesDriver:
    device = None
    callbacks = {}
    cipher = None
    _connected = False
    def __init__(self, adapter):
        self.adapter = adapter
        self.cipher = AES.new(AES_KEY, AES_MODE)

    def align(self, value):
        return value.ljust(16, b'\x00')

    def encrypt(self, value):
        return cipher.encrypt(self.align(value))

    def connect(self):
        device = None
        for i in range(DEFAULT_ATTEMPTS):
            device = ble_scan(adapter, "GLASSES-02FB6E")
            if device is not None:
                break
        if not device:
            print("Device not found")
            exit(1)

        print(f"Connected to {device.identifier()} [{device.address()}]")
        device.connect()

        self.device = device
        self._connected = True

    def write(self, suid=None, write_characteristic=None, value=None, show_command=False):
        if not self._connected:
            print("Not connected to device")
            return

        show_command and print(f"[Write] {value}")
        value = self.encrypt(value)
        self.device.write_command(
            suid, write_characteristic, value)

    def request(self, suid=None, write_characteristic=None, value=None, show_command=False):
        if not self._connected:
            print("Not connected to device")
            return

        show_command and print(f"[Request] {value}")
        value = self.encrypt(value)
        return self.device.write_request(
            suid, write_characteristic, value)

    def notify(self, suid=None, write_characteristic=None, value=None, show_command=False):
        if not self._connected:
            print("Not connected to device")
            return

        unique_key = f"{suid}-{write_characteristic}"
        if unique_key not in self.callbacks:
            self.callbacks[unique_key] = set([lambda data: print(f"[Notify] {data}")])

        def callback(payload):
            for cb in list(self.callbacks[unique_key]):
                cb(payload)

        return self.device.notify(suid, write_characteristic, callback)

driver = ShiningGlassesDriver(adapter)
driver.connect()
driver.write(BD_SUID, BD_WRITE, b'\x06PLAY\x01\x03;\x97\xf2\xf3U\xa9r\x13\x8b')

Scan started.
Scan complete.
Scan started.
Scan complete.
Scan started.
Scan complete.
Device not found


AttributeError: 'NoneType' object has no attribute 'identifier'

In [4]:
def Service(suid):
    def inner(func):
        def wrapper(*args, **kwargs):
            return func(*args, **kwargs, suid=suid)
        return wrapper
    return inner

def WriteCharacteristic(write_characteristic):
    def inner(func):
        def wrapper(*args, **kwargs):
            return func(*args, **kwargs, write_characteristic=write_characteristic)
        return wrapper
    return inner

def ReadCharacteristic(read_characteristic):
    def inner(func):
        def wrapper(*args, **kwargs):
            return func(*args, **kwargs, read_characteristic=read_characteristic)
        return wrapper
    return inner

def assert_type(value, expected_type):
    if not isinstance(value, expected_type):
        raise ValueError(f"Expected {expected_type}, got {type(value)}")

def assert_int_range(value, min_value, max_value):
    if not min_value <= value <= max_value:
        raise ValueError(f"Expected {min_value} <= value <= {max_value}, got {value}")

def assert_equal(value, expected_value):
    if value != expected_value:
        raise ValueError(f"Expected {expected_value}, got {value}")

In [10]:
from enum import Enum

class ShiningGlasses():
    def __init__(self, driver):
        self.driver = driver

    @Service("0000fff0-0000-1000-8000-00805f9b34fb")
    @WriteCharacteristic("d44bc439-abfd-45a2-b575-925416129601")
    def add_callback(self, callback, **k):
        unique_key = f"{k['suid']}-{k['write_characteristic']}"
        if unique_key not in self.driver.callbacks:
            self.driver.callbacks[unique_key] = set()
        self.driver.callbacks[unique_key].add(callback)

    @Service("0000fff0-0000-1000-8000-00805f9b34fb")
    @WriteCharacteristic("d44bc439-abfd-45a2-b575-925416129600")
    def play(self, n_imgs = 1, img_idcs = [0], **k):
        assert_type(n_imgs, int)
        assert_type(img_idcs, list)
        assert_int_range(n_imgs, 1, 255)
        assert_int_range(len(img_idcs), 1, 255)
        assert_equal(len(img_idcs), n_imgs)

        command = b'\x06PLAY' + bytes([n_imgs]) + bytes(img_idcs)
        return self.driver.write(value=command, **k)

    @Service("0000fff0-0000-1000-8000-00805f9b34fb")
    @WriteCharacteristic("d44bc439-abfd-45a2-b575-925416129601")
    def subscribe_to_notifications(self, **k):
        status = self.driver.notify(**k)
        print(f"Subscribed: {status}")
        return status

    @Service("0000fff0-0000-1000-8000-00805f9b34fb")
    @WriteCharacteristic("d44bc439-abfd-45a2-b575-925416129601")
    def check(self, **k):
        self.driver.request(value=b'\x06CHEC', **k)

    @Service("0000fff0-0000-1000-8000-00805f9b34fb")
    @WriteCharacteristic("d44bc439-abfd-45a2-b575-925416129600")
    def set_light(self, brightness=0, **k):
        assert_type(brightness, int)
        assert_int_range(brightness, 0, 100)

        command = b'\x06LIGHT' + bytes([brightness])
        return self.driver.write(value=command, **k)

    @Service("0000fff0-0000-1000-8000-00805f9b34fb")
    @WriteCharacteristic("d44bc439-abfd-45a2-b575-925416129600")
    def set_image(self, img_idx=0, **k):
        assert_type(img_idx, int)
        assert_int_range(img_idx, 0, 21)
        command = b'\x06IMAG' + bytes([img_idx])
        return self.driver.write(value=command, **k)

    @Service("0000fff0-0000-1000-8000-00805f9b34fb")
    @WriteCharacteristic("d44bc439-abfd-45a2-b575-925416129600")
    def set_animation(self, img_idx=0, **k):
        assert_type(img_idx, int)
        assert_int_range(img_idx, 0, 21)
        command = b'\x06ANIM' + bytes([img_idx])
        return self.driver.write(value=command, **k)

    @Service("0000fff0-0000-1000-8000-00805f9b34fb")
    @WriteCharacteristic("d44bc439-abfd-45a2-b575-925416129600")
    def request_upload(self, image_length, **k):
        assert_type(image_length, int)
        assert_int_range(image_length, 0, 65535)
        command = b'\x06DATS' + image_length.to_bytes(2, 'big')
        return self.driver.write(value=command, **k)

    class GlassesMode(Enum):
        TEXT_STATIC = 1
        TEXT_BLINK = 2
        TEXT_SCROLL_RTL = 3
        TEXT_SCROLL_LTR = 4

    @Service("0000fff0-0000-1000-8000-00805f9b34fb")
    @WriteCharacteristic("d44bc439-abfd-45a2-b575-925416129600")
    def set_mode(self, mode, **k):
        if isinstance(mode, ShiningGlasses.GlassesMode):
            mode = mode.value
        assert_type(mode, int)
        assert_int_range(mode, 0, 5)
        command = b'\x06MODE' + bytes([mode])
        return self.driver.write(value=command, **k)

In [6]:
for service in driver.device.services():
    print(f"Service: {service.uuid()}")
    for characteristic in service.characteristics():
        print("~" * 10)
        print(f"Characteristic: {characteristic.uuid()}")
        print(f"\tRead: {characteristic.can_read()}. Write: {characteristic.can_write_command()}.")
        print(f"\tRequest: {characteristic.can_write_request()}. Notify: {characteristic.can_notify()}")
    print("–" * 20)

clear_output()

In [4]:
import time

glasses = ShiningGlasses(driver)
glasses.subscribe_to_notifications()
glasses.add_callback(lambda data: print(f"Received: {data}"))


# glasses.play(1, [0], show_command=True)
# glasses.set_light(90, show_command=True)
# glasses.set_image(0, show_command=True)
# glasses.request_upload(1000, show_command=True)
# glasses.set_image(5, show_command=True)
# glasses.set_mode(glasses.GlassesMode.TEXT_SCROLL_RTL, show_command=True)
# glasses.anim(21, show_command=True)
# glasses.chec(show_command=True)

NameError: name 'ShiningGlasses' is not defined