/
elasticsearch.go
209 lines (178 loc) · 4.6 KB
/
elasticsearch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package logstasher
import (
"bytes"
"crypto/rand"
"encoding/base64"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"sync"
"time"
)
// ID is a 128-bit ID for an elasticsearch document.
// Textually, it is base64-encoded.
// The Next() method increments the ID.
type ID struct {
High uint64
Low uint64
}
// Text converts the ID into a base64 string.
func (id ID) String() string {
var buf bytes.Buffer
buf.Grow(21)
enc := base64.NewEncoder(base64.StdEncoding, &buf)
var bytes [16]byte
binary.LittleEndian.PutUint64(bytes[0:8], id.High)
binary.LittleEndian.PutUint64(bytes[8:16], id.Low)
enc.Write(bytes[:])
enc.Close()
return buf.String()
}
// Next increments the ID and returns the prior state.
// Overflow is not checked because it's a uint64, do you really expect me to overflow that
func (id *ID) Next() ID {
ret := ID{
High: id.High,
Low: id.Low,
}
id.Low++
return ret
}
var idPool = sync.Pool{New: func() interface{} {
var bytes [16]byte
n, err := rand.Reader.Read(bytes[:])
if n != 16 || err != nil {
panic(fmt.Errorf("Short read from crypto/rand: %v", err))
}
return &ID{
High: binary.LittleEndian.Uint64(bytes[0:8]),
Low: binary.LittleEndian.Uint64(bytes[8:16]),
}
}}
func ExampleID_Next() {
id := idPool.Get().(*ID).Next()
fmt.Println(id)
idPool.Put(id)
}
// Report is the interface presented to the Submit() function.
// FillReport() is satisfied by ReportBasic, but ReportType must always be specified.
type Report interface {
FillReport() error
ReportType() string
GetID() string
GetTimestamp() time.Time
}
// ReportBasic is the essential fields of any report.
type ReportBasic struct {
ID string
Timestamp time.Time
Host string
}
// FillReport sets the Host and Timestamp fields.
func (report *ReportBasic) FillReport() error {
report.Host = hostMarker
report.Timestamp = time.Now()
id := idPool.Get().(*ID).Next()
report.ID = id.String()
idPool.Put(id)
return nil
}
func (report *ReportBasic) GetID() string {
return report.ID
}
func (report *ReportBasic) GetTimestamp() time.Time {
return report.Timestamp
}
type ConnectionReport struct {
ReportBasic
ConnectTime time.Time
DisconnectTime time.Time
// calculated
ConnectionDuration time.Duration
DisconnectCode int
DisconnectReason string
UsernameWasValidated bool
RemoteAddr net.Addr `json:"-"` // not transmitted until I can figure out data minimization
TwitchUsername string `json:"-"` // also not transmitted
}
// FillReport sets all the calculated fields, and calls esReportBasic.FillReport().
func (report *ConnectionReport) FillReport() error {
report.ReportBasic.FillReport()
report.ConnectionDuration = report.DisconnectTime.Sub(report.ConnectTime)
return nil
}
func (report *ConnectionReport) ReportType() string {
return "conn"
}
var serverPresent bool
var esClient http.Client
var submitChan chan Report
var serverBase, indexPrefix, hostMarker string
func checkServerPresent() {
if serverBase == "" {
serverBase = "http://localhost:9200"
}
if indexPrefix == "" {
indexPrefix = "sockreport"
}
urlHealth := fmt.Sprintf("%s/_cluster/health", serverBase)
resp, err := esClient.Get(urlHealth)
if err == nil {
resp.Body.Close()
serverPresent = true
submitChan = make(chan Report, 8)
fmt.Println("elasticsearch reports enabled")
go submissionWorker()
} else {
serverPresent = false
}
}
// Setup sets up the global variables for the package.
func Setup(ESServer, ESIndexPrefix, ESHostname string) {
serverBase = ESServer
indexPrefix = ESIndexPrefix
hostMarker = ESHostname
checkServerPresent()
}
// Submit inserts a report into elasticsearch (this is basically a manual logstash).
func Submit(report Report) {
if !serverPresent {
return
}
report.FillReport()
submitChan <- report
}
func submissionWorker() {
for report := range submitChan {
time := report.GetTimestamp()
rType := report.ReportType()
// prefix-type-date
indexName := fmt.Sprintf("%s-%s-%d-%d-%d", indexPrefix, rType, time.Year(), time.Month(), time.Day())
// base/index/type/id
putUrl, err := url.Parse(fmt.Sprintf("%s/%s/%s/%s", serverBase, indexName, rType, report.GetID()))
if err != nil {
panic(fmt.Errorf("logstash: cannot parse url: %v", err))
}
body, err := json.Marshal(report)
if err != nil {
panic(fmt.Errorf("logstash: cannot marshal json: %v", err))
}
req := &http.Request{
Method: "PUT",
URL: putUrl,
Body: ioutil.NopCloser(bytes.NewReader(body)),
}
resp, err := esClient.Do(req)
if err != nil {
// ignore, the show must go on
} else {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}
}
}