/
zabbix-trapper-lookup.go
249 lines (235 loc) · 6.53 KB
/
zabbix-trapper-lookup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
package forwarder
import (
"context"
"database/sql"
"github.com/bangunindo/trap2json/helper"
"github.com/bangunindo/trap2json/snmp"
"github.com/georgysavva/scany/v2/sqlscan"
_ "github.com/go-sql-driver/mysql"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"sync"
"time"
)
type QueryResult struct {
IPOrDNS string `db:"ip_or_dns"`
Hostname string `db:"hostname"`
ProxyHostname sql.NullString `db:"proxy_hostname"`
}
type LookupResult struct {
Server *ProxyConf
Hostname string
}
const hostCacheQueryPost60 = `
select case when i.useip = 1 then i.ip else i.dns end as ip_or_dns,
h.host as hostname,
coalesce(hp.host, case when z.name = '' then null else z.name end) as proxy_hostname
from hosts h
join interface i on i.hostid = h.hostid
join items i2 on i2.hostid = h.hostid
join ha_node z on z.status = 3
left join hosts hp on hp.hostid = h.proxy_hostid
where i2.key_ = $1
-- item type is Zabbix trapper
and i2.type = 2
-- host is active and monitored
and h.status = 0
-- ip of snmp interface
and i.type = 2`
const hostCacheQueryPre60 = `
select case when i.useip = 1 then i.ip else i.dns end as ip_or_dns,
h.host as hostname,
hp.host as proxy_hostname
from hosts h
join interface i on i.hostid = h.hostid
join items i2 on i2.hostid = h.hostid
left join hosts hp on hp.hostid = h.proxy_hostid
where i2.key_ = $1
-- item type is Zabbix trapper
and i2.type = 2
-- host is active and monitored
and h.status = 0
-- ip of snmp interface
and i.type = 2`
const isPost60PostgresQuery = `
select (mandatory >= 6000000)::int
from dbversion
`
const isPost60MysqlQuery = `
select mandatory >= 6000000
from dbversion
`
type ZabbixLookup struct {
conf *ZabbixTrapperConfig
cacheMutex *sync.RWMutex
logger zerolog.Logger
ctx context.Context
cacheByAddress map[string]*LookupResult
cacheByHostname map[string]*LookupResult
}
func (z *ZabbixLookup) refresh() {
z.logger.Info().Msg("starting background cache refresh")
now := time.Now()
defer func() {
dur := time.Since(now)
z.logger.Info().Str("duration", dur.String()).Msg("background cache refresh done")
}()
if driver, dsn, err := helper.ParseDSN(z.conf.Advanced.DBUrl); err != nil {
z.logger.Warn().Err(err).Msg("failed reading db_url")
} else {
db, err := sql.Open(driver, dsn)
if err != nil {
z.logger.Warn().Err(err).Msg("failed initializing db")
return
}
defer db.Close()
ctx, cancel := context.WithTimeout(z.ctx, z.conf.Advanced.DBQueryTimeout.Duration)
defer cancel()
err = db.PingContext(ctx)
if err != nil {
z.logger.Warn().Err(err).Msg("failed connecting to db")
return
}
var isPost60 int
switch driver {
case "pgx":
err = sqlscan.Get(ctx, db, &isPost60, isPost60PostgresQuery)
case "mysql":
err = sqlscan.Get(ctx, db, &isPost60, isPost60MysqlQuery)
default:
z.logger.Fatal().Msgf("unknown driver: %s", driver)
return
}
if err != nil {
z.logger.Warn().Err(err).Msg("cannot determine zabbix version")
return
}
var results []QueryResult
switch isPost60 {
case 0:
err = sqlscan.Select(ctx, db, &results, hostCacheQueryPre60, z.conf.ItemKey)
case 1:
err = sqlscan.Select(ctx, db, &results, hostCacheQueryPost60, z.conf.ItemKey)
default:
z.logger.Error().Msg("unexpected error, incorrect isPost60 result")
return
}
if err != nil {
z.logger.Warn().Err(err).Msg("failed executing lookup query")
return
}
cacheByAddress := make(map[string]*LookupResult)
cacheByHostname := make(map[string]*LookupResult)
for _, r := range results {
lookupResult := LookupResult{
Hostname: r.Hostname,
}
if proxy, ok := z.conf.Advanced.getProxy(r.ProxyHostname.String); ok && r.ProxyHostname.Valid {
lookupResult.Server = &proxy
}
cacheByAddress[r.IPOrDNS] = &lookupResult
cacheByHostname[r.Hostname] = &lookupResult
}
z.cacheMutex.Lock()
z.cacheByAddress = cacheByAddress
z.cacheByHostname = cacheByHostname
z.cacheMutex.Unlock()
}
}
func (z *ZabbixLookup) lookupByAddress(addr string) (LookupResult, error) {
z.cacheMutex.RLock()
defer z.cacheMutex.RUnlock()
if r, ok := z.cacheByAddress[addr]; ok {
return *r, nil
} else {
return LookupResult{}, errors.New("address lookup failed")
}
}
func (z *ZabbixLookup) lookupByHostname(host string) (LookupResult, error) {
z.cacheMutex.RLock()
defer z.cacheMutex.RUnlock()
if r, ok := z.cacheByHostname[host]; ok {
return *r, nil
} else {
return LookupResult{}, errors.New("host lookup failed")
}
}
func (z *ZabbixLookup) Lookup(m *snmp.Message, strategy LookupStrategy) (LookupResult, error) {
if z.conf.Advanced != nil {
switch strategy {
case LookupFromOID:
for _, v := range m.Payload.Values {
if v.HasOIDPrefix(z.conf.OIDLookup) {
if vStr, ok := v.Value.(string); ok {
return z.lookupByHostname(vStr)
}
}
}
case LookupFromAgentAddress:
if m.Payload.AgentAddress != nil {
return z.lookupByAddress(*m.Payload.AgentAddress)
}
case LookupFromSourceAddress:
if m.Payload.SrcAddress != "" {
return z.lookupByAddress(m.Payload.SrcAddress)
}
}
} else {
switch strategy {
case LookupFromOID:
for _, v := range m.Payload.Values {
if v.HasOIDPrefix(z.conf.OIDLookup) {
if vStr, ok := v.Value.(string); ok {
return LookupResult{
Hostname: vStr,
}, nil
}
}
}
case LookupFromAgentAddress:
if m.Payload.AgentAddress != nil {
return LookupResult{
Hostname: *m.Payload.AgentAddress,
}, nil
}
case LookupFromSourceAddress:
if m.Payload.SrcAddress != "" {
return LookupResult{
Hostname: m.Payload.SrcAddress,
}, nil
}
}
}
return LookupResult{}, errors.New("lookup failed")
}
func (z *ZabbixLookup) Refresh() {
for {
select {
case <-time.After(z.conf.Advanced.DBRefreshInterval.Duration):
z.refresh()
case <-z.ctx.Done():
return
}
}
}
func NewZabbixLookup(
c *ZabbixTrapperConfig,
logger zerolog.Logger,
ctx context.Context,
) *ZabbixLookup {
zLookup := &ZabbixLookup{
conf: c,
cacheMutex: new(sync.RWMutex),
logger: logger,
ctx: ctx,
cacheByAddress: make(map[string]*LookupResult),
cacheByHostname: make(map[string]*LookupResult),
}
if c.Advanced != nil {
c.Advanced.initProxyMap()
zLookup.refresh()
go zLookup.Refresh()
}
return zLookup
}