In [None]:
# code
import time
import socket
import struct

from threading import Thread
from enum      import Enum

import cv2    # conda install opencv -c conda-forge
import numpy         as np 
import holoviews     as hv # conda install holoviews -c conda-forge
import ipywidgets    as widgets
import websockets # conda install websockets -c conda-forge

from IPython         import get_ipython
from IPython.display import display
from tornado.ioloop  import PeriodicCallback, IOLoop

hv.extension('bokeh')

class Streamed:
    def __init__(self, period = 100):
        self._callback = PeriodicCallback(self._stream, period)
        
    def start(self):
        self._callback.start()

    def stop(self):
        self._callback.stop()

    def toggle(self):
        if self._callback.is_running():
            self.stop()
            return False
        self.start()
        return True
    
    @classmethod
    def displayhook(cls, item = None):
        "displays an item"
        shell = get_ipython()
        if shell is None:
            return item
        
        fmt   = shell.display_formatter.formatters['text/html']   
        if item is None:
            fmt.for_type(cls, cls.displayhook)
            return
    
        disp  = item.display()
        try:
            fcn   = fmt.lookup_by_type(type(disp))
        except KeyError:
            display(disp)
            return None
        return fcn(disp)

    def buttons(self):
        startstop = widgets.Button(description="Start")
        def _startstop(_):
            startstop.description = "Stop" if self.toggle() else "Start"
        startstop.on_click(_startstop)
        step = widgets.Button(description="Step")
        step.on_click(self.step)
        return widgets.HBox([startstop, step])

    def step(self):
        if self._callback.is_running():
            self.stop()

        return self._stream()

    def withbuttons(self):
        display(self.display())
        display(self.buttons())

Streamed.displayhook()

