-
Notifications
You must be signed in to change notification settings - Fork 26
/
rxlog.go
108 lines (91 loc) · 2.5 KB
/
rxlog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// SPDX-License-Identifier: MIT
package rawread
import (
"context"
"encoding/json"
"fmt"
"os"
"go.cryptoscope.co/luigi"
"go.cryptoscope.co/margaret"
"go.cryptoscope.co/muxrpc/v2"
"go.cryptoscope.co/ssb"
"go.cryptoscope.co/ssb/internal/transform"
"go.cryptoscope.co/ssb/message"
)
// ~> sbot createLogStream --help
// (log) Fetch messages ordered by the time received.
// log [--live] [--gt index] [--gte index] [--lt index] [--lte index] [--reverse] [--keys] [--values] [--limit n]
type rxLogPlug struct {
h muxrpc.Handler
}
func NewRXLog(rootLog margaret.Log) ssb.Plugin {
plug := &rxLogPlug{}
plug.h = rxLogHandler{
root: rootLog,
}
return plug
}
func (lt rxLogPlug) Name() string { return "createLogStream" }
func (rxLogPlug) Method() muxrpc.Method {
return muxrpc.Method{"createLogStream"}
}
func (lt rxLogPlug) Handler() muxrpc.Handler {
return lt.h
}
type rxLogHandler struct {
root margaret.Log
}
func (rxLogHandler) Handled(m muxrpc.Method) bool { return m.String() == "createLogStream" }
func (g rxLogHandler) HandleConnect(ctx context.Context, e muxrpc.Endpoint) {}
func (g rxLogHandler) HandleCall(ctx context.Context, req *muxrpc.Request) {
// fmt.Fprintln(os.Stderr, "createLogStream args:", string(req.RawArgs))
var qry message.CreateLogArgs
var args []message.CreateLogArgs
err := json.Unmarshal(req.RawArgs, &args)
if err != nil {
fmt.Fprintln(os.Stderr, "createLogStream err:", err)
req.CloseWithError(fmt.Errorf("bad request data: %w", err))
return
}
if len(args) == 1 {
qry = args[0]
} else {
// Defaults for no arguments
qry.Keys = true
qry.Limit = -1
}
// empty query doesn't make much sense...
if qry.Limit == 0 {
qry.Limit = -1
}
// only return message keys
// qry.Values = true
if qry.Gt == -1 {
qry.Seq = int64(g.root.Seq()) - 1
}
// start := time.Now()
src, err := g.root.Query(
margaret.SeqWrap(false),
margaret.Gte(int64(qry.Seq)),
margaret.Limit(int(qry.Limit)),
margaret.Live(qry.Live),
margaret.Reverse(qry.Reverse),
)
if err != nil {
req.CloseWithError(fmt.Errorf("logStream: failed to qry tipe: %w", err))
return
}
snk, err := req.ResponseSink()
if err != nil {
req.CloseWithError(err)
return
}
err = luigi.Pump(ctx, transform.NewKeyValueWrapper(snk, qry.Keys), src)
if err != nil {
fmt.Fprintln(os.Stderr, "createLogStream err:", err)
req.CloseWithError(fmt.Errorf("logStream: failed to pump msgs: %w", err))
return
}
snk.Close()
// fmt.Fprintln(os.Stderr, "createLogStream closed:", err, "after:", time.Since(start))
}