Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
jbfbell committed Mar 14, 2024
1 parent a895b20 commit 9d4d80d
Show file tree
Hide file tree
Showing 17 changed files with 349 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.2.0'
features = ['db-destinations']
useLocalCdk = false
cdkVersionRequired = '0.24.1'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = true
}

//remove once upgrading the CDK version to 0.4.x or later
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
junitMethodExecutionTimeout = 30 m
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,23 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 3986776d-2319-4de9-8af8-db14c0996e72
dockerImageTag: 0.2.0
dockerImageTag: 1.0.0
dockerRepository: airbyte/destination-oracle-strict-encrypt
githubIssueLabel: destination-oracle
icon: oracle.svg
license: ELv2
name: Oracle
normalizationConfig:
normalizationIntegrationType: oracle
normalizationRepository: airbyte/normalization-oracle
normalizationTag: 0.4.1
releaseStage: alpha
releases:
breakingChanges:
1.0.0:
upgradeDeadline: "2024-03-15"
message: >
This version removes the option to use "normalization" with Oracle. It also changes
the schema and database of Airbyte's "raw" tables to be compatible with the new
[Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2)
format. These changes will likely require updates to downstream dbt / SQL models.
Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync.
documentationUrl: https://docs.airbyte.com/integrations/destinations/oracle
supportsDbt: true
tags:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.destination.oracle.OracleDestination;
import io.airbyte.integrations.destination.oracle.OracleNameTransformer;
import java.sql.SQLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -73,7 +73,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
.map(r -> Jsons.deserialize(
r.get(OracleDestination.COLUMN_NAME_DATA.replace("\"", "")).asText()))
r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -113,16 +113,15 @@ protected List<String> resolveIdentifier(final String identifier) {

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName)
throws SQLException {
final String query = String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, OracleDestination.COLUMN_NAME_EMITTED_AT);

try (final DSLContext dslContext = getDslContext(config)) {
final List<org.jooq.Record> result = getDatabase(dslContext).query(ctx -> ctx.fetch(query).stream().toList());
return result
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.collect(Collectors.toList());
}
final String query =
String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT.toUpperCase());
final DSLContext dslContext = getDslContext(config);
final List<org.jooq.Record> result = getDatabase(dslContext).query(ctx -> ctx.fetch(query).stream().toList());
return result
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.collect(Collectors.toList());
}

