-
Notifications
You must be signed in to change notification settings - Fork 1k
/
GeoIpResolverFilter.java
175 lines (144 loc) · 6.48 KB
/
GeoIpResolverFilter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package org.graylog.plugins.map.geoip.filter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.collect.Lists;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.record.Location;
import org.graylog.plugins.map.config.GeoIpResolverConfig;
import org.graylog2.cluster.ClusterConfigChangedEvent;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.cluster.ClusterConfigService;
import org.graylog2.plugin.filters.MessageFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Named;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.codahale.metrics.MetricRegistry.name;
import static com.google.common.base.Strings.isNullOrEmpty;
public class GeoIpResolverFilter implements MessageFilter {
private static final Logger LOG = LoggerFactory.getLogger(GeoIpResolverFilter.class);
// TODO: Match also IPv6 addresses
private static final Pattern IP_PATTERN = Pattern.compile("(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3})");
private final ClusterConfigService clusterConfigService;
private final ScheduledExecutorService scheduler;
private final MetricRegistry metricRegistry;
private final AtomicReference<GeoIpResolverConfig> config;
private final AtomicReference<FilterEngine> filterEngine;
@Inject
public GeoIpResolverFilter(ClusterConfigService clusterConfigService,
@Named("daemonScheduler") ScheduledExecutorService scheduler,
@ClusterEventBus EventBus clusterEventBus,
MetricRegistry metricRegistry) {
this.clusterConfigService = clusterConfigService;
this.scheduler = scheduler;
this.metricRegistry = metricRegistry;
final GeoIpResolverConfig config = clusterConfigService.getOrDefault(GeoIpResolverConfig.class,
GeoIpResolverConfig.defaultConfig());
this.config = new AtomicReference<>(config);
this.filterEngine = new AtomicReference<>(new FilterEngine(config, metricRegistry));
clusterEventBus.register(this);
}
@Subscribe
@SuppressWarnings("unused")
public void updateConfig(ClusterConfigChangedEvent event) {
if (!GeoIpResolverConfig.class.getCanonicalName().equals(event.type())) {
return;
}
scheduler.schedule((Runnable) this::reload, 0, TimeUnit.SECONDS);
}
private void reload() {
final GeoIpResolverConfig newConfig = clusterConfigService.getOrDefault(GeoIpResolverConfig.class,
GeoIpResolverConfig.defaultConfig());
LOG.debug("Updating GeoIP filter engine - {}", newConfig);
config.set(newConfig);
filterEngine.set(new FilterEngine(newConfig, metricRegistry));
}
@Override
public boolean filter(Message message) {
return filterEngine.get().filter(message);
}
@Override
public String getName() {
return "GeoIP resolver";
}
@Override
public int getPriority() {
// MAGIC NUMBER: 10 is the priority of the ExtractorFilter, we either run before or after it, depending on what the user wants.
return 10 - (config.get().runBeforeExtractors() ? 1 : -1);
}
protected static class FilterEngine {
private final Timer resolveTime;
private DatabaseReader databaseReader;
private boolean enabled;
public FilterEngine(GeoIpResolverConfig config, MetricRegistry metricRegistry) {
this.resolveTime = metricRegistry.timer(name(GeoIpResolverFilter.class, "resolveTime"));
try {
final File database = new File(config.dbPath());
this.databaseReader = new DatabaseReader.Builder(database).build();
this.enabled = config.enabled();
} catch (IOException e) {
LOG.error("Could not open GeoIP database {}", config.dbPath(), e);
this.enabled = false;
}
}
public boolean filter(Message message) {
if (!enabled) {
return false;
}
for (Map.Entry<String, Object> field : message.getFields().entrySet()) {
String key = field.getKey() + "_geolocation";
final List coordinates = extractGeoLocationInformation(field.getValue());
if (coordinates.size() == 2) {
// We will store the coordinates as a "lat,long" string
final String stringGeoPoint = coordinates.get(1) + "," + coordinates.get(0);
message.addField(key, stringGeoPoint);
}
}
return false;
}
protected List<Double> extractGeoLocationInformation(Object fieldValue) {
final List<Double> coordinates = Lists.newArrayList();
if (!(fieldValue instanceof String) || isNullOrEmpty((String) fieldValue)) {
return coordinates;
}
final String stringFieldValue = (String) fieldValue;
final String ip = this.getIpFromFieldValue(stringFieldValue);
if (isNullOrEmpty(ip)) {
return coordinates;
}
try {
try (Timer.Context ignored = resolveTime.time()) {
final InetAddress ipAddress = InetAddress.getByName(ip);
final CityResponse response = databaseReader.city(ipAddress);
final Location location = response.getLocation();
coordinates.add(location.getLongitude());
coordinates.add(location.getLatitude());
}
} catch (Exception e) {
LOG.debug("Could not get location from IP {}", ip, e);
}
return coordinates;
}
protected String getIpFromFieldValue(String fieldValue) {
Matcher matcher = IP_PATTERN.matcher(fieldValue);
if (matcher.find()) {
return matcher.group(1);
}
return null;
}
}
}