In [7]:
import asyncio, time
from pynq import Interrupt

class UART() :   
    RX_OFFSET = 0x00
    TX_OFFSET = 0x04
    STATUS_OFFSET = 0x08
    CONTROL_OFFSET = 0x0C


    RX_AVAIL_BIT = 0x01
    RX_FULL_BIT = 0x02
    TX_EMPTY_BIT = 0x04
    TX_FULL_BIT = 0x08

    RST_FIFO_BIT = 0x02

    CTRL_BIT_EN_INT = 0x10
    CTRL_BIT_DIS_INT = 0XEF

    def __init__(self, pr_region, interrupt = None, name=None):
        self._mmio = pr_region.S_AXI.mmio
        
        if name is None:
            self.name = "UART_" + str(pr_region.description.get('fullpath'))
        else:
            self.name = name
        
        if interrupt is not None:
            self.interrupt = Interrupt(interrupt)

    def txReady(self):
        cur_val = self._mmio.read(self.STATUS_OFFSET)
        return not (cur_val & self.TX_FULL_BIT)

    def rxAvail(self):
        cur_val = self._mmio.read(self.STATUS_OFFSET)
        return  (cur_val & self.RX_AVAIL_BIT) == self.RX_AVAIL_BIT

    def enableInterrupts(self, enable):
        ctrl = self._mmio.read(self.CONTROL_OFFSET)
        if enable:
            ctrl |= self.CTRL_BIT_EN_INT
        else:
            ctrl &= self.CTRL_BIT_DIS_INT
        self._mmio.write(self.CONTROL_OFFSET, ctrl)

    def write(self, msg):
        for b in msg:
            # Wait for ready to send
            while not self.txReady():
                pass

            # Send data
            self._mmio.write(self.TX_OFFSET, b)
        
    def readRxByte(self):
        byte = self._mmio.read(self.RX_OFFSET)
        return byte

    def WriteTxByte(self, byte):
        # Wait for ready to send
        while not self.txReady():
            pass

        self._mmio.write(self.TX_OFFSET, byte)

    #timeout_secs can be initialized to None to disable timeout
    def read(self, size=1, timeout_secs=1):
        recvd = []
        timeout = _Timeout(timeout_secs)
        while len(recvd) < size:
            #waits for data to be available
            while not self.rxAvail() and not timeout.expired():
                pass

            #exits if time has expired.
            if timeout.expired():
                break

            recvd.append(self._mmio.read(self.RX_OFFSET))
        

        return recvd
        
        
    def printStatus(self):
        status = self._mmio.read(self.STATUS_OFFSET)
        print(self.name + " status:")
        print("\tRX Available: " + str((status & self.RX_AVAIL_BIT) == self.RX_AVAIL_BIT))
        print("\tRX Full: " + str((status & self.RX_FULL_BIT) == self.RX_FULL_BIT))
        print("\tTX Empty: " + str((status & self.TX_EMPTY_BIT) == self.TX_EMPTY_BIT))
        print("\tTX Full: " + str((status & self.TX_FULL_BIT) == self.TX_FULL_BIT))
        print("\tInterrupts Enabled: " + str((status & self.CTRL_BIT_EN_INT) == self.CTRL_BIT_EN_INT))
       
       
    def resetFIFOs(self):
        self._mmio.write(self.CONTROL_OFFSET, self.RST_FIFO_BIT) 

    
    # Run this interrupt handler until all messages have been received
    # msg_size - Number of bytes to wait for (if 0, run forever)
    async def isr_recv(self, msg_size = 0):
        recvd_msg = []
        while True:
            await self.interrupt.wait()
            if self.rxAvail():
                recvd = self.readRxByte()
                recvd_msg.append(recvd)                

                if msg_size > 0:
                    print(self.name  + " isr received byte #" + str(len(recvd_msg)) + " of " + str(msg_size) + ": " + hex(recvd))                
                    if (len(recvd_msg) == msg_size):                        
                        return recvd_msg
                else:
                    print(self.name + " isr received byte #" + str(len(recvd_msg)) + ": " + hex(recvd))                



