In [None]:
import asyncio
import nest_asyncio
import websockets
import json

ip = "192.168.7.2"
port=5555
data_add = "gui_data"
control_add = "gui_control"

ws_control_add = f"ws://{ip}:{port}/{control_add}"
ws_data_add = f"ws://{ip}:{port}/{data_add}"

In [None]:
nest_asyncio.apply() # event loop needs to be nested - otherwise it conflicts with jupyter's event loop

async def send_msg_callback(ws_address,msg):
    async with websockets.connect(ws_address) as ws:
        await ws.send(json.dumps(msg))
        print(f"Sent: {msg}")

async def rec_msg_callback(ws_address):
    async with websockets.connect(ws_address) as ws:
        while True:
            msg = await ws.recv()
            print(f"Received:{msg}") # TODO filter according to msg type

async def start_listener_callback(ws_address):
    await rec_msg_callback(ws_address)

def send_msg(ws_address,msg):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send_msg_callback(ws_address,msg))

def start_listener(ws_address):
    loop = asyncio.get_event_loop()
    loop.create_task(start_listener_callback(ws_address)) # create_task() is needed so that the listener runs in the background and prints messages as received without blocking the cell

### connect to control ws

In [None]:
start_listener(ws_control_add) # this gets a "connection" json response each time, so run only once or TODO filter it out


In [None]:
send_msg(ws_control_add,  {"watcher":[{"cmd":"hi"}]})

In [None]:
send_msg(ws_control_add,{"watcher":[{"cmd": "unwatch", "watchers":['myvar2']}]})

In [None]:
send_msg(ws_control_add,{"watcher":[{"cmd":"list"}]})

### connect to data websockets  

In [None]:
start_listener(ws_data_add)

In [None]:
send_msg(ws_control_add,{"watcher":[{"cmd": "watch", "watchers":['myvar2']}]})

In [None]:
send_msg(ws_control_add,{"watcher":[{"cmd": "unwatch", "watchers":['myvar2']}]})

### watcher class

In [None]:
import asyncio
import nest_asyncio
import websockets
import json
import array
from collections import deque # circular buffers 

#nest_asyncio.apply() # needed for running event loops inside of jupyter event loop

