Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Artnet driver #638

Merged
merged 3 commits into from
Mar 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions bibliopixel/animation/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@
from . matrix import BaseMatrixAnim
from .. util import colors

BASE_COLORS = [colors.Red, colors.Green, colors.Green,
colors.Blue, colors.Blue, colors.Blue]
CYCLE_COLORS = [colors.Red, colors.Green, colors.Blue, colors.White]


class StripChannelTest(BaseStripAnim):

def __init__(self, layout):
super().__init__(layout)
self.internal_delay = 0.500
self.colors = [colors.Red, colors.Green, colors.Blue, colors.White]
self.colors = CYCLE_COLORS

def step(self, amt=1):

self.layout.set(0, colors.Red)
self.layout.set(1, colors.Green)
self.layout.set(2, colors.Green)
self.layout.set(3, colors.Blue)
self.layout.set(4, colors.Blue)
self.layout.set(5, colors.Blue)
for i, c in enumerate(BASE_COLORS):
self.layout.set(i, c)

color = self.cur_step % 4
self.layout.fill(self.colors[color], 7, 9)
Expand Down
Empty file.
32 changes: 32 additions & 0 deletions bibliopixel/drivers/artnet/artnet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import copy, ctypes, enum
from .. server_driver import ServerDriver
from ... util import log, server_cache, udp
from . import dmx_message

UDP_PORT = 0x1936 # 6454


class ArtNet(ServerDriver):
SERVER_CLASS = udp.Sender

def __init__(self, *args, ip_address, **kwds):
"""
:param dict channel_map: maps DMX channels to positions in
the color_list
"""
self.msg = dmx_message.dmx_message()

address = ip_address, UDP_PORT
super().__init__(*args, address=address, **kwds)

def _make_buffer(self):
return self.msg.data

def _send_packet(self):
# Regrettably, we need a copy here, because we can't be sure that the
# next _render won't come before this message has been sent.
# This is avoidable but not easily...
self.server.send(copy.copy(self.msg))

def _on_positions(self):
pass
65 changes: 65 additions & 0 deletions bibliopixel/drivers/artnet/dmx_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import ctypes

DMX_LENGTH = 512
ARTNET_DMX = 0x5000
ARTNET_VERSION = 14
NAME = 'Art-Net\x00'
MAX_NET = 0xFF
MAX_SUBNET = 0xF
MAX_UNIVERSE = 0xF


def dmx_message(data=None, length=None, net=0, subnet=0, universe=0,
sequence=1):
if length is None:
if data is None:
length = DMX_LENGTH
else:
length = len(data)

assert length % 2 == 0, 'artnet only takes messages of even length'
assert 0 <= length <= DMX_LENGTH
assert 0 <= sequence <= DMX_LENGTH
assert 0 <= net <= MAX_NET
assert 0 <= subnet <= MAX_SUBNET
assert 0 <= universe <= MAX_UNIVERSE

Char, Int8, Int16 = ctypes.c_char, ctypes.c_ubyte, ctypes.c_ushort

class ArtnetDMXMessage(ctypes.Structure):
# http://artisticlicence.com/WebSiteMaster/User%20Guides/art-net.pdf p47
_fields_ = [
('id', Char * 8),
('opCode', Int16),
('protVerHi', Int8),
('protVerLo', Int8),
('sequence', Int8),
('physical', Int8),
('subUni', Int8),
('net', Int8),
('lengthHi', Int8),
('length', Int8),
('data', Int8 * length), # At position 18
]

# http://artisticlicence.com/WebSiteMaster/User%20Guides/art-net.pdf p5
subUni = (subnet << 4) + universe
hi, lo = divmod(length, 256)

msg = ArtnetDMXMessage(
id=NAME.encode(),
opCode=ARTNET_DMX,
protVerLo=ARTNET_VERSION,
sequence=sequence,
net=net,
subUni=subUni,
lengthHi=hi,
length=lo)

if data is not None:
msg.data[:] = data

return msg


Message = type(dmx_message())
7 changes: 5 additions & 2 deletions bibliopixel/drivers/driver_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, num=0, width=0, height=0, c_order="RGB",
self.width = width
self.height = height
self.maker = maker
self._buf = maker.bytes(self.bufByteCount())
self._buf = self._make_buffer()

self.lastUpdate = 0
self.brightness_lock = threading.Lock()
Expand Down Expand Up @@ -175,6 +175,9 @@ def _render(self):
else:
level = self._brightness / 255.0
gam, (r, g, b) = self.gamma.get, self.c_order
for i in range(self.numLEDs):
for i in range(min(self.numLEDs, len(self._buf) / 3)):
c = [int(level * x) for x in self._colors[i + self._pos]]
self._buf[i * 3:(i + 1) * 3] = gam(c[r]), gam(c[g]), gam(c[b])

