-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
399 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,3 +102,6 @@ venv.bak/ | |
|
||
# mypy | ||
.mypy_cache/ | ||
|
||
# VS Code | ||
.vscode |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
class Colour: | ||
|
||
def __init__(self, red, green, blue): | ||
self._red = red | ||
self._green = green | ||
self._blue = blue | ||
|
||
def serialise(self): | ||
return [self._red, self._green, self._blue] | ||
|
||
@property | ||
def red(self): | ||
return self._red | ||
|
||
@red.setter | ||
def red(self, value): | ||
self._red = int(max(0, min(value, 255))) | ||
|
||
@property | ||
def green(self): | ||
return self._green | ||
|
||
@green.setter | ||
def green(self, value): | ||
self._green = int(max(0, min(value, 255))) | ||
|
||
@property | ||
def blue(self): | ||
return self._red | ||
|
||
@blue.setter | ||
def blue(self, value): | ||
self._blue = int(max(0, min(value, 255))) | ||
|
||
def __add__(self, other): | ||
if isinstance(other, Colour): | ||
self.red += other.red | ||
self.green += other.green | ||
self.blue += other.blue | ||
elif isinstance(other, (int, float)): | ||
self.red += other | ||
self.green += other | ||
self.blue += other | ||
|
||
def __sub__(self, other): | ||
if isinstance(other, Colour): | ||
self.red -= other.red | ||
self.green -= other.green | ||
self.blue -= other.blue | ||
elif isinstance(other, (int, float)): | ||
self.red -= other | ||
self.green -= other | ||
self.blue -= other | ||
|
||
def __mul__(self, other): | ||
if isinstance(other, Colour): | ||
self.red *= other.red | ||
self.green *= other.green | ||
self.blue *= other.blue | ||
elif isinstance(other, (int, float)): | ||
self.red *= other | ||
self.green *= other | ||
self.blue *= other | ||
|
||
def __truediv__(self, other): | ||
if isinstance(other, Colour): | ||
self.red /= other.red | ||
self.green /= other.green | ||
self.blue /= other.blue | ||
elif isinstance(other, (int, float)): | ||
self.red /= other | ||
self.green /= other | ||
self.blue /= other | ||
|
||
def __floordiv__(self, other): | ||
if isinstance(other, Colour): | ||
self.red //= other.red | ||
self.green //= other.green | ||
self.blue //= other.blue | ||
elif isinstance(other, (int, float)): | ||
self.red //= other | ||
self.green //= other | ||
self.blue //= other | ||
|
||
|
||
RED = Colour(255, 0, 0) | ||
GREEN = Colour(0, 255, 0) | ||
BLUE = Colour(0, 0, 255) | ||
WHITE = Colour(255, 255, 255) | ||
BLACK = Colour(0, 0, 0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
from abc import ABC, abstractmethod | ||
from functools import partial | ||
from importlib import import_module | ||
from os import listdir, path | ||
from typing import Dict | ||
|
||
|
||
__ALL__ = ["DMXDriver", "get_drivers"] | ||
|
||
DRIVER_PATH = path.abspath(path.dirname(__file__)) | ||
|
||
add_driver_path = partial(path.join, DRIVER_PATH) | ||
|
||
|
||
class DMXDriver(ABC): | ||
|
||
@abstractmethod | ||
def open(self): | ||
"""Open the driver.""" | ||
|
||
@abstractmethod | ||
def close(self): | ||
"""Close the driver.""" | ||
|
||
@abstractmethod | ||
def write(self): | ||
"""Write 512 bytes or less of DMX data.""" | ||
|
||
@property | ||
@abstractmethod | ||
def closed(self): | ||
"""Is the driver closed.""" | ||
|
||
@staticmethod | ||
def get_driver_name(self): | ||
"""Get driver name.""" | ||
return "ABD" | ||
|
||
|
||
def get_drivers() -> Dict[str, DMXDriver]: | ||
driver_files = map(add_driver_path, listdir(DRIVER_PATH)) | ||
driver_files = filter(path.isfile, driver_files) | ||
driver_files = filter(lambda x: x.endswith(".py"), driver_files) | ||
driver_files = filter(lambda x: not path.basename(x).startswith("__"), driver_files) | ||
driver_names = map(path.basename, driver_files) | ||
driver_names = map(lambda x: path.splitext(x)[0], driver_names) | ||
drivers = {} | ||
for driver_name in driver_names: | ||
driver_module = import_module("." + driver_name, "dmx.drivers") | ||
if hasattr(driver_module, "DRIVER_CLASS"): | ||
driver_class = getattr(driver_module, "DRIVER_CLASS") | ||
drivers[driver_class.get_driver_name()] = driver_class | ||
return drivers |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
#!/usr/bin/env python3 | ||
from ctypes import * | ||
|
||
from pylibftdi import Device, Driver | ||
|
||
from dmx.drivers import DMXDriver | ||
|
||
Driver._lib_search[ | ||
"libftdi"] = tuple(["./libftdi.so", "./libftdi.so.1", "./libftdi1.so"] + | ||
list(Driver._lib_search["libftdi"])) | ||
|
||
|
||
_LIBC = cdll.LoadLibrary("libc.so.6") | ||
|
||
class timespec(Structure): | ||
"""A timespec.""" | ||
_fields_ = [("tv_sec", c_long), ("tv_nsec", c_long)] | ||
|
||
|
||
def wait_ms(milliseconds): | ||
"""Wait for a specified number of milliseconds.""" | ||
dummy = timespec() | ||
sleeper = timespec() | ||
sleeper.tv_sec = int(milliseconds / 1000) | ||
sleeper.tv_nsec = (milliseconds % 1000) * 1000000 | ||
_LIBC.nanosleep(byref(sleeper), byref(dummy)) | ||
|
||
|
||
def wait_us(nanoseconds): | ||
"""Wait for a specified number of nanoseconds.""" | ||
dummy = timespec() | ||
sleeper = timespec() | ||
sleeper.tv_sec = int(nanoseconds / 1000000) | ||
sleeper.tv_nsec = (nanoseconds % 1000) * 1000 | ||
_LIBC.nanosleep(byref(sleeper), byref(dummy)) | ||
|
||
|
||
class FT232R(Device, DMXDriver): | ||
"""A DMX driver design for the University of York Serial-to-DMX usb adapter based on the FT232R.""" | ||
|
||
_BITS_8 = 8 | ||
_STOP_BITS_2 = 2 | ||
_PARITY_NONE = 0 | ||
_BREAK_OFF = 0 | ||
_BREAK_ON = 1 | ||
|
||
def __init__(self, *o, **k): | ||
Device.__init__(self, *o, mode="b", **k) | ||
self.baudrate = 250000 | ||
self.ftdi_fn.ftdi_set_line_property( | ||
FT232R._BITS_8, FT232R._STOP_BITS_2, FT232R._PARITY_NONE) | ||
|
||
def write(self, data): | ||
try: | ||
byte_data = bytes(data) | ||
except TypeError: | ||
byte_data = self.encoder.encode(data) | ||
# Break | ||
self._set_break_on() | ||
wait_ms(10) | ||
# Mark after break | ||
self._set_break_off() | ||
wait_us(8) | ||
# Frame body | ||
Device.write(self, b"\x00" + byte_data) | ||
# Idle | ||
wait_ms(15) | ||
|
||
def _set_break_on(self): | ||
self.ftdi_fn.ftdi_set_line_property2( | ||
FT232R._BITS_8, FT232R._STOP_BITS_2, FT232R._PARITY_NONE, | ||
FT232R._BREAK_ON) | ||
|
||
def _set_break_off(self): | ||
self.ftdi_fn.ftdi_set_line_property2( | ||
FT232R._BITS_8, FT232R._STOP_BITS_2, FT232R._PARITY_NONE, | ||
FT232R._BREAK_OFF) | ||
|
||
@staticmethod | ||
def get_driver_name(): | ||
return "FT232R" | ||
|
||
|
||
DRIVER_CLASS = FT232R | ||
|
||
__ALL__ = ["DRIVER_CLASS"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
from dmx.drivers import get_drivers, DMXDriver | ||
from typing import Dict | ||
|
||
DMX_MAX_ADDRESS = 512 | ||
|
||
class DMXInterface: | ||
"""Represents the interface between the DMX device and a frame generation source.""" | ||
|
||
def __init__(self, driver_name, *args, **kwards): | ||
self._device = None | ||
self._frame_state = [] | ||
self.clear_state() | ||
self._set_device_driver(driver_name, *args, **kwards) | ||
|
||
def _set_device_driver(self, driver_name: str, *args, **kwards): | ||
drivers = get_drivers() # type: Dict[str, DMXDriver] | ||
if driver_name in drivers: | ||
self._device = drivers[driver_name](*args, **kwards) | ||
else: | ||
raise Exception("Unknown driver") | ||
|
||
def open(self): | ||
self._device.open() | ||
|
||
def send_update(self): | ||
if not self._device.closed: | ||
self._device.write(self._frame_state) | ||
|
||
def set_frame(self, frame): | ||
if not self._device.closed: | ||
self._frame_state = frame[:DMX_MAX_ADDRESS] + ([0] * (DMX_MAX_ADDRESS - len(frame))) | ||
|
||
def clear_state(self): | ||
self._frame_state = [0] * DMX_MAX_ADDRESS | ||
|
||
def close(self): | ||
if not self._device.closed: | ||
self._device.close() | ||
|
||
def __del__(self): | ||
if hasattr(self._device, "closed") and not self._device.closed: | ||
self._device.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
from dmx.colour import BLACK | ||
|
||
DMX_MAX_ADDRESS = 512 | ||
DMX_MIN_ADDRESS = 1 | ||
|
||
|
||
class DMXLight: | ||
"""Represents a DMX light.""" | ||
|
||
def __init__(self, address=1): | ||
self._address = int(max(0, min(address, DMX_MAX_ADDRESS))) | ||
|
||
def serialise(self): | ||
return [] | ||
|
||
@property | ||
def start_address(self): | ||
return self._address | ||
|
||
@property | ||
def end_address(self): | ||
end_address = self._address + self.slot_count - 1 | ||
if end_address > DMX_MAX_ADDRESS or end_address < DMX_MIN_ADDRESS: | ||
return (end_address % DMX_MAX_ADDRESS) + DMX_MIN_ADDRESS | ||
return end_address | ||
|
||
@property | ||
def slot_count(): | ||
return 0 | ||
|
||
|
||
class DMXLight3Slot(DMXLight): | ||
"""Represents a DMX light with RGB.""" | ||
|
||
def __init__(self, address=1): | ||
super().__init__(address=address) | ||
self._colour = BLACK | ||
|
||
@property | ||
def slot_count(self): | ||
return 3 | ||
|
||
def set_colour(self, colour): | ||
self._colour | ||
|
||
def serialise(self): | ||
return self._colour.serialise() | ||
|
||
|
||
class DMXLight7Slot(DMXLight3Slot): | ||
"""Represents an DMX light with RGB, rotation, and opacity.""" | ||
|
||
def __init__(self, address: int=1): | ||
super().__init__(address=address) | ||
self._opacity = 255 | ||
self._coords = (0, 0, 0) | ||
|
||
def set_rotation(self, pitch: int , roll: int, yaw: int): | ||
pitch = int(max(0, min(pitch, 255))) | ||
roll = int(max(0, min(roll, 255))) | ||
yaw = int(max(0, min(yaw, 255))) | ||
self._coords = (pitch, roll, yaw) | ||
|
||
def set_opacity(self, value): | ||
self._opacity = int(max(0, min(value, 255))) | ||
|
||
@property | ||
def slot_count(self): | ||
return 7 | ||
|
||
def serialise(self): | ||
return super().serialise() + list(self._coords) + [self._opacity] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
from typing import Set | ||
|
||
from dmx.light import DMXLight | ||
|
||
DMX_MAX_ADDRESS = 512 | ||
|
||
class DMXUniverse: | ||
"""Represents a DMX universe.""" | ||
|
||
def __init__(self, universe_id=1): | ||
self._lights = set() # type: Set[DMXLight] | ||
self._id = 1 | ||
|
||
def add_light(self, light: DMXLight): | ||
self._lights.add(light) | ||
|
||
def remove_light(self, light: DMXLight): | ||
self._lights.remove(light) | ||
|
||
def has_light(self, light: DMXLight): | ||
return light in self._lights | ||
|
||
def serialise(self): | ||
frame = [0] * DMX_MAX_ADDRESS | ||
for light in self._lights: | ||
serialised_light = light.serialise() | ||
for address in range(light.start_address, light.end_address + 1): | ||
frame[address - 1] |= serialised_light[address - light.start_address] | ||
return frame |
Oops, something went wrong.