Skip to content

Commit

Permalink
Adds the new history API (#414)
Browse files Browse the repository at this point in the history
  • Loading branch information
Florimond committed Jan 28, 2024
1 parent 374de74 commit 4418034
Show file tree
Hide file tree
Showing 13 changed files with 305 additions and 33 deletions.
2 changes: 2 additions & 0 deletions internal/broker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/emitter-io/emitter/internal/security"
"github.com/emitter-io/emitter/internal/security/license"
"github.com/emitter-io/emitter/internal/service/cluster"
"github.com/emitter-io/emitter/internal/service/history"
"github.com/emitter-io/emitter/internal/service/keyban"
"github.com/emitter-io/emitter/internal/service/keygen"
"github.com/emitter-io/emitter/internal/service/link"
Expand Down Expand Up @@ -189,6 +190,7 @@ func NewService(ctx context.Context, cfg *config.Config) (s *Service, err error)
s.pubsub.Handle("keyban", keyban.New(s, s.keygen, s.cluster).OnRequest)
s.pubsub.Handle("link", link.New(s, s.pubsub).OnRequest)
s.pubsub.Handle("me", me.New().OnRequest)
s.pubsub.Handle("history", history.New(s, s.storage).OnRequest)

// Addresses and things
logging.LogTarget("service", "configured node name", nodeName)
Expand Down
8 changes: 4 additions & 4 deletions internal/provider/storage/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestInMemory_Query(t *testing.T) {
})
}

out, err := s.Query(tc.query, zero, zero, tc.limit)
out, err := s.Query(tc.query, zero, zero, nil, tc.limit)
assert.NoError(t, err)

count := 0
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestInMemory_lookup(t *testing.T) {
}

for _, tc := range tests {
matches := s.lookup(newLookupQuery(tc.query, zero, zero, tc.limit))
matches := s.lookup(newLookupQuery(tc.query, zero, zero, nil, tc.limit))
assert.Equal(t, tc.count, len(matches))
}
}
Expand All @@ -172,13 +172,13 @@ func TestInMemory_OnSurvey(t *testing.T) {
{name: "ssdstore"},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 1),
expectOk: true,
expectCount: 1,
},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 10),
expectOk: true,
expectCount: 2,
},
Expand Down
18 changes: 14 additions & 4 deletions internal/provider/storage/ssd.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ func encodeFrame(msgs message.Frame) []*badger.Entry {
// Query performs a query and attempts to fetch last n messages where
// n is specified by limit argument. From and until times can also be specified
// for time-series retrieval.
func (s *SSD) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) {
func (s *SSD) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) {

// Construct a query and lookup locally first
query := newLookupQuery(ssid, from, until, limit)
query := newLookupQuery(ssid, from, until, startFromID, limit)
match := s.lookup(query)

// Issue the message survey to the cluster
Expand Down Expand Up @@ -184,11 +184,21 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) {

// Since we're starting backwards, seek to the 'until' position first and then
// we'll iterate forward but have reverse time ('until' -> 'from')
prefix := message.NewPrefix(q.Ssid, q.Until)
var prefix message.ID
if len(q.StartFromID) == 0 {
prefix = message.NewPrefix(q.Ssid, q.Until)
it.Seek(prefix)
} else {
it.Seek(q.StartFromID)
if !it.Valid() {
return nil
}
it.Next()
}

matchesSize := 0
// Seek the prefix and check the key so we can quickly exit the iteration.
for it.Seek(prefix); it.Valid() &&
for ; it.Valid() &&
message.ID(it.Item().Key()).HasPrefix(q.Ssid, q.From) &&
len(matches) < q.Limit; it.Next() {
if !message.ID(it.Item().Key()).Match(q.Ssid, q.From, q.Until) {
Expand Down
16 changes: 11 additions & 5 deletions internal/provider/storage/ssd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestSSD_Query(t *testing.T) {
assert.NoError(t, err)

zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, 5)
f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, nil, 5)
assert.NoError(t, err)
assert.Len(t, f, 1)
})
Expand All @@ -83,6 +83,12 @@ func TestSSD_QueryOrdered(t *testing.T) {
})
}

