/
main.go
95 lines (84 loc) · 2.19 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/bartke/tributary"
"github.com/bartke/tributary/event/standardevent"
"github.com/bartke/tributary/example/advanced/event"
"github.com/bartke/tributary/filter/gormfilter"
"github.com/bartke/tributary/module"
"github.com/bartke/tributary/network"
"github.com/bartke/tributary/runtime"
"github.com/bartke/tributary/sink/handler"
"github.com/bartke/tributary/window/gormwindow"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/schema"
)
func main() {
mysqlHost := "localhost"
if len(os.Args) > 1 {
mysqlHost = os.Args[1]
}
db := mysql.Open(fmt.Sprintf("root:root@tcp(%s:3306)/tb_test", mysqlHost))
gormCfg := &gorm.Config{
// performance
SkipDefaultTransaction: true,
PrepareStmt: true,
// isolate multiple instances on the same DB
NamingStrategy: schema.NamingStrategy{
TablePrefix: "example_",
},
}
window, err := gormwindow.New(db, gormCfg, standardevent.New,
&event.Bet{},
&event.Selection{})
if err != nil {
log.Fatal(err)
}
deduper, err := gormfilter.New(db, gormCfg, standardevent.New)
if err != nil {
log.Fatal(err)
}
// create network and register nodes
n := network.New()
n.AddNode("streaming_ingest", NewStream())
n.AddNode("liability_printer", handler.New(out))
n.AddNode("stake_printer", handler.New(out))
// we can print the sources available
fmt.Println("unconnected:")
fmt.Println(tributary.Graphviz(n))
m := module.New(n)
m.AddWindowExports(window, &event.Bet{})
m.AddFilterExport(deduper)
r := runtime.New()
r.LoadModule(m.Loader)
err = r.Run("./network.lua")
if err != nil {
log.Fatal(err)
}
// add another script, see if it compiles first. Note that this script depends on the first and
// can only be loaded after. Otherwise it will error out.
bc, err := r.Compile("./network2.lua")
if err != nil {
log.Fatal(err)
}
if err := r.Execute(bc); err != nil {
log.Fatal(err)
}
fmt.Println("connected:")
fmt.Println(tributary.Graphviz(n))
n.Start()
log.Println("running")
<-time.After(20 * time.Second)
log.Println("stopping")
n.Stop()
log.Println("stopped.")
// blocking wait
select {}
}
func out(e tributary.Event) {
fmt.Println(string(e.Payload()))
}