Skip to content

Commit

Permalink
Oracle destination implementation (#3498)
Browse files Browse the repository at this point in the history
Working implementation of Oracle destination

Co-authored-by: cgardens <giardina.charles@gmail.com>
  • Loading branch information
masonwheeler and cgardens committed Jun 3, 2021
1 parent a262f46 commit 8dadd1c
Show file tree
Hide file tree
Showing 19 changed files with 827 additions and 11 deletions.
4 changes: 4 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/Databases.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public static Database createSqlServerDatabase(String username, String password,
return createDatabase(username, password, jdbcConnectionString, "com.microsoft.sqlserver.jdbc.SQLServerDriver", SQLDialect.DEFAULT);
}

public static Database createOracleDatabase(String username, String password, String jdbcConnectionString) {
return createDatabase(username, password, jdbcConnectionString, "oracle.jdbc.OracleDriver", SQLDialect.DEFAULT);
}

public static Database createDatabase(final String username,
final String password,
final String jdbcConnectionString,
Expand Down
15 changes: 15 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,19 @@ <T> Stream<T> query(CheckedFunction<Connection, PreparedStatement, SQLException>
CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException;

default int queryInt(String sql, String... params) throws SQLException {
try (Stream<Integer> q = query(c -> {
PreparedStatement statement = c.prepareStatement(sql);
int i = 1;
for (String param : params) {
statement.setString(i, param);
++i;
}
return statement;
},
rs -> rs.getInt(1))) {
return q.findFirst().get();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public String getRawTableName(String streamName) {

@Override
public String getTmpTableName(String streamName) {
return convertStreamName("_airbyte_" + Instant.now().toEpochMilli() + "_" + streamName);
return convertStreamName("_airbyte_" + Instant.now().toEpochMilli() + "_" + getRawTableName(streamName));
}

protected String convertStreamName(String input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ plugins {
}
dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-workers')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,10 @@ private StandardCheckConnectionOutput runCheck(JsonNode config) throws WorkerExc
.run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot);
}

protected AirbyteDestination getDestination() {
return new DefaultAirbyteDestination(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory));
}

private void runSyncAndVerifyStateOutput(JsonNode config, List<AirbyteMessage> messages, ConfiguredAirbyteCatalog catalog) throws Exception {
final List<AirbyteMessage> destinationOutput = runSync(config, messages, catalog);
final AirbyteMessage expectedStateMessage = MoreLists.reversed(messages)
Expand Down Expand Up @@ -852,8 +856,7 @@ private List<AirbyteMessage> runSync(JsonNode config, List<AirbyteMessage> messa
.withCatalog(catalog)
.withDestinationConnectionConfiguration(config);

final AirbyteDestination destination =
new DefaultAirbyteDestination(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory));
final AirbyteDestination destination = getDestination();

destination.start(targetConfig, jobRoot);
messages.forEach(message -> Exceptions.toRuntime(() -> destination.accept(message)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.standardtest.destination;

import io.airbyte.config.StandardTargetConfig;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.protocols.airbyte.AirbyteDestination;
import java.nio.file.Path;
import java.util.Optional;

// Simple class to host a Destination in-memory rather than spinning up a container for it.
// For debugging and testing purposes only; not recommended to use this for real code
public class LocalAirbyteDestination implements AirbyteDestination {

private Destination dest;
private AirbyteMessageConsumer consumer;
private boolean isClosed = false;

public LocalAirbyteDestination(Destination dest) {
this.dest = dest;
}

@Override
public void start(StandardTargetConfig targetConfig, Path jobRoot) throws Exception {
consumer =
dest.getConsumer(targetConfig.getDestinationConnectionConfiguration(), targetConfig.getCatalog(), Destination::defaultOutputRecordCollector);
consumer.start();
}

@Override
public void accept(AirbyteMessage message) throws Exception {
consumer.accept(message);
}

@Override
public void notifyEndOfStream() {
// nothing to do here
}

@Override
public void close() throws Exception {
consumer.close();
isClosed = true;
}

@Override
public void cancel() {
// nothing to do here
}

@Override
public boolean isFinished() {
return isClosed;
}

@Override
public Optional<AirbyteMessage> attemptRead() {
return Optional.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static void attemptSQLCreateAndDropTableOperations(String outputSchema,

// verify we have write permissions on the target schema by creating a table with a random name,
// then dropping that table
String outputTableName = "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "");
String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""));
sqlOps.createSchemaIfNotExists(database, outputSchema);
sqlOps.createTableIfNotExists(database, outputSchema, outputTableName);
sqlOps.dropTableIfExists(database, outputSchema, outputTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.text.Names;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
Expand Down Expand Up @@ -109,8 +108,8 @@ private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(
final String outputSchema = getOutputSchema(abStream, defaultSchemaName);

final String streamName = abStream.getName();
final String tableName = Names.concatQuotedNames("_airbyte_raw_", namingResolver.getIdentifier(streamName));
String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName);
final String tableName = namingResolver.getRawTableName(streamName);
String tmpTableName = namingResolver.getTmpTableName(streamName);

// TODO (#2948): Refactor into StandardNameTransformed , this is for MySQL destination, the table
// names can't have more than 64 characters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,37 @@ public static void insertRawRecordsInSingleQuery(String insertQueryComponent,
JdbcDatabase jdbcDatabase,
List<AirbyteRecordMessage> records)
throws SQLException {
insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, jdbcDatabase, records, UUID::randomUUID);
insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, jdbcDatabase, records, UUID::randomUUID, true);
}

/**
* Inserts "raw" records in a single query. The purpose of helper to abstract away database-specific
* SQL syntax from this query.
*
* This version does not add a semicolon at the end of the INSERT statement.
*
* @param insertQueryComponent the first line of the query e.g. INSERT INTO public.users (ab_id,
* data, emitted_at)
* @param recordQueryComponent query template for a full record e.g. (?, ?::jsonb ?)
* @param jdbcDatabase jdbc database
* @param records records to write
* @throws SQLException exception
*/
public static void insertRawRecordsInSingleQueryNoSem(String insertQueryComponent,
String recordQueryComponent,
JdbcDatabase jdbcDatabase,
List<AirbyteRecordMessage> records)
throws SQLException {
insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, jdbcDatabase, records, UUID::randomUUID, false);
}

@VisibleForTesting
static void insertRawRecordsInSingleQuery(String insertQueryComponent,
String recordQueryComponent,
JdbcDatabase jdbcDatabase,
List<AirbyteRecordMessage> records,
Supplier<UUID> uuidSupplier)
Supplier<UUID> uuidSupplier,
boolean sem)
throws SQLException {
if (records.isEmpty()) {
return;
Expand All @@ -80,7 +101,7 @@ static void insertRawRecordsInSingleQuery(String insertQueryComponent,
final StringBuilder sql = new StringBuilder(insertQueryComponent);
records.forEach(r -> sql.append(recordQueryComponent));
final String s = sql.toString();
final String s1 = s.substring(0, s.length() - 2) + ";";
final String s1 = s.substring(0, s.length() - 2) + (sem ? ";" : "");

try (final PreparedStatement statement = connection.prepareStatement(s1)) {
// second loop: bind values to the SQL string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void testInsertRawRecordsInSingleQuery() throws SQLException {
.withEmittedAt(NOW.toEpochMilli())
.withData(Jsons.jsonNode(ImmutableMap.of("name", "mississippi", "width", 20))));

SqlOperationsUtils.insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, database, records, uuidSupplier);
SqlOperationsUtils.insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, database, records, uuidSupplier, true);

final List<JsonNode> actualRecords = database.bufferedResultSetQuery(
connection -> connection.createStatement().executeQuery("SELECT * FROM RIVERS"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
12 changes: 12 additions & 0 deletions airbyte-integrations/connectors/destination-oracle/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte

ENV APPLICATION destination-oracle

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-oracle
31 changes: 31 additions & 0 deletions airbyte-integrations/connectors/destination-oracle/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.destination.oracle.OracleDestination'
}

dependencies {

// required so that log4j uses a standard xml parser instead of an oracle one (that gets pulled in by the oracle driver)
implementation group: 'xerces', name: 'xercesImpl', version: '2.12.1'

implementation project(':airbyte-db')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:connectors:destination-jdbc')

implementation "com.oracle.database.jdbc:ojdbc8-production:19.7.0.0"

testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.testcontainers:oracle-xe:1.15.2'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-oracle')

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.oracle;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OracleDestination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(OracleDestination.class);

public static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";

public static final String COLUMN_NAME_AB_ID = JavaBaseConstants.COLUMN_NAME_AB_ID.substring(1).toUpperCase();
public static final String COLUMN_NAME_DATA = JavaBaseConstants.COLUMN_NAME_DATA.substring(1).toUpperCase();
public static final String COLUMN_NAME_EMITTED_AT = JavaBaseConstants.COLUMN_NAME_EMITTED_AT.substring(1).toUpperCase();

public OracleDestination() {
super(DRIVER_CLASS, new OracleNameTransformer(), new OracleOperations("users"));
System.setProperty("oracle.jdbc.timezoneAsRegion", "false");
}

@Override
public JsonNode toJdbcConfig(JsonNode config) {
final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put("username", config.get("username").asText())
.put("jdbc_url", String.format("jdbc:oracle:thin:@//%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("sid").asText()));

if (config.has("password")) {
configBuilder.put("password", config.get("password").asText());
}

return Jsons.jsonNode(configBuilder.build());
}

public static void main(String[] args) throws Exception {
final Destination destination = new OracleDestination();
LOGGER.info("starting destination: {}", OracleDestination.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", OracleDestination.class);
}

}

0 comments on commit 8dadd1c

Please sign in to comment.