private static Database getDatabase(final DSLContext dslContext) {
Expand Down Expand Up @@ -151,15 +150,13 @@ protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TES
db.start();

config = getConfig(db);
final DSLContext dslContext = getDslContext(config);
final Database database = getDatabase(dslContext);
database.query(
ctx -> ctx.fetch(String.format("CREATE USER %s IDENTIFIED BY %s", schemaName, schemaName)));
database.query(ctx -> ctx.fetch(String.format("GRANT ALL PRIVILEGES TO %s", schemaName)));

try (final DSLContext dslContext = getDslContext(config)) {
final Database database = getDatabase(dslContext);
database.query(
ctx -> ctx.fetch(String.format("CREATE USER %s IDENTIFIED BY %s", schemaName, schemaName)));
database.query(ctx -> ctx.fetch(String.format("GRANT ALL PRIVILEGES TO %s", schemaName)));

((ObjectNode) config).put(JdbcUtils.SCHEMA_KEY, dbName);
}
((ObjectNode) config).put(JdbcUtils.SCHEMA_KEY, dbName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,126 @@
"examples": ["airbyte"],
"default": "airbyte",
"order": 6
},
"raw_data_schema": {
"type": "string",
"description": "The schema to write raw tables into (default: airbyte_internal)",
"title": "Raw Table Schema Name",
"order": 7
},
"tunnel_method": {
"type": "object",
"title": "SSH Tunnel Method",
"description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.",
"oneOf": [
{
"title": "No Tunnel",
"required": ["tunnel_method"],
"properties": {
"tunnel_method": {
"description": "No ssh tunnel needed to connect to database",
"type": "string",
"const": "NO_TUNNEL",
"order": 0
}
}
},
{
"title": "SSH Key Authentication",
"required": [
"tunnel_method",
"tunnel_host",
"tunnel_port",
"tunnel_user",
"ssh_key"
],
"properties": {
"tunnel_method": {
"description": "Connect through a jump server tunnel host using username and ssh key",
"type": "string",
"const": "SSH_KEY_AUTH",
"order": 0
},
"tunnel_host": {
"title": "SSH Tunnel Jump Server Host",
"description": "Hostname of the jump server host that allows inbound ssh tunnel.",
"type": "string",
"order": 1
},
"tunnel_port": {
"title": "SSH Connection Port",
"description": "Port on the proxy/jump server that accepts inbound ssh connections.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 22,
"examples": ["22"],
"order": 2
},
"tunnel_user": {
"title": "SSH Login Username",
"description": "OS-level username for logging into the jump server host.",
"type": "string",
"order": 3
},
"ssh_key": {
"title": "SSH Private Key",
"description": "OS-level user account ssh key credentials in RSA PEM format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )",
"type": "string",
"airbyte_secret": true,
"multiline": true,
"order": 4
}
}
},
{
"title": "Password Authentication",
"required": [
"tunnel_method",
"tunnel_host",
"tunnel_port",
"tunnel_user",
"tunnel_user_password"
],
"properties": {
"tunnel_method": {
"description": "Connect through a jump server tunnel host using username and password authentication",
"type": "string",
"const": "SSH_PASSWORD_AUTH",
"order": 0
},
"tunnel_host": {
"title": "SSH Tunnel Jump Server Host",
"description": "Hostname of the jump server host that allows inbound ssh tunnel.",
"type": "string",
"order": 1
},
"tunnel_port": {
"title": "SSH Connection Port",
"description": "Port on the proxy/jump server that accepts inbound ssh connections.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 22,
"examples": ["22"],
"order": 2
},
"tunnel_user": {
"title": "SSH Login Username",
"description": "OS-level username for logging into the jump server host",
"type": "string",
"order": 3
},
"tunnel_user_password": {
"title": "Password",
"description": "OS-level password for logging into the jump server host",
"type": "string",
"airbyte_secret": true,
"order": 4
}
}
}
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.2.0'
features = ['db-destinations']
useLocalCdk = false
cdkVersionRequired = '0.24.1'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = true
}

//remove once upgrading the CDK version to 0.4.x or later
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
junitMethodExecutionTimeout = 30 m
16 changes: 11 additions & 5 deletions airbyte-integrations/connectors/destination-oracle/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 3986776d-2319-4de9-8af8-db14c0996e72
dockerImageTag: 0.2.0
dockerImageTag: 1.0.0
dockerRepository: airbyte/destination-oracle
githubIssueLabel: destination-oracle
icon: oracle.svg
license: ELv2
name: Oracle
normalizationConfig:
normalizationIntegrationType: oracle
normalizationRepository: airbyte/normalization-oracle
normalizationTag: 0.4.3
registries:
cloud:
dockerRepository: airbyte/destination-oracle-strict-encrypt
Expand All @@ -21,6 +17,16 @@ data:
releaseStage: alpha
documentationUrl: https://docs.airbyte.com/integrations/destinations/oracle
supportsDbt: true
releases:
breakingChanges:
1.0.0:
upgradeDeadline: "2024-03-15"
message: >
This version removes the option to use "normalization" with Oracle. It also changes
the schema and database of Airbyte's "raw" tables to be compatible with the new
[Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2)
format. These changes will likely require updates to downstream dbt / SQL models.
Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync.
tags:
- language:java
ab_internal:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,30 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.NoOpJdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.RawOnlySqlGenerator;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,13 +39,6 @@ public class OracleDestination extends AbstractJdbcDestination implements Destin
private static final Logger LOGGER = LoggerFactory.getLogger(OracleDestination.class);
public static final String DRIVER_CLASS = DatabaseDriver.ORACLE.getDriverClassName();

public static final String COLUMN_NAME_AB_ID =
"\"" + JavaBaseConstants.COLUMN_NAME_AB_ID.toUpperCase() + "\"";
public static final String COLUMN_NAME_DATA =
"\"" + JavaBaseConstants.COLUMN_NAME_DATA.toUpperCase() + "\"";
public static final String COLUMN_NAME_EMITTED_AT =
"\"" + JavaBaseConstants.COLUMN_NAME_EMITTED_AT.toUpperCase() + "\"";

protected static final String KEY_STORE_FILE_PATH = "clientkeystore.jks";
private static final String KEY_STORE_PASS = RandomStringUtils.randomAlphanumeric(8);
public static final String ENCRYPTION_METHOD_KEY = "encryption_method";
Expand Down Expand Up @@ -134,6 +137,36 @@ private static void tryConvertAndImportCertificate(final String certificate) {
}
}

@Override
public boolean isV2Destination() {
return true;
}

@Override
protected boolean shouldAlwaysDisableTypeDedupe() {
return true;
}

@Override
protected JdbcSqlGenerator getSqlGenerator() {
return new RawOnlySqlGenerator(new OracleNameTransformer());
}

@Override
protected JdbcDestinationHandler<? extends MinimumDestinationState> getDestinationHandler(final String databaseName,
final JdbcDatabase database,
final String rawTableSchema) {
return new NoOpJdbcDestinationHandler<>(databaseName, database, rawTableSchema, SQLDialect.DEFAULT);
}

@Override
protected List<Migration> getMigrations(final JdbcDatabase database,
final String databaseName,
final SqlGenerator sqlGenerator,
final DestinationHandler destinationHandler) {
return List.of();
}

private static void convertAndImportCertificate(final String certificate)
throws IOException, InterruptedException {
final Runtime run = Runtime.getRuntime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public String applyDefaultCase(final String input) {
}

@Override
@Deprecated
public String getRawTableName(final String streamName) {
return convertStreamName("airbyte_raw_" + streamName);
}
Expand Down
Loading

0 comments on commit 9d4d80d

Please sign in to comment.