Skip to content

Commit

Permalink
Added ability to obtaining properties
Browse files Browse the repository at this point in the history
  • Loading branch information
breitburg committed Aug 10, 2020
1 parent 586b3b1 commit bcd5da8
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 38 deletions.
28 changes: 0 additions & 28 deletions .github/workflows/publish.yml

This file was deleted.

26 changes: 26 additions & 0 deletions .github/workflows/pythonpublish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: Upload Python Package

on:
release:
types: [created]

jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: Set up Python
uses: actions/setup-python@v1
with:
python-version: '3.x'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install setuptools wheel twine
- name: Build and publish
env:
TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }}
TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
run: |
python setup.py sdist
twine upload dist/*
50 changes: 42 additions & 8 deletions am43/blind.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from bluepy import btle
from munch import munchify
from munch import Munch


class Blind:
def __init__(self, address: bytes, auto_connect: bool = True, reconnect: bool = True):
self.address = address
self.device = [device for device in btle.Scanner().scan() if device.addr == address][0]
self.identifiers = munchify(dict(move=0x0d, stop=0x0a, battery=0xa2, light=0xaa, position=0xa7))
self.reconnect = reconnect
self.device = [device for device in btle.Scanner().scan() if device.addr == address][0]
self.identifiers = Munch(move=0x0d, stop=0x0a, battery=0xa2, light=0xaa, position=0xa7)

self._battery = self._position = self._light = None

if auto_connect:
self.connect(retry=True)
Expand All @@ -16,27 +18,59 @@ def connect(self, retry: bool = False, service: str = 'fe50', characteristic: st
while True:
try:
self.blind = btle.Peripheral(self.device)

delegate = btle.DefaultDelegate()
delegate.handleNotification = lambda handle, data: self._update_data(data=data)
self.blind.setDelegate(delegate)

self.characteristic = self.blind.getServiceByUUID(service).getCharacteristics(characteristic)[0]
except btle.BTLEDisconnectError:
if retry:
continue
break

def send(self, data: any, identifier: bytearray) -> dict:
def send(self, data: any, identifier: bytearray, wait_notification: bool = False) -> dict:
while True:
try:
message = bytearray({0x9a}) + bytearray({identifier}) + bytearray({len([data])}) + bytearray([data])
message += self._calculate_checksum(data=message)

if wait_notification and self.blind.waitForNotifications(10):
pass

return self.characteristic.write(message)
except btle.BTLEDisconnectError:
self.connect(retry=True)

def _update_data(self, data: bytearray) -> None:
identifier = data[1]

if identifier == self.identifiers.battery:
self._battery = data[7]
elif identifier == self.identifiers.position:
self._position = data[5]
elif identifier == self.identifiers.light:
self._light = data[4] * 12.5

def get_properties(self) -> Munch:
if self.reconnect: self.connect()

message = bytearray({0x9a}) + bytearray({identifier}) + bytearray({len([data])}) + bytearray([data])
message += self.calculate_checksum(data=message)
return self.characteristic.write(message)
self.send(data=0x01, identifier=self.identifiers.battery, wait_notification=True)
self.send(data=0x01, identifier=self.identifiers.position, wait_notification=True)
self.send(data=0x01, identifier=self.identifiers.light, wait_notification=True)

return Munch(battery=self._battery, position=self._position, light=self._light)

def set_position(self, percentage: int) -> None:
if self.reconnect: self.connect()
self.send(data=percentage, identifier=self.identifiers.move)

def stop(self) -> None:
if self.reconnect: self.connect()
self.send(data=0xcc, identifier=self.identifiers.stop)

@staticmethod
def calculate_checksum(data: bytearray) -> bytearray:
def _calculate_checksum(data: bytearray) -> bytearray:
checksum = 0
for byte in data:
checksum = checksum ^ byte
Expand Down
16 changes: 15 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ You can stop the blinds using `Blind.stop` method:
blind.stop() # Stops blinds
```

Also, you can access the motor properties:

```python
properties = blind.get_properties()

properties.battery # 95 <int>
properties.position # 100 <int>
properties.light # 23 <int>
```

Light property will be always return zero if the sensor don't plugged into the blind motor.

## Installation

To install the latest version you can use `pip` by executing that command:
Expand All @@ -30,6 +42,7 @@ To install the latest version you can use `pip` by executing that command:
$ pip install am43
```

All requirements will be installed automatically.

## Requirements

Expand All @@ -38,4 +51,5 @@ Full list of all dependencies you can find in `setup.py` file:
- `bluepy` >= 1.3.0
- `munch` >= 2.5.0

And you need to have bluetooth module on your machine. On Raspberry Pi (>=3) one is already on the board.
And you need to have bluetooth module on your machine. On Raspberry Pi (>=3) one is already on the board.

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='am43',
version='0.1.1',
version='0.1.2',
author='Ilya Breitburg',
author_email='me@breitburg.com',
description='AM43 blinds motor API implementation written on Python',
Expand Down

0 comments on commit bcd5da8

Please sign in to comment.