## DMX Receiver

In [1]:
# devices = %mpy --list
# ports = devices.fields(0)

# for port in ports:
#     s = %mpy --info --select {port}
#     print(s)

PICO = 'COM15'
PICO_W = 'COM4'

SENDER = PICO
RECEIVER = PICO_W


%mpy --select {RECEIVER}

In [2]:
from typing_extensions import TYPE_CHECKING  # type: ignore

if TYPE_CHECKING:
    from rp2.asm_pio import *

In [3]:
# %%micropython --select {RECEIVER} --reset
from rp2 import PIO
from machine import Pin

# remove all programs from both PIOs
for n in (0, 1):
    pio = PIO(n)
    pio.remove_program()

led = Pin("LED", Pin.OUT)
led.on()

True


In [4]:
# %%micropython
from machine import Pin, Signal

sig_max485_send = Signal(Pin(13, Pin.OUT, Pin.PULL_DOWN))

sig_max485_send.off()

In [5]:
# %%micropython

from machine import Pin, Signal
from rp2 import PIO, StateMachine, asm_pio

# -----------------------------------------------
# Wiring schema for the DMX TX

pin_dmx_tx = Pin(15, Pin.OUT)  # send data to the DMX bus
pin_dmx_rx = Pin(14, Pin.IN, pull=Pin.PULL_DOWN)  # receive data from the DMX bus
sig_max485_send = Pin(12, Pin.OUT, Pin.PULL_DOWN)  # enable/disable the MAX485 chip

# fmt: off
@asm_pio()
def dmx_receive():
    """PIO program to receive a DMX Universe frame of 512 channels."""
    # Constants
    dmx_bit = 4  # As DMX has a baudrate of 250.000kBaud, a single bit is 4us

    # Break loop
    # Receiver DMX break signal is 88us, so we need to loop 22 times to get 88us
    label("break_reset")
    set(x, 29)                              # 0

    label("break_loop")                     # BREAK = low for 88us
    jmp(pin, "break_reset")                 # 1 | Go back to start if pin goes high during BREAK
    jmp(x_dec, "break_loop")        [1]     # 2 | wait until BREAK time over (22 loops * 4us = 88us)
    
    wait(1, pin, 0)                         # 3 | wait for the Mark-After-Break (MAB)
    # TODO check if MAB is at least 8us long

    # Data loop
    label("wrap_target")                    # Start of a byte
    wait(1, pin, 0)                     [1] # 4 | Wait for START bit (low) + 1+1us - measure halfway through the bit
    set(x, 7)                   [dmx_bit-1] # 5 | 7 more bits 

    label("bitloop")
    in_(pins, 1)                            # 6 Shift data bit into ISR
    jmp(x_dec, "bitloop")    [dmx_bit - 2]  # 7 Loop 8 times, each loop iteration is 4us

    # Stop bits
    wait(1, pin, 0)                         # 8 Wait for pin to go high for stop bits
    # TODO check if STOP bits are at 8us long
    # if longer than 8 us then we are at the end of the frame  MARK Time after Slot 512
    # this can be up to  1 second - which may be too long for a PIO program to count
    
    push()                                  # 9

# fmt: on



In [8]:
# This is a Q&D way to get the PIO opcodes from the PIO assembler using an attached rp2 with micropython
from array import array
x = %mpy print(dmx_receive)
asm_pio = eval(str(x[0])) # first line of the output is the PIO program, with parameters
assembled = asm_pio[0] # just the opcodes 
print(f"{len(assembled)} opcodes assembled")

", ".join([hex(opcode) for opcode in assembled])

10 opcodes assembled


'0xe03d, 0xc0, 0x141, 0x20a0, 0x21a0, 0xe327, 0x4001, 0x246, 0x20a0, 0x8020'

In [None]:

machine_nr = 1
sm_dmx_rx = StateMachine(
    machine_nr, dmx_receive, freq=1_000_000, in_base=pin_dmx_rx, jmp_pin=pin_dmx_rx
)
sm_dmx_rx.active(1)

In [None]:
# %%micropython
# create buffer to receive the DMX data into
size = 512
from array import array

universe = array("B", [0] + [0] * (size))  # 1 start code + 512 channels

In [None]:
# %%micropython

sm_dmx_rx.restart()

for _ in range(512):
    r = sm_dmx_rx.get(None, 24)  # get the first byte, which is the start code
    print(f"{r=:X}")

In [None]:
CAPTURE = False

if CAPTURE:
    # start capture

    # setup logic capture
    from saleae import automation


    manager = automation.Manager.connect()

    # setup logic capture on all channels

    device_configuration = automation.LogicDeviceConfiguration(

        enabled_digital_channels=[0, 1, 2, 3, 4, 5, 6, 7],

        digital_sample_rate=12_000_000,

    )

    capture = manager.start_capture(device_configuration=device_configuration)

In [None]:
# %%micropython
p1.on()
import time

size = 512
from array import array

universe = array("B", [0] + [255] * (size))  # 1 start code + 512 channels

# print(f"{len(universe)=} {universe=}")

for i in range(len(universe)):
    universe[i] = i % 256

universe[0] = 123  # test Start Code

for n in range(2):
    sig_max485_send.on()
    sm_dmx_tx.restart()
    sm_dmx_tx.active(1)

    sm_dmx_tx.put(universe)

    time.sleep_us(4 * 50)  # wait for the last 4 frames to be sent before switching  the 485 driver
    sig_max485_send.off()

p1.off()

In [None]:
CAPTURE = False

if CAPTURE:
    # End capture

    capture.stop()

    # "Serial", "Accept DMX-1986 4us MAB"

    # Serial = input channel

    capture.add_analyzer("DMX-512", settings={"Serial": 5, "Accept DMX-1986 4us MAB": True})

    capture.add_analyzer(

        "DMX-512", settings={"Serial": 3, "Accept DMX-1986 4us MAB": True}, label="Tx-Shifter"

    )

    capture.add_analyzer(

        "DMX-512", settings={"Serial": 6, "Accept DMX-1986 4us MAB": True}, label="Tx-Shifter-2"

    )

    # capture.add_analyzer("DMX-512-RX", settings={"Serial": 4, "Accept DMX-1986 4us MAB": True})

    # "Input Channel", "Bit Rate (Bits/s)", "Bits per Frame", "Stop Bits", "Parity Bit", "Significant Bit", "Signal inversion", "Mode"

    # capture.add_analyzer("Async Serial", settings={"Input Channel": 6, "Bit Rate (Bits/s)": 115200})