New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector #21522
Conversation
@Aitozi Thanks for contribution. Migrate to new scheme is a good improvement and valuable for future development. I'll definitely have a look when I'm free. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aitozi Thanks for contribution. I left some comments. PTAL.
Also, I find some import of TableSchema
import org.apache.flink.table.api.TableSchema
in some test class like:
HiveDialectITCase
/TableEnvHiveConnectorITCase
/HiveInputFormatPartitionReaderITCase
/HiveCatalogGenericMetadataTest
/HiveCatalogHiveMetadataTest
/HiveCatalogITCase
/HiveCatalogTest
.
Can they all be removed?
String[] formatNames = new String[formatFieldCount]; | ||
LogicalType[] formatTypes = new LogicalType[formatFieldCount]; | ||
for (int i = 0; i < formatFieldCount; i++) { | ||
formatNames[i] = tableSchema.getFieldName(i).get(); | ||
formatTypes[i] = tableSchema.getFieldDataType(i).get().getLogicalType(); | ||
formatNames[i] = resolvedSchema.getColumn(i).get().getName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
resolvedSchema.getColumnNames().get(i);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
formatNames[i] = tableSchema.getFieldName(i).get(); | ||
formatTypes[i] = tableSchema.getFieldDataType(i).get().getLogicalType(); | ||
formatNames[i] = resolvedSchema.getColumn(i).get().getName(); | ||
formatTypes[i] = resolvedSchema.getColumn(i).get().getDataType().getLogicalType(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dito
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
formatConf, typeDescription.toString(), formatTypes)); | ||
formatConf, | ||
typeDescription.toString(), | ||
formatType.getFields().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use formatTypes
?
...ors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
Show resolved
Hide resolved
// Partition keys | ||
List<String> partitionKeys = new ArrayList<>(); | ||
TableSchema tableSchema; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also remove TableSchema
in here?
@@ -97,7 +102,7 @@ public class HiveTableUtil { | |||
|
|||
private HiveTableUtil() {} | |||
|
|||
public static TableSchema createTableSchema( | |||
public static ResolvedSchema createResolvedTableSchema( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is ResolvedSchema
? As for as I'm concerned, it should be Schema
.
@@ -2106,7 +2111,7 @@ public static CatalogBaseTable getCatalogBaseTable( | |||
public static class TableSpec { | |||
public ObjectIdentifier tableIdentifier; | |||
public String tableName; | |||
public CatalogBaseTable table; | |||
public ResolvedCatalogBaseTable<?> table; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need ResolvedCatalogBaseTable
? What I mean is can we avoid to call the method catalogManager.resolveCatalogBaseTable
? It's a internal method, we always want to avoid to call it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need the table to be resolved since we want validatePartColumnType
on this, it requires the resolved type information.
CatalogTable catalogTable = | ||
getCatalogTable(tableIdentifier.asSummaryString(), qb); | ||
ResolvedCatalogTable catalogTable = | ||
catalogManager.resolveCatalogTable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dito. Can we avoid to call catalogManager.resolveCatalogTable
?
} | ||
return builder.build(); | ||
return org.apache.flink.table.catalog.UniqueConstraint.primaryKey( | ||
primaryKey.getName(), primaryKey.getColumns()); | ||
} | ||
|
||
/** Create Hive columns from Flink TableSchema. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this method be removed?
oldTable.getComment(), | ||
oldTable.getPartitionKeys(), | ||
props), | ||
newSchema)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the deprecated TableSchema
be removed in method convertAlterTableChangeCol
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
@Aitozi FYI. About one year ago, I also review a pr of migrate TableSchema to Schema, fapaul@f8af0e9 |
@luoyuxia thanks for your review, I will take a look and fix your comments |
I'm revisiting this pr now. |
b9ed21a
to
fc21901
Compare
Hi @luoyuxia , I have addressed your comments, please take a look again when you are free, thanks |
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aitozi Thanks for updating. I left some comments again. PTAL. We're getting there.
*/ | ||
public HiveSourceBuilder setProjectedFields(int[] projectedFields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't change this method since it's a public interface. Also, I don't think we need to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted
...s/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
Outdated
Show resolved
Hide resolved
...tors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
Outdated
Show resolved
Hide resolved
if (isHiveTable) { | ||
pkConstraint = table.getSchema().getPrimaryKey().orElse(null); | ||
// TODO replace the deprecated UniqueConstraint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'll be better that we create a JIra to track the todo task,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -763,7 +783,7 @@ CatalogBaseTable instantiateCatalogTable(Table hiveTable) { | |||
tableSchemaProps.putProperties(properties); | |||
// try to get table schema with both new and old (1.10) key, in order to support tables | |||
// created in old version | |||
tableSchema = | |||
TableSchema tableSchema = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove TableSchema
in here? So that TableSchema
will be removed from our hive connector totally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its doesn't matter. Since we still use the DescriptorProperties
to se/de the schema to store/restore to the external meta store. So TableSchema
actually still used in the hive connector system. It can be entirely removed after we can use the new way to se/de the schema. But I think we can improve it as a follow up. WDYT ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CatalogPropertiesUtil
is the alternative of DescriptorProperties
. It should not be a major work to migrate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Now import org.apache.flink.table.api.TableSchema
has entirely removed from the hive connector. And the DescriptorProperties
is migrate to CatalogPropertiesUtil
...rc/test/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReaderITCase.java
Outdated
Show resolved
Hide resolved
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
Outdated
Show resolved
Hide resolved
@@ -69,67 +69,75 @@ private OperationConverterUtils() {} | |||
public static Operation convertAddReplaceColumns( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this method can be removed as well as else if (sqlAlterTable instanceof SqlAddReplaceColumns
in SqlToOperationConverter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I get it. SqlAddReplaceColumns
is Hive dialect, and is not used now. Will remove it.
@@ -157,12 +165,12 @@ public static Operation convertChangeColumn( | |||
// disallow changing partition columns | |||
throw new ValidationException("CHANGE COLUMN cannot be applied to partition columns"); | |||
} | |||
TableSchema oldSchema = catalogTable.getSchema(); | |||
ResolvedSchema oldSchema = catalogTable.getResolvedSchema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dito:
Seems we can also remove the method convertChangeColumn
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
* ResolvedExpression} back to its Unresolved state. This will enable direct comparison of the | ||
* schema. | ||
*/ | ||
public static Schema fromResolvedSchema(ResolvedSchema resolvedSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't Schema.newBuilder().fromResolvedSchema(resolvedSchema).build()
meet our need?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ResolvedExpression
in resolvedSchema is different from the Expression in the string format. For example, SqlCallExpression
's format will have '[]'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored by introducing a TestSchemaResolver
to ease this compare.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean for method CatalogTestUtil#checkEquals(CatalogTable t1, CatalogTable t2)
, t1/t2 may be ResolvedCatalogTable
or DefaultCatalogTable
.
If it only happans in Hive, can we use method CatalogManagerMocks.createEmptyCatalogManager() .resolveCatalogTable()? So that, we won't need to add a new class TestSchemaResolver
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it only happans in Hive, can we use method CatalogManagerMocks.createEmptyCatalogManager() .resolveCatalogTable()
? So that, we won't need to add a new class TestSchemaResolver
.
Hi @luoyuxia , Most of your comments have be solved. PTAL again. |
@@ -258,8 +256,12 @@ private TableFunction<RowData> getLookupFunction(int[] keys) { | |||
jobConf, | |||
hiveVersion, | |||
tablePath, | |||
getTableSchema().getFieldDataTypes(), | |||
getTableSchema().getFieldNames(), | |||
DataType.getFieldDataTypes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it be better using
catalogTable.getResolvedSchema().getColumnDataTypes()
.toArray(new DataType[0]),
catalogTable.getResolvedSchema().getColumnNames().toArray(new String[0])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should exclude the computed column here. So I use DataType.getFieldDataTypes( catalogTable.getResolvedSchema().toPhysicalRowDataType()) .toArray(new DataType[0])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense.
@@ -703,7 +704,7 @@ private CompactReader.Factory<RowData> createCompactReaderFactory( | |||
jobConf, | |||
catalogTable, | |||
hiveVersion, | |||
(RowType) tableSchema.toRowDataType().getLogicalType(), | |||
(RowType) resolvedSchema.toSinkRowDataType().getLogicalType(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be resolvedSchema.toSourceRowDataType()
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ? I think in sink it should deal with the Column::isPersisted
data type. So the compact reader factory should use toSinkRowDataType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it's for reader, should use resolvedSchema#toPhysicalRowDataType
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, updated.
...s/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
Outdated
Show resolved
Hide resolved
...ors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
Show resolved
Hide resolved
...ors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
Show resolved
Hide resolved
...nnector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
Outdated
Show resolved
Hide resolved
...-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
Outdated
Show resolved
Hide resolved
@@ -472,7 +510,8 @@ private static int getCount(Map<String, String> map, String key, String suffix) | |||
final String escapedSeparator = Pattern.quote(SEPARATOR); | |||
final Pattern pattern = | |||
Pattern.compile( | |||
escapedKey | |||
"^" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before, for the key generic.schema.1.name
and schema.1.name
will all pass this pattern, then the column count will misleading the key extractor. It actually expect the ^
match here. So for the generic.schema.1.name
it will return 0. Then, we can use the fallback key to get.
...e/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
Show resolved
Hide resolved
* ResolvedExpression} back to its Unresolved state. This will enable direct comparison of the | ||
* schema. | ||
*/ | ||
public static Schema fromResolvedSchema(ResolvedSchema resolvedSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean for method CatalogTestUtil#checkEquals(CatalogTable t1, CatalogTable t2)
, t1/t2 may be ResolvedCatalogTable
or DefaultCatalogTable
.
If it only happans in Hive, can we use method CatalogManagerMocks.createEmptyCatalogManager() .resolveCatalogTable()? So that, we won't need to add a new class TestSchemaResolver
@Aitozi Thanks for updating. I left minor comments. PTAL. Should be ready to be merge in next iteratioin. |
@luoyuxia Thanks for your detailed review. I have addressed your comments.
for this question: Yes, It's currently only need to resolve the schema for hive catalog. But in the Besides, |
...able/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java
Outdated
Show resolved
Hide resolved
...ink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
Outdated
Show resolved
Hide resolved
...able/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java
Show resolved
Hide resolved
...k-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
Outdated
Show resolved
Hide resolved
...e/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
Outdated
Show resolved
Hide resolved
...e/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aitozi Thanks for you patient. I left minor comments. PTAL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aitozi Thanks for your updating. left minor commets again. PTAL.
please remeber not to call method CatalogManager#resolveCatalogTable
...ink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
Outdated
Show resolved
Hide resolved
...r-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java
Outdated
Show resolved
Hide resolved
...ctor-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
Outdated
Show resolved
Hide resolved
...ctor-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
Outdated
Show resolved
Hide resolved
...ctor-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
Outdated
Show resolved
Hide resolved
...e/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
Show resolved
Hide resolved
...able/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java
Show resolved
Hide resolved
...able/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java
Outdated
Show resolved
Hide resolved
...ink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
Outdated
Show resolved
Hide resolved
...tors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
Outdated
Show resolved
Hide resolved
@Aitozi Still notice we are calling |
All removed, PTAL again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aitozi Thanks for contribution. LGTM assuming test pass.
Could you please rebase master? I will merge later.
@luoyuxia Done. Very thanks for your patient review. |
What is the purpose of the change
This PR is meant to migrate the
TableSchema
to theSchema
andResolvedSchema
. MostTableSchema
have been moved out of the hive connector module. Only left some in theHiveCatalog
related. I filed a discussion about the catalog's APIs regarding this.Verifying this change
This change is a rework that should be covered by the existing tests.