Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source support primary keys #2488

Merged
merged 5 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1",
"name": "Microsoft SQL Server (MSSQL)",
"dockerRepository": "airbyte/source-mssql",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-mssql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "e87ffa8e-a3b5-f69c-9076-6011339de1f6",
"name": "Redshift",
"dockerRepository": "airbyte/source-redshift",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-redshift"
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
- sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
name: Microsoft SQL Server (MSSQL)
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://hub.docker.com/r/airbyte/source-mssql
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://hub.docker.com/r/airbyte/source-postgres
- sourceDefinitionId: cd42861b-01fc-4658-a8ab-5d11d0510f01
name: Recurly
Expand All @@ -51,7 +51,7 @@
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
name: MySQL
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
- sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77
name: Salesforce
Expand Down Expand Up @@ -96,7 +96,7 @@
- sourceDefinitionId: e87ffa8e-a3b5-f69c-9076-6011339de1f6
name: Redshift
dockerRepository: airbyte/source-redshift
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://hub.docker.com/repository/docker/airbyte/source-redshift
- sourceDefinitionId: 932e6363-d006-4464-a9f5-102b82e07c06
name: Twilio
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -135,7 +136,9 @@ public AirbyteCatalog discover(JsonNode config) throws Exception {
Optional.ofNullable(config.get("schema")).map(JsonNode::asText))
.stream()
.map(t -> CatalogHelpers.createAirbyteStream(t.getName(), t.getFields())
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
t.getPrimaryKeys().stream().filter(Objects::nonNull).map(Collections::singletonList).collect(Collectors.toList())))
Comment on lines +140 to +141
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

styling nit.

Suggested change
.withSourceDefinedPrimaryKey(
t.getPrimaryKeys().stream().filter(Objects::nonNull).map(Collections::singletonList).collect(Collectors.toList())))
.withSourceDefinedPrimaryKey(t.getPrimaryKeys()
.stream()
.filter(Objects::nonNull)
.map(Collections::singletonList)
.collect(Collectors.toList())))

.collect(Collectors.toList()));
}
}
Expand Down Expand Up @@ -290,7 +293,7 @@ private List<TableInfo> getTables(final JdbcDatabase database,
.distinct()
.collect(Collectors.toList());

return new TableInfo(JdbcUtils.getFullyQualifiedTableName(t.getSchemaName(), t.getName()), fields);
return new TableInfo(JdbcUtils.getFullyQualifiedTableName(t.getSchemaName(), t.getName()), fields, t.getPrimaryKeys());
})
.collect(Collectors.toList());
}
Expand Down Expand Up @@ -354,6 +357,16 @@ private List<TableInfoInternal> discoverInternal(final JdbcDatabase database,
return new ColumnInfo(f.get(INTERNAL_COLUMN_NAME).asText(), jdbcType);
})
.collect(Collectors.toList())))
.peek(t -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be better done in a for loop after the sequence of stream operations for readability

try {
final List<String> primaryKeys = database.bufferedResultSetQuery(
conn -> conn.getMetaData().getPrimaryKeys(databaseOptional.orElse(null), t.getSchemaName(), t.getName()),
resultSet -> resultSet.getString(JDBC_COLUMN_COLUMN_NAME));
t.addPrimaryKeys(primaryKeys);
} catch (SQLException e) {
LOGGER.warn(String.format("Could not find primary keys for %s.%s: %s", t.getSchemaName(), t.getName(), e));
}
})
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -440,10 +453,12 @@ protected static class TableInfo {

private final String name;
private final List<Field> fields;
private final List<String> primaryKeys;

public TableInfo(String name, List<Field> fields) {
public TableInfo(String name, List<Field> fields, List<String> primaryKeys) {
this.name = name;
this.fields = fields;
this.primaryKeys = primaryKeys;
}

public String getName() {
Expand All @@ -454,18 +469,24 @@ public List<Field> getFields() {
return fields;
}

public List<String> getPrimaryKeys() {
return primaryKeys;
}

}

protected static class TableInfoInternal {

private final String schemaName;
private final String name;
private final List<ColumnInfo> fields;
private final List<String> primaryKeys;

public TableInfoInternal(String schemaName, String tableName, List<ColumnInfo> fields) {
this.schemaName = schemaName;
this.name = tableName;
this.fields = fields;
this.primaryKeys = new ArrayList<>();
}

public String getSchemaName() {
Expand All @@ -480,6 +501,14 @@ public List<ColumnInfo> getFields() {
return fields;
}

public void addPrimaryKeys(List<String> primaryKeys) {
this.primaryKeys.addAll(primaryKeys);
}

public List<String> getPrimaryKeys() {
return primaryKeys;
}

}

protected static class ColumnInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public void setup() throws Exception {
database.execute(connection -> {

connection.createStatement()
.execute(String.format("CREATE TABLE %s(id INTEGER, name VARCHAR(200), updated_at DATE);", getFullyQualifiedTableName(TABLE_NAME)));
.execute(String.format("CREATE TABLE %s(id INTEGER, name VARCHAR(200), updated_at DATE, PRIMARY KEY (id));",
getFullyQualifiedTableName(TABLE_NAME)));
connection.createStatement().execute(
String.format(
"INSERT INTO %s(id, name, updated_at) VALUES (1,'picard', '2004-10-19'), (2, 'crusher', '2005-10-19'), (3, 'vash', '2006-10-19');",
Expand Down Expand Up @@ -587,7 +588,8 @@ private static AirbyteCatalog getCatalog() {
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("updated_at", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))));
}

private static List<AirbyteMessage> getTestMessages() {
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/source-mssql
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
import java.util.List;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -54,7 +55,8 @@ class MssqlSourceTest {
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("born", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))));

private JsonNode configWithoutDbName;
private JsonNode config;
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class PostgresSourceTest {
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("power", JsonSchemaPrimitive.NUMBER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))));
private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG);
private static final Set<AirbyteMessage> ASCII_MESSAGES = Sets.newHashSet(
createRecord(STREAM_NAME, map("id", new BigDecimal("1.0"), "name", "goku", "power", null)),
Expand Down Expand Up @@ -100,7 +101,8 @@ void setup() throws Exception {
final JsonNode config = getConfig(PSQL_DB, dbName);
final Database database = getDatabaseFromConfig(config);
database.query(ctx -> {
ctx.fetch("CREATE TABLE id_and_name(id NUMERIC(20, 10), name VARCHAR(200), power double precision);");
ctx.fetch("CREATE TABLE id_and_name(id NUMERIC(20, 10), name VARCHAR(200), power double precision, PRIMARY KEY (id));");
ChristopheDuong marked this conversation as resolved.
Show resolved Hide resolved
ctx.fetch("CREATE INDEX i1 ON id_and_name (id);");
ctx.fetch("INSERT INTO id_and_name (id, name, power) VALUES (1,'goku', 'Infinity'), (2, 'vegeta', 9000.1), ('NaN', 'piccolo', '-Infinity');");
return null;
});
Expand Down Expand Up @@ -168,6 +170,12 @@ private static void setEmittedAtToNull(Iterable<AirbyteMessage> messages) {
}
}

@Test
void testDiscoverWithPk() throws Exception {
final AirbyteCatalog actual = new PostgresSource().discover(getConfig(PSQL_DB, dbName));
assertEquals(CATALOG, actual);
}

@Test
void testReadSuccess() throws Exception {
final Set<AirbyteMessage> actualMessages = MoreIterators.toSet(new PostgresSource().read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null));
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-redshift/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/source-redshift