# This class is part of pySerial. https://github.com/pyserial/pyserial
# (C) 2001-2016 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier:    BSD-3-Clause
class _Timeout(object):
    """\
    Abstraction for timeout operations. Using time.monotonic() if available
    or time.time() in all other cases.
    The class can also be initialized with 0 or None, in order to support
    non-blocking and fully blocking I/O operations. The attributes
    is_non_blocking and is_infinite are set accordingly.
    """
    if hasattr(time, 'monotonic'):
        # Timeout implementation with time.monotonic(). This function is only
        # supported by Python 3.3 and above. It returns a time in seconds
        # (float) just as time.time(), but is not affected by system clock
        # adjustments.
        TIME = time.monotonic
    else:
        # Timeout implementation with time.time(). This is compatible with all
        # Python versions but has issues if the clock is adjusted while the
        # timeout is running.
        TIME = time.time

    def __init__(self, duration):
        """Initialize a timeout with given duration"""
        self.is_infinite = (duration is None)
        self.is_non_blocking = (duration == 0)
        self.duration = duration
        if duration is not None:
            self.target_time = self.TIME() + duration
        else:
            self.target_time = None

    def expired(self):
        """Return a boolean, telling if the timeout has expired"""
        return self.target_time is not None and self.time_left() <= 0

    def time_left(self):
        """Return how many seconds are left until the timeout expires"""
        if self.is_non_blocking:
            return 0
        elif self.is_infinite:
            return None
        else:
            delta = self.target_time - self.TIME()
            if delta > self.duration:
                # clock jumped, recalculate
                self.target_time = self.TIME() + self.duration
                return self.duration
            else:
                return max(0, delta)

    def restart(self, duration):
        """\
        Restart a timeout, only supported if a timeout was already set up
        before.
        """
        self.duration = duration
        self.target_time = self.TIME() + duration

# UART Demo

This demo highlights the usefulness of using a more complex MMIO driver wrapper. This wrapper can be found in **`/usr/local/lib/python3.6/dist-packages/prio/drivers/uart.py`**

## Download the static bitstream
Notice that the full path to the bitstream must be provided when the bitstream isn't in the same directory as the notebook.

In [2]:
from prio.prio import PrIoOverlay
# from prio.drivers.uart import UART

# constants refering to the full paths for both the full and partial bitstreams
FULL_BITSTREAM_PATH = "/usr/local/lib/python3.6/dist-packages/prio/"
PARTIAL_BITSTREAM_PATH = "/usr/local/lib/python3.6/dist-packages/prio/partial/"

# Returns the a static overlay for Partial Reconfiguration Input/Output
# and downloads the static bitstream to the FPGA
overlay = PrIoOverlay(FULL_BITSTREAM_PATH + "prio.bit")

## Set up the reconfigurable region
Notice that as with the full bitstream, the full path to the partial bitstream must be provided when is located outside of the current notebook's directory.


In [3]:
# Downloads the gpio bitstream in region 1
# The first two lines must be called together
overlay.set_partial_region('pr_1')
overlay.download(PARTIAL_BITSTREAM_PATH + 'pr_1_uart.bit')
# Unlike GPIO, this line can be called even after another partial region
# is set up since it is assigned the default MMIO driver
uart1 = UART(overlay.pr_1)

# Downloads the gpio bitstream in region 3
# The first two lines must be called together
overlay.set_partial_region('pr_3')
overlay.download(PARTIAL_BITSTREAM_PATH + 'pr_3_uart.bit')

uart3 = UART(overlay.pr_3)

## Demo: Print UART Status
Prints the status of both of the UART modules

In [4]:
uart1.resetFIFOs()
uart3.resetFIFOs()
uart1.printStatus()
uart3.printStatus()

UART_pr_1 status:
	RX Available: False
	RX Full: False
	TX Empty: True
	TX Full: False
	Interrupts Enabled: False
UART_pr_3 status:
	RX Available: False
	RX Full: False
	TX Empty: True
	TX Full: False
	Interrupts Enabled: False


## Demo: Bidirectional UART Messages

** Hardware setup:** For this demo you should connect a wire between the top right pin of **`PMOD 1`** (uart1 RX) and the second most right bottom pin of **`PMOD 1`** (uart3 TX), and a second wire between the bottom right pin of **`PMOD 1`** (uart1 TX) and the second most right top **`PMOD 1`** (uart3 RX). (the two wires should criss-cross)

In [5]:
import time


msg = [0xde, 0xad, 0xbe, 0xef] # The message to be tranmitted from pr_1 to pr_3
print("***** Sending message: " + str(msg) + "*****")
uart1.write(msg) # Sends message from pr_1
time.sleep(1.0)
recvd = uart3.read(4)# Reads message from pr_1

# Checks to see if the message sent correctly
if recvd == msg:
    print("Success: correct message received")
else: 
    print("Failure: message received: (" + str(recvd) + ")")
    
    
msg = [0xaa, 0xbb, 0x55, 0x33] # The message to be tranmitted from pr_3 to pr_1
print("\n***** Sending message: " + str(msg) + "*****")
uart3.write(msg) # Sends message from pr_3
time.sleep(1.0)
recvd = uart1.read(4) # Reads message from pr_1

# Checks to see if the message sent correctly
if recvd == msg:
    print("Success: correct message received")
