Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -44,19 +45,25 @@

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.ingest.EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER;
import static org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderTaskExecutor.IPINFO_TOKEN_SETTING;
import static org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderTaskExecutor.MAXMIND_LICENSE_KEY_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class EnterpriseGeoIpDownloaderIT extends ESIntegTestCase {

private static final String DATABASE_TYPE = "GeoIP2-City";
private static final String MAXMIND_DATABASE_TYPE = "GeoIP2-City";
private static final String IPINFO_DATABASE_TYPE = "asn";

@ClassRule
public static final EnterpriseGeoIpHttpFixture fixture = new EnterpriseGeoIpHttpFixture(DATABASE_TYPE);
public static final EnterpriseGeoIpHttpFixture fixture = new EnterpriseGeoIpHttpFixture(
List.of(MAXMIND_DATABASE_TYPE),
List.of(IPINFO_DATABASE_TYPE)
);

protected String getEndpoint() {
return fixture.getAddress();
Expand All @@ -66,6 +73,7 @@ protected String getEndpoint() {
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString(MAXMIND_LICENSE_KEY_SETTING.getKey(), "license_key");
secureSettings.setString(IPINFO_TOKEN_SETTING.getKey(), "token");
Settings.Builder builder = Settings.builder();
builder.setSecureSettings(secureSettings)
.put(super.nodeSettings(nodeOrdinal, otherSettings))
Expand All @@ -92,29 +100,44 @@ public void testEnterpriseDownloaderTask() throws Exception {
* Note that the "enterprise database" is actually just a geolite database being loaded by the GeoIpHttpFixture.
*/
EnterpriseGeoIpDownloader.DEFAULT_MAXMIND_ENDPOINT = getEndpoint();
final String pipelineName = "enterprise_geoip_pipeline";
EnterpriseGeoIpDownloader.DEFAULT_IPINFO_ENDPOINT = getEndpoint();
final String indexName = "enterprise_geoip_test_index";
final String geoipPipelineName = "enterprise_geoip_pipeline";
final String iplocationPipelineName = "enterprise_iplocation_pipeline";
final String sourceField = "ip";
final String targetField = "ip-city";
final String targetField = "ip-result";

startEnterpriseGeoIpDownloaderTask();
configureDatabase(DATABASE_TYPE);
createGeoIpPipeline(pipelineName, DATABASE_TYPE, sourceField, targetField);
configureMaxmindDatabase(MAXMIND_DATABASE_TYPE);
configureIpinfoDatabase(IPINFO_DATABASE_TYPE);
waitAround();
createPipeline(geoipPipelineName, "geoip", MAXMIND_DATABASE_TYPE, sourceField, targetField);
createPipeline(iplocationPipelineName, "ip_location", IPINFO_DATABASE_TYPE, sourceField, targetField);

/*
* We know that the databases index has been populated (because we waited around, :wink:), but we don't know for sure that
* the databases have been pulled down and made available on all nodes. So we run these ingest-and-check steps in assertBusy blocks.
*/
assertBusy(() -> {
/*
* We know that the .geoip_databases index has been populated, but we don't know for sure that the database has been pulled
* down and made available on all nodes. So we run this ingest-and-check step in an assertBusy.
*/
logger.info("Ingesting a test document");
String documentId = ingestDocument(indexName, pipelineName, sourceField);
String documentId = ingestDocument(indexName, geoipPipelineName, sourceField, "89.160.20.128");
GetResponse getResponse = client().get(new GetRequest(indexName, documentId)).actionGet();
Map<String, Object> returnedSource = getResponse.getSource();
assertNotNull(returnedSource);
Object targetFieldValue = returnedSource.get(targetField);
assertNotNull(targetFieldValue);
assertThat(((Map<String, Object>) targetFieldValue).get("organization_name"), equalTo("Bredband2 AB"));
});
assertBusy(() -> {
logger.info("Ingesting another test document");
String documentId = ingestDocument(indexName, iplocationPipelineName, sourceField, "12.10.66.1");
GetResponse getResponse = client().get(new GetRequest(indexName, documentId)).actionGet();
Map<String, Object> returnedSource = getResponse.getSource();
assertNotNull(returnedSource);
Object targetFieldValue = returnedSource.get(targetField);
assertNotNull(targetFieldValue);
assertThat(((Map<String, Object>) targetFieldValue).get("organization_name"), equalTo("OAKLAWN JOCKEY CLUB, INC."));
});
}

private void startEnterpriseGeoIpDownloaderTask() {
Expand All @@ -133,29 +156,46 @@ private void startEnterpriseGeoIpDownloaderTask() {
);
}

private void configureDatabase(String databaseType) throws Exception {
private void configureMaxmindDatabase(String databaseType) {
admin().cluster()
.execute(
PutDatabaseConfigurationAction.INSTANCE,
new PutDatabaseConfigurationAction.Request(
TimeValue.MAX_VALUE,
TimeValue.MAX_VALUE,
new DatabaseConfiguration("test", databaseType, new DatabaseConfiguration.Maxmind("test_account"))
new DatabaseConfiguration("test-1", databaseType, new DatabaseConfiguration.Maxmind("test_account"))
)
)
.actionGet();
}

private void configureIpinfoDatabase(String databaseType) {
admin().cluster()
.execute(
PutDatabaseConfigurationAction.INSTANCE,
new PutDatabaseConfigurationAction.Request(
TimeValue.MAX_VALUE,
TimeValue.MAX_VALUE,
new DatabaseConfiguration("test-2", databaseType, new DatabaseConfiguration.Ipinfo())
)
)
.actionGet();
}

private void waitAround() throws Exception {
ensureGreen(GeoIpDownloader.DATABASES_INDEX);
assertBusy(() -> {
SearchResponse searchResponse = client().search(new SearchRequest(GeoIpDownloader.DATABASES_INDEX)).actionGet();
try {
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
assertThat(searchResponse.getHits().getHits().length, equalTo(2));
} finally {
searchResponse.decRef();
}
});
}

private void createGeoIpPipeline(String pipelineName, String databaseType, String sourceField, String targetField) throws IOException {
private void createPipeline(String pipelineName, String processorType, String databaseType, String sourceField, String targetField)
throws IOException {
final BytesReference bytes;
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
Expand All @@ -165,7 +205,7 @@ private void createGeoIpPipeline(String pipelineName, String databaseType, Strin
{
builder.startObject();
{
builder.startObject("geoip");
builder.startObject(processorType);
{
builder.field("field", sourceField);
builder.field("target_field", targetField);
Expand All @@ -183,11 +223,11 @@ private void createGeoIpPipeline(String pipelineName, String databaseType, Strin
assertAcked(clusterAdmin().putPipeline(new PutPipelineRequest(pipelineName, bytes, XContentType.JSON)).actionGet());
}

private String ingestDocument(String indexName, String pipelineName, String sourceField) {
private String ingestDocument(String indexName, String pipelineName, String sourceField, String value) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(
new IndexRequest(indexName).source("{\"" + sourceField + "\": \"89.160.20.128\"}", XContentType.JSON).setPipeline(pipelineName)
);
bulkRequest.add(new IndexRequest(indexName).source(Strings.format("""
{ "%s": "%s"}
""", sourceField, value), XContentType.JSON).setPipeline(pipelineName));
BulkResponse response = client().bulk(bulkRequest).actionGet();
BulkItemResponse[] bulkItemResponses = response.getItems();
assertThat(bulkItemResponses.length, equalTo(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.BoolQueryBuilder;
Expand All @@ -39,6 +38,8 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;

import java.io.Closeable;
Expand All @@ -57,6 +58,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderTaskExecutor.IPINFO_SETTINGS_PREFIX;
import static org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderTaskExecutor.MAXMIND_SETTINGS_PREFIX;

/**
Expand All @@ -72,6 +74,9 @@ public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {
// a sha256 checksum followed by two spaces followed by an (ignored) file name
private static final Pattern SHA256_CHECKSUM_PATTERN = Pattern.compile("(\\w{64})\\s\\s(.*)");

// an md5 checksum
private static final Pattern MD5_CHECKSUM_PATTERN = Pattern.compile("(\\w{32})");

// for overriding in tests
static String DEFAULT_MAXMIND_ENDPOINT = System.getProperty(
MAXMIND_SETTINGS_PREFIX + "endpoint.default", //
Expand All @@ -80,6 +85,14 @@ public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {
// n.b. a future enhancement might be to allow for a MAXMIND_ENDPOINT_SETTING, but
// at the moment this is an unsupported system property for use in tests (only)

// for overriding in tests
static String DEFAULT_IPINFO_ENDPOINT = System.getProperty(
IPINFO_SETTINGS_PREFIX + "endpoint.default", //
"https://ipinfo.io/data"
);
// n.b. a future enhancement might be to allow for an IPINFO_ENDPOINT_SETTING, but
// at the moment this is an unsupported system property for use in tests (only)

static final String DATABASES_INDEX = ".geoip_databases";
static final int MAX_CHUNK_SIZE = 1024 * 1024;

Expand Down Expand Up @@ -444,16 +457,15 @@ private void scheduleNextRun(TimeValue time) {
}
}

@Nullable
private ProviderDownload downloaderFor(DatabaseConfiguration database) {
if (database.provider() instanceof DatabaseConfiguration.Maxmind) {
return new MaxmindDownload(database.name(), (DatabaseConfiguration.Maxmind) database.provider());
} else if (database.provider() instanceof DatabaseConfiguration.Ipinfo) {
// as a temporary implementation detail, null here means 'not actually supported *just yet*'
return null;
if (database.provider() instanceof DatabaseConfiguration.Maxmind maxmind) {
return new MaxmindDownload(database.name(), maxmind);
} else if (database.provider() instanceof DatabaseConfiguration.Ipinfo ipinfo) {
return new IpinfoDownload(database.name(), ipinfo);
} else {
assert false : "Attempted to use database downloader with unsupported provider type [" + database.provider().getClass() + "]";
return null;
throw new IllegalArgumentException(
Strings.format("Unexpected provider [%s] for configuration [%s]", database.provider().getClass(), database.id())
);
}
}

Expand Down Expand Up @@ -488,7 +500,7 @@ public HttpClient.PasswordAuthenticationHolder buildCredentials() {

@Override
public boolean validCredentials() {
return auth.get() != null;
return auth != null && auth.get() != null;
}

@Override
Expand Down Expand Up @@ -529,7 +541,101 @@ public CheckedSupplier<InputStream, IOException> download() {

@Override
public void close() throws IOException {
auth.close();
if (auth != null) auth.close();
}
}

class IpinfoDownload implements ProviderDownload {

final String name;
final DatabaseConfiguration.Ipinfo ipinfo;
HttpClient.PasswordAuthenticationHolder auth;

IpinfoDownload(String name, DatabaseConfiguration.Ipinfo ipinfo) {
this.name = name;
this.ipinfo = ipinfo;
this.auth = buildCredentials();
}

@Override
public HttpClient.PasswordAuthenticationHolder buildCredentials() {
final char[] tokenChars = tokenProvider.apply("ipinfo");

// if the token is missing or empty, return null as 'no auth'
if (tokenChars == null || tokenChars.length == 0) {
return null;
}

// ipinfo uses the token as the username component of basic auth, see https://ipinfo.io/developers#authentication
return new HttpClient.PasswordAuthenticationHolder(new String(tokenChars), new char[] {});
}

@Override
public boolean validCredentials() {
return auth != null && auth.get() != null;
}

private static final Set<String> FREE_DATABASES = Set.of("asn", "country", "country_asn");

@Override
public String url(String suffix) {
// note: the 'free' databases are in the sub-path 'free/' in terms of the download endpoint
final String internalName;
if (FREE_DATABASES.contains(name)) {
internalName = "free/" + name;
} else {
internalName = name;
}

// reminder, we're passing the ipinfo token as the username part of http basic auth,
// see https://ipinfo.io/developers#authentication

String endpointPattern = DEFAULT_IPINFO_ENDPOINT;
if (endpointPattern.contains("%")) {
throw new IllegalArgumentException("Invalid endpoint [" + endpointPattern + "]");
}
if (endpointPattern.endsWith("/") == false) {
endpointPattern += "/";
}
endpointPattern += "%s.%s";

// at this point the pattern looks like this (in the default case):
// https://ipinfo.io/data/%s.%s
// also see https://ipinfo.io/developers/database-download,
// and https://ipinfo.io/developers/database-filename-reference for more

return Strings.format(endpointPattern, internalName, suffix);
}

@Override
public Checksum checksum() throws IOException {
final String checksumJsonUrl = this.url("mmdb/checksums"); // a minor abuse of the idea of a 'suffix', :shrug:
byte[] data = httpClient.getBytes(auth.get(), checksumJsonUrl); // this throws if the auth is bad
Map<String, Object> checksums;
try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, data)) {
checksums = parser.map();
}
@SuppressWarnings("unchecked")
String md5 = ((Map<String, String>) checksums.get("checksums")).get("md5");
logger.trace("checksum was [{}]", md5);

var matcher = MD5_CHECKSUM_PATTERN.matcher(md5);
boolean match = matcher.matches();
if (match == false) {
throw new RuntimeException("Unexpected md5 response from [" + checksumJsonUrl + "]");
}
return Checksum.md5(md5);
}

@Override
public CheckedSupplier<InputStream, IOException> download() {
final String mmdbUrl = this.url("mmdb");
return () -> httpClient.get(auth.get(), mmdbUrl);
}

@Override
public void close() throws IOException {
if (auth != null) auth.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,15 @@ public class EnterpriseGeoIpDownloaderTaskExecutor extends PersistentTasksExecut

static final String MAXMIND_SETTINGS_PREFIX = "ingest.geoip.downloader.maxmind.";

static final String IPINFO_SETTINGS_PREFIX = "ingest.ip_location.downloader.ipinfo.";

public static final Setting<SecureString> MAXMIND_LICENSE_KEY_SETTING = SecureSetting.secureString(
MAXMIND_SETTINGS_PREFIX + "license_key",
null
);

public static final Setting<SecureString> IPINFO_TOKEN_SETTING = SecureSetting.secureString(IPINFO_SETTINGS_PREFIX + "token", null);

private final Client client;
private final HttpClient httpClient;
private final ClusterService clusterService;
Expand Down Expand Up @@ -106,6 +110,10 @@ private char[] getSecureToken(final String type) {
if (cachedSecureSettings.getSettingNames().contains(MAXMIND_LICENSE_KEY_SETTING.getKey())) {
token = cachedSecureSettings.getString(MAXMIND_LICENSE_KEY_SETTING.getKey()).getChars();
}
} else if (type.equals("ipinfo")) {
if (cachedSecureSettings.getSettingNames().contains(IPINFO_TOKEN_SETTING.getKey())) {
token = cachedSecureSettings.getString(IPINFO_TOKEN_SETTING.getKey()).getChars();
}
}
return token;
}
Expand Down Expand Up @@ -166,7 +174,7 @@ public synchronized void reload(Settings settings) {
// `SecureSettings` are available here! cache them as they will be needed
// whenever dynamic cluster settings change and we have to rebuild the accounts
try {
this.cachedSecureSettings = extractSecureSettings(settings, List.of(MAXMIND_LICENSE_KEY_SETTING));
this.cachedSecureSettings = extractSecureSettings(settings, List.of(MAXMIND_LICENSE_KEY_SETTING, IPINFO_TOKEN_SETTING));
} catch (GeneralSecurityException e) {
// rethrow as a runtime exception, there's logging higher up the call chain around ReloadablePlugin
throw new ElasticsearchException("Exception while reloading enterprise geoip download task executor", e);
Expand Down
Loading