func TestSSD_QueryStartFromID(t *testing.T) {
runSSDTest(func(store *SSD) {
testStartFromID(t, store)
})
}

func TestSSD_MaxResponseSizeReached(t *testing.T) {
runSSDTest(func(store *SSD) {
testMaxResponseSizeReached(t, store)
Expand Down Expand Up @@ -127,7 +133,7 @@ func TestSSD_QuerySurveyed(t *testing.T) {
})
}

out, err := s.Query(tc.query, zero, zero, tc.limit)
out, err := s.Query(tc.query, zero, zero, nil, tc.limit)
assert.NoError(t, err)
count := 0
for range out {
Expand All @@ -152,13 +158,13 @@ func TestSSD_OnSurvey(t *testing.T) {
{name: "ssdstore"},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 1),
expectOk: true,
expectCount: 1,
},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 10),
expectOk: true,
expectCount: 2,
},
Expand Down Expand Up @@ -322,7 +328,7 @@ func benchmarkQuery(b *testing.B, store *SSD, last int, m *stats.Metric) {
return

default:
store.Query(ssid, t0, t1, last)
store.Query(ssid, t0, t1, nil, last)
m.Update(int32(last))
}
}
Expand Down
24 changes: 13 additions & 11 deletions internal/provider/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Storage interface {
// Query performs a query and attempts to fetch last n messages where
// n is specified by limit argument. From and until times can also be specified
// for time-series retrieval.
Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error)
Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error)
}

// ------------------------------------------------------------------------------------
Expand All @@ -65,20 +65,22 @@ func window(from, until time.Time) (int64, int64) {

// The lookup query to send out to the cluster.
type lookupQuery struct {
Ssid message.Ssid // The ssid to match.
From int64 // The beginning of the time window.
Until int64 // The end of the time window.
Limit int // The maximum number of elements to return.
Ssid message.Ssid // The ssid to match.
From int64 // The beginning of the time window.
Until int64 // The end of the time window.
StartFromID message.ID // The ID to start from when retrieving message, used for pagination.
Limit int // The maximum number of elements to return.
}

// newLookupQuery creates a new lookup query
func newLookupQuery(ssid message.Ssid, from, until time.Time, limit int) lookupQuery {
func newLookupQuery(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) lookupQuery {
t0, t1 := window(from, until)
return lookupQuery{
Ssid: ssid,
From: t0,
Until: t1,
Limit: limit,
Ssid: ssid,
From: t0,
Until: t1,
StartFromID: startFromID,
Limit: limit,
}
}

Expand Down Expand Up @@ -128,7 +130,7 @@ func (s *Noop) Store(m *message.Message) error {
// Query performs a query and attempts to fetch last n messages where
// n is specified by limit argument. From and until times can also be specified
// for time-series retrieval.
func (s *Noop) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) {
func (s *Noop) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) {
return nil, nil
}

Expand Down
41 changes: 36 additions & 5 deletions internal/provider/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestNoop_Store(t *testing.T) {
func TestNoop_Query(t *testing.T) {
s := new(Noop)
zero := time.Unix(0, 0)
r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, 10)
r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, nil, 10)
assert.NoError(t, err)
for range r {
t.Errorf("Should be empty")
Expand Down Expand Up @@ -81,7 +81,7 @@ func testOrder(t *testing.T, store Storage) {

// Issue a query
zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 5)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 5)
assert.NoError(t, err)

assert.Len(t, f, 5)
Expand All @@ -104,7 +104,7 @@ func testRetained(t *testing.T, store Storage) {

// Issue a query
zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 1)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 1)
assert.NoError(t, err)

assert.Len(t, f, 1)
Expand All @@ -127,7 +127,7 @@ func testRange(t *testing.T, store Storage) {
}

// Issue a query
f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), 5)
f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), nil, 5)
assert.NoError(t, err)

assert.Len(t, f, 5)
Expand All @@ -153,6 +153,37 @@ func Test_configUint32(t *testing.T) {
assert.Equal(t, uint32(99999999), v)
}

