In [None]:
from datetime import datetime, time, timedelta
from typing import Optional, Set, List
from pydantic import BaseModel
import whendo.core.util as util

## Actions

This section computes all of the Actions that will be executed on
the pi's specified in the Workbench below.

In [None]:
import whendo.core.actions.file_action as file_x
import whendo.core.actions.logic_action as logic_x
import whendo.core.actions.sys_action as sys_x
import whendo_gpio.action as gpio_x

def compute_actions():
    actions = {}
    actions["pinA_on"] = gpio_x.SetPin(pin=27, on=True)
    actions["pinA_off"] = gpio_x.SetPin(pin=27, on=False)
    actions["pinA_state"] = gpio_x.SetPin(pin=27, on=False)
    actions["pinB_on"] = gpio_x.SetPin(pin=25, on=True)
    actions["pinB_off"] = gpio_x.SetPin(pin=25, on=False)
    actions["pinB_state"] = gpio_x.SetPin(pin=25, on=False)
    actions["gpio_clear"] = gpio_x.CleanupPins()
    actions["file_heartbeat"] = file_x.FileHeartbeat(file="gpio_beat.txt")
    actions["system_info"] = sys_x.SysInfo()
    actions["pause1"] = sys_x.Pause()
    actions["pause2"] = sys_x.Pause(seconds=2)
    actions["start_pivot"] = logic_x.All(
        action_list=[
            actions["pinA_on"],
            actions["pinB_on"],
            actions["pause1"],
            actions["pinB_off"],
        ]
    )
    actions["pause_gpio_clear"] = logic_x.All(
        action_list=[actions["pause2"], actions["gpio_clear"]]
    )
    return actions

## Schedulers

This section computes the schedulers that will determine when Actions are
executed.

In [None]:
import whendo.core.schedulers.cont_scheduler as sched_x

def compute_schedulers():
    schedulers = {}
    schedulers["timely"] = sched_x.Timely(interval=10)
    schedulers["immediately"] = sched_x.Immediately()
    return schedulers

## Programs

This section computes the Programs that the pi's will run for
fixed time periods.

In [None]:
import whendo.core.program as prog_x

def compute_programs():
    programs = {}
    programs["pivot_program"] = (
        prog_x.Program()
        .prologue("start_pivot")
        .epilogue("pause_gpio_clear")
        .body_element("timely", "pinA_on")
    )
    return programs

In [None]:
from whendo.sdk.client import Client

## Pivots

In [None]:
class Pivot(BaseModel):
    name: str
    client: Client
    program: str
    duration: timedelta
    doc: Optional[str]=None

## Chains

In [None]:
class Chain(BaseModel):
    """
    A Chain corresponds to a list of Pivots where the stop time of one pivot is equal
    to the start time of the next Pivot.
    """
    name:str
    pivots:List[Pivot]
    durations:List[timedelta]
    
    def compute_stop(self, start:datetime):
        return start + reduce(lambda x,y: x+y, self.pivots)
    def compute_times(self, start:datetime):
        last_start = start
        result = [last_start]
        for duration in self.durations:
            next_start = last_start + duration
            result.append(next_start)
            last_start = next_start
        return result
    def compute_pivot_times(self, start:datetime):
        """
        Returns a list of tuples of the form: (pivot, start, stop).
        """
        result = []
        programs = [pivot.program for pivot in self.pivots]
        times = self.compute_times(start=start)
        starts = times[:-1]
        stops = times[1:]
        datetime2s = [util.DateTime2(dt1=dt1, dt2=dt2) for dt1,dt2 in zip(starts, stops)]
        result = list(zip(self.pivots, programs, datetime2s))
        return result
    def schedule_programs(self, start:datetime):
        pivot_times = self.compute_pivot_times(start=start)
        for pivot_time in pivot_times:
            pivot = pivot_time[0]
            program = pivot_time[1]
            datetime2 = pivot_time[2]
            pivot.client.schedule_program(program_name=program, datetime2=datetime2)
    
class ChainRepetition:
    """
    A ChainRepetition represents consecutive invocations of a Chain.
    """
    pass

## Flocks

In [None]:
class Flock(BaseModel):
    pivots:List[Pivot]=[]
    chains:List[Chain]=[]
        
    def get_pivot(self, name:str):
        try:
            return [pivot for pivot in self.pivots if pivot.name == name][0]
        except:
            return None
    def add_pivot(self, pivot:Pivot):
        self.pivots.append(pivot)
    def delete_pivot(self, pivot:Pivot):
        self.pivots.remove(pivot)
        
    def get_chain(self, name:str):
        try:
            return [chain for chain in self.chains if chain.name == name][0]
        except:
            return None
    def add_chain(self, chain:Chain):
        self.chains.append(chain)
    def delete_chain(self, chain:Chain):
        self.chains.remove(chain)

    def schedule_chain(self, name:str, start:datetime):
        chain = self.get_chain(name)
        for triple in chain.compute_pivot_times():
            triple[0].schedule_program(triple[1], triple[2])

## Initialization of the Flock, Pivots, Chains, Actions, Schedulers, Programs

Prior to updating the pivot pi's.

In [None]:
def compute_flock():
    pi3 = Pivot(
            name="pi3", 
            client=Client(host="192.168.0.46"),
            program="pivot_program",
            duration=timedelta(seconds=30))
    pi4 = Pivot(
            name="pi4",
            client=Client(host="192.168.0.45"),
            program="pivot_program",
            duration=timedelta(seconds=45))
    pivots = [pi3, pi4]
    chain = Chain(
            name="the_band", 
            pivots=pivots,
            durations = [pivot.duration for pivot in pivots])
    return Flock(pivots = pivots, chains=[chain])

def reset(client:Client):
    client.stop_jobs()
    client.clear_jobs()
    client.clear_deferred_actions()
    client.clear_expired_actions()
    client.clear_dispatcher()

def add_actions(client:Client):
    [ client.add_action(*action) for action in compute_actions().items() ]

def add_schedulers(client:Client):
    [ client.add_scheduler(*scheduler) for scheduler in compute_schedulers().items() ]

def add_programs(client:Client):
    [ client.add_program(*program) for program in compute_programs().items() ]

def initialize_pivots(flock:Flock):
    for pivot in flock.pivots:
        client = pivot.client
        reset(client)
        add_actions(client)
        add_schedulers(client)
        add_programs(client)
        client.run_jobs()

def schedule_programs(flock:Flock):
    for chain in flock.chains:
        chain.schedule_programs(start=util.Now.dt()+timedelta(seconds=5))

flock = compute_flock()

## Initialize all pivot pi's

Add Actions, Schedulers and Programs to pivot pi's for future scheduling.

In [None]:
initialize_pivots(flock)

## Run the lights

In [None]:
schedule_programs(flock)


## Widgets

import ipywidgets as widgets
from IPython.display import display

w = widgets.IntSlider()
display(w)
display(w)

w.close()

w.value=50

w.description

a = widgets.FloatText()
b = widgets.FloatSlider()
display(a,b)

mylink = widgets.jslink((a, 'value'), (b, 'value'))

w = widgets.DatePicker(description="foo")

display(w)

w.value

from IPython.display import display
button = widgets.Button(description="Click Me!")
output = widgets.Output()

display(button, output)

def on_button_clicked(b):
    with output:
        print("Button clicked.")

button.on_click(on_button_clicked)