# Overlapping Communication and Compute

The *Intro to Compression* notebook introduced the key features for interacting with the data compression kernels. In this notebook we are going to explore how to maximize the total system performance of the system using the techniques for overlapping compute and communication first introduced in the *Kernel Optimization* suite of notebooks.

Buliding on that example, here we want to implement the higher-level LZ4 *frame* compression format which encapsulates the compressed data with block and file-level headers. The full specification is [available online](https://android.googlesource.com/platform/external/lz4/+/HEAD/doc/lz4_Frame_format.md) but here we'll only touch on the parts absolutely necessary to create a file that can be decompressed by the software implementation.

First we are going to import pynq and get a handle to the compression kernel we plan on using:

In [1]:
import pynq

ol = pynq.Overlay('compression.xclbin')

compress_kernel = ol.xilLz4Compress_1

One piece of terminology that appears in the data compression examples designs is the *brick* which is used for the set of 8 blocks that are processed together. For this example we are going to stick with the 1 MB block size from the introduction and introduce a brick size of 8 blocks.

Before starting to try overlapping the compute and communication we want to write the non-overlapped case, both to ensure that the algorithm is correct and to provide a benchmark for future optimization. To begin with we need to allocate the buffers as before.

In [2]:
BLOCK_SIZE = 1024*1024
BRICK_SIZE = 8*BLOCK_SIZE

in_buffers = pynq.allocate((8,BLOCK_SIZE), 'u1', target=ol.bank0)
out_buffers = pynq.allocate((8,BLOCK_SIZE), 'u1', target=ol.bank0)
compressed_size = pynq.allocate((8,), 'u4', target=ol.bank0)
uncompressed_size = pynq.allocate((8,), 'u4', target=ol.bank0)

To simplify the core loop we can split the computation into three phases

 1. Prepare the input buffers with uncompressed data
 2. Call the accelerator
 3. Retrieve the compressed data from the output buffers
 
The `write_uncompressed` function below encapsulates all of the steps needed to prepare the input buffers as described in the previous notebook. The only change needed is some logic to account for the final brick of the file which is most likely not going to be a full brick. Otherwise the process of reshaping the input buffer and setting the size of each block in the brick is the same as before

In [3]:
def write_uncompressed(data, buffers, sizes):
    total_size = min(len(data), BRICK_SIZE)
    buffers.reshape((BRICK_SIZE,))[0:total_size] = data[0:total_size]
    buffers.sync_to_device()
    if total_size == BRICK_SIZE:
        sizes[:] = BLOCK_SIZE
        blocks = 8
    else:
        left = total_size
        blocks = ((total_size - 1) // BLOCK_SIZE) + 1
        sizes[0:blocks-1] = BLOCK_SIZE
        sizes[blocks-1] = total_size % BLOCK_SIZE
        sizes[blocks:8] = 0
    sizes.sync_to_device()
    return total_size, blocks

The `read_compressed` reads the sizes of each compressed block, retrieves the data from the card and adds it to a stream. Again, most of this code is directly taken from the previous notebook with the capability to only process a partial brick. Another change is the addition of a *block header* to the output stream. This is simply the length of the block and is used by the decompressor to quickly find multiple block for parallel decompression.

In [4]:
import struct

def read_compressed(stream, buffers, sizes, blocks):
    sizes.sync_from_device()
    for size, buffer in zip(sizes[0:blocks], buffers[0:blocks]):
        subbuf = buffer[0:size]
        subbuf.sync_from_device()
        stream.write(struct.pack('<I', size))
        stream.write(subbuf)

We are now in a position to create the non-overlapped reference implementation of the whole-file compressor. An LZ4 frame consists of the following parts

 * A header consisting of a magic string and some details of the compressed format
 * The compressed blocks, each with a size header
 * An empty block to mark the end of the file
 
For this example we have a fixed block size of 1 MB which is encoded in the header and no additional checksums or data lengths - for the details of how this is encoded see the specification linked at the top of the notebook.

Our `compress_hw` function works its way through the input data, splitting it into bricks and processing each brick in sequence. For ease of use we return the contents of the compressed stream as a binary string.

In [5]:
import io

LZ4_MAGIC = b'\x04\x22\x4D\x18'
LZ4_HEADER = LZ4_MAGIC + b'\x60\x60\x51'

def compress_hw(raw_data):
    view = memoryview(raw_data)
    offset = 0
    
    stream = io.BytesIO()
    stream.write(LZ4_HEADER)
    
    while offset < len(view):
        brick_size, blocks = write_uncompressed(
            view[offset:], in_buffers, uncompressed_size)
        
        compress_kernel.call(in_buffers, out_buffers,
                             compressed_size, uncompressed_size,
                             1024, brick_size)
        
        read_compressed(stream, out_buffers, compressed_size, blocks)
        offset += brick_size
    stream.write(b'\x00\x00\x00\x00')
    stream.seek(0)
    return stream

Again we'll use our *test_data.bin* consisting of an xclbin file as the input for the compressor:

In [6]:
with open('test_data.bin', 'rb') as f:
    test_data = f.read()

compressed_data = compress_hw(test_data).read()

This time we need to use the `lz4.frame` module to decompress our data.

In [7]:
import lz4.frame
uncompressed_data = lz4.frame.decompress(compressed_data)

Finally we can make sure that the uncompressed data is the same as what we started with.

In [8]:
uncompressed_data == test_data

True

## Profiling the Compressor

Before we start optimizing our loop we need to get a baseline for the compressor. Jupyter provides a `%%timeit` magic which offers a simple way to see how long a function takes:

In [9]:
%%timeit
compressed_data = compress_hw(test_data)

158 ms ± 6.48 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


For a more programmatic interface we can use the `timeit` module. Using the `number` and `repeat` parameters we can match the results of the `%%timeit` magic.

In [10]:
import timeit
runtimes = timeit.repeat('compress_hw(test_data)', number=10, 
                        repeat=7, globals=globals())

hw_average = sum(runtimes) / len(runtimes) / 10
hw_average

0.13324313652036446

Which we can use to find the throughput in MBps

In [11]:
len(test_data) / hw_average / (1024*1024)

531.031172128888

## Performing the Overlap

To conceptualize what we are trying to achieve by overlapping the compute and communication it's worth considering what the activity diagram of our non-overlapped case looks like and how we would like to optimize it.

![Non-overlapped execution](img/nonoverlapped.png)

As we can see the accelerator remains idle for large periods while we transfer data to and from the device. Instead we would like something that look more like:

![overlapped execution](img/overlapped.png)

where the communication for neighboring iterations of the loop while the accelerator is busy. To achieve this we need to duplicate each buffer so that one is being processed by the accelerator while the other is being used for data transfer. We'll ping-pong between the two buffers until all of the data is processed. We will represent this by adding another dimension of size 2 to the front of all the buffer sizes

In [12]:
in_buffers = pynq.allocate((2,8,BLOCK_SIZE), 'u1', target=ol.bank0)
out_buffers = pynq.allocate((2,8,BLOCK_SIZE), 'u1', target=ol.bank0)
compressed_size = pynq.allocate((2,8), 'u4', target=ol.bank0)
uncompressed_size = pynq.allocate((2,8), 'u4', target=ol.bank0)

Looking at our desired execution trace we can sketch out the form our revised function should take - namely

 * We need to preload the first brick onto the device before starting our processing loop
 * We need to retrieve the last brick result after the end of the processing loop
 * The loop should begin by starting the accelerator and end with the call for wait

With that structure in place and an `active` variable to keep track of which set of buffers are being processed.

In [13]:
def compress_overlapped(raw_data):
    view = memoryview(raw_data)
    offset = 0
    active = 0
    previous_blocks = 0
    
    stream = io.BytesIO()
    stream.write(LZ4_HEADER)
    
    brick_size, blocks = write_uncompressed(
            view[offset:], in_buffers[0], uncompressed_size[0])
    
    while True:
        wh = compress_kernel.start(
            in_buffers[active], out_buffers[active],
            compressed_size[active], uncompressed_size[active],
            1024, brick_size)
        
        active ^= 1
        offset += brick_size
        if previous_blocks != 0:
            read_compressed(stream, out_buffers[active],
                            compressed_size[active], previous_blocks)
        previous_blocks = blocks
        if offset < len(view):
            brick_size, blocks = write_uncompressed(
                view[offset:], in_buffers[active], uncompressed_size[active])
        else:
            break
        wh.wait()
    wh.wait()
    active ^= 1
    read_compressed(stream, out_buffers[active],
                    compressed_size[active], previous_blocks)
    stream.write(b'\x00\x00\x00\x00')
    stream.seek(0)
    return stream

We can check the result using the same code snippet as before

In [14]:
compressed_data = compress_overlapped(test_data).read()
test_data == lz4.frame.decompress(compressed_data)

True

And we can see our performance is drastically improved

In [15]:
%%timeit
compress_overlapped(test_data)

79.3 ms ± 2.38 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


Comparing with software we can see a 2x speedup with software on our test machine using settings to match the output of our Python code

In [16]:
%%timeit
lz4.frame.compress(test_data, block_linked=False, store_size=False)

181 ms ± 118 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)


At this point we are bound by the limitations of our data-movement code. The Vitis Libraries have [examples and benchmarks](https://github.com/Xilinx/Vitis_Libraries/tree/b658aa5cd262d080048526ce931d4570cb931a36/data_compression/L3/benchmarks/lz4_p2p_compress) that use P2P memory transfers between the device and an SSD along with both compute kernels to drastically improve performance. For more details on the library consult the [repository](https://github.com/Xilinx/Vitis_Libraries/tree/master/data_compression).

### Cleaning up

You might want to *shutdown* this notebook at this point to ensure that all of the resources used are freed.


Copyright 2020 (C) Xilinx, Inc