Skip to content

Commit

Permalink
New connector: Destination ElasticSearch strict encrypt (#16862)
Browse files Browse the repository at this point in the history
  • Loading branch information
VitaliiMaltsev committed Sep 22, 2022
1 parent 7f2c8be commit 083b5d1
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM airbyte/integration-base-java:dev AS build

WORKDIR /airbyte

ENV APPLICATION destination-elasticsearch-strict-encrypt

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar

FROM airbyte/integration-base-java:dev

WORKDIR /airbyte

ENV APPLICATION destination-elasticsearch-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/destination-elasticsearch-strict-encrypt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Elasticsearch Strict Encrypt Test Configuration

In order to test the Elasticsearch destination, you need to have the up and running Elasticsearch that has xpack.security.enabled.

This connector inherits the Elasticsearch destination, but support authorized connections only.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.destination.elasticsearch.ElasticsearchStrictEncryptDestination'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
}

dependencies {
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-integrations:bases:base-java')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)

implementation 'co.elastic.clients:elasticsearch-java:7.15.0'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.3'

// EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// https://eclipse-ee4j.github.io/jsonp/
implementation 'jakarta.json:jakarta.json-api:2.0.1'

// Needed even if using Jackson to have an implementation of the Jsonp object model
// EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// https://github.com/eclipse-ee4j/jsonp
implementation 'org.glassfish:jakarta.json:2.0.1'
implementation project(':airbyte-integrations:connectors:destination-elasticsearch')

// MIT
// https://www.testcontainers.org/
testImplementation libs.connectors.testcontainers.elasticsearch
integrationTestJavaImplementation libs.connectors.testcontainers.elasticsearch

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-elasticsearch')
}