In [None]:
class Watcher:
    
    def __init__(self, ip="192.168.7.2", port=5555, data_add="gui_data", control_add="gui_control"):
        """ Watcher class __summary__.

        Args:
            ip (str, optional): Remote address IP. Defaults to "192.168.7.2".
            port (int, optional): Remote address port. Defaults to 5555.
            data_add (str, optional): Data endpoint. Defaults to "gui_data".
            control_add (str, optional): Control endpoint. Defaults to "gui_control".
        """
        self.ip = ip
        self.port = port
        self.data_add = data_add
        self.control_add = control_add
        self.ws_control_add = f"ws://{self.ip}:{self.port}/{self.control_add}"
        self.ws_data_add = f"ws://{self.ip}:{self.port}/{self.data_add}"
        
        self.ws_control = None
        self.ws_data = None
        
        self.__ctrl_listener = None
        self.__data_listener = None
        
        self.__watcher_vars = [] # updates every time start is called
        self.__list_response_available = asyncio.Event()
        self.__list_response = None
        
        self.__streaming_buffer_size = 1000 
        self.__streaming_buffer = None
        self.__streaming_mode = None # OFF, FOREVER, N_FRAMES :: this flag prevents writing into the streaming buffer unless requested by the user using the start/stop_streaming() functions
        self.__streaming_buffer_available = asyncio.Event()
        
        
        nest_asyncio.apply() # event loop needs to be nested - otherwise it conflicts with jupyter's event loop


    @property
    def streaming_buffer_size(self):
        return self.__streaming_buffer_size
    
    @streaming_buffer_size.setter
    def streaming_buffer_size(self, value):
        self.__streaming_buffer_size = value
        self.__streaming_buffer = {var: deque(maxlen=self.__streaming_buffer_size) for var in self.__watcher_vars} # resize streaming buffer
    
    @property 
    def streaming_buffer(self):
            return {key: list(value) for key, value in self.__streaming_buffer.items()} # convert dict of deque to dict of list
    
    @property
    def watcher_vars(self):
        return self.__watcher_vars # updates every time start is called
    
    @property
    def watched_vars(self):
        return [var["name"] for var in self.list() if var["watched"]]
    
    @property
    def unwatched_vars(self):
        return [var["name"] for var in self.list() if var["unwatched"]]
        
    # public methods
    
    def start(self): # TODO check Bela project_name ?
        
        if self.__ctrl_listener is None: # avoid duplicate listeners
            self.start_ctrl_listener()
        if self.__data_listener is None:
            self.start_data_listener()
        
        # populate watcher vars so that variables received in data stream can be identified
        self.__watcher_vars = [var["name"] for var in self.list()] # TODO add error handling
        self.__streaming_buffer = {var: deque(maxlen=self.__streaming_buffer_size) for var in self.__watcher_vars} # init streaming buffer # FIX this is duplicated 
    
    def stop(self):  
        if self.__ctrl_listener is not None:
            self.__ctrl_listener.cancel()
            self.__ctrl_listener = None # empty the listener 
        if self.__data_listener is not None:
            self.__data_listener.cancel()
            self.__data_listener = None
            
    def list(self):
        async def list_coroutine():
             
            if self.__ctrl_listener is None: # start listener if listener is not running
                self.start_ctrl_listener()
            
            self.send_ctrl_msg({"watcher": [{"cmd": "list"}]})
            
            # Wait for the list response to be available
            await self.__list_response_available.wait()
            self.__list_response_available.clear()  # Reset the event for the next call
            
            return self.__list_response
                
        return asyncio.run(list_coroutine())
    
    def start_streaming(self, variables=[]): # stream forever until stopped
        self.start()
        self.__streaming_mode = "FOREVER" 
        self.send_ctrl_msg({"watcher": [{"cmd": "watch", "watchers":variables}]})        
        
    def stop_streaming(self, variables=[]):
        self.stop()
        self.__streaming_mode = "OFF"
        self.send_ctrl_msg({"watcher": [{"cmd": "unwatch", "watchers":variables}]})
        return self.streaming_buffer 
        
    def stream_n_frames(self, variables=[], n_frames=1000, delay=0): # resizes the streaming buffer size to n_frames and returns it when full # FIX this blocks the cell?
        # FIX now frames are counted as received by python -- once data comes timestamped, frames should be counted with respect to timestamps
        # TODO implement delay once data comes timestamped
        async def stream_n_frames_coroutine():
            self.start()
            self.__streaming_mode= "N_FRAMES" # flag cleared in __rec_msg_callback
            self.streaming_buffer_size = n_frames # using setter to automatically resize buffer 
            self.send_ctrl_msg({"watcher": [{"cmd": "watch", "watchers":variables}]}) # FIX if a variable was already set as watched it will be streamed here too -- TODO set variables as unwatched if not requested
            
            # return when finished filling buffer 
            await self.__streaming_buffer_available.wait()
            self.__streaming_buffer_available.clear()
            self.stop_streaming(variables) # turns off listener, unwatches variables
            
            return self.streaming_buffer
        
        return asyncio.run(stream_n_frames_coroutine())
    
    def send_ctrl_msg(self, msg):
        self.__send_msg(self.ws_control,self.ws_control_add, msg)
    
    def start_ctrl_listener(self):
        self.__ctrl_listener = self.__start_listener(self.ws_control,self.ws_control_add)
    
    def start_data_listener(self):
        self.__data_listener = self.__start_listener(self.ws_data, self.ws_data_add)



    # __private methods    
    
    ## start listener   

    def __start_listener(self,ws, ws_address):
        loop = asyncio.get_event_loop()
        listener_task = loop.create_task(self.__start_listener_callback(ws,ws_address)) # create_task() is needed so that the listener runs in the background and prints messages as received without blocking the cell
        return listener_task
    
    async def __start_listener_callback(self, ws, ws_address):
        await self.__rec_msg_callback(ws, ws_address)
        
    ## send message
    
    def __send_msg(self, ws, ws_address, msg):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.__send_msg_callback(ws,ws_address, msg))   
    
    async def __send_msg_callback(self,ws, ws_address, msg):
        try:
            async with websockets.connect(ws_address) as ws: # here you can use the same websocket for multiple messages -- but avoid using the same one for sending and receiving
                await ws.send(json.dumps(msg))
                #print(f"Sent: {msg}")
        except Exception as e:
            print(f"Error while sending message: {e}") 
    
    ## receive message     
    
    async def __rec_msg_callback(self, ws, ws_address): 
        try:
            async with websockets.connect(ws_address) as ws:
                _channel = None
                _type = None 
                
                while True:
                    msg = await ws.recv()
                    
                    if ws_address == self.ws_data_add and self.__streaming_mode != "OFF": # --> data websocket parsing                
                        if len(msg) == 3: # header message with channel and type info 
                            _channel = int(str(msg)[2]) 
                            _type = str(msg)[4]
                        elif len(msg)>3: # body message with array of data for channel n, parse to list 
                            _msg = array.array(_type, msg).tolist()
                            self.__streaming_buffer[self.__watcher_vars[_channel]].extend(_msg) # FIX this takes as many as possible rather than truncating once n is achieved 
                            if self.__streaming_mode == "N_FRAMES" and all(len(self.__streaming_buffer[var]) == self.__streaming_buffer_size for var in self.__watcher_vars):
                                self.__streaming_mode = "OFF" # prevent updating the streaming buffer
                                self.__streaming_buffer_available.set()
                            
                            
                    elif ws_address == self.ws_control_add: # --> control websocket parsing
                        _msg = json.loads(msg)
                        if "watcher" in _msg.keys() and "watchers" in _msg["watcher"].keys(): # cmd "list" response 
                                self.__list_response = _msg["watcher"]["watchers"]
                                self.__list_response_available.set()     
                    
                    else:
                        print(msg)

        except Exception as e:
            print(f"Error while receiving message: {e}")

    def __del__(self):
        self.stop() # stop websockets

