Skip to content

Commit

Permalink
feat(gateway): separate message queues for client messages in/out
Browse files Browse the repository at this point in the history
  • Loading branch information
xendarboh committed Sep 20, 2023
1 parent b87e867 commit 4042404
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 21 deletions.
52 changes: 36 additions & 16 deletions cmd/xtrellis/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,52 @@ import (
)

// A gateway:
// - messages to transmit through the mix-net are sent here
// - clients retrieve messages to transmit from here
// - receives messages to transmit through the mix-net for a particular client
// - clients retrieve messages to transmit
// - receives final messages from the mix-net

// Enable gateway? Used externally to conditionally use the gateway
var Enable bool = false

// Total message size including 8*2 bytes for serialized meta; initialization required
var messageSize int64 = 0

// A message queue for each client is used to store messages in real-time as they are received
// and made available for each client round
var messageQueue = make(map[int]*list.List)

func Init(s int64, enable bool) {
messageSize = s
Enable = enable
}

// A message queue for each client
type MessageQueue struct {
items map[int]*list.List
}

func NewMessageQueue() *MessageQueue {
return &MessageQueue{
items: make(map[int]*list.List),
}
}

// Incoming message queue; stores messages as they are received then made available for mix-net xfer each client round
var msgQueueIn = NewMessageQueue()

// Outgoing message queue; stores final messages as they arrive out of the mix-net
var msgQueueOut = NewMessageQueue()

// enqueue data into the queue at the given index
func enqueue(index int, val []byte) {
func (q *MessageQueue) Enqueue(index int, val []byte) {
// Initialize the queue if it doesn't exist
if _, exists := messageQueue[index]; !exists {
messageQueue[index] = list.New()
if _, exists := q.items[index]; !exists {
q.items[index] = list.New()
}
// Add to the queue
messageQueue[index].PushBack(val)
q.items[index].PushBack(val)
}

// dequeue data from the queue at the given index
func dequeue(index int) ([]byte, error) {
func (q *MessageQueue) Dequeue(index int) ([]byte, error) {
// pop a message from the queue for a specific client
if queue, exists := messageQueue[index]; exists {
if queue, exists := q.items[index]; exists {
if element := queue.Front(); element != nil {
// Pop the front element from the queue
message := element.Value.([]byte)
Expand Down Expand Up @@ -117,7 +131,7 @@ func messageUnserialize(message []byte) (uint64, []byte, error) {

// Input a message into the gateway for a client
func PutMessageForClient(clientId int64, message []byte) error {
enqueue(int(clientId), message)
msgQueueIn.Enqueue(int(clientId), message)

return nil
}
Expand All @@ -126,11 +140,11 @@ func PutMessageForClient(clientId int64, message []byte) error {
// if no messages in the client's message queue, then use a default
func GetMessageForClient(i *coord.RoundInfo, clientId int64) ([]byte, error) {
// get next message data queued for this client
data, err := dequeue(int(clientId))
data, err := msgQueueIn.Dequeue(int(clientId))

// if no message found in queue, use default data
if err != nil {
data = []byte("---")
data = []byte("")
}

// use clientId as message id
Expand Down Expand Up @@ -166,8 +180,14 @@ func CheckFinalMessages(messages [][]byte, numExpected int) bool {
}
}

// for each final message
for i, s := range messageData {
utils.DebugLog("messageData[%d] = %x", i, s)
// if the message data has length, then it was fed to the client
if len(s) > 0 {
// add the data to the client's out queue
msgQueueOut.Enqueue(int(i), s)
}
utils.DebugLog("messageData[%d] = '%x'", i, s)
}

return len(messageData) == numExpected
Expand Down
12 changes: 7 additions & 5 deletions cmd/xtrellis/gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,24 @@ func TestMessageQueue(t *testing.T) {
v1 := []byte("test1")
v2 := []byte("test2")

enqueue(id, v1)
enqueue(id, v2)
q := NewMessageQueue()

_, err := dequeue(4)
q.Enqueue(id, v1)
q.Enqueue(id, v2)

_, err := q.Dequeue(4)
if err == nil {
t.Log("expected index to not have a queue")
t.FailNow()
}

m1, err := dequeue(id)
m1, err := q.Dequeue(id)
if !bytes.Equal(m1, v1) {
t.Log("bytes not equal")
t.FailNow()
}

m2, err := dequeue(id)
m2, err := q.Dequeue(id)
if !bytes.Equal(m2, v2) {
t.Log("bytes not equal")
t.FailNow()
Expand Down

0 comments on commit 4042404

Please sign in to comment.