repositories {
maven {
name = "ESSnapshots"
url = "https://snapshots.elastic.co/maven/"
}
maven {
name = "ESJavaGithubPackages"
url = "https://maven.pkg.github.com/elastic/elasticsearch-java"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.elasticsearch;

import com.fasterxml.jackson.databind.node.ArrayNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.spec_modification.SpecModifyingDestination;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchStrictEncryptDestination extends SpecModifyingDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchStrictEncryptDestination.class);

public ElasticsearchStrictEncryptDestination() {
super(new ElasticsearchDestination());
}

public static void main(String[] args) throws Exception {
final var destination = new ElasticsearchStrictEncryptDestination();
LOGGER.info("starting destination: {}", ElasticsearchStrictEncryptDestination.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", ElasticsearchStrictEncryptDestination.class);
}

@Override
public ConnectorSpecification modifySpec(ConnectorSpecification originalSpec) throws Exception {
final ConnectorSpecification spec = Jsons.clone(originalSpec);
ArrayNode authMethod = (ArrayNode) spec.getConnectionSpecification().get("properties").get("authenticationMethod").get("oneOf");
IntStream.range(0, authMethod.size()).filter(i -> authMethod.get(i).get("title").asText().equals("None")).findFirst()
.ifPresent(authMethod::remove);
return spec;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.elasticsearch;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class ElasticsearchStrictEncryptDestinationAcceptanceTest extends DestinationAcceptanceTest {

private final ObjectMapper mapper = new ObjectMapper();
private static ElasticsearchContainer container;

@BeforeAll
public static void beforeAll() {

container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.15.1")
.withPassword("MagicWord");

container.start();
}

@AfterAll
public static void afterAll() {
container.stop();
container.close();
}

@Override
protected String getImageName() {
return "airbyte/destination-elasticsearch-strict-encrypt:dev";
}

@Override
protected int getMaxRecordValueLimit() {
return 2000000;
}

@Override
protected boolean implementsNamespaces() {
return true;
}

@Override
protected boolean supportsNormalization() {
return false;
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return false;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected TestDataComparator getTestDataComparator() {
return new AdvancedTestDataComparator();
}

@Override
protected JsonNode getConfig() {

final JsonNode authConfig = Jsons.jsonNode(Map.of(
"method", "basic",
"username", "elastic",
"password", "MagicWord"));

return Jsons.jsonNode(ImmutableMap.builder()
.put("endpoint", String.format("http://%s:%s", container.getHost(), container.getMappedPort(9200)))
.put("authenticationMethod", authConfig)
.build());
}

@Override
protected JsonNode getFailCheckConfig() {
// should result in a failed connection check
return mapper.createObjectNode();
}

@Override
protected List<JsonNode> retrieveRecords(DestinationAcceptanceTest.TestDestinationEnv testEnv,
String streamName,
String namespace,
JsonNode streamSchema)
throws IOException {
// Records returned from this method will be compared against records provided to the connector
// to verify they were written correctly
final String indexName = new ElasticsearchWriteConfig()
.setNamespace(namespace)
.setStreamName(streamName)
.getIndexName();

ElasticsearchConnection connection = new ElasticsearchConnection(mapper.convertValue(getConfig(), ConnectorConfiguration.class));
return connection.getRecords(indexName);
}

@Override
protected void setup(DestinationAcceptanceTest.TestDestinationEnv testEnv) {}

@Override
protected void tearDown(DestinationAcceptanceTest.TestDestinationEnv testEnv) {
ElasticsearchConnection connection = new ElasticsearchConnection(mapper.convertValue(getConfig(), ConnectorConfiguration.class));
connection.allIndices().forEach(connection::deleteIndexIfPresent);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.elasticsearch;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.protocol.models.ConnectorSpecification;
import org.junit.jupiter.api.Test;

public class ElasticsearchDestinationStrictEncryptTest {

@Test
void testSpec() throws Exception {
final ConnectorSpecification actual = new ElasticsearchStrictEncryptDestination().spec();
final ConnectorSpecification expected = Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class);

assertEquals(expected, actual);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
{
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/elasticsearch",
"supportsIncremental": true,
"supportsNormalization": false,
"supportsNamespaces": true,
"supportsDBT": false,
"supported_destination_sync_modes": ["overwrite", "append"],
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Elasticsearch Connection Configuration",
"type": "object",
"required": ["endpoint"],
"additionalProperties": false,
"properties": {
"endpoint": {
"title": "Server Endpoint",
"type": "string",
"description": "The full url of the Elasticsearch server"
},
"upsert": {
"type": "boolean",
"title": "Upsert Records",
"description": "If a primary key identifier is defined in the source, an upsert will be performed using the primary key value as the elasticsearch doc id. Does not support composite primary keys.",
"default": true
},
"authenticationMethod": {
"title": "Authentication Method",
"type": "object",
"description": "The type of authentication to be used",
"oneOf": [
{
"title": "Api Key/Secret",
"additionalProperties": false,
"description": "Use a api key and secret combination to authenticate",
"required": ["method", "apiKeyId", "apiKeySecret"],
"properties": {
"method": {
"type": "string",
"const": "secret"
},
"apiKeyId": {
"title": "API Key ID",
"description": "The Key ID to used when accessing an enterprise Elasticsearch instance.",
"type": "string"
},
"apiKeySecret": {
"title": "API Key Secret",
"description": "The secret associated with the API Key ID.",
"type": "string",
"airbyte_secret": true
}
}
},
{
"title": "Username/Password",
"additionalProperties": false,
"description": "Basic auth header with a username and password",
"required": ["method", "username", "password"],
"properties": {
"method": {
"type": "string",
"const": "basic"
},
"username": {
"title": "Username",
"description": "Basic auth username to access a secure Elasticsearch server",
"type": "string"
},
"password": {
"title": "Password",
"description": "Basic auth password to access a secure Elasticsearch server",
"type": "string",
"airbyte_secret": true
}
}
}
]
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class ElasticsearchWriteConfig {
private List<List<String>> primaryKey;
private boolean upsert;

ElasticsearchWriteConfig() {}
public ElasticsearchWriteConfig() {}

ElasticsearchWriteConfig(
String namespace,
Expand Down

0 comments on commit 083b5d1

Please sign in to comment.