Skip to content

Commit

Permalink
Extract native wrappers into a Native API module for internal use
Browse files Browse the repository at this point in the history
  • Loading branch information
intrueder committed Jun 3, 2022
1 parent aefc79b commit 351c44f
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 183 deletions.
134 changes: 60 additions & 74 deletions src/cuesdk/api.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,65 @@
from ctypes import c_bool, c_int32, byref
import os
import platform
from ctypes import c_bool, c_int32, c_void_p, byref, sizeof
from typing import Any, Optional, Union, Callable, Dict, Sequence, Tuple

from .capi import CorsairNativeApi
from .enums import (CorsairAccessMode, CorsairError, CorsairDevicePropertyId,
CorsairDevicePropertyType, CorsairLedId, CorsairEventId)
from .helpers import ColorRgb
from .structs import (CorsairProtocolDetails, CorsairDeviceInfo,
CorsairLedPosition, CorsairLedPositions, CorsairLedColor,
CorsairEvent)
from .utils import bytes_to_str_or_default
from .structs import (CorsairEvent, CorsairLedPositions,
CorsairProtocolDetails, CorsairDeviceInfo)
from .native import CorsairNativeApi, CorsairLedColor

__all__ = ['CueSdk']

native = None
napi: CorsairNativeApi = None


class Device(object):
def __init__(self, id, type, model, caps_mask, led_count, channels):
self.id = id
self.type = type
self.model = model
self.caps_mask = caps_mask
self.led_count = led_count
self.channels = channels
def get_library_path(lib_name):
return os.path.join(os.path.dirname(__file__), 'bin', lib_name)

def __repr__(self):
return self.model

def get_library_path_windows():
suffix = '.x64' if sizeof(c_void_p) == 8 else ''
lib_name = 'CUESDK' + suffix + '_2017.dll'
return get_library_path(lib_name)

class Channel(object):
def __init__(self, total_led_count, devices):
self.total_led_count = total_led_count
self.devices = devices


class ChannelDevice(object):
def __init__(self, type, led_count):
self.type = type
self.led_count = led_count
def get_library_path_mac():
lib_name = 'libCUESDK.dylib'
return get_library_path(lib_name)


class CueSdk(object):

def __init__(self, sdk_path: Optional[str] = None) -> None:
global native
native = CorsairNativeApi(sdk_path)
global napi
if sdk_path is None:
system = platform.system()
if system == "Windows":
sdk_path = get_library_path_windows()
elif system == "Darwin":
sdk_path = get_library_path_mac()
napi = CorsairNativeApi(sdk_path)
self._protocol_details = None

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
pass

def connect(self):
self.protocol_details = native.CorsairPerformProtocolHandshake()
err = native.CorsairGetLastError()
def connect(self) -> bool:
details = napi.CorsairPerformProtocolHandshake()
self._protocol_details = CorsairProtocolDetails.create(details)
err = napi.CorsairGetLastError()
return err == CorsairError.Success

@property
def protocol_details(self) -> CorsairProtocolDetails:
return self._protocol_details

