# UART Demo

This demo highlights the usefulness of using a more complex MMIO driver wrapper by implementing a wrapper to interact with UART hardware. This wrapper is included in the notebook as an example of how to create a more complicated MMIO driver, including how to interact with interrupts.

In [1]:
import asyncio
import time
from pynq import Interrupt

class UART() :   
    RX_OFFSET = 0x00
    TX_OFFSET = 0x04
    STATUS_OFFSET = 0x08
    CONTROL_OFFSET = 0x0C


    RX_AVAIL_BIT = 0x01
    RX_FULL_BIT = 0x02
    TX_EMPTY_BIT = 0x04
    TX_FULL_BIT = 0x08

    RST_FIFO_BIT = 0x03

    CTRL_BIT_EN_INT = 0x10
    CTRL_BIT_DIS_INT = 0XEF

    def __init__(self, pr_region, name=None):
        self._mmio = pr_region.S_AXI.mmio
        interruptPin = str(pr_region.description.get('fullpath')) + "/axi_uartlite_0/interrupt"
        interrupt = overlay.interrupt_pins[interruptPin]['fullpath']
        
        if name is None:
            self.name = "UART_" + str(pr_region.description.get('fullpath'))
        else:
            self.name = name
        
        self.interrupt = Interrupt(interrupt)

    def txReady(self):
        cur_val = self._mmio.read(self.STATUS_OFFSET)
        return not (cur_val & self.TX_FULL_BIT)

    def rxAvail(self):
        cur_val = self._mmio.read(self.STATUS_OFFSET)
        return  (cur_val & self.RX_AVAIL_BIT) == self.RX_AVAIL_BIT

    def enableInterrupts(self, enable):
        ctrl = self._mmio.read(self.CONTROL_OFFSET)
        if enable:
            ctrl |= self.CTRL_BIT_EN_INT
        else:
            ctrl &= self.CTRL_BIT_DIS_INT
        self._mmio.write(self.CONTROL_OFFSET, ctrl)

    def write(self, msg):
        for b in msg:
            # Wait for ready to send
            while not self.txReady():
                pass

            # Send data
            self.writeTxByte(b)
        
    def readRxByte(self):
        byte = self._mmio.read(self.RX_OFFSET)
        return (byte & 0xff)

    def writeTxByte(self, byte):
        # Wait for ready to send
        while not self.txReady():
            pass

        self._mmio.write(self.TX_OFFSET, byte)

    #timeout_secs can be initialized to None to disable timeout
    def read(self, size=1, timeout_secs=1):
        recvd = []
        timeout = _Timeout(timeout_secs)
        while len(recvd) < size:
            #waits for data to be available
            while not self.rxAvail() and not timeout.expired():
                pass

            #exits if time has expired.
            if timeout.expired():
                break
            
            b=self.readRxByte()
            recvd.append(b)
        
        return recvd
        
        
    def printStatus(self):
        status = self._mmio.read(self.STATUS_OFFSET)
        print(self.name + " status:")
        print("\tRX Available: " + str((status & self.RX_AVAIL_BIT) == self.RX_AVAIL_BIT))
        print("\tRX Full: " + str((status & self.RX_FULL_BIT) == self.RX_FULL_BIT))
        print("\tTX Empty: " + str((status & self.TX_EMPTY_BIT) == self.TX_EMPTY_BIT))
        print("\tTX Full: " + str((status & self.TX_FULL_BIT) == self.TX_FULL_BIT))
        print("\tInterrupts Enabled: " + str((status & self.CTRL_BIT_EN_INT) == self.CTRL_BIT_EN_INT))
       
       
    def resetFIFOs(self):
        self._mmio.write(self.CONTROL_OFFSET, self.RST_FIFO_BIT) 

    
    # Run this interrupt handler until all messages have been received
    # msg_size - Number of bytes to wait for (if 0, run forever)
    async def isr_recv(self, msg_size = 0):
        recvd_msg = []
        while True:
            await self.interrupt.wait()
            if self.rxAvail():
                recvd = self.readRxByte()
                recvd_msg.append(recvd)                

                if msg_size > 0:
                    print(self.name  + " isr received byte #" + str(len(recvd_msg)) + \
                          " of " + str(msg_size) + ": " + hex(recvd))                
                    if (len(recvd_msg) == msg_size):                        
                        return recvd_msg
                else:
                    print(self.name + " isr received byte #" + str(len(recvd_msg)) + ": " + hex(recvd))    



