Skip to content

Commit

Permalink
offset sorter
Browse files Browse the repository at this point in the history
supports three domains: claimed, received and feed sequence(*)

also supports createFeedStream and messagesByType sorting by claimed timestamp

*: this will become important later once feeds are received out of
order.
  • Loading branch information
cryptix committed Nov 13, 2020
1 parent b068ca4 commit 61c5c03
Show file tree
Hide file tree
Showing 16 changed files with 904 additions and 433 deletions.
3 changes: 2 additions & 1 deletion cmd/go-sbot/main.go
Expand Up @@ -414,7 +414,8 @@ func runSbot() error {
time.Sleep(1 * time.Second)
select {
case <-ctx.Done():
return nil
err := sbot.Close()
return err
default:
}
}
Expand Down
7 changes: 5 additions & 2 deletions cmd/sbotcli/main.go
Expand Up @@ -82,8 +82,10 @@ var app = cli.App{
blockCmd,
friendsCmd,
logStreamCmd,
sortedStreamCmd,
typeStreamCmd,
historyStreamCmd,
partialStreamCmd,
replicateUptoCmd,
callCmd,
connectCmd,
Expand Down Expand Up @@ -111,7 +113,6 @@ func check(err error) {
}

func main() {

cli.VersionPrinter = func(c *cli.Context) {
fmt.Printf("%s (rev: %s, built: %s)\n", c.App.Version, Version, Build)
}
Expand All @@ -127,7 +128,7 @@ func todo(ctx *cli.Context) error {

func initClient(ctx *cli.Context) error {
longctx = context.Background()
longctx, shutdownFunc = context.WithTimeout(longctx, 10*time.Second)
longctx, shutdownFunc = context.WithTimeout(longctx, 45*time.Second)
signalc := make(chan os.Signal)
signal.Notify(signalc, os.Interrupt, syscall.SIGTERM)
go func() {
Expand Down Expand Up @@ -199,6 +200,8 @@ func getStreamArgs(ctx *cli.Context) message.CreateHistArgs {
AsJSON: ctx.Bool("asJSON"),
}
args.Limit = ctx.Int64("limit")
args.Gt = ctx.Int64("gt")
args.Lt = ctx.Int64("lt")
args.Reverse = ctx.Bool("reverse")
args.Live = ctx.Bool("live")
args.Keys = ctx.Bool("keys")
Expand Down
47 changes: 46 additions & 1 deletion cmd/sbotcli/streams.go
Expand Up @@ -9,6 +9,8 @@ import (
"io"
"os"

"go.cryptoscope.co/ssb/message"

"github.com/pkg/errors"
"go.cryptoscope.co/luigi"
"go.cryptoscope.co/muxrpc"
Expand All @@ -19,6 +21,8 @@ import (
var streamFlags = []cli.Flag{
&cli.IntFlag{Name: "limit", Value: -1},
&cli.IntFlag{Name: "seq", Value: 0},
&cli.IntFlag{Name: "gt"},
&cli.IntFlag{Name: "lt"},
&cli.BoolFlag{Name: "reverse"},
&cli.BoolFlag{Name: "live"},
&cli.BoolFlag{Name: "keys", Value: false},
Expand All @@ -27,7 +31,7 @@ var streamFlags = []cli.Flag{

type mapMsg map[string]interface{}

var typeStreamCmd = &cli.Command{
var partialStreamCmd = &cli.Command{
Name: "partial",
Flags: append(streamFlags, &cli.StringFlag{Name: "id"}, &cli.BoolFlag{Name: "asJSON"}),
Action: func(ctx *cli.Context) error {
Expand Down Expand Up @@ -111,6 +115,47 @@ var logStreamCmd = &cli.Command{
},
}

var sortedStreamCmd = &cli.Command{
Name: "sorted",
Flags: streamFlags,
Action: func(ctx *cli.Context) error {
client, err := newClient(ctx)
if err != nil {
return err
}

var args = getStreamArgs(ctx)
src, err := client.Source(longctx, mapMsg{}, muxrpc.Method{"createFeedStream"}, args)
if err != nil {
return errors.Wrap(err, "source stream call failed")
}
err = luigi.Pump(longctx, jsonDrain(os.Stdout), src)
return errors.Wrap(err, "log failed")
},
}

var typeStreamCmd = &cli.Command{
Name: "bytype",
Flags: streamFlags,
Action: func(ctx *cli.Context) error {
client, err := newClient(ctx)
if err != nil {
return err
}
var targs message.MessagesByTypeArgs
arg := getStreamArgs(ctx)
targs.CommonArgs = arg.CommonArgs
targs.StreamArgs = arg.StreamArgs
targs.Type = ctx.Args().First()
src, err := client.Source(longctx, mapMsg{}, muxrpc.Method{"messagesByType"}, targs)
if err != nil {
return errors.Wrap(err, "source stream call failed")
}
err = luigi.Pump(longctx, jsonDrain(os.Stdout), src)
return errors.Wrap(err, "byType failed")
},
}

var privateReadCmd = &cli.Command{
Name: "read",
Flags: streamFlags,
Expand Down
13 changes: 7 additions & 6 deletions message/requests.go
Expand Up @@ -49,12 +49,8 @@ func NewCreateHistArgsFromMap(argMap map[string]interface{}) (*CreateHistArgs, e
if err != nil {
return nil, errors.Wrapf(err, "ssb/message: not a feed ref")
}

// TODO:
// case "type":
// qry.Type = val
}
case "seq", "limit":
case "seq", "limit", "gt", "lt":
n, ok := v.(float64)
if !ok {
return nil, errors.Errorf("ssb/message: not a float64(%T) for %s", v, k)
Expand All @@ -64,6 +60,10 @@ func NewCreateHistArgsFromMap(argMap map[string]interface{}) (*CreateHistArgs, e
qry.Seq = int64(n)
case "limit":
qry.Limit = int64(n)
case "gt":
qry.Gt = int64(n)
case "lt":
qry.Lt = int64(n)
}
}
}
Expand Down Expand Up @@ -94,7 +94,8 @@ type CommonArgs struct {
type StreamArgs struct {
Limit int64 `json:"limit,omitempty"`

Gt int64 `json:"gt"`
Gt int64 `json:"gt,omitempty"`
Lt int64 `json:"lt,omitempty"`

Reverse bool `json:"reverse,omitempty"`
}
Expand Down
30 changes: 27 additions & 3 deletions multilogs/combined.go
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"sync"
"time"

"github.com/keks/persist"
"github.com/pkg/errors"
Expand All @@ -27,8 +28,9 @@ import (
// Compared to the "old" fatbot approach of just having 4 independant indexes,
// this one updates all 4 of them, resulting in less read-overhead
// while also being able to index private massages by tangle and type.
func NewCombinedIndex(repoPath string, box *private.Manager, self *refs.FeedRef, u, p, bt, tan *roaring.MultiLog) (librarian.SinkIndex, error) {
statePath := repo.New(repoPath).GetPath(repo.PrefixMultiLog, "combined-state.json")
func NewCombinedIndex(repoPath string, box *private.Manager, self *refs.FeedRef, res *repo.SequenceResolver, u, p, bt, tan *roaring.MultiLog) (*combinedIndex, error) {
r := repo.New(repoPath)
statePath := r.GetPath(repo.PrefixMultiLog, "combined-state.json")
mode := os.O_RDWR | os.O_EXCL
if _, err := os.Stat(statePath); os.IsNotExist(err) {
mode |= os.O_CREATE
Expand All @@ -48,12 +50,16 @@ func NewCombinedIndex(repoPath string, box *private.Manager, self *refs.FeedRef,
byType: bt,
tangles: tan,

seqresolver: res,

file: idxStateFile,
l: &sync.Mutex{},
}
return idx, nil
}

var _ librarian.SinkIndex = (*combinedIndex)(nil)

type combinedIndex struct {
self *refs.FeedRef
boxer *private.Manager
Expand All @@ -63,6 +69,8 @@ type combinedIndex struct {
byType *roaring.MultiLog
tangles *roaring.MultiLog

seqresolver *repo.SequenceResolver

file *os.File
l *sync.Mutex
}
Expand All @@ -88,6 +96,10 @@ func (slog *combinedIndex) Pour(ctx context.Context, swv interface{}) error {

if isNulled, ok := v.(error); ok {
if margaret.IsErrNulled(isNulled) {
err = slog.seqresolver.Append(seq.Seq(), 0, time.Now(), time.Now())
if err != nil {
return errors.Wrap(err, "error updating sequence resolver (nulled message)")
}
return nil
}
return isNulled
Expand All @@ -98,6 +110,11 @@ func (slog *combinedIndex) Pour(ctx context.Context, swv interface{}) error {
return errors.Errorf("error casting message. got type %T", v)
}

err = slog.seqresolver.Append(seq.Seq(), abstractMsg.Seq(), abstractMsg.Claimed(), abstractMsg.Received())
if err != nil {
return errors.Wrap(err, "error updating sequence resolver")
}

author := abstractMsg.Author()

authorLog, err := slog.users.Get(author.StoredAddr())
Expand Down Expand Up @@ -188,7 +205,9 @@ func (slog *combinedIndex) Pour(ctx context.Context, swv interface{}) error {
}

// Close does nothing.
func (slog *combinedIndex) Close() error { return nil }
func (slog *combinedIndex) Close() error {
return nil
}

// QuerySpec returns the query spec that queries the next needed messages from the log
func (slog *combinedIndex) QuerySpec() margaret.QuerySpec {
Expand All @@ -205,6 +224,11 @@ func (slog *combinedIndex) QuerySpec() margaret.QuerySpec {
seq = margaret.SeqEmpty
}

if resN := slog.seqresolver.Seq() - 1; resN != seq.Seq() {
err := fmt.Errorf("combined idx (has:%d, will: %d)", resN, seq.Seq())
return margaret.ErrorQuerySpec(err)
}

return margaret.MergeQuerySpec(
margaret.Gt(seq),
margaret.SeqWrap(true),
Expand Down
21 changes: 17 additions & 4 deletions plugins/gossip/feed_manager.go
Expand Up @@ -225,12 +225,25 @@ func (m *FeedManager) CreateStreamHistory(

// Make query
limit := nonliveLimit(arg, latest)
resolved := mutil.Indirect(m.ReceiveLog, userLog)
src, err := resolved.Query(
margaret.Gte(margaret.BaseSeq(arg.Seq)),
qryArgs := []margaret.QuerySpec{
margaret.Limit(int(limit)),
margaret.Reverse(arg.Reverse),
)
}

if arg.Seq > 0 {
qryArgs = append(qryArgs, margaret.Gte(margaret.BaseSeq(arg.Seq)))
}

if arg.Lt > 0 {
qryArgs = append(qryArgs, margaret.Lt(margaret.BaseSeq(arg.Lt)))
}

if arg.Gt > 0 {
qryArgs = append(qryArgs, margaret.Gt(margaret.BaseSeq(arg.Gt)))
}

resolved := mutil.Indirect(m.ReceiveLog, userLog)
src, err := resolved.Query(qryArgs...)
if err != nil {
return errors.Wrapf(err, "invalid user log query")
}
Expand Down
3 changes: 1 addition & 2 deletions plugins/gossip/handler.go
Expand Up @@ -220,8 +220,7 @@ func (g *handler) HandleCall(
// } else {
// dbgLog.Log("msg", "feed access granted")
}
// query.Limit = 50
// spew.Dump(query)

err = g.feedManager.CreateStreamHistory(ctx, req.Stream, query)
if err != nil {
if luigi.IsEOS(err) {
Expand Down

0 comments on commit 61c5c03

Please sign in to comment.