# Artnet Tests

### Packages

In [1]:
import asyncio
from multiprocess import Process
#import argparse
#from PIL import ImageColor
from pyartnet import ArtNetNode
from pyartnet.base import universe as Universe
from pyartnet.base import channel as Channel

In [2]:
import pygame
import sys
import pygame_gui
import PySimpleGUI as sg
from pygame_gui.elements import UIButton
from pygame_gui.windows import UIColourPickerDialog

pygame-ce 2.3.0 (SDL 2.26.4, Python 3.8.17)


### Constants

In [3]:
# Setup Constants
DEFAULT_PORT = 6454
DEFAULT_UNIVERSE_ID = 1
DEFAULT_CHANNEL_START_ID = 1
DEFAULT_CHANNEL_WIDTH = 11
# Control Constants
DIMMER_ID = 0
DIMMER_FINE_ID = 1
STROBE_ID = 2
RED_ID = 3
GREEN_ID = 4
BLUE_ID = 5
WHITE_ID = 6
AMBER_ID = 7
UV_ID = 8
PRESET_ID = 9
SOUND_ID = 10
DEFAULT_LIGHT_VALUE = [255,255]+[0]*(DEFAULT_CHANNEL_WIDTH-2)
RESET_VALUE = [0]*DEFAULT_CHANNEL_WIDTH
LIGHT_OFF_VALUE = RESET_VALUE
# Mappings
FIXTURE_TO_ID_DICT = {'Dimmer':DIMMER_ID,'Dimmer_Fine':DIMMER_FINE_ID,
                    'Strobe':STROBE_ID,'Red':RED_ID,'Green':GREEN_ID,
                    'Blue':BLUE_ID,'White':WHITE_ID,'Amber':AMBER_ID,
                    'UV':UV_ID,'Preset':PRESET_ID,'Sound':SOUND_ID}
ID_TO_FIXTURE_DICT = dict([(i,fixture) for fixture,i in FIXTURE_TO_ID_DICT.items()])

### Code

#### Helpers

In [4]:
def init_channels(ip:str,channel_list:list,port=DEFAULT_PORT,
                  universe_id=DEFAULT_UNIVERSE_ID,
                  channel_start_id=DEFAULT_CHANNEL_START_ID,
                  channel_width=DEFAULT_CHANNEL_WIDTH) -> tuple:
    """
    Create the connection and channels with the given parameters
    and turn all lights off.

    :param ip: String encoding of device IP.
    :param channel_list: Ordered list containing the names of the channel to be created.
    :param port: Destination port for communication. (int, default=DEFAULT_PORT).
    :param universe_id: Id of the universe of the setup. (int, default=DEFAULT_UNIVERSE_ID).
    :param channel_start_id: Channel start id. (int, default=DEFAULT_CHANNEL_START_ID).
    :param channel_width: Channel width. (int, default=DEFAULT_CHANNEL_WIDTH).

    :return: Tuple of dictionnaries containing the channel and their states.
    
    """
    # Init connections
    node, universe = create_connection(ip,port=port,universe_id=universe_id)
    # Init Channels
    channel_dict = dict()
    state_dict = dict()
    for idx, channel_name in enumerate(channel_list):
        create_channel(universe,channel_dict,channel_name,
                       channel_start_id+channel_width*idx,
                       channel_width)
        turn_off_channel(channel_dict,channel_name)
        state_dict[channel_name] = DEFAULT_LIGHT_VALUE.copy()
    return channel_dict, state_dict
    
    

def create_connection(ip:str,port=DEFAULT_PORT,universe_id=DEFAULT_UNIVERSE_ID) -> tuple:
    """
    Create instance of ArtNetNode and assign universe id.

    :param ip: String encoding of device IP.
    :param port: Destination port for communication. (int, default=DEFAULT_PORT).
    :param universe_id: Id of the universe of the setup. (int, default=DEFAULT_UNIVERSE_ID).

    :return: Tuple in the form (node, universe).
    
    """
    node = ArtNetNode(ip,port)
    universe = node.add_universe(universe_id)
    return node, universe

