In [1]:
from typing import Callable, Any, List
import multiprocessing as mp
from multiprocessing import Queue as MQueue
from queue import Queue
from threading import Thread
import pyring as pr

%load_ext autoreload
%autoreload 2

from qubx import logger
from qubx.core.basics import Trade
from qubx.utils import Stopwatch

 >  [[32mdev[0m] [31minstalled cython rebuilding hook[0m


# Test throughput

In [11]:
sw = Stopwatch()

def run_in_thread(method: Callable, arguments: List[Any]=list()) -> Thread:
    t = Thread(target=method, args=arguments)
    t.start()
    return t

class DataPumper:
    queue: Queue
    def __init__(self, type='q', ring_size=10000):
        self.type = type
        if type == 'q':
            self.queue = Queue()
        else:
            self.queue = pr.SingleProducerDisruptor(ring_size)

    def pump(self, n):
        sw.start(f'{self.type.upper()}:pump_batch')
        self.queue.put('S')
        for i in range(n):
            self.queue.put(Trade(100, i + 0.1, i * 100))
        self.queue.put('F')
        sw.stop(f'{self.type.upper()}:pump_batch')

class DataReader:
    def __init__(self, pumper: DataPumper):
        self.queue = pumper.queue
        self.type = pumper.type
        self._run = True

    def _read_queue(self):
        print("START reading queue")
        while self._run:
            data = self.queue.get()
            if isinstance(data, str):
                if data == 'S':
                    sw.start('Q:read_batch')
                    print("batch reading started ...")
                elif data == 'F':
                    sw.stop('Q:read_batch')
                    print("batch finished")

    def _read_disr(self):
        print("START reading disruptor")
        sbscr = self.queue.subscribe()
        while self._run:
            idx, data = sbscr.next()
            if isinstance(data, str):
                if data == 'S':
                    sw.start('D:read_batch')
                    print("batch reading started ...")
                elif data == 'F':
                    sw.stop('D:read_batch')
                    print("batch finished")
        sbscr.unregister()

    def read(self):
        if self.type=='q':
            self._read_queue()
        else:
            self._read_disr()

    def run(self):
        return run_in_thread(self.read)

    def stop(self):
        self._run = False
        self.queue.put(None)

In [12]:
# PyRing
dp = DataPumper('d')
rd = DataReader(dp)
rd.run()
dp.pump(1_000_000)
dp.pump(1_000_000)
dp.pump(1_000_000)
rd.stop()
# dp.pump(1)

START reading disruptor
batch reading started ...
batch finished
batch reading started ...
batch finished
batch reading started ...


In [13]:
# Standard Queue
dp = DataPumper('q')
rd = DataReader(dp)
rd.run()
dp.pump(1_000_000)
dp.pump(1_000_000)
dp.pump(1_000_000)
rd.stop()

START reading queue
batch reading started ...
batch finished
batch reading started ...
batch finished
batch reading started ...


In [14]:
logger.info('Tests' + str(sw))

[32m2024-03-18 10:53:58.319[0m [ [1mℹ️[0m ] [1mTests
	[37mD:pump_batch[0m[1m took [31m2.0618683[0m[1m secs
	[37mD:read_batch[0m[1m took [31m2.0830948[0m[1m secs
	[37mQ:pump_batch[0m[1m took [31m4.1221722[0m[1m secs
	[37mQ:read_batch[0m[1m took [31m4.3047418[0m[1m secs[0m


In [7]:
sw.reset()

# Test pyring


In [None]:
disruptor = pr.SingleProducerDisruptor()

subscriber = disruptor.subscribe()
# subscriber_two = disruptor.subscribe()

for i in range(100):
    disruptor.put(i ** 2)
    sequence_one, res_one = subscriber.next()
    # sequence_two, res_two = subscriber_two.next()

# releases the subscribers barriers and allows disruptor to continue
subscriber.unregister()
# subscriber_two.unregister()

In [10]:
# create ring buffer
ring_buffer = pr.RingBuffer()

# add to ring
ring_buffer.put("Something new!")

# get latest from ring
sequence, value = ring_buffer.get_latest()
print(sequence, value) # 0 Something new!

0 Something new!
