Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSH for Postgres Source #5742

Merged
merged 25 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ jobs:
MIXPANEL_INTEGRATION_TEST_CREDS: ${{ secrets.MIXPANEL_INTEGRATION_TEST_CREDS }}
MSSQL_RDS_TEST_CREDS: ${{ secrets.MSSQL_RDS_TEST_CREDS }}
PAYPAL_TRANSACTION_CREDS: ${{ secrets.SOURCE_PAYPAL_TRANSACTION_CREDS }}
POSTGRES_SSH_KEY_TEST_CREDS: ${{ secrets.POSTGRES_SSH_KEY_TEST_CREDS }}
POSTGRES_SSH_PWD_TEST_CREDS: ${{ secrets.POSTGRES_SSH_PWD_TEST_CREDS }}
POSTHOG_TEST_CREDS: ${{ secrets.POSTHOG_TEST_CREDS }}
PIPEDRIVE_INTEGRATION_TESTS_CREDS: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS }}
RECHARGE_INTEGRATION_TEST_CREDS: ${{ secrets.RECHARGE_INTEGRATION_TEST_CREDS }}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ jobs:
MIXPANEL_INTEGRATION_TEST_CREDS: ${{ secrets.MIXPANEL_INTEGRATION_TEST_CREDS }}
MSSQL_RDS_TEST_CREDS: ${{ secrets.MSSQL_RDS_TEST_CREDS }}
PAYPAL_TRANSACTION_CREDS: ${{ secrets.SOURCE_PAYPAL_TRANSACTION_CREDS }}
POSTGRES_SSH_KEY_TEST_CREDS: ${{ secrets.POSTGRES_SSH_KEY_TEST_CREDS }}
POSTGRES_SSH_PWD_TEST_CREDS: ${{ secrets.POSTGRES_SSH_PWD_TEST_CREDS }}
POSTHOG_TEST_CREDS: ${{ secrets.POSTHOG_TEST_CREDS }}
PIPEDRIVE_INTEGRATION_TESTS_CREDS: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS }}
RECHARGE_INTEGRATION_TEST_CREDS: ${{ secrets.RECHARGE_INTEGRATION_TEST_CREDS }}
Expand Down
87 changes: 73 additions & 14 deletions airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.stream.MoreStreams;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

