Skip to content

Commit

Permalink
first pass
Browse files Browse the repository at this point in the history
  • Loading branch information
cynthiaxyin committed Apr 29, 2023
1 parent 5751fb9 commit 072b38c
Show file tree
Hide file tree
Showing 6 changed files with 4 additions and 470 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outpu
outputRecordCollector,
onStartFunction(database, sqlOperations, writeConfigs),
new InMemoryRecordBufferingStrategy(recordWriterFunction(database, sqlOperations, writeConfigs, catalog), DEFAULT_MAX_BATCH_SIZE_BYTES),
onCloseFunction(database, sqlOperations, writeConfigs),
onCloseFunction(),
catalog,
sqlOperations::isValidData);
}
Expand Down Expand Up @@ -189,21 +189,12 @@ private static RecordWriter<AirbyteRecordMessage> recordWriterFunction(final Jdb
}

/**
* Closes connection to JDBC database and other tear down functionality
* Tear down functionality
*
* @param database JDBC database to connect to
* @param sqlOperations interface used to execute SQL queries
* @param writeConfigs settings for each stream
* @return
*/
private static OnCloseFunction onCloseFunction(final JdbcDatabase database,
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs) {
return (hasFailed) -> {
if (!hasFailed) {
sqlOperations.onDestinationCloseOperations(database, writeConfigs);
}
};
private static OnCloseFunction onCloseFunction() {
return (hasFailed) -> {};
}

private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteConfig config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,21 +137,4 @@ default boolean isSchemaExists(final JdbcDatabase database, final String schemaN
*/
boolean isSchemaRequired();

/**
* The method is responsible for executing some specific DB Engine logic in onClose method. We can
* override this method to execute specific logic e.g. to handle any necessary migrations in the
* destination, etc.
* <p>
* In next example you can see how migration from VARCHAR to SUPER column is handled for the
* Redshift destination:
*
* @param database - Database that the connector is interacting with
* @param writeConfigs - schemas and tables (streams) will be discovered
* @see io.airbyte.integrations.destination.redshift.RedshiftSqlOperations#onDestinationCloseOperations
*/
default void onDestinationCloseOperations(final JdbcDatabase database, final List<WriteConfig> writeConfigs) {
// do nothing
LOGGER.info("No onDestinationCloseOperations required for this destination.");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,6 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database,
final List<WriteConfig> writeConfigs,
final boolean purgeStagingData) {
return (hasFailed) -> {
if (!hasFailed) {
stagingOperations.onDestinationCloseOperations(database, writeConfigs);
LOGGER.info("Finalizing tables in destination completed.");
}
// After moving data from staging area to the target table (airybte_raw) clean up the staging
// area (if user configured)
LOGGER.info("Cleaning up destination started for {} streams", writeConfigs.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,38 +33,6 @@ public class RedshiftSqlOperations extends JdbcSqlOperations {
public static final int REDSHIFT_VARCHAR_MAX_BYTE_SIZE = 65535;
public static final int REDSHIFT_SUPER_MAX_BYTE_SIZE = 1000000;

private static final String SELECT_ALL_TABLES_WITH_NOT_SUPER_TYPE_SQL_STATEMENT =
"""
select tablename, schemaname
from pg_table_def
where tablename in (
select tablename as tablename
from pg_table_def
where schemaname = '%1$s'
and tablename in ('%5$s')
and tablename like '%%airbyte_raw%%'
and tablename not in (select table_name
from information_schema.views
where table_schema in ('%1$s'))
and "column" in ('%2$s', '%3$s', '%4$s')
group by tablename
having count(*) = 3)
and schemaname = '%1$s'
and type <> 'super'
and "column" = '_airbyte_data' """;

private static final String ALTER_TMP_TABLES_WITH_NOT_SUPER_TYPE_TO_SUPER_TYPE =
"""
ALTER TABLE %1$s ADD COLUMN %2$s_super super;
ALTER TABLE %1$s ADD COLUMN %3$s_reserve TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP;
UPDATE %1$s SET %2$s_super = JSON_PARSE(%2$s);
UPDATE %1$s SET %3$s_reserve = %3$s;
ALTER TABLE %1$s DROP COLUMN %2$s CASCADE;
ALTER TABLE %1$s DROP COLUMN %3$s CASCADE;
ALTER TABLE %1$s RENAME %2$s_super to %2$s;
ALTER TABLE %1$s RENAME %3$s_reserve to %3$s;
""";

public RedshiftSqlOperations() {}

@Override
Expand Down Expand Up @@ -126,115 +94,4 @@ public boolean isValidData(final JsonNode data) {
return isValid;
}

/**
* In case of redshift we need to discover all tables with not super type and update them after to
* SUPER type. This would be done once.
*
* @param database - Database object for interacting with a JDBC connection.
* @param writeConfigs - list of write configs.
*/

@Override
public void onDestinationCloseOperations(final JdbcDatabase database, final List<WriteConfig> writeConfigs) {
LOGGER.info("Executing operations for Redshift Destination DB engine...");
if (writeConfigs.isEmpty()) {
LOGGER.warn("Write config list is EMPTY.");
return;
}
final Map<String, List<String>> schemaTableMap = getTheSchemaAndRelatedStreamsMap(writeConfigs);
final List<String> schemaAndTableWithNotSuperType = schemaTableMap
.entrySet()
.stream()
// String.join() we use to concat tables from list, in query, as follows: SELECT * FROM some_table
// WHERE smt_column IN ('test1', 'test2', etc)
.map(e -> discoverNotSuperTables(database, e.getKey(), join("', '", e.getValue())))
.flatMap(Collection::stream)
.collect(Collectors.toList());

if (!schemaAndTableWithNotSuperType.isEmpty()) {
updateVarcharDataColumnToSuperDataColumn(database, schemaAndTableWithNotSuperType);
}
LOGGER.info("Executing operations for Redshift Destination DB engine completed.");
}

/**
* The method is responsible for building the map which consists from: Keys - Schema names, Values -
* List of related tables (Streams)
*
* @param writeConfigs - write configs from which schema-related tables map will be built
* @return map with Schemas as Keys and with Tables (Streams) as values
*/
private Map<String, List<String>> getTheSchemaAndRelatedStreamsMap(final List<WriteConfig> writeConfigs) {
final Map<String, List<String>> schemaTableMap = new HashMap<>();
for (final WriteConfig writeConfig : writeConfigs) {
if (schemaTableMap.containsKey(writeConfig.getOutputSchemaName())) {
schemaTableMap.get(writeConfig.getOutputSchemaName()).add(writeConfig.getOutputTableName());
} else {
schemaTableMap.put(writeConfig.getOutputSchemaName(), new ArrayList<>(Collections.singletonList(writeConfig.getOutputTableName())));
}
}
return schemaTableMap;
}

/**
* @param database - Database object for interacting with a JDBC connection.
* @param schemaName - schema to update.
* @param tableName - tables to update.
*/
private List<String> discoverNotSuperTables(final JdbcDatabase database, final String schemaName, final String tableName) {

final List<String> schemaAndTableWithNotSuperType = new ArrayList<>();

try {
LOGGER.info("Discovering NOT SUPER table types...");
database.execute(String.format("set search_path to %s", schemaName));
final List<JsonNode> tablesNameWithoutSuperDatatype = database.bufferedResultSetQuery(
connection -> connection.createStatement().executeQuery(String.format(SELECT_ALL_TABLES_WITH_NOT_SUPER_TYPE_SQL_STATEMENT,
schemaName,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT,
JavaBaseConstants.COLUMN_NAME_AB_ID,
tableName)),
getDefaultSourceOperations()::rowToJson);
if (tablesNameWithoutSuperDatatype.isEmpty()) {
return Collections.emptyList();
} else {
tablesNameWithoutSuperDatatype
.forEach(e -> schemaAndTableWithNotSuperType.add(e.get("schemaname").textValue() + "." + e.get("tablename").textValue()));
return schemaAndTableWithNotSuperType;
}
} catch (final SQLException e) {
LOGGER.error("Error during discoverNotSuperTables() appears: ", e);
throw new RuntimeException(e);
}
}

/**
* We prepare one query for all tables with not super type for updating.
*
* @param database - Database object for interacting with a JDBC connection.
* @param schemaAndTableWithNotSuperType - list of tables with not super type.
*/
private void updateVarcharDataColumnToSuperDataColumn(final JdbcDatabase database, final List<String> schemaAndTableWithNotSuperType) {
LOGGER.info("Updating VARCHAR data column to SUPER...");
final StringBuilder finalSqlStatement = new StringBuilder();
// To keep the previous data, we need to add next columns: _airbyte_data, _airbyte_emitted_at
// We do such workflow because we can't directly CAST VARCHAR to SUPER column. _airbyte_emitted_at
// column recreated to keep
// the COLUMN order. This order is required to INSERT the values in correct way.
schemaAndTableWithNotSuperType.forEach(schemaAndTable -> {
LOGGER.info("Altering table {} column _airbyte_data to SUPER.", schemaAndTable);
finalSqlStatement.append(String.format(ALTER_TMP_TABLES_WITH_NOT_SUPER_TYPE_TO_SUPER_TYPE,
schemaAndTable,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT));
});
try {
database.execute(finalSqlStatement.toString());
} catch (final SQLException e) {
LOGGER.error("Error during updateVarcharDataColumnToSuperDataColumn() appears: ", e);
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,167 +4,20 @@

package io.airbyte.integrations.destination.redshift;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Instant;
import java.util.List;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.Record;
import org.jooq.Result;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/**
* Integration test testing the {@link RedshiftInsertDestination}. As the Redshift test credentials
* contain S3 credentials by default, we remove these credentials.
*/
public class RedshiftInsertDestinationAcceptanceTest extends RedshiftStagingS3DestinationAcceptanceTest {

public static final String DATASET_ID = Strings.addRandomSuffix("airbyte_tests", "_", 8);
private static final String TYPE = "type";
private ConfiguredAirbyteCatalog catalog;

private static final Instant NOW = Instant.now();
private static final String USERS_STREAM_NAME = "users_" + RandomStringUtils.randomAlphabetic(5);
private static final String BOOKS_STREAM_NAME = "books_" + RandomStringUtils.randomAlphabetic(5);

private static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "john").put("id", "10").build()))
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_USERS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "susan").put("id", "30").build()))
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage USER_IN_THE_DB = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "Alex").put("id", "1").build()))
.withEmittedAt(NOW.toEpochMilli()));