def create_channel(universe:Universe, channel_dict:dict,
                   channel_name:str,channel_start_id=DEFAULT_CHANNEL_START_ID,
                   channel_width=DEFAULT_CHANNEL_WIDTH,return_=False): 
    """
    Create channel in the given universe of size
    'channel_width' and starting at 'channel_start'.
    Then add it in the dictionnary of all channels for
    the universe.

    :param universe: ArtNet Universe object in which the channel should be created.
    :param channel_dict: Dictionnary with all the channels for the universe.
    :param channel_name: String to be used as key in the channel dictionnary.
    :param channel_start_id: Channel start id. (int, default=DEFAULT_CHANNEL_START_ID).
    :param channel_width: Channel width. (int, default=DEFAULT_CHANNEL_WIDTH).

    :return: ArtNet Channel with the given specifications.
    
    """
    if channel_start_id < 1 or channel_width < 1 :
        raise ValueError('The channel start id and channel width should be greater or equal than 1')
    channel = universe.add_channel(start=channel_start_id, width=channel_width)
    channel_dict[channel_name] = channel
    if return_:
        return channel

def send_rgb(channel_dict:dict, channel_name:str,
             values:list, state_dict:dict):
    """
    Send RGB data to the channel, while keeping other parameters fixed.

    :param channel_dict: Dictionnary with all the channels for the universe.
    :param channel_name: Name of the channel to which the signal should be transmitted.
    :param values: List of RGB values in format: [red_value, green_value, blue_value].
                   The values of each color should be contained in [0,255].
    :param state_dict: Dictionnary with the DMX states of all channels.
    
    """
    if len(values) != 3:
        raise ValueError('The list of values should be of length 3')
    current_state = state_dict[channel_name]
    new_state = current_state.copy()
    new_state[RED_ID:BLUE_ID+1] = values
    state_dict[channel_name] = new_state
    channel = channel_dict[channel_name]
    channel.set_values(new_state)

def send_amber(channel_dict:dict, channel_name:str,
             value:int, state_dict:dict):
    """
    Send RGB data to the channel, while keeping other parameters fixed.

    :param channel_dict: Dictionnary with all the channels for the universe.
    :param channel_name: Name of the channel to which the signal should be transmitted.
    :param value: The value to which should be set the amber channel.
                   The value should be contained in [0,255].
    :param state_dict: Dictionnary with the DMX states of all channels.
    
    """
    current_state = state_dict[channel_name]
    new_state = current_state.copy()
    new_state[AMBER_ID] = value
    state_dict[channel_name] = new_state
    channel = channel_dict[channel_name]
    channel.set_values(new_state)

def send_uv(channel_dict:dict, channel_name:str,
             value:int, state_dict:dict):
    """
    Send RGB data to the channel, while keeping other parameters fixed.

    :param channel_dict: Dictionnary with all the channels for the universe.
    :param channel_name: Name of the channel to which the signal should be transmitted.
    :param value: The value to which should be set the amber channel.
                   The value should be contained in [0,255].
    :param state_dict: Dictionnary with the DMX states of all channels.
    
    """
    current_state = state_dict[channel_name]
    new_state = current_state.copy()
    new_state[UV_ID] = value
    state_dict[channel_name] = new_state
    channel = channel_dict[channel_name]
    channel.set_values(new_state)

def turn_off_channel(channel_dict:dict, channel_name:str):
    """
    Turn off the lights of the given channel by setting all values to zero.

    :param channel_dict: Dictionnary with all the channels for the universe.
    :param channel_name: Name of the channel to which the signal should be transmitted.
    
    """
    channel_dict[channel_name].set_values(LIGHT_OFF_VALUE)

#### Objects