# This class is part of pySerial. https://github.com/pyseraial/pyserial
# (C) 2001-2016 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier:    BSD-3-Clause
class _Timeout(object):
    """\
    Abstraction for timeout operations. Using time.monotonic() if available
    or time.time() in all other cases.
    The class can also be initialized with 0 or None, in order to support
    non-blocking and fully blocking I/O operations. The attributes
    is_non_blocking and is_infinite are set accordingly.
    """
    if hasattr(time, 'monotonic'):
        # Timeout implementation with time.monotonic(). This function is only
        # supported by Python 3.3 and above. It returns a time in seconds
        # (float) just as time.time(), but is not affected by system clock
        # adjustments.
        TIME = time.monotonic
    else:
        # Timeout implementation with time.time(). This is compatible with all
        # Python versions but has issues if the clock is adjusted while the
        # timeout is running.
        TIME = time.time

    def __init__(self, duration):
        """Initialize a timeout with given duration"""
        self.is_infinite = (duration is None)
        self.is_non_blocking = (duration == 0)
        self.duration = duration
        if duration is not None:
            self.target_time = self.TIME() + duration
        else:
            self.target_time = None

    def expired(self):
        """Return a boolean, telling if the timeout has expired"""
        return self.target_time is not None and self.time_left() <= 0

    def time_left(self):
        """Return how many seconds are left until the timeout expires"""
        if self.is_non_blocking:
            return 0
        elif self.is_infinite:
            return None
        else:
            delta = self.target_time - self.TIME()
            if delta > self.duration:
                # clock jumped, recalculate
                self.target_time = self.TIME() + self.duration
                return self.duration
            else:
                return max(0, delta)

    def restart(self, duration):
        """\
        Restart a timeout, only supported if a timeout was already set up
        before.
        """
        self.duration = duration
        self.target_time = self.TIME() + duration

## Download the static bitstream
We first need to download the static or full bitstream before any partial bitstreams can be downloaded. Note that if the bitstream is not in the same directory as the notebook then the full path needs to be provided.

In [2]:
from prio.prio import PrIoOverlay

FULL_BITSTREAM_PATH = "/usr/local/lib/python3.6/dist-packages/prio/"
PARTIAL_BITSTREAM_PATH = "/usr/local/lib/python3.6/dist-packages/prio/partial/"
overlay = PrIoOverlay(FULL_BITSTREAM_PATH + "prio.bit")

## Set up the reconfigurable region
Notice that as with the full bitstream, the full path to the partial bitstream must be provided when it is located outside of the current notebook's directory.

We will download partial bitstream and initialize each uart driver.

In [3]:
overlay.pr_download("pr_1", PARTIAL_BITSTREAM_PATH + "pr_1_uart.bit")
uart1 = UART(overlay.pr_1)

overlay.pr_download("pr_3", PARTIAL_BITSTREAM_PATH + "pr_3_uart.bit")
uart3 = UART(overlay.pr_3)

## Demo: Print UART Status
Prints the status of both of the UART modules.

In [4]:
uart1.resetFIFOs()
uart3.resetFIFOs()
uart1.printStatus()
uart3.printStatus()

UART_pr_1 status:
	RX Available: False
	RX Full: False
	TX Empty: True
	TX Full: False
	Interrupts Enabled: False
UART_pr_3 status:
	RX Available: False
	RX Full: False
	TX Empty: True
	TX Full: False
	Interrupts Enabled: False


## Demo: Bidirectional UART Messages

This cell will transmit a message back and forth between partial region 1 and partial region 3. After running the cell you will see output showing the message that was sent and the message that was recieved, going both directions.

