Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
grro committed May 22, 2024
1 parent 7a1168b commit ae13eac
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 118 deletions.
129 changes: 85 additions & 44 deletions pi_awning_webthing/awning.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,39 @@
import sys
import time
from datetime import datetime
from typing import List
from abc import ABC, abstractmethod
from threading import Thread, Lock




class Awning(ABC):

def __init__(self, name: str):
self.name = name
self.__listeners = set()

@abstractmethod
def is_target_reached(self) -> bool:
pass

@abstractmethod
def get_position(self) -> int:
pass

@abstractmethod
def set_position(self, new_position: int):
pass

def add_listener(self, listener):
self.__listeners.add(listener)

def _notify_listeners(self):
for listener in self.__listeners:
listener()


class Motor(ABC):

@abstractmethod
Expand All @@ -31,18 +60,6 @@ def sec_per_step(self) -> float:
pass


class AwningPropertyListener:

def on_current_pos_updated(self, current_position: int):
pass

def on_retracting_updated(self, retracting: bool):
pass

def on_extenting_updated(self, extenting: bool):
pass


class Movement:
SLOT_TOLERANCE = 7

Expand Down Expand Up @@ -84,7 +101,7 @@ def process(self):
if self.is_target_reached():
return Idling(self.motor, self.get_target_pos(), self.sec_per_slot, self.awning)
else:
self.awning.listener.on_current_pos_updated(self.get_current_pos())
self.awning.on_updated()
return self

def drive_to(self, new_position: int):
Expand All @@ -109,8 +126,7 @@ class Idling(Movement):
def __init__(self, motor: Motor, start_pos: int, sec_per_slot: float, awning):
Movement.__init__(self, motor, start_pos, 0, sec_per_slot, True, awning)
self.motor.stop()
self.awning.listener.on_extenting_updated(False)
self.awning.listener.on_retracting_updated(False)
self.awning.on_updated()

def get_pause_sec(self):
pause_sec = int(self.SLOT_TOLERANCE * self.sec_per_slot * 1.4)
Expand All @@ -127,79 +143,76 @@ class Forward(Movement):
def __init__(self, motor: Motor, start_pos: int, new_position: int, sec_per_slot: float, awning):
Movement.__init__(self, motor, start_pos, new_position - start_pos, sec_per_slot, True, awning)
self.motor.forward()
self.awning.listener.on_extenting_updated(True)
self.awning.listener.on_retracting_updated(False)
self.awning.on_updated()


class Backward(Movement):

def __init__(self, motor: Motor, start_pos: int, new_position: int, sec_per_slot: float, awning):
Movement.__init__(self, motor, start_pos, start_pos - new_position, sec_per_slot, False, awning)
self.motor.backward()
self.awning.listener.on_retracting_updated(True)
self.awning.listener.on_extenting_updated(False)
self.awning.on_updated()


class Awning:
class PiAwning(Awning):
PERIODIC_CALIBRATE_ON_HOUR = 3
PERIODIC_CALIBRATE_ON_MINUTE = 10

def __init__(self, motor: Motor):
self.sec_per_slot = motor.sec_per_step
self.listener = AwningPropertyListener()
self.motor = motor
self.__lock = Lock()
self.movement = Idling(self.motor, 0, self.sec_per_slot, self)
self.set_target_position(0)
self.set_position(0)
super().__init__(self.motor.name)
Thread(name=self.name + "_move", target=self.__process_move, daemon=False).start()
Thread(target=self.__periodic_calibrate, daemon=True).start()

@property
def name(self) -> str:
return self.motor.name
def on_updated(self):
self._notify_listeners()

def __periodic_calibrate(self):
time.sleep(60)
self.calibrate()
already_scheduled = False
while True:
now = datetime.now()
if self.PERIODIC_CALIBRATE_ON_HOUR <= now.hour < (self.PERIODIC_CALIBRATE_ON_HOUR + 1) and now.minute >= self.PERIODIC_CALIBRATE_ON_MINUTE:
if not already_scheduled:
self.calibrate()
already_scheduled = True
else:
already_scheduled = False
try:
now = datetime.now()
if self.PERIODIC_CALIBRATE_ON_HOUR <= now.hour < (self.PERIODIC_CALIBRATE_ON_HOUR + 1) and now.minute >= self.PERIODIC_CALIBRATE_ON_MINUTE:
if not already_scheduled:
self.calibrate()
already_scheduled = True
else:
already_scheduled = False
except Exception as e:
logging.warning("error occurred on calibrating " + str(e))
time.sleep(10 * 60)

def register_listener(self, listener: AwningPropertyListener):
self.listener = listener

def calibrate(self):
saved_target_pos = self.get_target_position()
saved_target_pos = self.get_position()
logging.info("calibrating")
self.movement = Idling(self.motor, 100, self.sec_per_slot, self) # set position to 100%
self.set_target_position(0) # and backward to position 0. This ensures that the awning is calibrated with position 0
self.set_position(0) # and backward to position 0. This ensures that the awning is calibrated with position 0
# wait until completed
for i in range (0, 60):
if self.is_target_reached():
break
else:
time.sleep(5)
if self.get_current_position() != saved_target_pos:
if self.__get_current_position() != saved_target_pos:
logging.info("move to previous target position " + str(saved_target_pos))
self.set_target_position(saved_target_pos)
self.set_position(saved_target_pos)

def __get_current_position(self) -> int:
return self.movement.get_current_pos()

def is_target_reached(self) -> bool:
return self.movement.is_target_reached()

def get_current_position(self) -> int:
return self.movement.get_current_pos()

def get_target_position(self) -> int:
def get_position(self) -> int:
return self.movement.get_target_pos()

def set_target_position(self, new_position: int):
def set_position(self, new_position: int):
with self.__lock:
self.movement = self.movement.drive_to(new_position)

