In [1]:
import ctypes
import multiprocessing
import numpy as np
import os
import time
from pynq.lib.pmod import Pmod_IO
from pynq.lib import Pmod_PWM

In [2]:
from pynq.overlays.base import BaseOverlay
base = BaseOverlay("base.bit")

In [12]:
sync_msg=0b101101

class receiver_data:
    def __init__ (self, inpin, rx_bufs, rx_counter):
        self.inpin = inpin
        self.rx_bufs = rx_bufs
        self.rx_counter = rx_counter
        self.strtMsg = 0b000000
        self.rx_started=False
        self.rx_ctr = 0
        self.rx_bit_ctr = 0
        self.rx_word = 0
        self.msg_length = 128
    
    def handle_rx (self):
        if self.rx_started == True:
            self.rx_word = (self.rx_word << 1) | (1-self.inpin.read())
            self.rx_bit_ctr += 1
            if self.rx_bit_ctr == 16:
                self.rx_bufs[self.rx_counter.value][self.rx_ctr] = self.rx_word
                self.rx_word = 0
                self.rx_bit_ctr = 0
                self.rx_ctr += 1
            if self.rx_ctr == 1:
                self.msg_length = rx_bufs[self.rx_counter.value][0].value & 0xFF
            if self.rx_ctr == self.msg_length:
                self.rx_started = False
                self.rx_ctr = 0
                self.rx_counter = (self.rx_counter.value + 1) % len(self.rx_bufs)
                print("Rx: Message Captured")
        elif self.rx_started == False:
            val = (1-self.inpin.read())
            self.strtMsg = self.strtMsg << 1
            self.strtMsg |= (val & 1)
            self.strtMsg &= 0x3F
            if self.strtMsg == sync_msg:
                self.rx_started = True
                self.strtMsg = 0b101101
                self.bit_ctr = 6
                self.msg_length = 128
                print("Rx: Sync Acquired")
                           
class transmitter_data:
    PREP_CYCLES=64
    
    OFF=0
    PREP=1
    SYNC=2
    MSG=3
    
    def __init__(self, outpwm, tx_buf, tx_flag):
        self.outpwm = outpwm
        self.tx_buf = tx_buf
        self.tx_flag = tx_flag
        self.tx_state=self.OFF
        self.tx_ctr=0
        self.tx_bit_ctr=16
        self.sync_ctr=0
        
    def handle_tx(self):
        if self.tx_state==self.OFF:
            self.outpwm.stop()
            if self.tx_flag.value:
                print("TX: Starting prep")
                self.tx_state=self.PREP
                self.tx_ctr=self.PREP_CYCLES
        elif self.tx_state==self.PREP:
            self.outpwm.generate(26,50)
            self.tx_ctr -= 1
            if self.tx_ctr == 0:
                self.tx_state = self.MSG
                self.tx_bit_ctr = 16
        elif self.tx_state==self.MSG:
            self.tx_bit_ctr -= 1
            if ((self.tx_buf[self.tx_ctr]>>self.tx_bit_ctr)&1) == 1:
                self.outpwm.generate(26,50)
            else:
                self.outpwm.stop()
            if self.tx_bit_ctr == 0:
                self.tx_bit_ctr = 16
                self.tx_ctr += 1
            if self.tx_ctr >= len(self.tx_buf):
                self.tx_state=self.OFF
                self.tx_ctr=0
                self.tx_flag.value=False

In [None]:
class IR_Message:
    def __init__(self, msgType, text):
#     msgType must be value 0 for current implementation of free text only.
        if not isinstance(msgType,int):
            raise TypeError("msg type must be int")
        if msgType != 0:
            raise TypeError("msg type must be 0")
        self.type = msgType
        
        if not isinstance(text,str):
            raise TypeError("msg text must be str")
        if (len(text) > 254) | (len(text) == 0):
            raise TypeError("msg text must be 1-254")
        self.text = text
        self.length = len(self.text)
        
#   encode message object into 128 word message format and send to buffer to Transmit
#   only for free text message type
    def encode(self, txBuff):
#       zero out buffer first
        for i in range(128):
            txBuff[i] = 0b0000000000000000
#       set up starter word with sync and free text msg type
        txBuff[0] = 0b1011010000000000
        txBuff[0] = txBuff[0] | (0x00FF & self.length)

#       Loop though and put string character into words 1-127. Breaks if string is finished
        j=0
        for i in range(1,128):
            txBuff[i] = txBuff[i] | (0x00FF & ord(self.text[j]))
            txBuff[i] = txBuff[i] << 8
            j += 1
            if(j >= len(self.text)):
                break
            txBuff[i] = txBuff[i] | (0x00FF & ord(self.text[j]))
            j += 1
            if(j >= len(self.text)):
                break
        pass
    