def get_devices(self):
cnt = native.CorsairGetDeviceCount()
cnt = napi.CorsairGetDeviceCount()
if cnt > 0:
return [self.get_device_info(i) for i in range(cnt)]
return []
Expand All @@ -76,7 +78,7 @@ def get_device_count(self) -> int:
Use `CueSdk.get_device_info` to get information about a certain device.
"""
return native.CorsairGetDeviceCount()
return napi.CorsairGetDeviceCount()

def get_device_info(self, device_index: int):
"""Returns information about a device based on provided index.
Expand All @@ -85,25 +87,14 @@ def get_device_info(self, device_index: int):
device_index: zero-based index of device. Should be strictly
less than a value returned by `CueSdk.get_device_count`
"""
p = native.CorsairGetDeviceInfo(device_index)
s = p.contents
channels = []
for chi in range(s.channels.channelsCount):
ch = s.channels.channels[chi]
devices = [
ChannelDevice(ch.devices[i].type, ch.devices[i].deviceLedCount)
for i in range(ch.devicesCount)
]
channels.append(Channel(ch.totalLedsCount, devices))
return Device(bytes_to_str_or_default(s.deviceId), s.type,
bytes_to_str_or_default(s.model), s.capsMask,
s.ledsCount, channels)
p = napi.CorsairGetDeviceInfo(device_index)
return CorsairDeviceInfo.create(p)

def get_last_error(self) -> CorsairError:
"""Returns last error that occurred in this thread while using
any of SDK functions.
"""
return native.CorsairGetLastError()
return napi.CorsairGetLastError()

def request_control(self) -> bool:
"""Requests exclusive control over lighting.
Expand All @@ -112,12 +103,12 @@ def request_control(self) -> bool:
no need to call `CueSdk.request_control` unless a client
requires exclusive control.
"""
return native.CorsairRequestControl(
return napi.CorsairRequestControl(
CorsairAccessMode.ExclusiveLightingControl)

def release_control(self) -> bool:
"""Releases previously requested exclusive control."""
return native.CorsairReleaseControl(
return napi.CorsairReleaseControl(
CorsairAccessMode.ExclusiveLightingControl)

def set_layer_priority(self, priority: int) -> bool:
Expand All @@ -135,7 +126,7 @@ def set_layer_priority(self, priority: int) -> bool:
to check the reason of failure. If this function is called in
exclusive mode then it will return True
"""
return native.CorsairSetLayerPriority(priority)
return napi.CorsairSetLayerPriority(priority)

def get_led_id_for_key_name(self, key_name: str) -> CorsairLedId:
"""Retrieves led id for key name taking logical layout into account.
Expand All @@ -155,7 +146,7 @@ def get_led_id_for_key_name(self, key_name: str) -> CorsairLedId:
str) or len(key_name) != 1 or not key_name.isalpha():
return CorsairLedId(CorsairLedId.Invalid)

return native.CorsairGetLedIdForKeyName(key_name.encode())
return napi.CorsairGetLedIdForKeyName(key_name.encode())

def set_led_colors_buffer_by_device_index(
self, device_index: int,
Expand Down Expand Up @@ -188,7 +179,7 @@ def set_led_colors_buffer_by_device_index(
for i, led in enumerate(led_colors):
rgb = dict(zip(rgb_keys, led_colors[led]))
data[i] = CorsairLedColor(ledId=led, **rgb)
return native.CorsairSetLedsColorsBufferByDeviceIndex(
return napi.CorsairSetLedsColorsBufferByDeviceIndex(
device_index, sz, data)

def set_led_colors_flush_buffer(self) -> bool:
Expand All @@ -198,13 +189,13 @@ def set_led_colors_flush_buffer(self) -> bool:
This function executes synchronously, if you are concerned about
delays consider using `CueSdk.set_led_colors_flush_buffer_async`
"""
return native.CorsairSetLedsColorsFlushBuffer()
return napi.CorsairSetLedsColorsFlushBuffer()

def set_led_colors_flush_buffer_async(self) -> bool:
"""Same as `CueSdk.set_led_colors_flush_buffer` but returns control to
the caller immediately
"""
return native.CorsairSetLedsColorsFlushBufferAsync(None, None)
return napi.CorsairSetLedsColorsFlushBufferAsync(None, None)

