Skip to content

Commit

Permalink
Fix JdbcSource handling of tables with same names in different schemas (
Browse files Browse the repository at this point in the history
#1724)

* Fix JdbcSource handling of tables with same names in different schemas

* Previously the JdbcSource was combining the columns of any tables with the same name across different schemas into a single stream in the catalog.

* This was caught because in those tables there were columns of the same name with different types which triggered a precondition to check for this.

* The fix makes sure we group by both schema name and table name.

* Adds test to the standard jdbc tests to catch this case.

* This test does NOT run for mysql as, mysql has no concept of schemas.
  • Loading branch information
cgardens committed Jan 20, 2021
1 parent 4d7a5e2 commit 3670545
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.1.7",
"dockerImageTag": "0.1.8",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1",
"name": "Microsoft SQL Server (MSSQL)",
"dockerRepository": "airbyte/source-mssql",
"dockerImageTag": "0.1.8",
"dockerImageTag": "0.1.9",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-mssql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.1.9",
"dockerImageTag": "0.1.10",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "e87ffa8e-a3b5-f69c-9076-6011339de1f6",
"name": "Redshift",
"dockerRepository": "airbyte/source-redshift",
"dockerImageTag": "0.1.3",
"dockerImageTag": "0.1.4",
"documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-redshift"
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
- sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
name: Microsoft SQL Server (MSSQL)
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.1.8
dockerImageTag: 0.1.9
documentationUrl: https://hub.docker.com/r/airbyte/source-mssql
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
documentationUrl: https://hub.docker.com/r/airbyte/source-postgres
- sourceDefinitionId: cd42861b-01fc-4658-a8ab-5d11d0510f01
name: Recurly
Expand All @@ -51,7 +51,7 @@
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
name: MySQL
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
- sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77
name: Salesforce
Expand Down Expand Up @@ -96,7 +96,7 @@
- sourceDefinitionId: e87ffa8e-a3b5-f69c-9076-6011339de1f6
name: Redshift
dockerRepository: airbyte/source-redshift
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://hub.docker.com/repository/docker/airbyte/source-redshift
- sourceDefinitionId: 932e6363-d006-4464-a9f5-102b82e07c06
name: Twilio
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class PostgresDestination extends AbstractJdbcDestination implements Dest
public static final String DRIVER_CLASS = "org.postgresql.Driver";

public PostgresDestination() {
super("org.postgresql.Driver", new PostgresSQLNameTransformer(), new DefaultSqlOperations());
super(DRIVER_CLASS, new PostgresSQLNameTransformer(), new DefaultSqlOperations());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -277,7 +278,7 @@ private List<TableInfo> getTables(final JdbcDatabase database,
// some databases return multiple copies of the same record for a column (e.g. redshift) because
// they have at least once delivery guarantees. we want to dedupe these, but first we check that the
// records are actually the same and provide a good error message if they are not.
assertColumnsWithSameNameAreSame(t.getName(), t.getFields());
assertColumnsWithSameNameAreSame(t.getSchemaName(), t.getName(), t.getFields());
final List<Field> fields = t.getFields()
.stream()
.map(f -> Field.of(f.getColumnName(), JdbcUtils.getType(f.getColumnType())))
Expand All @@ -289,7 +290,7 @@ private List<TableInfo> getTables(final JdbcDatabase database,
.collect(Collectors.toList());
}

private static void assertColumnsWithSameNameAreSame(String tableName, List<ColumnInfo> columns) {
private static void assertColumnsWithSameNameAreSame(String schemaName, String tableName, List<ColumnInfo> columns) {
columns.stream()
.collect(Collectors.groupingBy(ColumnInfo::getColumnName))
.values()
Expand All @@ -298,8 +299,8 @@ private static void assertColumnsWithSameNameAreSame(String tableName, List<Colu
columnsWithSameName.forEach(column -> {
if (!column.equals(comparisonColumn)) {
throw new RuntimeException(
String.format("Found multiple columns with same name: %s in table: %s but the columns are not the same. columns: %s",
comparisonColumn.getColumnName(), tableName, columns));
String.format("Found multiple columns with same name: %s in table: %s.%s but the columns are not the same. columns: %s",
comparisonColumn.getColumnName(), schemaName, tableName, columns));
}
});
});
Expand Down Expand Up @@ -328,32 +329,30 @@ private List<TableInfoInternal> discoverInternal(final JdbcDatabase database,
.build()))
.stream()
.filter(t -> !internalSchemas.contains(t.get(INTERNAL_SCHEMA_NAME).asText()))
.collect(Collectors.groupingBy(t -> t.get(INTERNAL_TABLE_NAME).asText()))
.entrySet()
// group by schema and table name to handle the case where a table with the same name exists in
// multiple schemas.
.collect(Collectors.groupingBy(t -> ImmutablePair.of(t.get(INTERNAL_SCHEMA_NAME).asText(), t.get(INTERNAL_TABLE_NAME).asText())))
.values()
.stream()
.map(e -> {
final String tableName = e.getKey();
final List<JsonNode> fields = e.getValue();
return new TableInfoInternal(
fields.get(0).get(INTERNAL_SCHEMA_NAME).asText(),
tableName,
fields.stream()
.map(f -> {
JDBCType jdbcType;
try {
jdbcType = JDBCType.valueOf(f.get(INTERNAL_COLUMN_TYPE).asInt());
} catch (IllegalArgumentException ex) {
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.",
f.get(INTERNAL_COLUMN_NAME),
f.get(INTERNAL_SCHEMA_NAME),
f.get(INTERNAL_TABLE_NAME),
f.get(INTERNAL_COLUMN_TYPE)));
jdbcType = JDBCType.VARCHAR;
}
return new ColumnInfo(f.get(INTERNAL_COLUMN_NAME).asText(), jdbcType);
})
.collect(Collectors.toList()));
})
.map(fields -> new TableInfoInternal(
fields.get(0).get(INTERNAL_SCHEMA_NAME).asText(),
fields.get(0).get(INTERNAL_TABLE_NAME).asText(),
fields.stream()
.map(f -> {
JDBCType jdbcType;
try {
jdbcType = JDBCType.valueOf(f.get(INTERNAL_COLUMN_TYPE).asInt());
} catch (IllegalArgumentException ex) {
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.",
f.get(INTERNAL_COLUMN_NAME),
f.get(INTERNAL_SCHEMA_NAME),
f.get(INTERNAL_TABLE_NAME),
f.get(INTERNAL_COLUMN_TYPE)));
jdbcType = JDBCType.VARCHAR;
}
return new ColumnInfo(f.get(INTERNAL_COLUMN_NAME).asText(), jdbcType);
})
.collect(Collectors.toList())))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand All @@ -57,6 +58,7 @@
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -128,7 +130,7 @@ public void setup() throws Exception {
getDriverClass());

