Skip to content

Commit

Permalink
Adds TLS check to mongodb destination and migrates util constants (#1…
Browse files Browse the repository at this point in the history
…8892)

* Adds TLS check to mongodb destination and migrates util constants

* Migrates MongodbSourceUitls to general purprose Utils file

* Updates expected_spec.json to include SSH tunnel

* Bumps connector version and removes connector from being hidden in UI

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
ryankfu and octavia-squidington-iii committed Nov 8, 2022
1 parent 5c9e5d9 commit 804af5d
Show file tree
Hide file tree
Showing 15 changed files with 249 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@
- name: MongoDB
destinationDefinitionId: 8b746512-8c2e-6ac1-4adc-b59faafd473c
dockerRepository: airbyte/destination-mongodb
dockerImageTag: 0.1.8
dockerImageTag: 0.1.9
documentationUrl: https://docs.airbyte.com/integrations/destinations/mongodb
icon: mongodb.svg
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3338,7 +3338,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-mongodb:0.1.8"
- dockerImage: "airbyte/destination-mongodb:0.1.9"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/mongodb"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@

import static java.util.Arrays.asList;
import static org.bson.BsonType.ARRAY;
import static org.bson.BsonType.DATE_TIME;
import static org.bson.BsonType.DECIMAL128;
import static org.bson.BsonType.DOCUMENT;
import static org.bson.BsonType.DOUBLE;
import static org.bson.BsonType.INT32;
import static org.bson.BsonType.INT64;
import static org.bson.BsonType.OBJECT_ID;
import static org.bson.BsonType.STRING;
import static org.bson.BsonType.TIMESTAMP;
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -20,6 +28,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.TreeNode;
Expand All @@ -28,6 +37,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.bson.BsonBinary;
import org.bson.BsonDateTime;
import org.bson.BsonDocument;
Expand All @@ -53,6 +63,29 @@ public class MongoUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoUtils.class);

// Shared constants
public static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/%s?authSource=admin&ssl=%s";
public static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?retryWrites=true&w=majority&tls=true";
public static final String MONGODB_REPLICA_URL = "mongodb://%s%s/%s?authSource=admin&directConnection=false&ssl=true";
public static final String USER = "user";
public static final String INSTANCE_TYPE = "instance_type";
public static final String INSTANCE = "instance";
public static final String CLUSTER_URL = "cluster_url";
public static final String SERVER_ADDRESSES = "server_addresses";
public static final String REPLICA_SET = "replica_set";

// MongodbDestination specific constants
public static final String AUTH_TYPE = "auth_type";
public static final String AUTHORIZATION = "authorization";
public static final String LOGIN_AND_PASSWORD = "login/password";
public static final String AIRBYTE_DATA_HASH = "_airbyte_data_hash";

// MongodbSource specific constants
public static final String AUTH_SOURCE = "auth_source";
public static final String PRIMARY_KEY = "_id";
public static final Set<BsonType> ALLOWED_CURSOR_TYPES = Set.of(DOUBLE, STRING, DOCUMENT, OBJECT_ID, DATE_TIME,
INT32, TIMESTAMP, INT64, DECIMAL128);

private static final String MISSING_TYPE = "missing";
private static final String NULL_TYPE = "null";
public static final String AIRBYTE_SUFFIX = "_aibyte_transform";
Expand Down Expand Up @@ -136,6 +169,14 @@ private static ObjectNode readDocument(final BsonReader reader, final ObjectNode
return jsonNodes;
}

/**
* Determines whether TLS/SSL should be enabled for a standalone instance of MongoDB.
*/
public static boolean tlsEnabledForStandaloneInstance(final JsonNode config, final JsonNode instanceConfig) {
return config.has(JdbcUtils.TLS_KEY) ? config.get(JdbcUtils.TLS_KEY).asBoolean()
: (instanceConfig.has(JdbcUtils.TLS_KEY) ? instanceConfig.get(JdbcUtils.TLS_KEY).asBoolean() : true);
}

public static void transformToStringIfMarked(final ObjectNode jsonNodes, final List<String> columnNames, final String fieldName) {
if (columnNames.contains(fieldName + AIRBYTE_SUFFIX)) {
final JsonNode data = jsonNodes.get(fieldName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-mongodb-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.name=airbyte/destination-mongodb-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@

package io.airbyte.integrations.destination.mongodb;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.mongodb.MongoUtils;
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.spec_modification.SpecModifyingDestination;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.ConnectorSpecification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -21,6 +26,17 @@ public MongodbDestinationStrictEncrypt() {
super(MongodbDestination.sshWrappedDestination());
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
final JsonNode instanceConfig = config.get(MongoUtils.INSTANCE_TYPE);
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoUtils.INSTANCE).asText());
// If the MongoDb destination connector is not set up to use a TLS connection, then check should fail
if (instance.equals(MongoInstanceType.STANDALONE) && !MongoUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)) {
throw new ConfigErrorException("TLS connection must be used to read from MongoDB.");
}
return super.check(config);
}

