Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source support primary keys #2488

Merged
merged 5 commits into from
Mar 17, 2021
Merged

Conversation

ChristopheDuong
Copy link
Contributor

@ChristopheDuong ChristopheDuong commented Mar 16, 2021

What

Closes #2461

How

JDBC Abstract Sources are filling up the sourceDefinedPrimaryKey field

Pre-merge Checklist

  • Run integration tests
  • Publish Docker images

Recommended reading order

  1. airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java
  2. the rest

Copy link
Contributor

@sherifnada sherifnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add more test cases to cover this feature more thoroughly? Also one style comment

@@ -354,6 +357,16 @@ private static void assertColumnsWithSameNameAreSame(String schemaName, String t
return new ColumnInfo(f.get(INTERNAL_COLUMN_NAME).asText(), jdbcType);
})
.collect(Collectors.toList())))
.peek(t -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be better done in a for loop after the sequence of stream operations for readability

@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Mar 17, 2021

/publish connector=connectors/source-postgres

🕑 connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/660941824
✅ connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/660941824

@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Mar 17, 2021

/publish connector=connectors/source-mysql

🕑 connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/660942166
✅ connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/660942166

@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Mar 17, 2021

/publish connector=connectors/source-mssql

🕑 connectors/source-mssql https://github.com/airbytehq/airbyte/actions/runs/660942406
✅ connectors/source-mssql https://github.com/airbytehq/airbyte/actions/runs/660942406

@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Mar 17, 2021

/publish connector=connectors/source-redshift

🕑 connectors/source-redshift https://github.com/airbytehq/airbyte/actions/runs/660942490
❌ connectors/source-redshift https://github.com/airbytehq/airbyte/actions/runs/660942490
🕑 connectors/source-redshift https://github.com/airbytehq/airbyte/actions/runs/660942490
✅ connectors/source-redshift https://github.com/airbytehq/airbyte/actions/runs/660942490

Copy link
Contributor

@sherifnada sherifnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to sleep so well tonight with all these amazing test cases!!! Looks great.

@ChristopheDuong ChristopheDuong merged commit 41e8b6a into master Mar 17, 2021
@ChristopheDuong ChristopheDuong deleted the chris/source-primary-keys branch March 17, 2021 18:28
Comment on lines +140 to +141
.withSourceDefinedPrimaryKey(
t.getPrimaryKeys().stream().filter(Objects::nonNull).map(Collections::singletonList).collect(Collectors.toList())))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

styling nit.

Suggested change
.withSourceDefinedPrimaryKey(
t.getPrimaryKeys().stream().filter(Objects::nonNull).map(Collections::singletonList).collect(Collectors.toList())))
.withSourceDefinedPrimaryKey(t.getPrimaryKeys()
.stream()
.filter(Objects::nonNull)
.map(Collections::singletonList)
.collect(Collectors.toList())))

result.forEach(t -> {
try {
final List<String> primaryKeys = database.bufferedResultSetQuery(
conn -> conn.getMetaData().getPrimaryKeys(databaseOptional.orElse(null), t.getSchemaName(), t.getName()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to do this query once per table? or if we leave table null, can we just run this query once for the whole database?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to run the function without table names.

It seems like for MySQL this is not allowed and is throwing an exception about not having a table name but it does work for some other Databases... I made a new PR for this

See also:

The method getTables takes a like-pattern for the tableNamePattern parameter, so "%" matches all table names.
The method getPrimaryKeys and getExportedKeys do not take a pattern, so you will need to loop over the result of
getTables and execute those methods for each row of the getTables result set.

https://stackoverflow.com/a/40841486

final List<String> primaryKeys = database.bufferedResultSetQuery(
conn -> conn.getMetaData().getPrimaryKeys(databaseOptional.orElse(null), t.getSchemaName(), t.getName()),
resultSet -> resultSet.getString(JDBC_COLUMN_COLUMN_NAME));
t.addPrimaryKeys(primaryKeys);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One convention that we try to follow in our java code is to keep objects immutable wherever possible. This pattern of setting fields after initialization should only be used if absolutely necessary (e.g. it represents different stages of the object's lifecycle). You'll notice that in almost all of our java classes all of the fields are final. Here it is not necessary to set the field late. The integrity of this object is compromised by because we happen to use two separate queries to construct it.

I would suggest the following you do both queries 1. get all columns. 2 get all primary keys. and then construct the table info one time (no need for addPrimaryKeys).

LMK if any of this is unclear or if the motivation doesn't make sense.

connection.createStatement().execute(
String.format(
"INSERT INTO %s(id, name, updated_at) VALUES (1,'picard', '2004-10-19'), (2, 'crusher', '2005-10-19'), (3, 'vash', '2006-10-19');",
getFullyQualifiedTableName(TABLE_NAME)));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call adding these test cases here. glad that this could be added without having to rewrite all of the tests!

@@ -87,6 +88,8 @@
private static final Set<String> TEST_SCHEMAS = ImmutableSet.of(SCHEMA_NAME, SCHEMA_NAME2);

private static final String TABLE_NAME = "id_and_name";
private static final String TABLE_NAME_WITHOUT_PK = "id_and_name_without_pk";
private static final String TABLE_NAME_FULL_NAMES = "full_names";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call this multi column pk or something? i had trouble parsing this name. wasn't until i read the create table query that i understood it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Sources support destination sync modes
3 participants