private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE)
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build())));

public JsonNode getStaticConfig() throws IOException {
return Jsons.deserialize(Files.readString(Path.of("secrets/config.json")));
}

void setup() {
MESSAGE_USERS1.getRecord().setNamespace(DATASET_ID);
MESSAGE_USERS2.getRecord().setNamespace(DATASET_ID);
catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, DATASET_ID,
io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING),
io.airbyte.protocol.models.Field.of("id", JsonSchemaType.STRING))
.withDestinationSyncMode(DestinationSyncMode.APPEND)));
}

@Override
protected void tearDown(final TestDestinationEnv testEnv) throws Exception {
getDatabase().query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", DATASET_ID)));
super.tearDown(testEnv);
}

@Disabled // temporary resolution before issue #25519
@Test
void testIfSuperTmpTableWasCreatedAfterVarcharTmpTable() throws Exception {
setup();
final Database database = getDatabase();
final String usersStream = getNamingResolver().getRawTableName(USERS_STREAM_NAME);
final String booksStream = getNamingResolver().getRawTableName(BOOKS_STREAM_NAME);
createTmpTableWithVarchar(database, usersStream);
createTmpTableWithVarchar(database, booksStream);

assertTrue(isTmpTableDataColumnInExpectedType(database, DATASET_ID, usersStream, "character varying"));

final Destination destination = new RedshiftDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);
consumer.start();
consumer.accept(MESSAGE_USERS1);
consumer.accept(MESSAGE_USERS2);
consumer.accept(MESSAGE_STATE);
consumer.close();

