Skip to content

Commit

Permalink
Adding the "connect a USB keyboard as input device when the keypad is…
Browse files Browse the repository at this point in the history
… offline" fallback
  • Loading branch information
CRImier committed Jan 27, 2019
1 parent c7e6729 commit 5c03f43
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 3 deletions.
Empty file.
79 changes: 79 additions & 0 deletions apps/keyboard_fallback/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from time import sleep
from threading import Event

from ui import Canvas, Refresher
from ui.base_ui import BaseUIElement
from apps import ZeroApp
from helpers import ExitHelper, setup_logger, remove_left_failsafe, read_or_create_config, local_path_gen, save_config_method_gen

logger = setup_logger(__name__, "info")

from __main__ import input_device_manager as device_manager

from zerophone_hw import USB_DCDC

class KeyboardFallbackApp(ZeroApp):

do_not_activate_events = ["usb_keyboard_connected"]

def __init__(self, *args, **kwargs):
ZeroApp.__init__(self, *args, **kwargs)
self.active = Event()
self.pop_on_event = Event()
self.pop_on_event.set()
self.c = Canvas(self.o)
device_manager.register_monitor_callback(self.process_dm_event)
self.i.set_streaming(self.deactivate)
self.state = None
self.status_image = "No image"
self.r = Refresher(self.get_status_image, self.i, self.o, name="Keyboard fallback status refresher")

def deactivate(self, keyname, *args, **kwargs):
# Upon receiving *any* key when active, it's our hint from the user
# to not appear again ;-P
# Unless the keyboard was just connected, of course
if self.state == "usb_keyboard_connected":
self.r.deactivate()
self.context.signal_background()
else:
self.pop_on_event.clear()

def go_into_foreground(self):
if not self.pop_on_event.isSet():
return False
return self.context.request_switch()

def process_dm_event(self, event):
self.c.clear()
self.state = event
if not self.context.is_active() and self.pop_on_event.isSet() \
and event not in self.do_not_activate_events:
if not self.go_into_foreground():
return
if event == "usb_keyboard_connected":
self.c.centered_text("USB kb connected")
elif event == "custom_i2c_disconnected":
self.c.centered_text("Keypad not found!")
elif event == "looking_for_usb_keyboard":
self.c.centered_text("Keypad not found!", ch=16)
self.c.centered_text("Looking for USB kb")
elif event == "usb_keyboard_found":
self.c.centered_text("USB keyboard found")
elif event == "usb_keyboard_disconnected":
self.c.centered_text("USB kb disconnected!")
self.status_image = self.c.get_image()
if self.state == "usb_keyboard_connected":
sleep(2)
if self.state == "usb_keyboard_connected":
self.r.deactivate()
self.context.signal_background()

def set_context(self, c):
self.context = c
c.set_target(self.show_status)

def get_status_image(self):
return self.status_image

def show_status(self):
self.r.activate()
105 changes: 102 additions & 3 deletions input/hotplug.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,53 @@
from ui import Menu
from threading import Thread
from time import sleep

from helpers import setup_logger

# If evdev is not available, mark it as 'None' so that it's known as unavailable
try:
import evdev
except ImportError:
evdev = None

logger = setup_logger(__name__, "warning")

def get_input_devices():
"""Returns list of all the available InputDevices"""
from evdev import list_devices
devices = []
for fn in list_devices():
for fn in evdev.list_devices():
try:
devices.append(HID(fn))
devices.append(evdev.InputDevice(fn))
except:
logger.exception("Error while listing a HID device {}".format(fn))
pass
return devices


class DeviceManager():
def __init__(self, i):
self.i = i
self.driver_storage = {}
self.monitor_cbs = []
self.start_monitor_loops()
self.unconnectable_keyboards = []

def register_monitor_callback(self, cb):
self.monitor_cbs.append(cb)

def remove_monitor_callback(self, cb):
self.monitor_cbs.remove(cb)

def notify_event(self, event):
if self.monitor_cbs:
logger.info("Event: {}, sending it to callbacks".format(event))
else:
logger.info("Event: {} (no monitor callbacks set)".format(event))
for cb in self.monitor_cbs:
try:
cb(event)
except:
logging.exception("Callback {} ({}) failed to receive event!".format(cb, cb.__name__))

def get_drivers(self):
return self.driver_storage
Expand All @@ -35,3 +66,71 @@ def remove_driver(self, name):
driver = self.driver_storage.pop(name)
self.i.detach_driver(name)
driver.atexit()

def start_monitor_loops(self):
# Only start the "wait for USB keyboard on custom i2c device" loop
# if custom i2c devices are actually present and evdev lib is installed
ci_drivers = self.get_custom_i2c_drivers()
if len(ci_drivers) > 0 and evdev is not None:
self.start_ukouk_thread()

def start_ukouk_thread(self):
self.ukouk_thread = Thread(target=self.connect_usb_keyboard_on_unresponsive_keypad_loop)
self.ukouk_thread.daemon = True
self.ukouk_thread.start()

driver_ok_sleep = 5
usb_keyboard_not_detected_sleep = 3
usb_keyboard_connected_sleep = 1

def connect_usb_keyboard_on_unresponsive_keypad_loop(self):
ci_drivers = self.get_custom_i2c_drivers()
driver = ci_drivers[0] # Can't wait until I think of a better heuristic lol
while True:
if driver.connected.isSet() is not True:
print(driver.connected.isSet())
self.notify_event("custom_i2c_disconnected")
usb_keyboard = None
while usb_keyboard is None and not driver.connected.isSet():
self.notify_event("looking_for_usb_keyboard")
usb_keyboard = self.detect_usb_keyboard()
if usb_keyboard is None:
sleep(self.usb_keyboard_not_detected_sleep)
if driver.connected.isSet(): # Driver is active again
self.notify_event("custom_i2c_connected_back")
continue
else: # Driver still not active and keyboard was found
self.notify_event("usb_keyboard_found")
driver_name = self.connect_usb_keyboard(usb_keyboard.name)
if driver_name:
self.notify_event("usb_keyboard_connected")
while self.i.drivers[driver_name].connected.isSet(): # idling while driver is connected
sleep(self.usb_keyboard_connected_sleep)
self.notify_event("usb_keyboard_disconnected")
self.remove_driver(driver_name)
else:
self.notify_event("usb_keyboard_failed_to_connect")
self.unconnectable_keyboards.append(usb_keyboard.name)
else: # driver OK
sleep(self.driver_ok_sleep)

def detect_usb_keyboard(self):
hid_devices = get_input_devices()
usb_keyboards = [hid_dev for hid_dev in hid_devices if hid_dev.name not in self.unconnectable_keyboards]
return usb_keyboards[0] if usb_keyboards else None # again, needs a better heuristic

def connect_usb_keyboard(self, usb_keyboard):
try:
driver_name = self.add_driver(name=usb_keyboard, dtype="hid")
except:
logger.exception("Failed to attach USB keyboard {}:".format(usb_keyboard))
return False
else:
return driver_name

def keypad_unresponsive(self):
self.notify_event("keypad_unresponsive")

def get_custom_i2c_drivers(self):
custom_i2c_drivers = [driver for name, driver in self.i.drivers.items() if name.startswith("custom_i2c")]
return custom_i2c_drivers

0 comments on commit 5c03f43

Please sign in to comment.