public class Jsons {
Expand All @@ -49,42 +53,42 @@ public class Jsons {
private static final ObjectMapper OBJECT_MAPPER = MoreMappers.initMapper();
private static final ObjectWriter OBJECT_WRITER = OBJECT_MAPPER.writer(new JsonPrettyPrinter());

public static <T> String serialize(T object) {
public static <T> String serialize(final T object) {
try {
return OBJECT_MAPPER.writeValueAsString(object);
} catch (JsonProcessingException e) {
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public static <T> T deserialize(final String jsonString, final Class<T> klass) {
try {
return OBJECT_MAPPER.readValue(jsonString, klass);
} catch (IOException e) {
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

public static JsonNode deserialize(final String jsonString) {
try {
return OBJECT_MAPPER.readTree(jsonString);
} catch (IOException e) {
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

public static <T> Optional<T> tryDeserialize(final String jsonString, final Class<T> klass) {
try {
return Optional.of(OBJECT_MAPPER.readValue(jsonString, klass));
} catch (IOException e) {
} catch (final IOException e) {
return Optional.empty();
}
}

public static Optional<JsonNode> tryDeserialize(final String jsonString) {
try {
return Optional.of(OBJECT_MAPPER.readTree(jsonString));
} catch (IOException e) {
} catch (final IOException e) {
return Optional.empty();
}
}
Expand All @@ -108,15 +112,15 @@ public static <T> T object(final JsonNode jsonNode, final TypeReference<T> typeR
public static <T> Optional<T> tryObject(final JsonNode jsonNode, final Class<T> klass) {
try {
return Optional.of(OBJECT_MAPPER.convertValue(jsonNode, klass));
} catch (Exception e) {
} catch (final Exception e) {
return Optional.empty();
}
}

public static <T> Optional<T> tryObject(final JsonNode jsonNode, final TypeReference<T> typeReference) {
try {
return Optional.of(OBJECT_MAPPER.convertValue(jsonNode, typeReference));
} catch (Exception e) {
} catch (final Exception e) {
return Optional.empty();
}
}
Expand All @@ -126,30 +130,85 @@ public static <T> T clone(final T object) {
return (T) deserialize(serialize(object), object.getClass());
}

public static byte[] toBytes(JsonNode jsonNode) {
public static byte[] toBytes(final JsonNode jsonNode) {
return serialize(jsonNode).getBytes(Charsets.UTF_8);
}

public static Set<String> keys(JsonNode jsonNode) {
public static Set<String> keys(final JsonNode jsonNode) {
if (jsonNode.isObject()) {
return Jsons.object(jsonNode, new TypeReference<Map<String, Object>>() {}).keySet();
} else {
return new HashSet<>();
}
}

public static List<JsonNode> children(JsonNode jsonNode) {
public static List<JsonNode> children(final JsonNode jsonNode) {
return MoreStreams.toStream(jsonNode.elements()).collect(Collectors.toList());
}

public static String toPrettyString(JsonNode jsonNode) {
public static String toPrettyString(final JsonNode jsonNode) {
try {
return OBJECT_WRITER.writeValueAsString(jsonNode) + "\n";
} catch (JsonProcessingException e) {
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public static JsonNode navigateTo(JsonNode node, final List<String> keys) {
for (final String key : keys) {
node = node.get(key);
}
return node;
}

public static void replaceNestedString(final JsonNode json, final List<String> keys, final String replacement) {
replaceNested(json, keys, (node, finalKey) -> node.put(finalKey, replacement));
}

public static void replaceNestedInt(final JsonNode json, final List<String> keys, final int replacement) {
replaceNested(json, keys, (node, finalKey) -> node.put(finalKey, replacement));
}

private static void replaceNested(final JsonNode json, final List<String> keys, final BiConsumer<ObjectNode, String> typedReplacement) {
Preconditions.checkArgument(keys.size() > 0, "Must pass at least one key");
final JsonNode nodeContainingFinalKey = navigateTo(json, keys.subList(0, keys.size() - 1));
typedReplacement.accept((ObjectNode) nodeContainingFinalKey, keys.get(keys.size() - 1));
}

public static Optional<JsonNode> getOptional(final JsonNode json, final String... keys) {
return getOptional(json, Arrays.asList(keys));
}

public static Optional<JsonNode> getOptional(JsonNode json, final List<String> keys) {
for (final String key : keys) {
if (json == null) {
return Optional.empty();
}

json = json.get(key);
}

return Optional.ofNullable(json);
}

public static String getStringOrNull(final JsonNode json, final String... keys) {
return getStringOrNull(json, Arrays.asList(keys));
}

public static String getStringOrNull(final JsonNode json, final List<String> keys) {
final Optional<JsonNode> optional = getOptional(json, keys);
return optional.map(JsonNode::asText).orElse(null);
}

public static int getIntOrZero(final JsonNode json, final String... keys) {
return getIntOrZero(json, Arrays.asList(keys));
}

public static int getIntOrZero(final JsonNode json, final List<String> keys) {
final Optional<JsonNode> optional = getOptional(json, keys);
return optional.map(JsonNode::asInt).orElse(0);
}

/**
* By the Jackson DefaultPrettyPrinter prints objects with an extra space as follows: {"name" :
* "airbyte"}. We prefer {"name": "airbyte"}.
Expand All @@ -165,7 +224,7 @@ public DefaultPrettyPrinter createInstance() {

// override the method that inserts the extra space.
@Override
public DefaultPrettyPrinter withSeparators(Separators separators) {
public DefaultPrettyPrinter withSeparators(final Separators separators) {
_separators = separators;
_objectFieldValueSeparatorWithSpaces = separators.getObjectFieldValueSeparator() + " ";
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@

public class Strings {

public static String join(Iterable<?> iterable, CharSequence separator) {
public static String join(final Iterable<?> iterable, final CharSequence separator) {
return Streams.stream(iterable)
.map(Object::toString)
.collect(Collectors.joining(separator));
}

public static String addRandomSuffix(String base, String separator, int suffixLength) {
public static String addRandomSuffix(final String base, final String separator, final int suffixLength) {
return base + separator + RandomStringUtils.randomAlphabetic(suffixLength).toLowerCase();
}

public static String safeTrim(final String string) {
return string == null ? null : string.trim();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -241,6 +242,32 @@ void testToPrettyString() {
assertEquals(expectedOutput, Jsons.toPrettyString(jsonNode));
}

@Test
void testGetOptional() {
final JsonNode json = Jsons.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": {}, \"mno\": \"pqr\", \"stu\": null }");

assertEquals(Optional.of(Jsons.jsonNode("ghi")), Jsons.getOptional(json, "abc", "def"));
assertEquals(Optional.of(Jsons.emptyObject()), Jsons.getOptional(json, "jkl"));
assertEquals(Optional.of(Jsons.jsonNode("pqr")), Jsons.getOptional(json, "mno"));
assertEquals(Optional.of(Jsons.jsonNode(null)), Jsons.getOptional(json, "stu"));
assertEquals(Optional.empty(), Jsons.getOptional(json, "xyz"));
assertEquals(Optional.empty(), Jsons.getOptional(json, "abc", "xyz"));
assertEquals(Optional.empty(), Jsons.getOptional(json, "abc", "def", "xyz"));
assertEquals(Optional.empty(), Jsons.getOptional(json, "abc", "jkl", "xyz"));
assertEquals(Optional.empty(), Jsons.getOptional(json, "stu", "xyz"));
}

@Test
void testGetStringOrNull() {
final JsonNode json = Jsons.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": \"mno\", \"pqr\": 1 }");

assertEquals("ghi", Jsons.getStringOrNull(json, "abc", "def"));
assertEquals("mno", Jsons.getStringOrNull(json, "jkl"));
assertEquals("1", Jsons.getStringOrNull(json, "pqr"));
assertNull(Jsons.getStringOrNull(json, "abc", "def", "xyz"));
assertNull(Jsons.getStringOrNull(json, "xyz"));
}

private static class ToClass {

@JsonProperty("str")
Expand All @@ -254,21 +281,21 @@ private static class ToClass {

public ToClass() {}

public ToClass(String str, Integer num, long numLong) {
public ToClass(final String str, final Integer num, final long numLong) {
this.str = str;
this.num = num;
this.numLong = numLong;
}

@Override
public boolean equals(Object o) {
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ToClass toClass = (ToClass) o;
final ToClass toClass = (ToClass) o;
return numLong == toClass.numLong
&& Objects.equals(str, toClass.str)
&& Objects.equals(num, toClass.num);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.3.10
dockerImageTag: 0.3.11
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
- sourceDefinitionId: 9fa5862c-da7c-11eb-8d19-0242ac130003
Expand Down
6 changes: 6 additions & 0 deletions airbyte-integrations/bases/base-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ plugins {

dependencies {
implementation 'commons-cli:commons-cli:1.4'
implementation 'org.apache.sshd:sshd-mina:2.7.0'
// bouncycastle is pinned to version-match the transitive dependency from kubernetes client-java
// because a version conflict causes "parameter object not a ECParameterSpec" on ssh tunnel initiation
implementation 'org.bouncycastle:bcprov-jdk15on:1.66'
implementation 'org.bouncycastle:bcpkix-jdk15on:1.66'
implementation 'org.bouncycastle:bctls-jdk15on:1.66'

implementation project(':airbyte-protocol:models')
implementation project(":airbyte-json-validation")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.base.ssh;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.io.IOException;

public class SshHelpers {

public static ConnectorSpecification getSpecAndInjectSsh() throws IOException {
final ConnectorSpecification originalSpec = Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class);
return injectSshIntoSpec(originalSpec);
}

public static ConnectorSpecification injectSshIntoSpec(final ConnectorSpecification connectorSpecification) throws IOException {
final ConnectorSpecification originalSpec = Jsons.clone(connectorSpecification);
final ObjectNode propNode = (ObjectNode) originalSpec.getConnectionSpecification().get("properties");
propNode.set("tunnel_method", Jsons.deserialize(MoreResources.readResource("ssh-tunnel-spec.json")));
return originalSpec;
}

}
Loading