From 10363c1412b8f9b5b16b3e2e075b895e0cc9a293 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Tue, 24 May 2022 08:17:15 -0400 Subject: [PATCH] [HUDI-4132] Fixing determining target table schema for delta sync with empty batch (#5648) --- .../hudi/utilities/deltastreamer/DeltaSync.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index a1a804b9ed12..a4a7e10abc00 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -840,8 +840,15 @@ private Schema getSchemaForWriteConfig(Schema targetSchema) { && SchemaCompatibility.checkReaderWriterCompatibility(InputBatch.NULL_SCHEMA, targetSchema).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) { // target schema is null. fetch schema from commit metadata and use it HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setPayloadClassName(cfg.payloadClassName).build(); - TableSchemaResolver schemaResolver = new TableSchemaResolver(meta); - newWriteSchema = schemaResolver.getTableAvroSchema(false); + int totalCompleted = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); + if (totalCompleted > 0) { + try { + TableSchemaResolver schemaResolver = new TableSchemaResolver(meta); + newWriteSchema = schemaResolver.getTableAvroSchema(false); + } catch (IllegalArgumentException e) { + LOG.warn("Could not fetch schema from table. Falling back to using target schema from schema provider"); + } + } } } return newWriteSchema;