#   decode 128 word message in format and parse through words 1-127 to pull out ascii chars
#   assumses message format is free text and adds ascii chars to self.text.                      
    def decode(self, rxBuff):
#       Loop though and parse put string character from words 1-127 into self.text
        self.text = ""    
        j=0
        for i in range(1,128):
            self.text += chr((rxBuff[i] >> 8) & 0x00FF)
            j =+ 1
            self.text += chr(rxBuff[i] & 0x00FF)
            j =+ 1
        pass


In [13]:
def transceiver (tx_flag, tx_buf, rx_counter, rx_bufs, quit_flag):
    period=0.003
    inpin = Pmod_IO (base.PMODA, 0, 'in')
    outpwm = Pmod_PWM(base.PMODB, 0)
    
    rx = receiver_data (inpin, rx_bufs, rx_counter)
    tx = transmitter_data (outpwm, tx_buf, tx_flag)

    start_time=time.perf_counter()
    timecounter=start_time
    
    while not quit_flag.value:
        # Do RX stuff
        rx.handle_rx ()

        # Do TX stuff
        tx.handle_tx ()

        # Manage timer
        timecounter += period 
        now=time.perf_counter()
        if now < (timecounter+period):
            time.sleep(timecounter+period-now)
            
    outpwm.stop()

            
def data_processor (tx_flag, tx_buf, rx_counter, rx_bufs, quit_flag):
    j = 0
    while not quit_flag.value:
        # Do data stuff
        btns = base.btns_gpio
        if btns[2].read():
            tx_flag.value=True
        time.sleep(0.25)
        j += 1
        
def button_handler (quit_flag):
    btns = base.btns_gpio
    while not btns[0].read() and not quit_flag.value:
        time.sleep(0.25)
    quit_flag.value = True

In [14]:
rx_buffers=[]
for i in range(3):
    # TODO: determine if we need the lock
    rx_buffers.append(multiprocessing.Array(ctypes.c_uint16, 128,lock=False))

rx_counter=multiprocessing.Value(ctypes.c_uint8)
rx_counter.value=0
    
tx_buffer=multiprocessing.Array(ctypes.c_uint16,128)
for i in range(128):
    tx_buffer[i] = i + 555
# print(f"TX Buf: {[val for val in tx_buffer]}")

tx_flag=multiprocessing.Value(ctypes.c_bool)
tx_flag.value=False

quit_flag=multiprocessing.Value(ctypes.c_bool, lock=False)
quit_flag.value=False

p_xcvr = multiprocessing.Process(target=transceiver, args=(tx_flag,tx_buffer,rx_counter,rx_buffers,quit_flag))

p_data = multiprocessing.Process(target=data_processor, args=(tx_flag,tx_buffer,rx_counter,rx_buffers,quit_flag))

p_btn = multiprocessing.Process(target=button_handler, args=[quit_flag])

p_xcvr.start() # start the process
os.system("taskset -p -c {} {} > /dev/null".format(1, p_xcvr.pid))
p_data.start() # start the process
os.system("taskset -p -c {} {} > /dev/null".format(0, p_data.pid))
p_btn.start()
os.system("taskset -p -c {} {} > /dev/null".format(0, p_btn.pid))

print("Started processes")

p_xcvr.join()
p_data.join()
p_btn.join()

print("All processes joined")

# results = [(tx_buffer[i], rx_buffers[0][i]) for i in range(len(tx_buffer))]
# errorcount = 0
# for data in results:
#     if data[0] != data[1]:
#         print(f"ERROR: {data[0]}, {data[1]}")
#         errorcount += 1
# print(f"Errorcount: {errorcount}")

# print(results)

Started processes
All processes joined


In [None]:
msgTx = IR_Message(0,"Hello Word, are you out there")
test_buffer=multiprocessing.Array(ctypes.c_uint16,128)

msgTx.encode(test_buffer)
for i in range(0,128):
    print(test_buffer[i])

msgRx = IR_Message(0, "a")
msgRx.decode(test_buffer)
print(msgRx.text)

In [63]:
import ipywidgets as widgets
from ipywidgets import Button, Layout

testtxt = widgets.Textarea(
    value='Hello World',
    placeholder='Type something',
    description='Received:',
    disabled=False,
    overflow='scroll'
)

display(testtxt)

out = widgets.Output(layout={'border': '1px solid black', 'overflow': 'scroll'})

def append_text(button):
    out.append_stdout(f"{testtxt.value}\n")

button = widgets.Button(
    description='Click me',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Click me',
    icon='check' # (FontAwesome names without the `fa-` prefix)
)

button.on_click(append_text)
display(button)

display(out)

Textarea(value='Hello World', description='Received:', placeholder='Type something')

Button(description='Click me', icon='check', style=ButtonStyle(), tooltip='Click me')

Output(layout=Layout(border='1px solid black', overflow='scroll'))