# Data structures for fast infinte batching or streaming requests processing 

> Here we dicuss one of the coolest use of a data structures to address one of the very natural use case scenario of a server processing streaming requests from clients in order.Usually processing these requests involve a pipeline of operations applied based on request and multiple threads are in charge of dealing with these satges of pipeline. The requests gets accessed by these threads and the threads performing operations in the later part of the pipeline will have to wait for the earlier threads to finish their execution.

The usual way to ensure the correctness of multiple threads handling the same data concurrently is use locks.The problem is framed as a producer / consumer problems , where one threads finishes its operation and become producer of the data to be worked upon by another thread, which is a consumer. These two threads needs to be synchronized. 


> Note: In this blog we will discuss a "lock-free" circular queue data structure called disruptor. It was designed to be an efficient concurrent message passing datastructure.The official implementations and other discussions are available [here](https://lmax-exchange.github.io/disruptor/#_discussion_blogs_other_useful_links). This blog intends to summarise its use case and show the points where the design of the disruptor scores big.

# LOCKS ARE BAD

Whenever we have a scenario where mutliple concurrent running threads contend on a shared data structure and you need to ensure visibility of changes (i.e. a consumer thread can only get its hand over the data after the producer has processed it and put it for further processing). The usual and most common way to ensure these two requirements is to use a lock.
Locks need the operating system to arbitrate which thread has the responsibility on a shared piece of data. The operating system might schedule other processes and the software's thread may be waiting in a queue. Moreover, if other threads get scheduled by the CPU then the cache memory of the softwares's thread will be overwritten and when it finally gets access to the CPU, it may have to go as far as the main memory to get it's required data. All this adds a lot of overhead and is evident by the simple experiment of incrementing a single shared variable. In the experiment below we increment a shared variable in three different ways. In the first case, we have a single process incrementing the variable, in the second case we again have two threads, but they synchronize their way through the operation using locks.
In the third case, we have two threads which increment the variables and they synchronize their operation using atomic locks.


## SINGLE PROCESS INCREMENTING A SINGLE VARIABLE

In [38]:
import time 
def single_thread():
    start = time.time()
    x = 0
    for i in range(500000000):
        x += 1
    end = time.time()
    return(end-start)
print(single_thread())

28.66362190246582


In [37]:
#another way for single threaded increment using 
class SingleThreadedCounter():
    def __init__(self):
        self.val = 0
    def increment(self):
        self.val += 1


## TWO PROCESS INCREMENTING A SINGLE VARIABLE

In [40]:
import time
from threading import Thread, Lock
mutex = Lock()
x = 0
def thread_fcn():
    global x
    mutex.acquire()
    for i in range(250000000):
        x += 1
    mutex.release()

def mutex_increment():
    start = time.time()
    t1 = Thread(target=thread_fcn)
    t2 = Thread(target=thread_fcn)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    end = time.time()
    return (end-start)
print(mutex_increment())

36.418396949768066


> Note: As we can see that the time for performing the increment operation has gone up substantially when we would have expected it take half the time. 

> Important: In the rest of the blog we will take in a very usual scenario we see in streaming request processing.

A client sends in requests to a server in a streaming fashion. The server at its end needs to process the client's request, it may have multiple stages of processing. For example, imagine the client sends in a stream of requests and the server in JSON format. Now the probable first task that the client needs to perform is to parse the JSON request.Imagine a thread being assigned to do this parsing task. It parses requests one after another and hands over the parsed request in some form to another thread which may be responsible for performing business logic for that client. Usually the data structure to manage this message passing and flow control in screaming scenario is handled by a queue data structure. The producer threads (parser thread) puts in parsed data in this queue, from which the consumer thread (the business logic thread) will read of the parsed data. Because we have two threads working concurrently on a single data structure (the queue) we can expect contention to kick in. 

## WHY QUEUES ARE FLAWED

The queue could be an obvious choice for handling data communication between multiple threads, but the queue data structure is fundamentally flawed for communication between multiple threads. Imagine the case of the first two threads of the a system using a queue for data communication, the listener thread and the parsing thread. The listener thread listens to bytes from the wire and puts it in a queue and the parser thread will pick up bytes from the queue and parse it. Typically, a queue data structure will have a head field, a tail field and a size field (to tell an empty queue from a full one). The head field will be modified by the parser thread and the tail field by the parser thread. The size field though will be modified by both of the threads and it effectively makes the queue data structure having two writers. 

![](my_icons/queue.png)
Moreover, the entire data structure will fall in the same cache line and hence when say the listener thread modifies the tail field, the head field in another core also gets invalidated and needs to be fetched from a level 2 cache.
![](my_icons/false_sharing.png)

## CAN WE AVOID LOCKS ?

So, using a queue structure for inter-thread communication with expensive locks could cost a lot of performance for any system. Hence, we move towards a better data structure that solves the issues of synchronization among threads.
The data structure we use doesn't use locks.
The main components of the data structure are -
A. A circular buffer
B. A sequence number field which has a number indicating a specific slot in the circular buffer.
C. Each of the worker threads have their own sequence number.
The circular buffer is written to by the producers . The producer in each case updates the sequence number for each of the circular buffers. The worker threads (consumer thread) have their own sequence number indicating the slots they have consumed so far from the circular buffer.
![](my_icons/lock_free_ds.png)

>Note: In the design, each of the elements has a SINGLE WRITER. The producer threads of the circular ring write to the ring buffer, and its sequence number. The worker consumer threads will write their own local sequence number. No field or data have more than one writer in this data structure.

## WRITE OPERATION ON THE LOCK-FREE DATA STRUCTURE

 Before writing a slot in the circular buffer, the thread has to make sure that it doesn't overwrite old bytes that have not yet been processed by the consumer thread. The consumer thread also maintains a sequence number, this number indicates the slots that have been already processed. So the producer thread before writing grabs the circular buffer's sequence number, adds one to it (mod size of the circular buffer) to get the next eligible slot for writing. But before putting in the bytes in that slot it checks with the dependent consumer thread (by reading their local sequence number) if they have processed this slot. If say the consumer has not yet processed this slot, then the producer thread goes in a busy wait till the slot is available to write to. When the slot is overwritten then the circular buffer's sequence number is updated by the producer thread. This indicates to consumer threads that they have a new slot to consume.
![](my_icons/writing1.png)

Writing to the circular buffers is a 2-phase commit. In the first phase, we check out a slot from the circular buffer. We can only check out a slot if it has already been consumed. This is ensured by following the logic mentioned above. Once the slot is checked out the producer writes the next byte to it. Then it sends a commit message to commit the entry by updating the circular buffer's sequence number to its next logical value(+1 mod size of the circular buffer)
![](my_icons/writing2.png)

## READ OPERATION ON THE LOCK_FREE DATA STRUCTURE

The consumer thread reads the slots from circular buffer -1. Before reading the next slot, it checks (read) the buffer's sequence number. This number is indicative of the slots till which the buffer can read.
![](my_icons/reading.png)

## ENSURING THAT THE READS HAPPEN IN PROGRAM ORDER


There is just one piece of detail that needs to be addressed for the above data structure to work. Compilers and CPU take the liberty to reorder independent instructions for optimizations. This doesn’t have any issues in the single process case where the program’s logic integrity is maintained. But this could logic breakdown in case of multiple threads.
Imagine a typical simplified read/write to the circular buffer described above—
Say the publisher thread’s sequence of operation is indicated in black, and the consumer thread’s in brown. The publisher checks in a slot and it updates the sequence number. Then the consumer thread reads the (wrong) sequence number of the buffer and goes on to access the slot which is yet to be written.
![](my_icons/ordering1.png)

The way we could solve this is by putting memory fences around the variables which tells the compiler and CPU to not reorder reads / writes before and after those shared variables. In that way programs logic integrity is maintained.
![](my_icons/memory_fence.png)