/
http_processor.go
110 lines (93 loc) · 2.83 KB
/
http_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package web
import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"github.com/annchain/BlockDB/backends"
"github.com/annchain/BlockDB/ogws"
"github.com/annchain/BlockDB/processors"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
)
type HttpListenerConfig struct {
Port int
EnableAudit bool
EnableHealth bool
MaxContentLength int64
}
type HttpListener struct {
config HttpListenerConfig
ledgerWriter backends.LedgerWriter
dataProcessor processors.DataProcessor
wg sync.WaitGroup
stopped bool
router *mux.Router
auditWriter ogws.AuditWriter
}
func (l *HttpListener) Name() string {
return "HttpListener"
}
func NewHttpListener(config HttpListenerConfig, dataProcessor processors.DataProcessor, ledgerWriter backends.LedgerWriter, auditWriter ogws.AuditWriter) *HttpListener {
if config.MaxContentLength == 0 {
config.MaxContentLength = 1e7
}
l := &HttpListener{
config: config,
ledgerWriter: ledgerWriter,
dataProcessor: dataProcessor,
router: mux.NewRouter(),
auditWriter: auditWriter,
}
if l.config.EnableAudit {
l.router.Methods("POST").Path("/audit").HandlerFunc(l.Handle)
l.router.Methods("GET", "POST").Path("/query").HandlerFunc(l.Query)
l.router.Methods("GET", "POST").Path("/queryGrammar").HandlerFunc(l.QueryGrammar)
}
l.router.Methods("GET", "POST").Path("/health").HandlerFunc(l.Health)
return l
}
func (l *HttpListener) Start() {
go l.doListen()
logrus.Info("HttpListener started")
}
func (l *HttpListener) Stop() {
l.stopped = true
logrus.Info("HttpListener stopped")
}
func (l *HttpListener) Handle(rw http.ResponseWriter, req *http.Request) {
if req.ContentLength > l.config.MaxContentLength {
http.Error(rw, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)
return
}
data, err := ioutil.ReadAll(req.Body)
if err != nil || len(data) == 0 {
http.Error(rw, "miss content", http.StatusBadRequest)
return
}
logrus.Tracef("get audit request data: %s", string(data))
events, err := l.dataProcessor.ParseCommand(data)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
for _, event := range events {
logrus.Tracef("write event to ledger: %s", event.String())
err = l.ledgerWriter.EnqueueSendToLedger(event)
if err != nil {
logrus.WithError(err).Warn("send to ledger err")
http.Error(rw, err.Error(), http.StatusInternalServerError)
}
}
logrus.Tracef("write to ledger ends, data: %s", events[0].PrimaryKey)
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
rw.Write([]byte("{}"))
}
func (l *HttpListener) Health(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusOK)
rw.Write([]byte("ok"))
}
func (l *HttpListener) doListen() {
logrus.Fatal(http.ListenAndServe(":"+fmt.Sprintf("%d", l.config.Port), l.router))
}