**Hardware setup:** For this demo you should connect a wire between the top right pin of **`PMOD 1`** (uart1 RX) and the second most right bottom pin of **`PMOD 1`** (uart3 TX), and a second wire between the bottom right pin of **`PMOD 1`** (uart1 TX) and the second most right top **`PMOD 1`** (uart3 RX). (The two wires should criss-cross)

![](images/uart_hw_setup.JPG)

In [5]:
# Note: Because of the 16 byte nature of the FIFO registers, these 32 byte
# messages may occassionally fail to transmit properly. To ensure safe 
# transmittion of data transmit 16 bytes at a time.

uart1.resetFIFOs()
uart3.resetFIFOs()

message = "Sending Data from uart1 to uart3"
print(message)
size = len(message)
uart1.write(message.encode())
recieved = uart3.read(size)
recieved = bytes(recieved).decode()
if recieved == message:
    print("Success! Message Recieved: " + recieved)
else:
    print("Failure: Message Recieved: " + recieved)

message = "Sending Data from uart3 to uart1"
print(message)
size = len(message)
uart3.write(message.encode())
recieved = uart1.read(size)
recieved = bytes(recieved).decode()
if recieved == message:
    print("Success! Message Recieved: " + recieved)
else:
    print("Failure: Message Recieved: " + recieved)

Sending Data from uart1 to uart3
Success! Message Recieved: Sending Data from uart1 to uart3
Sending Data from uart3 to uart1
Success! Message Recieved: Sending Data from uart3 to uart1


## Demo: Bidirectional UART Messages with Interrupts

This demo is similar to the demo above, but this time it will utilize the interrupt functionality present in the PR regions. For the ZCU104, an interupt class attribute is automatically created each time an instance of the UART class is created, so we can use the same UART objects for the cell below.

**Hardware setup:** _(Same as previous demo)_ For this demo you should connect a wire between the top right pin of **`PMOD 1`** (uart1 RX) and the second most right bottom pin of **`PMOD 1`** (uart3 TX), and a second wire between the bottom right pin of **`PMOD 1`** (uart1 TX) and the second most right top **`PMOD 1`** (uart3 RX). (the two wires should criss-cross)

In [6]:
msg = [0xde, 0xad, 0xbe, 0xef]

uart1.resetFIFOs()
uart3.resetFIFOs()

# Send message from uart 1 to uart 3
print("***** Sending message: " + '[{}]'.format(', '.join(hex(x) for x in msg)) + "*****")
uart3.enableInterrupts(True)
uart1.write(msg)

recvd = asyncio.get_event_loop().run_until_complete(uart3.isr_recv(len(msg)))
if recvd == msg:
    print("Success: correct message received")
else: 
    print("Failure: message received: (" + '[{}]'.format(', '.join(hex(x) for x in recvd)) + ")")

# Send message from uart 3 to uart 1
print("\n***** Sending message: " + '[{}]'.format(', '.join(hex(x) for x in msg)) + "*****")
uart1.enableInterrupts(True)
uart3.write(msg)

recvd = asyncio.get_event_loop().run_until_complete(uart1.isr_recv(len(msg)))
if recvd == msg:
    print("Success: correct message received")
else:
    print("Failure: message received: (" + '[{}]'.format(', '.join(hex(x) for x in recvd)) + ")")


***** Sending message: [0xde, 0xad, 0xbe, 0xef]*****
UART_pr_3 isr received byte #1 of 4: 0xde
UART_pr_3 isr received byte #2 of 4: 0xad
UART_pr_3 isr received byte #3 of 4: 0xbe
UART_pr_3 isr received byte #4 of 4: 0xef
Success: correct message received

***** Sending message: [0xde, 0xad, 0xbe, 0xef]*****
UART_pr_1 isr received byte #1 of 4: 0xde
UART_pr_1 isr received byte #2 of 4: 0xad
UART_pr_1 isr received byte #3 of 4: 0xbe
UART_pr_1 isr received byte #4 of 4: 0xef
Success: correct message received