Expand All @@ -214,3 +227,31 @@ def __process_move(self):
finally:
pause_sec = self.movement.get_pause_sec()
time.sleep(pause_sec)



class Awnings(Awning):

def __init__(self, name: str, awnings: List[Awning]):
self.__awnings = awnings
[awning.add_listener(self._notify_listeners) for awning in awnings]
super().__init__(name)

def is_target_reached(self) -> bool:
for awning in self.__awnings:
if not awning.is_target_reached():
return False
return True

def get_position(self) -> int:
positions = [awning.get_position() for awning in self.__awnings]
total = sum(positions)
if total == 0:
return 0
else:
return int(total/len(positions))

def set_position(self, target_position: int):
[awning.set_position(target_position) for awning in self.__awnings]


102 changes: 28 additions & 74 deletions pi_awning_webthing/awning_webthing.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,13 @@
from webthing import (MultipleThings, Property, Thing, Value, WebThingServer)
from pi_awning_webthing.awning import Awning, AwningPropertyListener
from pi_awning_webthing.awning import Awning, PiAwning, Awnings
from pi_awning_webthing.switch import Switch
from pi_awning_webthing.motor_tb6612Fng import load_tb6612fng
from time import sleep
import logging
import tornado.ioloop


class WebThingAwningPropertyListener(AwningPropertyListener):

def __init__(self, anwing_webthing):
self.anwing_webthing = anwing_webthing

def on_current_pos_updated(self, current_position: int):
self.anwing_webthing.ioloop.add_callback(self.anwing_webthing.set_current_position, current_position)

def on_retracting_updated(self, retracting: bool):
self.anwing_webthing.ioloop.add_callback(self.anwing_webthing.set_retracting, retracting)

def on_extenting_updated(self, extenting: bool):
self.anwing_webthing.ioloop.add_callback(self.anwing_webthing.set_extending, extenting)


class AnwingWebThing(Thing):
class AwningWebThing(Thing):

# regarding capabilities refer https://iot.mozilla.org/schemas
# there is also another schema registry http://iotschema.org/docs/full.html not used by webthing
Expand All @@ -31,95 +16,64 @@ def __init__(self, description: str, awning: Awning):
Thing.__init__(
self,
'urn:dev:ops:anwing-TB6612FNG',
'Awning ' + awning.name + " Controller",
'Awning_' + awning.name,
['MultiLevelSensor'],
description
)
self.awning = awning
self.awning.register_listener(WebThingAwningPropertyListener(self))
self.awning.add_listener(self.on_value_changed)

self.target_position = Value(0, self.__target_position)
self.position = Value(self.awning.get_position(), self.awning.set_position)
self.add_property(
Property(self,
'target_position',
self.target_position,
'position',
self.position,
metadata={
'@type': 'LevelProperty',
'title': 'Awning target position',
'title': 'Awning position',
"type": "number",
"minimum": 0,
"maximum": 100,
"unit": "percent",
'description': 'awning target position'
'description': 'awning position',
'readOnly': False
}))

self.current_position = Value(0)
self.is_target_reached = Value(self.awning.is_target_reached())
self.add_property(
Property(self,
'current_position',
self.current_position,
'is_target_reached',
self.is_target_reached,
metadata={
'@type': 'LevelProperty',
'title': 'Awning current position',
"type": "number",
'minimum': 0,
'maximum': 100,
"unit": "percent",
'readOnly': True,
'description': 'awning current position'
'title': 'is_target_reached',
"type": "boolean",
'description': 'true, if target position is reached',
'readOnly': True
}))

self.retracting = Value(0)
self.add_property(
Property(self,
'retracting',
self.retracting,
metadata={
'@type': 'BooleanProperty',
'title': 'Awning is retracting',
"type": "boolean",
'readOnly': True,
'description': 'Awning is retracting'
}))

self.extending = Value(0)
self.add_property(
Property(self,
'extending',
self.extending,
metadata={
'@type': 'BooleanProperty',
'title': 'Awning is extending',
"type": "boolean",
'readOnly': True,
'description': 'Awning is extending'
}))

self.ioloop = tornado.ioloop.IOLoop.current()

def __target_position(self, new_postion):
self.awning.set_target_position(new_postion)
def on_value_changed(self):
self.ioloop.add_callback(self._on_value_changed)

def set_current_position(self, value):
self.current_position.notify_of_external_update(value)
logging.debug(self.awning.name + " position " + str(value) + " reached (target=" + str(self.target_position.get()) + ")")

def set_retracting(self, value):
self.retracting.notify_of_external_update(value)

def set_extending(self, value):
self.extending.notify_of_external_update(value)
def _on_value_changed(self):
self.position.notify_of_external_update(self.awning.get_position())
self.is_target_reached.notify_of_external_update(self.awning.is_target_reached())



def run_server(port: int, filename: str, switch_pin_forward: int, switch_pin_backward: int, description: str):

while True:
awnings = [Awning(motor) for motor in load_tb6612fng(filename)]
awning_webthings = [AnwingWebThing(description, anwing) for anwing in awnings]
awnings = [PiAwning(motor) for motor in load_tb6612fng(filename)]
if len(awnings) > 2:
awnings = [Awnings("all", awnings)] + awnings
awning_webthings = [AwningWebThing(description, anwing) for anwing in awnings]
server = WebThingServer(MultipleThings(awning_webthings, 'Awnings'), port=port, disable_host_validation=True)

if switch_pin_forward > 0 and switch_pin_backward > 0:
Switch(switch_pin_forward, switch_pin_backward, awnings= awnings)

try:
logging.info('starting the server')
server.start()
Expand Down

0 comments on commit ae13eac

Please sign in to comment.