In [None]:
watcher = Watcher()

In [None]:
watcher.stream_n_frames(variables=["myvar","myvar2"], n_frames=2000)  

In [None]:
watcher.start_streaming(["myvar"])

In [None]:
watcher.streaming_buffer

print(len(watcher.streaming_buffer["myvar"]), len(watcher.streaming_buffer["myvar2"]))

In [None]:
watcher.stop_streaming()

In [None]:
watcher.send_ctrl_msg({"watcher":[{"cmd": "unwatch", "watchers":['myvar','myvar2']}]})

In [None]:
watcher.send_ctrl_msg({"watcher":[{"cmd": "list"}]})

In [None]:

watcher.stop_streaming()

In [None]:
# duplication testing

watcher2.start()
watcher.start()# calling watcher again does not duplicate prints 

In [None]:
watcher.stop() # not sure this is working properly
watcher2.stop()

### streamer class

In [None]:
class Streamer():
    def __init__(self, ip="192.168.7.2", port=5555, data_add="gui_data", control_add="gui_control"):
        """ Streamer class __summary__.

            Args:
                ip (str, optional): Remote address IP. Defaults to "192.168.7.2".
                port (int, optional): Remote address port. Defaults to 5555.
                data_add (str, optional): Data endpoint. Defaults to "gui_data".
                control_add (str, optional): Control endpoint. Defaults to "gui_control".
        """
        self.watcher = Watcher(ip, port, data_add, control_add) # do we need to inherit all methods? or just the send_msg() and start_listener()?
        
    def stream_forever(self,variables=[]):
        
        variables = [variables] if isinstance(variables, str) else variables # check variables is list
        self.watcher.send_ctrl_msg({"watcher":[{"cmd": "watch", "watchers":variables}]})
        self.watcher.start()
        
        
    def stop_streaming(self,variables):  # FIX for some reason this stops streaming all variables
        variables = [variables] if isinstance(variables, str) else variables    
        self.watcher.send_ctrl_msg({"watcher":[{"cmd": "unwatch", "watchers":variables}]})

            
    def stop_streaming_all(self):
        self.watcher.stop()
                                
        

In [None]:
streamer = Streamer()

In [None]:
streamer.stream_forever(["myvar1","myvar2"])

In [None]:
streamer.stop_streaming("myvar2")

In [None]:
streamer.stop_streaming_all()

In [None]:
streamer = Streamer() # this should delete the previous watcher so that ws are not duplicated