# Queue Primitive

### Main Goals

- Bi-directional Async Messaging
- Abstraction should work on the in-memory python Worker and switchable to large containered stack as well
- Allow distribution of work across multiple nodes / workers
- Handling Exceptions and sending errors in case of task failure
- APIs for querying interacting and clearing queues

### Current Architecture

![current_queue_system.png](current_queue_system.png)

### Single Queue Pub Sub

![single-queue-pub-sub.png](single-queue-pub-sub.png)

### Multi Queue Pub-Sub

![multi-queue-pub-sub.png](multi-queue-pub-sub.png)

In [1]:
import syft as sy
from syft.service.queue.queue_stash import QueueItem
from syft.service.queue.queue import QueueServer, Publisher, Subscriber



### Initialize the Proxy/Router Server

In [2]:
pub_addr = "tcp://127.0.0.1:5000"
sub_addr = "tcp://127.0.0.1:5001"

In [3]:
queue_server = QueueServer.create(pub_addr=pub_addr, sub_addr=sub_addr)

In [4]:
queue_server.start()

Logging...


In [5]:
queue_server.check_logs(timeout=0.1)


### Attach a subscriber

In [6]:
subscriber = Subscriber(sub_addr)

pub 



### Attach a publisher

In [7]:
publisher = Publisher(address=pub_addr)

In [8]:
my_message = b"My Message"
publisher.send(my_message)

Message Send:  b'My Message'
sub My Message


### Listen message on Subscriber

In [9]:
# Listening message in a blocking process
subscriber.receive(blocking=True)

HEllo message received:  [b'My Message']


In [10]:
my_message = b"Super secret message !!"
publisher.send(my_message)

Message Send:  b'Super secret message !!'


In [11]:
# Listening messages in a thread
subscriber.receive()

HEllo message received:  [b'Super secret message !!']
sub Super secret message !!


In [12]:
my_message = b"Super secret message !!"
publisher.send(my_message)

Message Send:  b'Super secret message !!'
sub Super secret message !!


In [13]:
queue_server.check_logs(timeout=0.5)

### Close server

In [14]:
queue_server.close()

### Simple integration with current Worker

In [15]:
domain = sy.Worker(name="Domain")

> Worker: Domain - bec0dca254ce406297e6ae3f3f6c9927 - NodeType.DOMAIN

Services:
ActionService
DataSubjectMemberService
DataSubjectService
DatasetService
MessageService
MetadataService
NetworkService
PolicyService
ProjectService
RequestService
UserCodeService
UserService
Queue is Online 🟢
Logging...
pub 


In [16]:
domain.queue_proxy_server

<syft.service.queue.queue.QueueServer at 0x7f3a59831640>

In [17]:
domain.subscriber

<syft.service.queue.queue.Subscriber at 0x7f3a596c55b0>

In [18]:
domain.publisher

<syft.service.queue.queue.Publisher at 0x7f3a59831610>

In [19]:
my_message = b"My Message"
domain.publisher.send(my_message)

Message Send:  b'My Message'


In [20]:
domain.queue_proxy_server.check_logs(timeout=0.5)

HEllo message received:  [b'My Message']
sub My Message


In [21]:
domain.queue_proxy_server.close()