In [5]:
class Light:
    """
    Light object class. A light is model by the channel to which
    it is linked and a state. The latter is a list containing the 
    values for each of the fixture in the channel. 
    
    """
    
    def __init__(self, name:str, channel:Channel):
        """
        Create a Light instance.

        :param name: String identifier for the light. If the light is used in a Group,
                     please be sure to enter unique identifiers.
        :param channel: ArtNet channel object to which the light is bound. 

        """
        self.name = name
        self.channel = channel
        self.state = DEFAULT_LIGHT_VALUE.copy()

    def reset(self):
        """
        Reset the light to its default state, i.e. zero value for each fixture.
        """
        new_state = DEFAULT_LIGHT_VALUE
        self.channel.set_values(new_state)
        self.state = new_state

    def set_fixture_value(self, fixture_id:int, value:int):
        """
        Set the fixture to the given value.

        :param fixture_id: Id of the fixture to be set.
        :param value: Integer value of the fixture, should be contained in [0,255].
        
        """
        if value < 0 or value > 255:
            raise ValueError(f'The value for {ID_TO_FIXTURE_DICT[fixture_id]} should be contained in [0,255]')
        new_state = self.state.copy()
        new_state[fixture_id] = value
        print(new_state)
        print(self.channel)
        self.channel.set_values(new_state)
        self.state = new_state

    def turn_off(self):
        """
        Turn off the light by setting dimmer to 0.
        """
        self.set_fixture_value(DIMMER_ID,0)
    
    def set_rgb(self, values:list):
        """
        Set the RGB fixtures to the color code given in values.

        :param values: List containing the values for the red, green and blue fixtures.
        
        """
        for idx, fixture_id in enumerate([RED_ID,GREEN_ID,BLUE_ID]):
            self.set_fixture_value(fixture_id,values[idx])

In [6]:
class Group:
    """
    Group object class. A group is modeled by a list
    of lights and a name. Every action applied to the group
    will be executed on all lights present. Each light in the 
    group should be unique and have a unique name.
    
    """

    def __init__(self, name:str, lights=[]):
        """
        Create a Group instance.

        :param name: String identifier for the group of lights.
        :param lights: List containing the initial lights of the group,
                       empty by default. All lights should be unique.
        
        """
        self.name = name
        self.lights = lights
        self.light_names = list(set([l.name for l in lights]))
        if len(self.light_names) != len(self.lights):
            raise ValueError('Duplicate names in the list of lights provided to the Group constructor')
    
    def add_light(self,light):
        """
        Add a light to the existing pool. Must be a new unique light.

        :param light. Light object to add to the existing pool of lights in the group.
        
        """
        if light.name in self.light_name:
            raise ValueError('Tried to add a light whose name is already present in the group')
        self.lights.append(light)
        
    def reset(self):
        """
        Reset all lights in the group to their default states, i.e. zero value for each fixture.
        """
        for l in self.lights:
            l.reset()

    def set_fixture_value(self, fixture_id:int, value:int):
        """
        Set the given fixture to 'value' for all lights in the group.

        :param fixture_id: Id of the fixture to be set.
        :param value: Integer value of the fixture, should be contained in [0,255].
        
        """
        for l in self.lights:
            l.set_fixture_value(fixture_id,value)

    def turn_off(self):
        """
        Turn off all lights in the group by setting dimmer to 0.
        """
        for l in self.lights:
            l.turn_off()
    
    def set_rgb(self, values:list):
        """
        Set the RGB fixtures to the color code given in values for all lights in the group.

        :param values: List containing the values for the red, green and blue fixtures.
        
        """
        for l in self.lights:
            print(l,values)
            l.set_rgb(values)

### UI

In [7]:
EVENTS_SET = {'group_0','light_0','group_1','light_1','light_2','light_3','light_4','light_5',}