class StreamedVideo(Streamed):
    def __init__(self,
                 address    = "rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov",
                 bounds     = (0, 0, 1936, 1216),
                 period     = 100, #ms
                 downsample = 5,
                 roi        = None,
                 newcapture = True,
                ):
        super().__init__(period)
        self.image     = hv.Image([], bounds = bounds, vdims=[hv.Dimension('z', range=(0, 256))])
        self.address   = address
        self._pipe     = hv.streams.Pipe(data = [])
        self._map      = hv.DynamicMap(self._newimage, streams=[self._pipe])
        self._data     = []
        self._html     = widgets.HTML(value="", description = "Image count: ")
        self.downsample= downsample
        self.roi       = roi
        self.index     = 0

    def start(self):
        data = super().start()
        thread = Thread(target=self._update, args=())
        thread.daemon = True
        thread.start()
        return data

    def _newimage(self, data):
        self.image = self.image.clone(data = data)
        if len(data) == 0:
            return self.image

        if self.roi:
            x, y, width, height = self.roi
            step                = 1 if not self.downsample else self.downsample
            data = data[self.roi[x-width //2:x+width //2+1:step,
                                 y-height//2:y+height//2+1:step]]
        elif self.downsample:
            data = data[::self.downsample,::self.downsample]
        else:
            return self.image
        return self.image.clone(data = data)

    def step(self):
        if self._callback.is_running():
            self.stop()
        else:
            video = cv2.VideoCapture(self.address)
            self._data = video.read()[1]
            video.release()
        return self._stream()

    def _update(self):
        video = cv2.VideoCapture(self.address)
        video.set(cv2.CAP_PROP_BUFFERSIZE, 1)
        good  = True
        while self._callback.is_running() and good:
            good, self._data = video.read()
            self.index += 1
            
        video.release()

    def _stream(self):
        data             = self._data
        self._html.value = f"{self.index}"
        self._pipe.send(data)
        return data

    def withbuttons(self):
        display(self.display())
        display(widgets.HBox([self.buttons(), self._html]))

class PointerStreamedVideo(StreamedVideo):
    def __init__(self, *args, window = 0, **kwa):
        super().__init__(*args, **kwa)
        self._pipex   = hv.streams.Pipe(data = [])
        self._pipey   = hv.streams.Pipe(data = [])
        self._tap     = hv.streams.Tap(source = self._map)        
        
        self._vline   = hv.DynamicMap(self._lines_op, streams=[self._tap])
        self._crossx  = hv.DynamicMap(self._cross_xop, streams=[self._pipex])
        self._crossy  = hv.DynamicMap(self._cross_yop, streams=[self._pipey])
        self.window   = window
        self.x        = None
        self.y        = None
    
    def display(self):
        return (self._map  * self._vline
                << self._crossx
                << self._crossy)

    def _lines_op(self, x, y):
        self.x = x
        self.y = y
        if x is not None and not self._callback.is_running():
            self._stream()
        return hv.VLine(x if x else 0)*hv.HLine(y if y else 0)

    def _cross_xop(self, data):
        x   = self.x
        img = self.image.clone(data = data)
        if x is None or self.window <= 1:
            self.xsample = img.sample(x=x if x else 0).redim.range(z = (0, 256))
        else:
            x            = x if x is not None else 0
            self.xsample = hv.Curve((img.sample(x=x).data["y"],
                                     np.mean(img[max(0,x-self.window):x+self.window,:].data, 1)),
                                    "y", "z").redim.range(z = (0, 256))
        return self.xsample

    def _cross_yop(self, data):
        y   = self.y
        img = self.image.clone(data = data)
        if y is None or self.window <= 1:
            self.ysample = img.sample(y=y if y else 0).redim.range(z = (0, 256))
        else:
            y            = y if y is not None else 0
            self.ysample = hv.Curve((img.sample(y=y).data["x"],
                                     np.mean(img[:, max(0,y-self.window):y+self.window].data, 0)),
                                    "x", "z").redim.range(z = (0, 256))
        return self.ysample

    def _stream(self):
        data = super()._stream()
        self._pipex.send(data)
        self._pipey.send(data)

class Teensy:
    FOVTYPE  = [("_l0", "f4"),
                ("time",  'i8'), ("zmag", 'f4'), ("vmag",    'f4'), ("zobj",    'f4'),
                ("x",     'f4'), ("y",    'f4'), ("tsample", 'f4'), ("tmagnet", 'f4'),
                ("tsink", 'f4'), ("led1", 'f4'), ("led2",    'f4'),
                ("_r0", "f4"), ("_r1", "f4"), ("_r2", "f4")]

    def __init__(self, address, port, multicast, length = 100, repeats = 3):
        self.address = address, port
        self.multicast = struct.pack('4sL',
                                    socket.inet_aton(multicast),
                                    socket.INADDR_ANY)

        self.data     = np.zeros(length*repeats, dtype = self.FOVTYPE)
        self.index    = 0
        self.repeats  = repeats

    def read(self):
        if self.index+self.repeats*2 >= len(self.data):
            self.index = 0
            self.data  = np.zeros_like(self.data)

        data = self.data[self.index+self.repeats: self.index+self.repeats*2]
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
            sock.setsockopt(socket.IPPROTO_IP,
                            socket.IP_ADD_MEMBERSHIP,
                            self.multicast)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.settimeout(.1)
            sock.bind(self.address)
            for i in range(self.repeats):
                sock.recv_into(data[i:i+1], data.dtype.itemsize)

        self.index += self.repeats
        return data

class StreamedTeensy(Streamed):
    def __init__(self, teensy, *names, period = 100):
        super().__init__(period)
        self.teensy  = teensy
        if len(names) == 0:
            names = [i for i, _ in Teensy.FOV if i[0] != '_']

        self.table   = hv.Table(tuple([] for i in names), vdims = names)
        self._pipe   = hv.streams.Pipe(data = [])
        self._map    = hv.DynamicMap(self._newtable, streams=[self._pipe])
        self.name    = name
    
    def display(self):
        return self._map

    def _newtable(self, data):
        self.table = self.table.clone(data = data)
        return self.table

    def _stream(self):
        data = self.teensy.read()
        self._pipe.send(data)
        return data

class TeensyReader:
    FOVTYPE  = [("_l0", "f4"),
                ("time",  'i8'), ("zmag", 'f4'), ("vmag",    'f4'), ("zobj",    'f4'),
                ("x",     'f4'), ("y",    'f4'), ("tsample", 'f4'), ("tmagnet", 'f4'),
                ("tsink", 'f4'), ("led1", 'f4'), ("led2",    'f4'),
                ("_r0", "f4"), ("_r1", "f4"), ("_r2", "f4")]

    def __init__(self, address, port, multicast, length = 100, repeats = 3):
        self.address = address, port
        self.multicast = struct.pack('4sL',
                                    socket.inet_aton(multicast),
                                    socket.INADDR_ANY)

        self.data     = np.zeros(length*repeats, dtype = self.FOVTYPE)
        self.index    = 0
        self.repeats  = repeats

    def read(self):
        if self.index+self.repeats*2 >= len(self.data):
            self.index = 0
            self.data  = np.zeros_like(self.data)

        data = self.data[self.index+self.repeats: self.index+self.repeats*2]
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
            sock.setsockopt(socket.IPPROTO_IP,
                            socket.IP_ADD_MEMBERSHIP,
                            self.multicast)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.settimeout(.1)
            sock.bind(self.address)
            for i in range(self.repeats):
                sock.recv_into(data[i:i+1], data.dtype.itemsize)

        self.index += self.repeats
        return data

class StreamedTeensy(Streamed):
    def __init__(self, teensy: TeensyReader, *names, period = 100):
        super().__init__(period)
        self.teensy  = teensy
        if len(names) == 0:
            names = [i for i, _ in Teensy.FOVTYPE if i[0] != '_']

        self.table   = hv.Table(tuple([] for i in names), vdims = names)
        self._pipe   = hv.streams.Pipe(data = [])
        self._map    = hv.DynamicMap(self._newtable, streams=[self._pipe])
    
    def display(self):
        return self._map

    def _newtable(self, data):
        if len(data) == 0:
            return self.table
        self.table = self.table.clone(data = tuple(data[i.name] for i in self.table.vdims))
        return self.table

    def _stream(self):
        data = self.teensy.read()
        self._pipe.send(data)
        return data    

class TeensyVar(Enum):
    zmag        = "zmag"
    zobj        = "zobj"
    vmag        = "vmag"
    x           = "x"
    y           = "y"
    startTime   = "startTime"
    tbox        = "tbox"
    intLed1     = "intLed1"
    intLed2     = "intLed2"

class Admin:
    _CMD    = "local {}=hidCommand.new();"
    _SCRIPT = "local {}=hidCommandScript.new();"
    _BEGIN  = "{}.beginPhase({});"
    _ADD    = "{}.addPhase({});"
    _SEND   = "daq_sendCommand({});"

    def __init__(self, address):
        self.address = address

    def send(self, text):
        "sends a Lua script to the DAQ"
        async def _run():
            async with websockets.connect(self.address) as websocket:
                await websocket.send(text)
        IOLoop.current().spawn_callback(_run)

    def setvalues(self, *args, test = False):
        "sets a list of values"
        msg = self._setvalues("t", *args)+self._SEND.format("t")
        return msg if test else self.send(msg)

    def textbox(self, name, value, minval, maxval, **kwa):
        return self._text(widgets.FloatText, name, value, minval, maxval, kwa)

    def slider(self, name, value, minval, maxval, **kwa):
        return self._text(widgets.FloatSlider, name, value, minval, maxval, kwa)

    def _text(self, tpe, name, value, minval, maxval, kwa):
        name = TeensyVar(name).value
        txt  = tpe(value       = value,
                   min         = minval,
                   max         = maxval,
                   description = name,
                   **kwa)
        txt.observe(lambda attr: self.setvalues(name, attr['new']), "value")
        return txt

    def _setvalues(self, name, *args) -> str:
        "sets a list of values"
        msg  = self._CMD.format(name)
        for i in range(0, len(args), 2):
            if args[i+1] is not None:
                msg += "{}.{}={};".format(name, TeensyVar(args[i]).value, args[i+1])
        return msg

In [None]:
%%opts Image[width=800, height=500 colorbar=True]
video = PointerStreamedVideo(address = "rtsp://192.168.1.76:8554/mystream",
                             window = 5,
                             period = 100,
                             downsample=10,
                             newcapture=False
                            )
video.withbuttons()

In [None]:
admin = Admin("ws://???")
display(admin.slider("zmag", .5, 0., 10.))
display(admin.textbox("zobj", 100, -1.5, 2000))

In [None]:
%%opts Table[width=1000 height=110]
zmag = StreamedTeensy(TeensyReader('jupyter.depixus.org', 7007, '224.0.0.7'))
zmag.withbuttons()