else: 
    print("Failure: message received: (" + str(recvd) + ")")

***** Sending message: [222, 173, 190, 239]*****
Success: correct message received

***** Sending message: [170, 187, 85, 51]*****
Success: correct message received


In [6]:
# Note: Because of the 16 byte nature of the FIFO registers, these 32 byte
# messages may fail to transmit properly. To ensure safe transmittion of data
# transmit 16 bytes at a time.

# Flushes the FIFO buffers, discarding all it’s contents.
uart1.resetFIFOs()
uart3.resetFIFOs()


# Sends a message from uart1 to uart3.
message = "Sending Data from uart1 to uart3"
print(message)
# Should return 32 as an int.
size = len(message)
# Sends message from uart1 to uart3.
uart1.write(message.encode())
# Returns 32 bytes recieved by uart3.
recieved = uart3.read(size)
# Decodes the bytes recieved by uart3.
recieved = bytes(recieved).decode()
# Checks to see if message was sent succesfully.
if recieved == message:
    print("Success! Message Recieved: " + recieved)
else:
    print("Failure: Message Recieved: " + recieved)

# Send a message from uart3 to uart1.
message = "Sending Data from uart3 to uart1"
print(message)
# Should return 32 as an int.
size = len(message)
# Sends message from uart3 to uart1.
uart3.write(message.encode())
# Returns 32 bytes recieved by uart1.
recieved = uart1.read(size)
# Decodes the bytes recieved by uart1.
recieved = bytes(recieved).decode()
# Checks to see if message was sent succesfully.
if recieved == message:
    print("Success! Message Recieved: " + recieved)
else:
    print("Failure: Message Recieved: " + recieved)

Sending Data from uart1 to uart3
Success! Message Recieved: Sending Data from uart1 to uart3
Sending Data from uart3 to uart1
Success! Message Recieved: Sending Data from uart3 to uart1


## Demo: Bidirectional UART Messages with Interrupts

** Hardware setup:** _(Same as previous demo)_ For this demo you should connect a wire between the top right pin of **`PMOD 1`** (uart1 RX) and the second most right bottom pin of **`PMOD 1`** (uart3 TX), and a second wire between the bottom right pin of **`PMOD 1`** (uart1 TX) and the second most right top **`PMOD 1`** (uart3 RX). (the two wires should criss-cross)

In [8]:
# Downloads the gpio bitstream in region 1
# The first two lines must be called together
overlay.set_partial_region('pr_1')
overlay.download(PARTIAL_BITSTREAM_PATH + 'pr_1_uart.bit')
# Unlike GPIO, this line can be called even after another partial region
# is set up since it is assigned the default MMIO driver
uart1 = UART(overlay.pr_1, overlay.interrupt_pins['pr_1/axi_uartlite_0/interrupt']['fullpath'])

# Downloads the gpio bitstream in region 3
# The first two lines must be called together
overlay.set_partial_region('pr_3')
overlay.download(PARTIAL_BITSTREAM_PATH + 'pr_3_uart.bit')

uart3 = UART(overlay.pr_3, overlay.interrupt_pins['pr_3/axi_uartlite_0/interrupt']['fullpath'])

In [9]:
import time
import asyncio

msg = [0xde, 0xad, 0xbe, 0xef]

uart1.resetFIFOs()
uart3.resetFIFOs()

# Send message from uart 1 to uart 3
print("***** Sending message: " + str(msg) + "*****")
uart3.enableInterrupts(True)
uart1.write(msg)

recvd = asyncio.get_event_loop().run_until_complete(uart3.isr_recv(len(msg)))
if recvd == msg:
    print("Success: correct message received")
else: 
    print("Failure: message received: (" + str(recvd) + ")")

# Send message from uart 3 to uart 1
print("\n***** Sending message: " + str(msg) + "*****")
uart1.enableInterrupts(True)
uart3.write(msg)

recvd = asyncio.get_event_loop().run_until_complete(uart1.isr_recv(len(msg)))
if recvd == msg:
    print("Success: correct message received")
else: 
    print("Failure: message received: (" + str(recvd) + ")")


***** Sending message: [222, 173, 190, 239]*****
UART_pr_3 isr received byte #1 of 4: 0xde
UART_pr_3 isr received byte #2 of 4: 0xad
UART_pr_3 isr received byte #3 of 4: 0xbe
UART_pr_3 isr received byte #4 of 4: 0xef
Success: correct message received

***** Sending message: [222, 173, 190, 239]*****
UART_pr_1 isr received byte #1 of 4: 0xde
UART_pr_1 isr received byte #2 of 4: 0xad
UART_pr_1 isr received byte #3 of 4: 0xbe
UART_pr_1 isr received byte #4 of 4: 0xef
Success: correct message received