In [8]:
def create_UI_layout():
    """
    """
    layout = [[sg.Text("Control Center Group 1",justification="center",s=(47,1))],
    [sg.Button("Main",button_color="black on SkyBlue1",key="group_0",s=(10,5)),
        sg.Button("Light 1",button_color="black on SkyBlue1",key="light_0",s=(10,5)),
        sg.Button("Light 2",button_color="black on SkyBlue1",key="light_1",s=(10,5)),
        sg.Button("Light 3",button_color="black on SkyBlue1",key="light_2",s=(10,5))],
    [sg.Text("Control Center Group 2",justification="center",s=(47,1))],
    [sg.Button("Ambiance",button_color="black on gold",key="group_1",s=(10,5)),
     sg.Button("Light 4",button_color="black on gold",key="light_3",s=(10,5)),
     sg.Button("Light 5",button_color="black on gold",key="light_4",s=(10,5)),
     sg.Button("Light 6",button_color="black on RoyalBlue1",key="light_5",s=(10,5))],]
    return layout

In [17]:
def UI_process(light_object_dict:dict):
    """
    """
    layout = create_UI_layout()
    window = sg.Window("Delta Control", layout, background_color='black', resizable=True)
    while True:
        # Update GUI
        event, values = window.read(timeout=1000)
        if event == sg.WIN_CLOSED:
            break
        elif event in EVENTS_SET:
            light_object = light_object_dict[event]
            color_popup(light_object)
            try:
                color_popup(light_object)
            except:
                print('UUU')
                continue
                
    window.close();

### PyGame

In [18]:
def color_popup(light_object):
    """
    """
    pygame.init()

    pygame.display.set_caption('Colour Picking App')
    SCREEN = pygame.display.set_mode((800, 600))
    
    ui_manager = pygame_gui.UIManager((800, 600))
    background = pygame.Surface((800, 600))
    background.fill("#3a3b3c")
    colour_picker_button = UIButton(relative_rect=pygame.Rect(-180, -60, 150, 30),
                                    text='Pick Colour',
                                    manager=ui_manager,
                                    anchors={'left': 'right',
                                            'right': 'right',
                                            'top': 'bottom',
                                            'bottom': 'bottom'})
    colour_picker = None                                    
    current_colour = pygame.Color(0, 0, 0)
    picked_colour_surface = pygame.Surface((400, 400))
    picked_colour_surface.fill(current_colour)
    
    clock = pygame.time.Clock()
    
    while True:
        time_delta = clock.tick(60) / 1000
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                sys.exit()
            if event.type == pygame_gui.UI_BUTTON_PRESSED and event.ui_element == colour_picker_button:
                colour_picker = UIColourPickerDialog(pygame.Rect(160, 50, 420, 400),
                                                    ui_manager,
                                                    window_title="Change Colour...",
                                                    initial_colour=current_colour,
                                                    light_object=light_object)
                colour_picker_button.disable()
            if event.type == pygame_gui.UI_COLOUR_PICKER_COLOUR_PICKED:
                current_colour = event.colour
                picked_colour_surface.fill(current_colour)
            if event.type == pygame_gui.UI_WINDOW_CLOSE:
                colour_picker_button.enable()
                colour_picker = None
            
            ui_manager.process_events(event)
            
        ui_manager.update(time_delta)
    
        SCREEN.blit(background, (0, 0))
        SCREEN.blit(picked_colour_surface, (200, 100))
    
        ui_manager.draw_ui(SCREEN)
    
        pygame.display.update()
    
    pygame.quit()