@Override
public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) throws Exception {
final ConnectorSpecification spec = Jsons.clone(originalSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@
package io.airbyte.integrations.destination.mongodb;

import static com.mongodb.client.model.Projections.excludeId;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.mongodb.client.MongoCursor;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.mongodb.MongoDatabase;
import io.airbyte.db.mongodb.MongoUtils;
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.io.IOException;
Expand All @@ -21,6 +27,7 @@
import java.util.List;
import org.bson.Document;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class MongodbDestinationStrictEncryptAcceptanceTest extends DestinationAcceptanceTest {

Expand Down Expand Up @@ -102,9 +109,27 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
return result;
}

@Test
void testCheck() throws Exception {
final JsonNode instanceConfig = Jsons.jsonNode(ImmutableMap.builder()
.put("instance", MongoInstanceType.STANDALONE.getType())
.put("tls", false)
.build());

final JsonNode invalidStandaloneConfig = getConfig();

((ObjectNode) invalidStandaloneConfig).put(MongoUtils.INSTANCE_TYPE, instanceConfig);

final Throwable throwable = catchThrowable(() -> new MongodbDestinationStrictEncrypt().check(invalidStandaloneConfig));
assertThat(throwable).isInstanceOf(ConfigErrorException.class);
assertThat(((ConfigErrorException) throwable)
.getDisplayMessage()
.contains("TLS connection must be used to read from MongoDB."));
}

@Override
protected void setup(final TestDestinationEnv testEnv) {
var credentials = String.format("%s:%s@", config.get(AUTH_TYPE).get(JdbcUtils.USERNAME_KEY).asText(),
final var credentials = String.format("%s:%s@", config.get(AUTH_TYPE).get(JdbcUtils.USERNAME_KEY).asText(),
config.get(AUTH_TYPE).get(JdbcUtils.PASSWORD_KEY).asText());
final String connectionString = String.format("mongodb+srv://%s%s/%s?retryWrites=true&w=majority&tls=true",
credentials,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,120 @@
}
}
]
},
"tunnel_method": {
"type": "object",
"title": "SSH Tunnel Method",
"description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.",
"oneOf": [
{
"title": "No Tunnel",
"required": ["tunnel_method"],
"properties": {
"tunnel_method": {
"description": "No ssh tunnel needed to connect to database",
"type": "string",
"const": "NO_TUNNEL",
"order": 0
}
}
},
{
"title": "SSH Key Authentication",
"required": [
"tunnel_method",
"tunnel_host",
"tunnel_port",
"tunnel_user",
"ssh_key"
],
"properties": {
"tunnel_method": {
"description": "Connect through a jump server tunnel host using username and ssh key",
"type": "string",
"const": "SSH_KEY_AUTH",
"order": 0
},
"tunnel_host": {
"title": "SSH Tunnel Jump Server Host",
"description": "Hostname of the jump server host that allows inbound ssh tunnel.",
"type": "string",
"order": 1
},
"tunnel_port": {
"title": "SSH Connection Port",
"description": "Port on the proxy/jump server that accepts inbound ssh connections.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 22,
"examples": ["22"],
"order": 2
},
"tunnel_user": {
"title": "SSH Login Username",
"description": "OS-level username for logging into the jump server host.",
"type": "string",
"order": 3
},
"ssh_key": {
"title": "SSH Private Key",
"description": "OS-level user account ssh key credentials in RSA PEM format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )",
"type": "string",
"airbyte_secret": true,
"multiline": true,
"order": 4
}
}
},
{
"title": "Password Authentication",
"required": [
"tunnel_method",
"tunnel_host",
"tunnel_port",
"tunnel_user",
"tunnel_user_password"
],
"properties": {
"tunnel_method": {
"description": "Connect through a jump server tunnel host using username and password authentication",
"type": "string",
"const": "SSH_PASSWORD_AUTH",
"order": 0
},
"tunnel_host": {
"title": "SSH Tunnel Jump Server Host",
"description": "Hostname of the jump server host that allows inbound ssh tunnel.",
"type": "string",
"order": 1
},
"tunnel_port": {
"title": "SSH Connection Port",
"description": "Port on the proxy/jump server that accepts inbound ssh connections.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 22,
"examples": ["22"],
"order": 2
},
"tunnel_user": {
"title": "SSH Login Username",
"description": "OS-level username for logging into the jump server host",
"type": "string",
"order": 3
},
"tunnel_user_password": {
"title": "Password",
"description": "OS-level password for logging into the jump server host",
"type": "string",
"airbyte_secret": true,
"order": 4
}
}
}
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-mongodb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.name=airbyte/destination-mongodb
Loading

0 comments on commit 804af5d

Please sign in to comment.