diff --git a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/.dockerignore b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/.dockerignore new file mode 100644 index 000000000000..65c7d0ad3e73 --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/Dockerfile new file mode 100644 index 000000000000..4f4ea189e065 --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/Dockerfile @@ -0,0 +1,20 @@ +FROM airbyte/integration-base-java:dev AS build + +WORKDIR /airbyte + +ENV APPLICATION destination-elasticsearch-strict-encrypt + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar + +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte + +ENV APPLICATION destination-elasticsearch-strict-encrypt + +COPY --from=build /airbyte /airbyte + +LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.name=airbyte/destination-elasticsearch-strict-encrypt diff --git a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/README.md b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/README.md new file mode 100644 index 000000000000..0ce2b25e3b35 --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/README.md @@ -0,0 +1,5 @@ +# Elasticsearch Strict Encrypt Test Configuration + +In order to test the Elasticsearch destination, you need to have the up and running Elasticsearch that has xpack.security.enabled. + +This connector inherits the Elasticsearch destination, but support authorized connections only. diff --git a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/build.gradle b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/build.gradle new file mode 100644 index 000000000000..ffc4c797814a --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/build.gradle @@ -0,0 +1,49 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.destination.elasticsearch.ElasticsearchStrictEncryptDestination' + applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] +} + +dependencies { + implementation project(':airbyte-config:config-models') + implementation project(':airbyte-protocol:protocol-models') + implementation project(':airbyte-integrations:bases:base-java') + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + + implementation 'co.elastic.clients:elasticsearch-java:7.15.0' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.3' + + // EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + // https://eclipse-ee4j.github.io/jsonp/ + implementation 'jakarta.json:jakarta.json-api:2.0.1' + + // Needed even if using Jackson to have an implementation of the Jsonp object model + // EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + // https://github.com/eclipse-ee4j/jsonp + implementation 'org.glassfish:jakarta.json:2.0.1' + implementation project(':airbyte-integrations:connectors:destination-elasticsearch') + + // MIT + // https://www.testcontainers.org/ + testImplementation libs.connectors.testcontainers.elasticsearch + integrationTestJavaImplementation libs.connectors.testcontainers.elasticsearch + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-elasticsearch') +} + +repositories { + maven { + name = "ESSnapshots" + url = "https://snapshots.elastic.co/maven/" + } + maven { + name = "ESJavaGithubPackages" + url = "https://maven.pkg.github.com/elastic/elasticsearch-java" + } +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchStrictEncryptDestination.java b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchStrictEncryptDestination.java new file mode 100644 index 000000000000..44a7bb080e32 --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchStrictEncryptDestination.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.elasticsearch; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.base.spec_modification.SpecModifyingDestination; +import io.airbyte.protocol.models.ConnectorSpecification; +import java.util.stream.IntStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticsearchStrictEncryptDestination extends SpecModifyingDestination implements Destination { + + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchStrictEncryptDestination.class); + + public ElasticsearchStrictEncryptDestination() { + super(new ElasticsearchDestination()); + } + + public static void main(String[] args) throws Exception { + final var destination = new ElasticsearchStrictEncryptDestination(); + LOGGER.info("starting destination: {}", ElasticsearchStrictEncryptDestination.class); + new IntegrationRunner(destination).run(args); + LOGGER.info("completed destination: {}", ElasticsearchStrictEncryptDestination.class); + } + + @Override + public ConnectorSpecification modifySpec(ConnectorSpecification originalSpec) throws Exception { + final ConnectorSpecification spec = Jsons.clone(originalSpec); + ArrayNode authMethod = (ArrayNode) spec.getConnectionSpecification().get("properties").get("authenticationMethod").get("oneOf"); + IntStream.range(0, authMethod.size()).filter(i -> authMethod.get(i).get("title").asText().equals("None")).findFirst() + .ifPresent(authMethod::remove); + return spec; + } + +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchStrictEncryptDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchStrictEncryptDestinationAcceptanceTest.java new file mode 100644 index 000000000000..31232025fcbe --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchStrictEncryptDestinationAcceptanceTest.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.elasticsearch; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; +import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.elasticsearch.ElasticsearchContainer; + +public class ElasticsearchStrictEncryptDestinationAcceptanceTest extends DestinationAcceptanceTest { + + private final ObjectMapper mapper = new ObjectMapper(); + private static ElasticsearchContainer container; + + @BeforeAll + public static void beforeAll() { + + container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.15.1") + .withPassword("MagicWord"); + + container.start(); + } + + @AfterAll + public static void afterAll() { + container.stop(); + container.close(); + } + + @Override + protected String getImageName() { + return "airbyte/destination-elasticsearch-strict-encrypt:dev"; + } + + @Override + protected int getMaxRecordValueLimit() { + return 2000000; + } + + @Override + protected boolean implementsNamespaces() { + return true; + } + + @Override + protected boolean supportsNormalization() { + return false; + } + + @Override + protected boolean supportBasicDataTypeTest() { + return true; + } + + @Override + protected boolean supportArrayDataTypeTest() { + return false; + } + + @Override + protected boolean supportObjectDataTypeTest() { + return true; + } + + @Override + protected TestDataComparator getTestDataComparator() { + return new AdvancedTestDataComparator(); + } + + @Override + protected JsonNode getConfig() { + + final JsonNode authConfig = Jsons.jsonNode(Map.of( + "method", "basic", + "username", "elastic", + "password", "MagicWord")); + + return Jsons.jsonNode(ImmutableMap.builder() + .put("endpoint", String.format("http://%s:%s", container.getHost(), container.getMappedPort(9200))) + .put("authenticationMethod", authConfig) + .build()); + } + + @Override + protected JsonNode getFailCheckConfig() { + // should result in a failed connection check + return mapper.createObjectNode(); + } + + @Override + protected List retrieveRecords(DestinationAcceptanceTest.TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws IOException { + // Records returned from this method will be compared against records provided to the connector + // to verify they were written correctly + final String indexName = new ElasticsearchWriteConfig() + .setNamespace(namespace) + .setStreamName(streamName) + .getIndexName(); + + ElasticsearchConnection connection = new ElasticsearchConnection(mapper.convertValue(getConfig(), ConnectorConfiguration.class)); + return connection.getRecords(indexName); + } + + @Override + protected void setup(DestinationAcceptanceTest.TestDestinationEnv testEnv) {} + + @Override + protected void tearDown(DestinationAcceptanceTest.TestDestinationEnv testEnv) { + ElasticsearchConnection connection = new ElasticsearchConnection(mapper.convertValue(getConfig(), ConnectorConfiguration.class)); + connection.allIndices().forEach(connection::deleteIndexIfPresent); + } + +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationStrictEncryptTest.java b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationStrictEncryptTest.java new file mode 100644 index 000000000000..2682c162ba74 --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationStrictEncryptTest.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.elasticsearch; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.protocol.models.ConnectorSpecification; +import org.junit.jupiter.api.Test; + +public class ElasticsearchDestinationStrictEncryptTest { + + @Test + void testSpec() throws Exception { + final ConnectorSpecification actual = new ElasticsearchStrictEncryptDestination().spec(); + final ConnectorSpecification expected = Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class); + + assertEquals(expected, actual); + } + +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test/resources/expected_spec.json new file mode 100644 index 000000000000..e8e769b20e28 --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test/resources/expected_spec.json @@ -0,0 +1,81 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/elasticsearch", + "supportsIncremental": true, + "supportsNormalization": false, + "supportsNamespaces": true, + "supportsDBT": false, + "supported_destination_sync_modes": ["overwrite", "append"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Elasticsearch Connection Configuration", + "type": "object", + "required": ["endpoint"], + "additionalProperties": false, + "properties": { + "endpoint": { + "title": "Server Endpoint", + "type": "string", + "description": "The full url of the Elasticsearch server" + }, + "upsert": { + "type": "boolean", + "title": "Upsert Records", + "description": "If a primary key identifier is defined in the source, an upsert will be performed using the primary key value as the elasticsearch doc id. Does not support composite primary keys.", + "default": true + }, + "authenticationMethod": { + "title": "Authentication Method", + "type": "object", + "description": "The type of authentication to be used", + "oneOf": [ + { + "title": "Api Key/Secret", + "additionalProperties": false, + "description": "Use a api key and secret combination to authenticate", + "required": ["method", "apiKeyId", "apiKeySecret"], + "properties": { + "method": { + "type": "string", + "const": "secret" + }, + "apiKeyId": { + "title": "API Key ID", + "description": "The Key ID to used when accessing an enterprise Elasticsearch instance.", + "type": "string" + }, + "apiKeySecret": { + "title": "API Key Secret", + "description": "The secret associated with the API Key ID.", + "type": "string", + "airbyte_secret": true + } + } + }, + { + "title": "Username/Password", + "additionalProperties": false, + "description": "Basic auth header with a username and password", + "required": ["method", "username", "password"], + "properties": { + "method": { + "type": "string", + "const": "basic" + }, + "username": { + "title": "Username", + "description": "Basic auth username to access a secure Elasticsearch server", + "type": "string" + }, + "password": { + "title": "Password", + "description": "Basic auth password to access a secure Elasticsearch server", + "type": "string", + "airbyte_secret": true + } + } + } + ] + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchWriteConfig.java b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchWriteConfig.java index f76aee42265c..90b36591c0d1 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchWriteConfig.java +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchWriteConfig.java @@ -19,7 +19,7 @@ public class ElasticsearchWriteConfig { private List> primaryKey; private boolean upsert; - ElasticsearchWriteConfig() {} + public ElasticsearchWriteConfig() {} ElasticsearchWriteConfig( String namespace,