assertTrue(isTmpTableDataColumnInExpectedType(database, DATASET_ID, usersStream, "super"));
assertTrue(isTmpTableDataColumnInExpectedType(database, DATASET_ID, booksStream, "character varying"));

final List<JsonNode> usersActual = retrieveRecords(testDestinationEnv, USERS_STREAM_NAME, DATASET_ID, config);
final List<JsonNode> expectedUsersJson = Lists.newArrayList(
MESSAGE_USERS1.getRecord().getData(),
MESSAGE_USERS2.getRecord().getData(),
USER_IN_THE_DB.getRecord().getData());
assertEquals(expectedUsersJson.size(), usersActual.size());
assertTrue(expectedUsersJson.containsAll(usersActual) && usersActual.containsAll(expectedUsersJson));
}

private void createTmpTableWithVarchar(final Database database, final String streamName) throws SQLException {
// As we don't care about the previous data we just simulate the flow when previous table exists.
database.query(q -> {
q.fetch(String.format("CREATE SCHEMA IF NOT EXISTS %s", DATASET_ID));
q.fetch(String.format(
"CREATE TABLE IF NOT EXISTS %s.%s (%s VARCHAR PRIMARY KEY, %s VARCHAR, %s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)",
DATASET_ID,
streamName,
JavaBaseConstants.COLUMN_NAME_AB_ID,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT));
// Simulate existing record
q.fetch(String.format("""
insert into %s.%s (_airbyte_ab_id, _airbyte_data, _airbyte_emitted_at) values
('9', '{\"id\":\"1\",\"name\":\"Alex\"}', '2022-02-09 12:02:13.322000 +00:00')""",
DATASET_ID,
streamName));
return null;
});
}

/**
* @param database - current database properties
* @param dataSet - current catalog
* @param streamName - table name
* @param expectedType - data type of _airbyte_data to expect
* @return if current datatype of _airbyte_data column is expectedType.
*
* PG_TABLE_DEF table Stores information about table columns. PG_TABLE_DEF only returns
* information about tables that are visible to the user.
*
* <a href=
* "https://docs.aws.amazon.com/redshift/latest/dg/r_PG_TABLE_DEF.html">PG_TABLE_DEF</a>
*
* @throws SQLException
*/
private boolean isTmpTableDataColumnInExpectedType(final Database database,
final String dataSet,
final String streamName,
final String expectedType)
throws SQLException {
Result<Record> query = database.query(q -> {
return q.fetch(String.format("""
set search_path to %s;
select type from pg_table_def where tablename = \'%s\' and "column" = \'%s\'""",
dataSet, streamName, JavaBaseConstants.COLUMN_NAME_DATA));
});
return query.get(0).getValue(TYPE).toString().trim().contains(expectedType);
}

}

0 comments on commit 072b38c

Please sign in to comment.