Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

Kudu Connector rework #78

Merged
merged 7 commits into from
May 19, 2020
Merged

Kudu Connector rework #78

merged 7 commits into from
May 19, 2020

Conversation

mbalassi
Copy link
Contributor

Based on our proposal on the developer mailing list @gyfora and @thebalu have reworked the Kudu connector and added a connector to the Table API for it. The proposed changes are documented in the following design doc.

The initial commit by @gyfora contains the API rework, followed by the Table API addition and the respective README updates by @thebalu.

@mbalassi
Copy link
Contributor Author

The checks seem to be failing because we also bumped the Flink version to 1.10.0 and missed to update the .travis.yml accordingly. Proposing that in a separate pull request then.

@mbalassi
Copy link
Contributor Author

I have opened the PR bumping the underlying Flink version in #79. That will fix the CI failures for this PR. Let me know if you prefer that I also push that commit to this brnach for visibility.

Copy link
Member

@granthenke granthenke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just passing through with a quick scan while I had some time.

@@ -30,24 +30,38 @@
<packaging>jar</packaging>

<properties>
<kudu.version>1.10.0</kudu.version>

<kudu.version>1.11.1</kudu.version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on when this lands, the Apache Kudu community is currently in the process of releasing 1.12.0. The relevant features that might impact this integration are support for DATE and VARCHAR types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we did not consider this 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look at Kudu 1.12.0. Date support seems to be an easy addition, but Varchar types are not supported by the Blink planner.
For this PR, I suggest we remain with 1.11.1, and Date support after the Kudu release is final.

} else {
Type type = column.getType();
switch (type) {
case BINARY:
values.setField(pos, name, row.getBinary(name));
values.setField(pos, row.getBinary(name));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given values.setField takes an Object you can probably remove this switch and call row.getObject(name).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, lets simplify that!

values.setField(pos, name, row.getLong(name) / 1000);
try {
values.setField(pos, row.getTimestamp(name));
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exception is this handling?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is beyond me, I will ask Balazs if he knows why it was there

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually not necessary, especially if this is simplified (above comment)

* Catalog for reading and creating Kudu tables.
*/
@PublicEvolving
public class KuduCatalog extends AbstractReadOnlyCatalog {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extending AbstractReadOnlyCatalog to make it not read only is a bit confusing. Maybe call it AbstractCatalog and note that it's read only by default in the documentation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added this as a helper class to factor out the unsupported features of the Catalog. Originally the intention was to make it read-only in the sense that it would only grant access to tables already present in Kudu.

Now we added support for creating Kudu tables but still the user is not allowed to create other kinds of tables (like you normally would in the Hive or In memory catalog). But at this point the naming might be unfortunate.

}

@Override
public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This api is a bit unusual given the call to kudu isn't actually filtering by "database" at all.

Kudu doesn't have a strict first-class database concept yet, though it does have a table name syntax to help loosely represent a database. The format is effectively <database>.<tablename>.

Handling this is likely out of scope for this patch, but I wanted to provide some context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now we decided to not introduce any database concept in this catalog for the reasons you highlighted and treat everything as a single database.

In the future we could improve this and adapt the convention you mentioned. In that case the default could be the global namespace as it is now.

}

private DescriptorProperties getValidatedProps(Map<String, String> properties) {
checkNotNull(properties.get(KUDU_MASTERS), "Missing required property " + KUDU_MASTERS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other integrations have provided an application level configuration to use a default if a per-table override is not provided. Does that make sense here too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The KuduCatalog basically provides the default for this. So as long as you use the KuduCatalog (and specified the masters there) you wouldnt have to specify it for individual tables.

On the other hand when you are creating Kudu tables in other catalogs you will end up in this codepath and you have to specify the kudu masters

public void setKeyFields(String[] keyFields) { /* this has no effect */}

@Override
public void setIsAppendOnly(Boolean isAppendOnly) { /* this has no effect */}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this could enforce that only the INSERT operation is used? Is that useful?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is called to indicate that the sink (that supports upserts) dont actually need to upsert but only append which might be cheaper in certain cases.

In our case upsert and append is the same thing (we cannot append like in Kafka) so we should not do anything here


public static KuduTableInfo createTableInfo(String tableName, TableSchema schema, Map<String, String> props) {

boolean createIfMissing = props.containsKey(KUDU_HASH_COLS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this, what does KUDU_HASH_COLS have to do with createIfMissing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the user defines a table we try to infer from the provided properties if the table already exists in Kudu or not.

The logic is, that if the user provides the hash columns the table should be created (as it's only used for creations)

ColumnSchema.ColumnSchemaBuilder builder = new ColumnSchema
.ColumnSchemaBuilder(t.f0, KuduTypeUtils.toKuduType(t.f1))
.key(keyColumns.contains(t.f0))
.nullable(!keyColumns.contains(t.f0) && t.f1.getLogicalType().isNullable());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this just use t.f1.getLogicalType().isNullable() and let Kudu complain if it's a key column? Silently dropping nullable seems like it could be problematic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Flink/Calcite type checking for relational queries takes nullability a bit too seriously at this point for type checking.

It is almost impossible to use NOT Null types in queries or with other connectors. Due to this limitation we decided to not treat Kudu key columns as non-nullable on the Flink side.

What this means is that we dont require kudu key columns to be NOT NULL in Flink and rely only for the key column property for making it a Kudu key column (which we have to set non nullable in the kudu schema)

}

@Override
public Type visit(VarCharType varCharType) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kudu 1.12.0 will have VARCHAR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VarChar seems to be not yet supported by the Blink planner

@thebalu
Copy link
Contributor

thebalu commented Apr 22, 2020

We have addressed the comments of @granthenke; and implemented the recommended simplification. Also rebased, so that the checks now pass.
Please let us know if you have any additional suggestions.
@lresende

@mbalassi
Copy link
Contributor Author

Hi @lresende, we would greatly appreciate if you could review this and merge if you are satisfied with it. Let us know if you need further input from us.

@@ -73,7 +73,8 @@ void testInputFormatWithProjection() throws Exception {
private List<Row> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
String masterAddresses = harness.getMasterAddressesAsString();
KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo, new ArrayList<>(), Arrays.asList(fieldProjection));
KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo, new ArrayList<>(),
fieldProjection == null ? null : Arrays.asList(fieldProjection));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also add a test that covers this

@lresende
Copy link
Member

lresende commented May 7, 2020

I see some comments still being addressed, please let me know when it's ready to final review/merge.

@gyfora
Copy link
Contributor

gyfora commented May 7, 2020

We have made some improvements and bug fixes (after extensive testing) but I think this is ready for final review in any case :)

@nutony111
Copy link

I add the flink-connector-kudu_2.11 dependency to my project, but I use mvn install, it is the wrong message:
Could not transfer metadata org.apache.bahir:flink-connector-kudu_2.11:1.1-SNAPSHOT/maven-metadata.xml from/to ,!cloudera (https://repository.cloudera.com/artifactory/cloudera-repos/): D:\mavencangku\org\apache\bahir\flink-connector-kudu_2.11\1.1-SNAPSHOT\maven-metadata-,!cloudera.xml.part.lock (文件名、目录名或卷标语法不正确。in English: Incorrect file name, directory name, or volume label syntax. )
how can I deal with this question?

@mbalassi
Copy link
Contributor Author

@nutony111 This seems completely unrelated to this pull request. We have not modified the repository information, and this repository is not included in bahor-flink. Seems like a temporary connection issue on your end, or a weird local maven setup.

@nutony111
Copy link

nutony111 commented May 12, 2020 via email

@gyfora
Copy link
Contributor

gyfora commented May 19, 2020

@lresende , the PR is ready for your review when you have some time :)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants