This notebook utilizes the Workbench and Client classes, which allows for convenient, notebook-based use of the sdk.

In [None]:
from datetime import time, timedelta
from whendo.sdk.client import Client
from whendo.sdk.workbench import Workbench
import whendo.core.util as util
import whendo.core.actions.file_action as file_x
import whendo.core.actions.logic_action as logic_x
import whendo.core.scheduler as sched_x
import whendo.core.actions.gpio_action as gpio_x
import whendo.core.actions.sys_action as sys_x

"""
define the actions
"""
green_on = gpio_x.SetPin(pin=27, on=True)
green_off = gpio_x.SetPin(pin=27, on=False)
green_toggle = gpio_x.TogglePin(pin=27)
red_on = gpio_x.SetPin(pin=25, on=True)
red_off = gpio_x.SetPin(pin=25, on=False)
red_toggle = gpio_x.TogglePin(pin=25)
gpio_clear = gpio_x.Cleanup()
toggle_toggle = logic_x.ListAction(op_mode=logic_x.ListOpMode.ALL, action_list=[green_toggle, red_toggle])
on_on = logic_x.ListAction(op_mode=logic_x.ListOpMode.ALL, action_list=[green_on, red_on])
off_off = logic_x.ListAction(op_mode=logic_x.ListOpMode.ALL, action_list=[green_off, red_off])
file_heartbeat = file_x.FileHeartbeat(file="gpio_beat.txt")
system_info = sys_x.SystemInfoAction()

"""
define the schedulers
"""
often = sched_x.TimelyScheduler(interval=1)
morning, evening = time(6,0,0), time(18,0,0)
daily_often = sched_x.TimelyScheduler(start=morning, stop=evening, interval=1)
nightly_often = sched_x.TimelyScheduler(start=evening, stop=morning, interval=1)
randomly_often = sched_x.RandomlyScheduler(time_unit=util.TimeUnit.second, low=2, high=5)
timely_at_00_sec = sched_x.TimelyScheduler(interval=1, second=00)
timely_at_30_sec = sched_x.TimelyScheduler(interval=1, second=30)
immediately = sched_x.Immediately()


def add_actions(client):
    """
    add actions to the 'client' dispatcher
    """
    [ client.add_action(*action) for action in [
        ('green_on', green_on),
        ('green_off', green_off),
        ('green_toggle', green_toggle),
        ('red_on', red_on),
        ('red_off', red_off),
        ('red_toggle', red_toggle),
        ('gpio_clear', gpio_clear),
        ('toggle_toggle', toggle_toggle),
        ('on_on', on_on),
        ('off_off', off_off),
        ('file_heartbeat', file_heartbeat),
        ('sys_info', system_info)
        ]
    ]

def add_schedulers(client):
    """
    add schedulers to the 'client' dispatcher
    """
    [ client.add_scheduler(*scheduler) for scheduler in [
        ('often', often),
        ('daily_often', daily_often),
        ('nightly_often', nightly_often),
        ('randomly_often', randomly_often),
        ('timely_at_00_sec', timely_at_00_sec),
        ('timely_at_30_sec', timely_at_30_sec),
        ('immediately', immediately)
        ]
    ]

def schedule_actions(client):
    """
    schedule the actions of interest
    """
    if True:
        dt1 = util.DateTime(date_time=util.Now.dt() + timedelta(seconds=10))
        dt2 = util.DateTime(date_time=util.Now.dt() + timedelta(seconds=30))
        client.defer_action("immediately", "on_on", dt1)
        client.defer_action("immediately", "off_off", dt2)
    if False:
        dt1 = util.DateTime(date_time=util.Now.dt() + timedelta(seconds=20))
        dt2 = util.DateTime(date_time=util.Now.dt() + timedelta(seconds=40))
        dt3 = util.DateTime(date_time=util.Now.dt() + timedelta(seconds=60))
        client.schedule_action("often", "toggle_toggle")
        client.expire_action("often", "toggle_toggle", dt1)
        client.defer_action("often", "on_on", dt1)
        client.expire_action("often", "on_on", dt2)
        client.defer_action("often", "toggle_toggle", dt2)
        client.expire_action("often", "toggle_toggle", dt3)
    elif False:
        [ client.schedule_action(*stuff) for stuff in [
            ('randomly_often', 'red_toggle'),
            ('daily_often', 'green_toggle'),
            ('nightly_often', 'green_toggle')
            # ('randomly_often', 'file_heartbeat')
            # ('nightly_often', 'toggle_toggle')
            ]
    ]

def unschedule_schedulers(client):
    """
    unschedule the schedulers
    """
    [ client.unschedule_scheduler(scheduler) for scheduler in [ 
        'randomly_often', 'daily_often', 'nightly_often', 'often'
        ]
    ]
def all_on(client):
    client.execute_action('green_on')
    client.execute_action('red_on')

def all_off(client):
    client.execute_action('green_off')
    client.execute_action('red_off')

def sys_info(client):
    print(client)
    PP.pprint(client.execute_action('sys_info'))

"""
set up workbench
"""
workbench = Workbench()
workbench.add_client('pi4', Client(host='192.168.0.45'))
workbench.add_client('pi3', Client(host='192.168.0.46'))
local = workbench.get_client('local')
pi3 = workbench.get_client('pi3')
pi4 = workbench.get_client('pi4')

"""
convenience functions applied to a list of Clients
"""
def run(client_list):
    for c in client_list:
        c.clear_dispatcher()
        add_actions(c)
        add_schedulers(c)
        schedule_actions(c)
        c.run_jobs()
def stop(client_list):
    for c in client_list:
        c.stop_jobs()
def pprint(client_list):
    for c in client_list:
        print(f"job count={c.job_count()} for ({c})")
        c.load_dispatcher().pprint()
def on(client_list):
    for c in client_list:
        all_on(c)
def off(client_list):
    for c in client_list:
        all_off(c)
def info(client_list):
    for c in client_list:
        sys_info(c)


In [None]:
pprint([pi3, pi4])

In [None]:
run([pi3, pi4])

In [None]:
stop([pi3, pi4])

In [None]:
on([pi3, pi4])

In [None]:
off([pi3, pi4])

In [None]:
info([pi3, pi4])