Skip to content

Commit

Permalink
handle foreign key violation when expected record doesnt exist (#9016)
Browse files Browse the repository at this point in the history
* handle foreign key violation when expected record doesnt exist

* add test

* missed this one

* update spec for build to pass
  • Loading branch information
subodh1810 committed Dec 21, 2021
1 parent 6316cb9 commit a6923b2
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3722,7 +3722,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.3.21"
- dockerImage: "airbyte/destination-snowflake:0.3.22"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake"
connectionSpecification:
Expand Down Expand Up @@ -3770,7 +3770,9 @@
order: 3
schema:
description: "The default Snowflake schema tables are written to if the\
\ source does not specify a namespace."
\ source does not specify a namespace. Schema name would be transformed\
\ to allowed by Snowflake if it not follow Snowflake Naming Conventions\
\ https://docs.airbyte.io/integrations/destinations/snowflake#notes-about-snowflake-naming-conventions "
examples:
- "AIRBYTE_SCHEMA"
type: "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3171,7 +3171,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-kafka:0.1.1"
- dockerImage: "airbyte/source-kafka:0.1.2"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/kafka"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import static org.jooq.impl.DSL.currentOffsetDateTime;
import static org.jooq.impl.DSL.foreignKey;
import static org.jooq.impl.DSL.primaryKey;
import static org.jooq.impl.DSL.select;
import static org.jooq.impl.DSL.table;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.enums.Enums;
Expand Down Expand Up @@ -267,9 +269,22 @@ private static void createAndPopulateActor(DSLContext ctx) {
ConfigSchema.SOURCE_CONNECTION,
SourceConnection.class,
ctx);

long sourceRecords = 0L;
for (final ConfigWithMetadata<SourceConnection> configWithMetadata : sourcesWithMetadata) {
final SourceConnection sourceConnection = configWithMetadata.getConfig();
if (workspaceDoesNotExist(sourceConnection.getWorkspaceId(), ctx)) {
LOGGER.warn(
"Skipping source connection " + sourceConnection.getSourceId() + " because the specified workspace " + sourceConnection.getWorkspaceId()
+ " doesn't exist and violates foreign key constraint.");
continue;
} else if (actorDefinitionDoesNotExist(sourceConnection.getSourceDefinitionId(), ctx)) {
LOGGER.warn(
"Skipping source connection " + sourceConnection.getSourceId() + " because the specified source definition "
+ sourceConnection.getSourceDefinitionId()
+ " doesn't exist and violates foreign key constraint.");
continue;
}

ctx.insertInto(DSL.table("actor"))
.set(id, sourceConnection.getSourceId())
.set(workspaceId, sourceConnection.getWorkspaceId())
Expand All @@ -281,16 +296,31 @@ private static void createAndPopulateActor(DSLContext ctx) {
.set(createdAt, OffsetDateTime.ofInstant(configWithMetadata.getCreatedAt(), ZoneOffset.UTC))
.set(updatedAt, OffsetDateTime.ofInstant(configWithMetadata.getUpdatedAt(), ZoneOffset.UTC))
.execute();
sourceRecords++;
}
LOGGER.info("actor table populated with " + sourcesWithMetadata.size() + " source records");
LOGGER.info("actor table populated with " + sourceRecords + " source records");

final List<ConfigWithMetadata<DestinationConnection>> destinationsWithMetadata = listConfigsWithMetadata(
ConfigSchema.DESTINATION_CONNECTION,
DestinationConnection.class,
ctx);

long destinationRecords = 0L;
for (final ConfigWithMetadata<DestinationConnection> configWithMetadata : destinationsWithMetadata) {
final DestinationConnection destinationConnection = configWithMetadata.getConfig();
if (workspaceDoesNotExist(destinationConnection.getWorkspaceId(), ctx)) {
LOGGER.warn(
"Skipping destination connection " + destinationConnection.getDestinationId() + " because the specified workspace "
+ destinationConnection.getWorkspaceId()
+ " doesn't exist and violates foreign key constraint.");
continue;
} else if (actorDefinitionDoesNotExist(destinationConnection.getDestinationDefinitionId(), ctx)) {
LOGGER.warn(
"Skipping destination connection " + destinationConnection.getDestinationId() + " because the specified source definition "
+ destinationConnection.getDestinationDefinitionId()
+ " doesn't exist and violates foreign key constraint.");
continue;
}

ctx.insertInto(DSL.table("actor"))
.set(id, destinationConnection.getDestinationId())
.set(workspaceId, destinationConnection.getWorkspaceId())
Expand All @@ -302,8 +332,49 @@ private static void createAndPopulateActor(DSLContext ctx) {
.set(createdAt, OffsetDateTime.ofInstant(configWithMetadata.getCreatedAt(), ZoneOffset.UTC))
.set(updatedAt, OffsetDateTime.ofInstant(configWithMetadata.getUpdatedAt(), ZoneOffset.UTC))
.execute();
destinationRecords++;
}
LOGGER.info("actor table populated with " + destinationsWithMetadata.size() + " destination records");
LOGGER.info("actor table populated with " + destinationRecords + " destination records");
}

@VisibleForTesting
static boolean workspaceDoesNotExist(UUID workspaceId, DSLContext ctx) {
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
return !ctx.fetchExists(select()
.from(table("workspace"))
.where(id.eq(workspaceId)));
}

@VisibleForTesting
static boolean actorDefinitionDoesNotExist(UUID definitionId, DSLContext ctx) {
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
return !ctx.fetchExists(select()
.from(table("actor_definition"))
.where(id.eq(definitionId)));
}

@VisibleForTesting
static boolean actorDoesNotExist(UUID actorId, DSLContext ctx) {
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
return !ctx.fetchExists(select()
.from(table("actor"))
.where(id.eq(actorId)));
}

@VisibleForTesting
static boolean connectionDoesNotExist(UUID connectionId, DSLContext ctx) {
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
return !ctx.fetchExists(select()
.from(table("connection"))
.where(id.eq(connectionId)));
}

@VisibleForTesting
static boolean operationDoesNotExist(UUID operationId, DSLContext ctx) {
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
return !ctx.fetchExists(select()
.from(table("operation"))
.where(id.eq(operationId)));
}

private static void crateAndPopulateActorOauthParameter(final DSLContext ctx) {
Expand Down Expand Up @@ -336,9 +407,22 @@ private static void crateAndPopulateActorOauthParameter(final DSLContext ctx) {
ConfigSchema.SOURCE_OAUTH_PARAM,
SourceOAuthParameter.class,
ctx);

long sourceOauthParamRecords = 0L;
for (final ConfigWithMetadata<SourceOAuthParameter> configWithMetadata : sourceOauthParamsWithMetadata) {
final SourceOAuthParameter sourceOAuthParameter = configWithMetadata.getConfig();
if (workspaceDoesNotExist(sourceOAuthParameter.getWorkspaceId(), ctx)) {
LOGGER.warn(
"Skipping source oauth parameter " + sourceOAuthParameter.getOauthParameterId() + " because the specified workspace "
+ sourceOAuthParameter.getWorkspaceId()
+ " doesn't exist and violates foreign key constraint.");
continue;
} else if (actorDefinitionDoesNotExist(sourceOAuthParameter.getSourceDefinitionId(), ctx)) {
LOGGER.warn(
"Skipping source oauth parameter " + sourceOAuthParameter.getSourceDefinitionId() + " because the specified source definition "
+ sourceOAuthParameter.getSourceDefinitionId()
+ " doesn't exist and violates foreign key constraint.");
continue;
}
ctx.insertInto(DSL.table("actor_oauth_parameter"))
.set(id, sourceOAuthParameter.getOauthParameterId())
.set(workspaceId, sourceOAuthParameter.getWorkspaceId())
Expand All @@ -348,17 +432,32 @@ private static void crateAndPopulateActorOauthParameter(final DSLContext ctx) {
.set(createdAt, OffsetDateTime.ofInstant(configWithMetadata.getCreatedAt(), ZoneOffset.UTC))
.set(updatedAt, OffsetDateTime.ofInstant(configWithMetadata.getUpdatedAt(), ZoneOffset.UTC))
.execute();
sourceOauthParamRecords++;
}

LOGGER.info("actor_oauth_parameter table populated with " + sourceOauthParamsWithMetadata.size() + " source oauth params records");
LOGGER.info("actor_oauth_parameter table populated with " + sourceOauthParamRecords + " source oauth params records");

final List<ConfigWithMetadata<DestinationOAuthParameter>> destinationOauthParamsWithMetadata = listConfigsWithMetadata(
ConfigSchema.DESTINATION_OAUTH_PARAM,
DestinationOAuthParameter.class,
ctx);

long destinationOauthParamRecords = 0L;
for (final ConfigWithMetadata<DestinationOAuthParameter> configWithMetadata : destinationOauthParamsWithMetadata) {
final DestinationOAuthParameter destinationOAuthParameter = configWithMetadata.getConfig();
if (workspaceDoesNotExist(destinationOAuthParameter.getWorkspaceId(), ctx)) {
LOGGER.warn(
"Skipping destination oauth parameter " + destinationOAuthParameter.getOauthParameterId() + " because the specified workspace "
+ destinationOAuthParameter.getWorkspaceId()
+ " doesn't exist and violates foreign key constraint.");
continue;
} else if (actorDefinitionDoesNotExist(destinationOAuthParameter.getDestinationDefinitionId(), ctx)) {
LOGGER.warn(
"Skipping destination oauth parameter " + destinationOAuthParameter.getOauthParameterId()
+ " because the specified destination definition "
+ destinationOAuthParameter.getDestinationDefinitionId()
+ " doesn't exist and violates foreign key constraint.");
continue;
}
ctx.insertInto(DSL.table("actor_oauth_parameter"))
.set(id, destinationOAuthParameter.getOauthParameterId())
.set(workspaceId, destinationOAuthParameter.getWorkspaceId())
Expand All @@ -368,9 +467,10 @@ private static void crateAndPopulateActorOauthParameter(final DSLContext ctx) {
.set(createdAt, OffsetDateTime.ofInstant(configWithMetadata.getCreatedAt(), ZoneOffset.UTC))
.set(updatedAt, OffsetDateTime.ofInstant(configWithMetadata.getUpdatedAt(), ZoneOffset.UTC))
.execute();
destinationOauthParamRecords++;
}

LOGGER.info("actor_oauth_parameter table populated with " + destinationOauthParamsWithMetadata.size() + " destination oauth params records");
LOGGER.info("actor_oauth_parameter table populated with " + destinationOauthParamRecords + " destination oauth params records");
}

private static void createAndPopulateOperation(final DSLContext ctx) {
Expand Down Expand Up @@ -406,9 +506,16 @@ private static void createAndPopulateOperation(final DSLContext ctx) {
ConfigSchema.STANDARD_SYNC_OPERATION,
StandardSyncOperation.class,
ctx);

long standardSyncOperationRecords = 0L;
for (final ConfigWithMetadata<StandardSyncOperation> configWithMetadata : configsWithMetadata) {
final StandardSyncOperation standardSyncOperation = configWithMetadata.getConfig();
if (workspaceDoesNotExist(standardSyncOperation.getWorkspaceId(), ctx)) {
LOGGER.warn(
"Skipping standard sync operation " + standardSyncOperation.getOperationId() + " because the specified workspace "
+ standardSyncOperation.getWorkspaceId()
+ " doesn't exist and violates foreign key constraint.");
continue;
}
ctx.insertInto(DSL.table("operation"))
.set(id, standardSyncOperation.getOperationId())
.set(workspaceId, standardSyncOperation.getWorkspaceId())
Expand All @@ -421,9 +528,10 @@ private static void createAndPopulateOperation(final DSLContext ctx) {
.set(createdAt, OffsetDateTime.ofInstant(configWithMetadata.getCreatedAt(), ZoneOffset.UTC))
.set(updatedAt, OffsetDateTime.ofInstant(configWithMetadata.getUpdatedAt(), ZoneOffset.UTC))
.execute();
standardSyncOperationRecords++;
}

LOGGER.info("operation table populated with " + configsWithMetadata.size() + " records");
LOGGER.info("operation table populated with " + standardSyncOperationRecords + " records");
}

private static void createConnectionOperation(final DSLContext ctx) {
Expand Down Expand Up @@ -496,8 +604,20 @@ private static void createAndPopulateConnection(final DSLContext ctx) {
ConfigSchema.STANDARD_SYNC,
StandardSync.class,
ctx);
long standardSyncRecords = 0L;
for (final ConfigWithMetadata<StandardSync> configWithMetadata : configsWithMetadata) {
final StandardSync standardSync = configWithMetadata.getConfig();
if (actorDoesNotExist(standardSync.getSourceId(), ctx)) {
LOGGER.warn(
"Skipping standard sync " + standardSync.getConnectionId() + " because the specified source " + standardSync.getSourceId()
+ " doesn't exist and violates foreign key constraint.");
continue;
} else if (actorDoesNotExist(standardSync.getDestinationId(), ctx)) {
LOGGER.warn(
"Skipping standard sync " + standardSync.getConnectionId() + " because the specified destination " + standardSync.getDestinationId()
+ " doesn't exist and violates foreign key constraint.");
continue;
}
ctx.insertInto(DSL.table("connection"))
.set(id, standardSync.getConnectionId())
.set(namespaceDefinition, standardSync.getNamespaceDefinition() == null ? null
Expand All @@ -515,10 +635,11 @@ private static void createAndPopulateConnection(final DSLContext ctx) {
.set(createdAt, OffsetDateTime.ofInstant(configWithMetadata.getCreatedAt(), ZoneOffset.UTC))
.set(updatedAt, OffsetDateTime.ofInstant(configWithMetadata.getUpdatedAt(), ZoneOffset.UTC))
.execute();
standardSyncRecords++;
populateConnectionOperation(ctx, configWithMetadata);
}

LOGGER.info("connection table populated with " + configsWithMetadata.size() + " records");
LOGGER.info("connection table populated with " + standardSyncRecords + " records");
}

private static void createAndPopulateState(final DSLContext ctx) {
Expand Down Expand Up @@ -546,19 +667,26 @@ private static void createAndPopulateState(final DSLContext ctx) {
ConfigSchema.STANDARD_SYNC_STATE,
StandardSyncState.class,
ctx);

long standardSyncStateRecords = 0L;
for (final ConfigWithMetadata<StandardSyncState> configWithMetadata : configsWithMetadata) {
final StandardSyncState standardSyncState = configWithMetadata.getConfig();
if (connectionDoesNotExist(standardSyncState.getConnectionId(), ctx)) {
LOGGER.warn(
"Skipping standard sync state because the specified standard sync " + standardSyncState.getConnectionId()
+ " doesn't exist and violates foreign key constraint.");
continue;
}
ctx.insertInto(DSL.table("state"))
.set(id, UUID.randomUUID())
.set(connectionId, standardSyncState.getConnectionId())
.set(state, JSONB.valueOf(Jsons.serialize(standardSyncState.getState())))
.set(createdAt, OffsetDateTime.ofInstant(configWithMetadata.getCreatedAt(), ZoneOffset.UTC))
.set(updatedAt, OffsetDateTime.ofInstant(configWithMetadata.getUpdatedAt(), ZoneOffset.UTC))
.execute();
standardSyncStateRecords++;
}

LOGGER.info("state table populated with " + configsWithMetadata.size() + " records");
LOGGER.info("state table populated with " + standardSyncStateRecords + " records");
}

private static void populateConnectionOperation(final DSLContext ctx,
Expand All @@ -572,16 +700,31 @@ private static void populateConnectionOperation(final DSLContext ctx,
DSL.field("updated_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime()));

final StandardSync standardSync = standardSyncWithMetadata.getConfig();

if (connectionDoesNotExist(standardSync.getConnectionId(), ctx)) {
LOGGER.warn(
"Skipping connection_operations because the specified standard sync " + standardSync.getConnectionId()
+ " doesn't exist and violates foreign key constraint.");
return;
}
long connectionOperationRecords = 0L;
for (final UUID operationIdFromStandardSync : standardSync.getOperationIds()) {
if (operationDoesNotExist(operationIdFromStandardSync, ctx)) {
LOGGER.warn(
"Skipping connection_operations because the specified standard sync operation " + operationIdFromStandardSync
+ " doesn't exist and violates foreign key constraint.");
continue;
}
ctx.insertInto(DSL.table("connection_operation"))
.set(id, UUID.randomUUID())
.set(connectionId, standardSync.getConnectionId())
.set(operationId, operationIdFromStandardSync)
.set(createdAt, OffsetDateTime.ofInstant(standardSyncWithMetadata.getCreatedAt(), ZoneOffset.UTC))
.set(updatedAt, OffsetDateTime.ofInstant(standardSyncWithMetadata.getUpdatedAt(), ZoneOffset.UTC))
.execute();
connectionOperationRecords++;
}
LOGGER.info("connection_operation table populated with " + standardSync.getOperationIds().size() + " records");
LOGGER.info("connection_operation table populated with " + connectionOperationRecords + " records");
}

private static <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConfig airbyteConfigType,
Expand Down

0 comments on commit a6923b2

Please sign in to comment.