def get_led_colors_by_device_index(
self, device_index: int, led_identifiers: Sequence[CorsairLedId]
Expand Down Expand Up @@ -239,7 +230,7 @@ def get_led_colors_by_device_index(
data = (CorsairLedColor * sz)()
for i in range(sz):
data[i].ledId = led_identifiers[i]
ok = native.CorsairGetLedsColorsByDeviceIndex(device_index, sz, data)
ok = napi.CorsairGetLedsColorsByDeviceIndex(device_index, sz, data)
if not ok:
return None
ret = {}
Expand Down Expand Up @@ -273,14 +264,10 @@ def get_led_positions_by_device_index(
If error occurred, the function returns None
"""
leds = native.CorsairGetLedPositionsByDeviceIndex(device_index)
leds = napi.CorsairGetLedPositionsByDeviceIndex(device_index)
if not leds:
return None
positions = {}
for i in range(leds.contents.numberOfLed):
p = leds.contents.pLedPosition[i]
positions[p.ledId] = (p.left, p.top)
return positions
return CorsairLedPositions.create(leds)

def subscribe_for_events(
self, handler: Callable[[CorsairEventId, Any], None]) -> bool:
Expand All @@ -302,14 +289,11 @@ def subscribe_for_events(
return False

def raw_handler(ctx, e):
id = e[0].id
if (id == CorsairEventId.DeviceConnectionStatusChangedEvent):
handler(id, e[0].deviceConnectionStatusChangedEvent[0])
elif (id == CorsairEventId.KeyEvent):
handler(id, e[0].keyEvent[0])
evt = CorsairEvent.create(e)
handler(evt.id, evt.data)

self.event_handler = native.EventHandler(raw_handler)
return native.CorsairSubscribeForEvents(self.event_handler, None)
self.event_handler = napi.EventHandler(raw_handler)
return napi.CorsairSubscribeForEvents(self.event_handler, None)

def unsubscribe_from_events(self):
"""Unregisters callback previously registered by
Expand All @@ -320,18 +304,20 @@ def unsubscribe_from_events(self):
to check the reason of failure.
"""
self.event_handler = None
return native.CorsairUnsubscribeFromEvents()
return napi.CorsairUnsubscribeFromEvents()

def get_property(self, device_index, property_id):
def get_property(self, device_index: int,
property_id: CorsairDevicePropertyId):
if (property_id & CorsairDevicePropertyType.Boolean) != 0:
prop = c_bool()
success = native.CorsairGetBoolPropertyValue(
device_index, property_id, byref(prop))
success = napi.CorsairGetBoolPropertyValue(device_index,
property_id,
byref(prop))
if success:
return prop.value
elif (property_id & CorsairDevicePropertyType.Int32) != 0:
prop = c_int32()
success = native.CorsairGetInt32PropertyValue(
success = napi.CorsairGetInt32PropertyValue(
device_index, property_id, byref(prop))
if success:
return prop.value
Expand Down
2 changes: 2 additions & 0 deletions src/cuesdk/native/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .structs import *
from .capi import *
30 changes: 5 additions & 25 deletions src/cuesdk/capi.py → src/cuesdk/native/capi.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,14 @@
import os
import platform
import sys
from ctypes import (CDLL, CFUNCTYPE, POINTER, sizeof, c_bool, c_char, c_int32,
from ctypes import (CDLL, CFUNCTYPE, POINTER, c_bool, c_char, c_int32,
c_void_p)
from .enums import (CorsairAccessMode, CorsairError, CorsairLedId,
CorsairEventId, CorsairDevicePropertyId)
from .structs import (CorsairProtocolDetails, CorsairDeviceInfo,
CorsairLedPosition, CorsairLedPositions, CorsairLedColor,
CorsairEvent)
from ..enums import (CorsairAccessMode, CorsairError, CorsairLedId,
CorsairDevicePropertyId)
from . import (CorsairProtocolDetails, CorsairDeviceInfo, CorsairLedPositions,
CorsairLedColor, CorsairEvent)

__all__ = ['CorsairNativeApi']


def get_library_path_windows():
suffix = '.x64' if sizeof(c_void_p) == 8 else ''
lib_name = 'CUESDK' + suffix + '_2017.dll'
return os.path.join(sys.prefix, 'bin', lib_name)


def get_library_path_mac():
lib_name = 'libCUESDK.dylib'
return os.path.join(sys.prefix, 'bin', lib_name)


def load_library(library_path):
try:
return CDLL(library_path)
Expand All @@ -34,12 +20,6 @@ def load_library(library_path):
class CorsairNativeApi():

def __init__(self, libpath):
if libpath is None:
system = platform.system()
if system == "Windows":
libpath = get_library_path_windows()
elif system == "Darwin":
libpath = get_library_path_mac()
lib = load_library(libpath)

def create_func(fn, restype, argtypes):
Expand Down
83 changes: 83 additions & 0 deletions src/cuesdk/native/structs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import ctypes
from ..enums import (CorsairEventId, CorsairKeyId, CorsairLedId,
CorsairDeviceType, CorsairChannelDeviceType,
CorsairPhysicalLayout, CorsairLogicalLayout)

__all__ = [
'CorsairProtocolDetails', 'CorsairChannelDeviceInfo', 'CorsairChannelInfo',
'CorsairChannelsInfo', 'CorsairDeviceInfo', 'CorsairLedPosition',
'CorsairLedPositions', 'CorsairLedColor', 'CorsairEvent',
'CorsairKeyEvent', 'CorsairDeviceConnectionStatusChangedEvent'
]

CORSAIR_DEVICE_ID_MAX = 128


class CorsairProtocolDetails(ctypes.Structure):
_fields_ = [('sdkVersion', ctypes.c_char_p),
('serverVersion', ctypes.c_char_p),
('sdkProtocolVersion', ctypes.c_int32),
('serverProtocolVersion', ctypes.c_int32),
('breakingChanges', ctypes.c_bool)]


class CorsairChannelDeviceInfo(ctypes.Structure):
_fields_ = [('type', CorsairChannelDeviceType),
('deviceLedCount', ctypes.c_int32)]


class CorsairChannelInfo(ctypes.Structure):
_fields_ = [('totalLedsCount', ctypes.c_int32),
('devicesCount', ctypes.c_int32),
('devices', ctypes.POINTER(CorsairChannelDeviceInfo))]


class CorsairChannelsInfo(ctypes.Structure):
_fields_ = [('channelsCount', ctypes.c_int32),
('channels', ctypes.POINTER(CorsairChannelInfo))]


class CorsairDeviceInfo(ctypes.Structure):
_fields_ = [('type', CorsairDeviceType), ('model', ctypes.c_char_p),
('physicalLayout', CorsairPhysicalLayout),
('logicalLayout', CorsairLogicalLayout),
('capsMask', ctypes.c_int32), ('ledsCount', ctypes.c_int32),
('channels', CorsairChannelsInfo),
('deviceId', ctypes.c_char * CORSAIR_DEVICE_ID_MAX)]


class CorsairLedPosition(ctypes.Structure):
_fields_ = [('ledId', CorsairLedId), ('top', ctypes.c_double),
('left', ctypes.c_double), ('height', ctypes.c_double),
('width', ctypes.c_double)]


class CorsairLedPositions(ctypes.Structure):
_fields_ = [('numberOfLed', ctypes.c_int32),
('pLedPosition', ctypes.POINTER(CorsairLedPosition))]


class CorsairLedColor(ctypes.Structure):
_fields_ = [('ledId', CorsairLedId), ('r', ctypes.c_int32),
('g', ctypes.c_int32), ('b', ctypes.c_int32)]


class CorsairDeviceConnectionStatusChangedEvent(ctypes.Structure):
_fields_ = [('deviceId', ctypes.c_char * CORSAIR_DEVICE_ID_MAX),
('isConnected', ctypes.c_bool)]


class CorsairKeyEvent(ctypes.Structure):
_fields_ = [('deviceId', ctypes.c_char * CORSAIR_DEVICE_ID_MAX),
('keyId', CorsairKeyId), ('isPressed', ctypes.c_bool)]


class CorsairEventPayload(ctypes.Union):
_fields_ = [('deviceConnectionStatusChangedEvent',
ctypes.POINTER(CorsairDeviceConnectionStatusChangedEvent)),
('keyEvent', ctypes.POINTER(CorsairKeyEvent))]


class CorsairEvent(ctypes.Structure):
_anonymous_ = ("payload", )
_fields_ = [('id', CorsairEventId), ('payload', CorsairEventPayload)]
Loading

0 comments on commit 351c44f

Please sign in to comment.