Skip to content

Commit

Permalink
storage test
Browse files Browse the repository at this point in the history
  • Loading branch information
Florimond committed Nov 14, 2023
1 parent 35784f2 commit 705518c
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 83 deletions.
1 change: 0 additions & 1 deletion internal/provider/storage/ssd.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func encodeFrame(msgs message.Frame) []*badger.Entry {
// n is specified by limit argument. From and until times can also be specified
// for time-series retrieval.
func (s *SSD) Query(ssid message.Ssid, from, untilTime time.Time, untilID message.ID, limiter Limiter) (message.Frame, error) {

// Construct a query and lookup locally first
query := newLookupQuery(ssid, from, untilTime, untilID, limiter)
match := s.lookup(query)
Expand Down
6 changes: 6 additions & 0 deletions internal/provider/storage/ssd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func TestSSD_QueryOrdered(t *testing.T) {
})
}

func TestSSD_QueryUntilID(t *testing.T) {
runSSDTest(func(store *SSD) {
testUntilID(t, store)
})
}

func TestSSD_QueryRetained(t *testing.T) {
runSSDTest(func(store *SSD) {
testRetained(t, store)
Expand Down
20 changes: 20 additions & 0 deletions internal/provider/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,26 @@ func testRetained(t *testing.T, store Storage) {
assert.Equal(t, "9", string(f[0].Payload))
}

func testUntilID(t *testing.T, store Storage) {
var fourth message.ID
for i := int64(0); i < 10; i++ {
msg := message.New(message.Ssid{0, 1, 2}, []byte("a/b/c/"), []byte(fmt.Sprintf("%d", i)))
msg.TTL = message.RetainedTTL
msg.ID.SetTime(msg.ID.Time() + (i * 10000))
assert.NoError(t, store.Store(msg))
if i == 4 {
fourth = msg.ID
}
}

// Issue a query
zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, fourth, NewMessageNumberLimiter(100))
assert.NoError(t, err)

assert.Len(t, f, 4)
}

func testRange(t *testing.T, store Storage) {
var t0, t1 int64
for i := int64(0); i < 100; i++ {
Expand Down
73 changes: 17 additions & 56 deletions internal/service/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,13 @@ type Message struct {
type Response struct {
Request uint16 `json:"req,omitempty"` // The corresponding request ID.
Messages []Message `json:"messages"` // The history of messages.
//Messages message.Frame `json:"messages"`
}

// ForRequest sets the request ID in the response for matching
func (r *Response) ForRequest(id uint16) {
r.Request = id
}

type limiter struct {
maxCount int64
currentCount int64
totalSize int64
}

func (l *limiter) CanAddMessage(m message.Message) bool {
if l.currentCount >= l.maxCount {
return false
}
return true
}

// OnRequest handles a request of historical messages.
func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, bool) {
var request Request
Expand All @@ -80,13 +66,12 @@ func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, b
return errors.ErrUnauthorized, false
}

limit := int64(3)
// if v, ok := channel.Last(); ok {
// limit = v
// }
// messageLimiter := &limiter{
// maxCount: limit,
// }
// Use limit = 1 if not specified, otherwise use the limit option. The limit now
// defaults to one as per MQTT spec we always need to send retained messages.
limit := int64(1)
if v, ok := channel.Last(); ok {
limit = v
}

ssid := message.NewSsid(key.Contract(), channel.Query)
t0, t1 := channel.Window() // Get the window
Expand All @@ -98,40 +83,16 @@ func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, b
return errors.ErrServerError, false
}

// This request is answered either by resending all messages on their
// original channel, potentially triggering mutliple handlers on the client
// side, or by responding with all messages in one big response here.
// Can be both, but the latter is the default behavior.
withResponse, okResponse := channel.GetOption("response")
withResend, okResend := channel.GetOption("resend")
doResend := okResend && withResend == 1
doRespond := (okResponse && withResponse == 1) || !doResend

// Resend every messages again like they were originally.
/*
if doResend {
// Range over the messages in the channel and forward them
for _, m := range msgs {
msg := m // Copy message
c.Send(&msg)
}
}*/

// Send all messages in the payload of the response to this request.
if doRespond {
resp := &Response{
Messages: make([]Message, 0, len(msgs)),
}
for _, m := range msgs {
msg := m
resp.Messages = append(resp.Messages, Message{
ID: msg.ID,
Topic: string(msg.Channel), // The channel for this message.
Payload: string(msg.Payload), // The payload for this message.
})
}
return resp, true
resp := &Response{
Messages: make([]Message, 0, len(msgs)),
}

return nil, true
for _, m := range msgs {
msg := m
resp.Messages = append(resp.Messages, Message{
ID: msg.ID,
Topic: string(msg.Channel), // The channel for this message.
Payload: string(msg.Payload), // The payload for this message.
})
}
return resp, true
}
68 changes: 42 additions & 26 deletions internal/service/history/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"github.com/stretchr/testify/assert"
)

// TestHistory tests the history service and its default Limiter implementation
// which limits the number of messages that can be retrieved. This Limiter's
// purpose is to reproduce historical behavior of Emitter.
func TestHistory(t *testing.T) {
//assert.True(t, true)
ssid := message.Ssid{1, 3238259379, 500706888, 1027807523}
store := storage.NewInMemory(nil)
store.Configure(nil)
Expand All @@ -35,38 +37,52 @@ func TestHistory(t *testing.T) {
Contract: uint32(1),
ExtraPerm: security.AllowLoad,
}
// Create new service
service := New(auth, store)
connection := &fake.Conn{}

// The most basic request, on an empty store.
request := &Request{
Key: "key",
Channel: "key/a/b/c/",
}

// Prepare the request
b, _ := json.Marshal(request)
if request == nil {
b = []byte("invalid")
} else {
auth.Target = request.Channel
}

// Create new service
s := New(auth, store)
c := &fake.Conn{}

// Store a message
for i := 0; i < 1; i++ {
store.Store(&message.Message{
ID: message.NewID(ssid),
Channel: []byte("a/b/c/"),
Payload: []byte("hello"),
TTL: 30,
})
}

// Issue a request
response, ok := s.OnRequest(c, b)
println(response)
// Store 2 messages
firstSSID := message.NewID(ssid)
store.Store(&message.Message{
ID: firstSSID,
Channel: []byte("a/b/c/"),
Payload: []byte("hello"),
TTL: 30,
})
store.Store(&message.Message{
ID: message.NewID(ssid),
Channel: []byte("a/b/c/"),
Payload: []byte("hello"),
TTL: 30,
})
reqBytes, _ := json.Marshal(request)

// Issue the same request
response, ok := service.OnRequest(connection, reqBytes)
// The request should have succeeded and returned a response.
assert.Equal(t, true, ok)
// The response should have returned the last message as per MQTT spec.
assert.Equal(t, 1, len(response.(*Response).Messages))

store.Store(&message.Message{
ID: message.NewID(ssid),
Channel: []byte("a/b/c/"),
Payload: []byte("hello"),
TTL: 30,
})
request.Channel = "key/a/b/c/?last=2"
reqBytes, _ = json.Marshal(request)
response, ok = service.OnRequest(connection, reqBytes)
// The request should have succeeded and returned a response.
assert.Equal(t, true, ok)
// The response should have returned the last 2 messages.
assert.Equal(t, 2, len(response.(*Response).Messages))
}

// ------------------------------------------------------------------------------------
Expand Down

0 comments on commit 705518c

Please sign in to comment.