def _make_buffer(self):
return self.maker.bytes(self.bufByteCount())
Empty file.
57 changes: 57 additions & 0 deletions test/bibliopixel/drivers/artnet/artnet_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import struct, unittest
from bibliopixel.util import udp
from bibliopixel.util.colors import printer
from bibliopixel.animation import tests as animation_tests
from test.bibliopixel import mark_tests
from test.bibliopixel.project import make
from test.bibliopixel.util import udp_test
from bibliopixel.drivers.artnet import artnet, dmx_message

LOCALHOST = '127.0.0.1'
ADDRESS = LOCALHOST, artnet.UDP_PORT
MAX_STEPS = 3

PROJECT = {
"driver": {
"typename": ".artnet.artnet",
"ip_address": LOCALHOST
},
"dimensions": 12,
"animation": ".tests.StripChannelTest",
"run": {
"max_steps": MAX_STEPS
}
}


class DMXMessageTest(unittest.TestCase):
@mark_tests.long_test
def test_blackout(self):
results = []
with udp_test.receive_udp(ADDRESS, results):
project = make.make_project(PROJECT)
project.start()

self.assertEquals(len(results), MAX_STEPS + 1)

make_msg = dmx_message.Message.from_buffer_copy

failures = []
blackout = make_msg(results.pop())
for i, result in enumerate(results):
actual = make_msg(result)
expected = dmx_message.dmx_message()

for j, color in enumerate(animation_tests.BASE_COLORS):
expected.data[3 * j:3 * (j + 1)] = color

for j in range(7, 10):
color = animation_tests.CYCLE_COLORS[i]
expected.data[3 * j:3 * (j + 1)] = color

for j, (e, a) in enumerate(zip(expected.data, actual.data)):
if e != a:
failures.append((i, j, e, a))

self.assertEquals(failures, [])
self.assertEquals(bytes(blackout), bytes(dmx_message.dmx_message()))
69 changes: 69 additions & 0 deletions test/bibliopixel/drivers/artnet/dmx_message_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import struct, unittest

from bibliopixel.drivers.artnet import dmx_message


class DMXMessageTest(unittest.TestCase):
def do_test(self, data, sequence=1, **kwds):
msg = dmx_message.dmx_message(data=data, sequence=sequence, **kwds)
original = Original(data, sequence, **kwds).broadcast()
self.assertEqual(len(bytes(msg)), len(original))

differences = []
for i, (a, b) in enumerate(zip(bytes(msg), original)):
if a != b:
differences.append((i, a, b))

self.assertEqual(bytes(msg), original)

def test_blackout(self):
self.do_test(bytes(512 * [0]))

def test_trivial(self):
self.do_test(bytes())

def test_ramp_and_a_half(self):
self.do_test(bytes(i % 256 for i in range(384)))


class Original:
def __init__(self, dmxdata, packet_counter=1, net=0, subnet=0, universe=0):
self.dmxdata = dmxdata
self.packet_counter = packet_counter
self.net = net
self.subnet = subnet
self.universe = universe

def broadcast(self):
# New Array
data = []
# Fix ID 7byte + 0x00
data.append("Art-Net\x00")
# OpCode = OpOutput / OpDmx -> 0x5000, Low Byte first
data.append(struct.pack('<H', 0x5000))
# ProtVerHi and ProtVerLo -> Protocol Version 14, High Byte first
data.append(struct.pack('>H', 14))
# Order 1 to 255
data.append(struct.pack('B', self.packet_counter))
self.packet_counter += 1
if self.packet_counter > 255:
self.packet_counter = 1
# Physical Input Port
data.append(struct.pack('B', 0))
# Artnet source address
data.append(
struct.pack('<H', self.net << 8 | self.subnet << 4 | self.universe))
# Length of DMX Data, High Byte First
data.append(struct.pack('>H', len(self.dmxdata)))
# DMX Data
for d in self.dmxdata:
data.append(struct.pack('B', d))
# convert from list to string
result = bytes()
for token in data:
try: # Handels all strings
result = result + token.encode('utf-8', 'ignore')
except: # Handels all bytes
result = result + token

return result
34 changes: 23 additions & 11 deletions test/bibliopixel/project/make.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,29 @@
from .. mark_tests import SKIP_LONG_TESTS


def make_project(data):
if isinstance(data, dict):
desc = data

elif not isinstance(data, str):
raise ValueError('Cannot understand data %s' % data)

else:
if '{' in data:
fp = tempfile.NamedTemporaryFile(mode='w')
fp.write(data)
fp.seek(0)
data = fp.name

desc = json.load(data)

return project.project(desc)


def make(data, run_start=not SKIP_LONG_TESTS):
if '{' in data:
fp = tempfile.NamedTemporaryFile(mode='w')
fp.write(data)
fp.seek(0)
data = fp.name

desc = json.load(data)
pr = project.project(desc)
animation = pr.animation
project = make_project(data)

if run_start:
animation.start()
project.animation.start()

return animation
return project.animation