// Test the StartFromID option for pagination purposes.
func testStartFromID(t *testing.T, store Storage) {
var fourth message.ID
for i := int64(0); i < 10; i++ {
payload := make([]byte, 1)
payload[0] = byte(i)
msg := message.New(message.Ssid{0, 1, 2}, []byte("a/b/c/"), payload)
msg.TTL = message.RetainedTTL
msg.ID.SetTime(msg.ID.Time() + (i * 10000))
assert.NoError(t, store.Store(msg))
if i == 4 {
fourth = msg.ID
}
}

// Issue a query, starting at the fourth message ID and going back 2.
zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, fourth, 2)
assert.NoError(t, err)

assert.Len(t, f, 2)
assert.Equal(t, 2, int(f[0].Payload[0]))
assert.Equal(t, 3, int(f[1].Payload[0]))

// Issue a query, starting at the first message ID and going back 2.
f, err = store.Query([]uint32{0, 1, 2}, zero, zero, f[0].ID, 2)
assert.NoError(t, err)
assert.Equal(t, 0, int(f[0].Payload[0]))
assert.Equal(t, 1, int(f[1].Payload[0]))
}

func testMaxResponseSizeReached(t *testing.T, store Storage) {
for i := int64(0); i < 10; i++ {
payload := make([]byte, mqtt.MaxMessageSize/5)
Expand All @@ -163,7 +194,7 @@ func testMaxResponseSizeReached(t *testing.T, store Storage) {
}

zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 10)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 10)
assert.NoError(t, err)

assert.Len(t, f, 4)
Expand Down
95 changes: 95 additions & 0 deletions internal/service/history/history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**********************************************************************************
* Copyright (c) 2009-2020 Misakai Ltd.
* This program is free software: you can redistribute it and/or modify it under the
* terms of the GNU Affero General Public License as published by the Free Software
* Foundation, either version 3 of the License, or(at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License along
* with this program. If not, see<http://www.gnu.org/licenses/>.
************************************************************************************/

package history

import (
"encoding/json"

"github.com/emitter-io/emitter/internal/errors"
"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/provider/logging"
"github.com/emitter-io/emitter/internal/security"
"github.com/emitter-io/emitter/internal/service"
)

// Request represents a historical messages request.
type Request struct {
Key string `json:"key"` // The channel key for this request.
Channel string `json:"channel"` // The target channel for this request.
StartFromID message.ID `json:"startFromID,omitempty"`
}

type Message struct {
ID message.ID `json:"id"`
Topic string `json:"topic"` // The channel of the message
Payload string `json:"payload"` // The payload of the message
}
type Response struct {
Request uint16 `json:"req,omitempty"` // The corresponding request ID.
Messages []Message `json:"messages"` // The history of messages.
}

// ForRequest sets the request ID in the response for matching
func (r *Response) ForRequest(id uint16) {
r.Request = id
}

// OnRequest handles a request of historical messages.
func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, bool) {
var request Request
if err := json.Unmarshal(payload, &request); err != nil {
return errors.ErrBadRequest, false
}

channel := security.ParseChannel([]byte(request.Channel))
if channel.ChannelType == security.ChannelInvalid {
return errors.ErrBadRequest, false
}

// Check the authorization and permissions
_, key, allowed := s.auth.Authorize(channel, security.AllowLoad)
if !allowed {
return errors.ErrUnauthorized, false
}

// Use limit = 1 if not specified, otherwise use the limit option. The limit now
// defaults to one as per MQTT spec we always need to send retained messages.
limit := int64(1)
if v, ok := channel.Last(); ok {
limit = v
}

ssid := message.NewSsid(key.Contract(), channel.Query)
t0, t1 := channel.Window() // Get the window

msgs, err := s.store.Query(ssid, t0, t1, request.StartFromID, int(limit))
if err != nil {
logging.LogError("conn", "query last messages", err)
return errors.ErrServerError, false
}

resp := &Response{
Messages: make([]Message, 0, len(msgs)),
}
for _, m := range msgs {
msg := m
resp.Messages = append(resp.Messages, Message{
ID: msg.ID,
Topic: string(msg.Channel), // The channel for this message.
Payload: string(msg.Payload), // The payload for this message.
})
}
return resp, true
}
Loading

0 comments on commit 4418034

Please sign in to comment.