From fa11e502e5bb03980dd0e7a011fd0a04748e6e44 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Fri, 20 May 2022 12:11:43 -0400 Subject: [PATCH] Fixing determining target table schema for delta sync with empty batch --- .../hudi/utilities/deltastreamer/DeltaSync.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 8f44b8b7d0b3..84626d9d265a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -840,8 +840,15 @@ private Schema getSchemaForWriteConfig(Schema targetSchema) { && SchemaCompatibility.checkReaderWriterCompatibility(InputBatch.NULL_SCHEMA, targetSchema).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) { // target schema is null. fetch schema from commit metadata and use it HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setPayloadClassName(cfg.payloadClassName).build(); - TableSchemaResolver schemaResolver = new TableSchemaResolver(meta); - newWriteSchema = schemaResolver.getTableAvroSchema(false); + int totalCompleted = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); + if (totalCompleted > 0) { + try { + TableSchemaResolver schemaResolver = new TableSchemaResolver(meta); + newWriteSchema = schemaResolver.getTableAvroSchema(false); + } catch (IllegalArgumentException e) { + LOG.warn("Could not fetch schema from table. Falling back to using target schema from schema provider"); + } + } } } return newWriteSchema;