In [19]:
def live_color_picker(ip:str, num_lights:int, groups_mapping:dict,
                      port=DEFAULT_PORT, universe_id=DEFAULT_UNIVERSE_ID,
                      channel_start_id=DEFAULT_CHANNEL_START_ID,
                      channel_width=DEFAULT_CHANNEL_WIDTH):
    """
    """
    # Init connections
    node, universe = create_connection(ip,port=port,universe_id=universe_id)
    # Init channels
    channel_dict = dict()
    for i in range(num_lights):
        create_channel(universe,channel_dict,str(i),
                       channel_start_id+channel_width*i,
                       channel_width)
    # Lights
    lights = []
    for i in range(num_lights):
        lights.append(Light(name=str(i),channel=channel_dict[str(i)]))
    # Groups
    groups = []
    groups_mapping = {'group_0':lights[:3],'group_1':lights[3:]}
    for group_id, group_lights in groups_mapping.items():
        groups.append(Group(name=group_id, lights=group_lights))

    # light Object Mapping
    light_object_dict = [('light_'+str(i),lights[i]) for i in range(num_lights)]
    light_object_dict.extend([('group_'+str(i),groups[i]) for i in range(len(groups))])
    light_object_dict = dict(light_object_dict)
    # UI Loop
    print(light_object_dict)
    UI_process(light_object_dict)

In [20]:
live_color_picker('169.254.79.148',6,{'group_1':[]})

{'light_0': <__main__.Light object at 0x000001F0B0F65FD0>, 'light_1': <__main__.Light object at 0x000001F0B0F65E20>, 'light_2': <__main__.Light object at 0x000001F0B0F65F40>, 'light_3': <__main__.Light object at 0x000001F0B0F65F70>, 'light_4': <__main__.Light object at 0x000001F0B0F371C0>, 'light_5': <__main__.Light object at 0x000001F0B0F37FD0>, 'group_0': <__main__.Group object at 0x000001F0B0F37FA0>, 'group_1': <__main__.Group object at 0x000001F0B0F37BE0>}
96
27
27


TypeError: cannot pickle '_asyncio.Task' object

In [None]:
await live_color_picker('169.254.79.148',6,{'group_1':[]})

{'light_0': <__main__.Light object at 0x0000021E79965550>, 'light_1': <__main__.Light object at 0x0000021E79965D90>, 'light_2': <__main__.Light object at 0x0000021E799658B0>, 'light_3': <__main__.Light object at 0x0000021E79965D00>, 'light_4': <__main__.Light object at 0x0000021E79965940>, 'light_5': <__main__.Light object at 0x0000021E79965850>, 'group_0': <__main__.Group object at 0x0000021E79965D30>, 'group_1': <__main__.Group object at 0x0000021E799659D0>}
234
63
63


In [18]:
asyncio.run(live_color_picker('169.254.79.148',6,{'group_1':[]}))

RuntimeError: asyncio.run() cannot be called from a running event loop

### Artnet

In [10]:
channel_dict, state_dict = init_channels('169.254.79.148',['main','ambiance'])

In [27]:
send_rgb(channel_dict,'main',[120,0,50],state_dict)

In [28]:
send_rgb(channel_dict,'ambiance',[20,0,120],state_dict)

In [29]:
send_amber(channel_dict,'main',50,state_dict)

In [19]:
send_uv(channel_dict,'main',0,state_dict)

In [20]:
send_amber(channel_dict,'ambiance',255,state_dict)

---

In [13]:
turn_off_channel(channel_dict,'main')
turn_off_channel(channel_dict,'ambiance')

### Async

In [None]:
async def main_async():
    # Run this code in your async function
    node = ArtNetNode('169.254.79.148',6454)

    # Create universe 0
    universe = node.add_universe(1)

    # Add a channel to the universe which consists of 3 values
    # Default size of a value is 8Bit (0..255) so this would fill
    # the DMX values 1..3 of the universe
    channel = universe.add_channel(start=1, width=11)

    # Fade channel to 255,0,0 in 5s
    # The fade will automatically run in the background
    channel.add_fade([255,0,0,0,255,0,0,0,0,0,0],1000)

    # this can be used to wait till the fade is complete
    await channel

### Process

In [3]:
node = ArtNetNode('169.254.79.148',6454)

In [4]:
universe = node.add_universe(1)

In [5]:
channel = universe.add_channel(start=1, width=11)

In [16]:
channel.set_values([255,0,0,120,40,10,80,250,0,0,0])

<Channel 1/11 8bit>