database.execute(connection -> {
connection.createStatement().execute(String.format("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200), updated_at DATE);"));
connection.createStatement().execute("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200), updated_at DATE);");
connection.createStatement().execute(
"INSERT INTO id_and_name (id, name, updated_at) VALUES (1,'picard', '2004-10-19'), (2, 'crusher', '2005-10-19'), (3, 'vash', '2006-10-19');");
});
Expand Down Expand Up @@ -165,6 +167,34 @@ void testDiscover() throws Exception {
assertEquals(getCatalog(), actual);
}

@Test
void testDiscoverWithMultipleSchemas() throws Exception {
// mysql does not have a concept of schemas, so this test does not make sense for it.
if (getDriverClass().toLowerCase().contains("mysql")) {
return;
}

// add table and data to a separate schema.
database.execute(connection -> {
connection.createStatement().execute("CREATE SCHEMA public2;");
connection.createStatement().execute("CREATE TABLE public2.id_and_name(id VARCHAR(200), name VARCHAR(200));");
connection.createStatement().execute(
"INSERT INTO public2.id_and_name (id, name) VALUES ('1','picard'), ('2', 'crusher'), ('3', 'vash');");
});

final AirbyteCatalog actual = source.discover(config);

final AirbyteCatalog expected = getCatalog();
expected.getStreams().add(CatalogHelpers.createAirbyteStream("public2.id_and_name",
Field.of("id", JsonSchemaPrimitive.STRING),
Field.of("name", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)));
// sort streams by name so that we are comparing lists with the same order.
expected.getStreams().sort(Comparator.comparing(AirbyteStream::getName));
actual.getStreams().sort(Comparator.comparing(AirbyteStream::getName));
assertEquals(expected, actual);
}

@Test
void testReadSuccess() throws Exception {
final List<AirbyteMessage> actualMessages = source.read(config, getConfiguredCatalog(), null).collect(Collectors.toList());
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.name=airbyte/source-mssql
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/source-mysql
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/source-postgres
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-redshift/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-redshift

0 comments on commit 3670545

Please sign in to comment.