Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃帀Destination-clickhouse: added custom jdbc parameters field #16444

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -56,6 +56,12 @@
"airbyte_secret": true,
"order": 5
},
"jdbc_url_params": {
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"title": "JDBC URL Params",
"type": "string",
"order": 6
},
"tunnel_method": {
"type": "object",
"title": "SSH Tunnel Method",
Expand Down
Expand Up @@ -21,6 +21,7 @@ dependencies {

// https://mvnrepository.com/artifact/org.testcontainers/clickhouse
testImplementation libs.connectors.destination.testcontainers.clickhouse
testImplementation project(":airbyte-json-validation")

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-clickhouse')
Expand Down
Expand Up @@ -41,7 +41,6 @@ public class ClickhouseDestination extends AbstractJdbcDestination implements De
static final List<String> DEFAULT_PARAMETERS = ImmutableList.of(
"socket_timeout=3000000");


public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new ClickhouseDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
}
Expand Down Expand Up @@ -74,6 +73,10 @@ public JsonNode toJdbcConfig(final JsonNode config) {
configBuilder.put(JdbcUtils.PASSWORD_KEY, config.get(JdbcUtils.PASSWORD_KEY).asText());
}

if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) {
configBuilder.put(JdbcUtils.JDBC_URL_PARAMS_KEY, config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText());
}

return Jsons.jsonNode(configBuilder.build());
}

Expand Down
Expand Up @@ -56,12 +56,18 @@
"airbyte_secret": true,
"order": 5
},
"jdbc_url_params": {
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"title": "JDBC URL Params",
"type": "string",
"order": 6
},
"ssl": {
"title": "SSL Connection",
"description": "Encrypt data using SSL.",
"type": "boolean",
"default": false,
"order": 6
"order": 7
}
}
}
Expand Down
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.clickhouse;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/**
* Tests that the clickhouse spec passes JsonSchema validation. While this may seem like overkill,
* we are doing it because there are some gotchas in correctly configuring the oneOf.
*/
public class ClickhouseDestinationSpecTest {

private static final String CONFIGURATION = "{ "
+ "\"password\" : \"pwd\", "
+ "\"username\" : \"clickhouse\", "
+ "\"database\" : \"clickhouse_db\", "
+ "\"port\" : 8123, "
+ "\"tcp-port\" : 9000, "
+ "\"host\" : \"localhost\", "
+ "\"jdbc_url_params\" : \"property1=pValue1&property2=pValue2\", "
+ "\"ssl\" : true "
+ "}";

private static JsonNode schema;
private static JsonSchemaValidator validator;

@BeforeAll
static void init() throws IOException {
final String spec = MoreResources.readResource("spec.json");
final File schemaFile = IOs.writeFile(Files.createTempDirectory(Path.of("/tmp"), "cl-spec-test"), "schema.json", spec).toFile();
schema = JsonSchemaValidator.getSchema(schemaFile).get("connectionSpecification");
validator = new JsonSchemaValidator();
}

@Test
void testDatabaseMissing() {
final JsonNode config = Jsons.deserialize(CONFIGURATION);
((ObjectNode) config).remove(JdbcUtils.DATABASE_KEY);
assertFalse(validator.test(schema, config));
}

@Test
void testSchemaMissing() {
final JsonNode config = Jsons.deserialize(CONFIGURATION);
((ObjectNode) config).remove("schemas");
assertTrue(validator.test(schema, config));
}

@Test
void testWithoutReplicationMethod() {
final JsonNode config = Jsons.deserialize(CONFIGURATION);
((ObjectNode) config).remove("replication_method");

assertTrue(validator.test(schema, config));
}

@Test
void testWithReplicationMethodWithReplicationSlot() {
final JsonNode config = Jsons.deserialize(CONFIGURATION);
assertTrue(validator.test(schema, config));
}

@Test
void testWithJdbcAdditionalProperty() {
final JsonNode config = Jsons.deserialize(CONFIGURATION);
assertTrue(validator.test(schema, config));
}

@Test
void testJdbcAdditionalProperty() throws Exception {
final ConnectorSpecification spec = new ClickhouseDestination().spec();
assertNotNull(spec.getConnectionSpecification().get("properties").get(JdbcUtils.JDBC_URL_PARAMS_KEY));
}

}