From 8b60f677fa6cda34d0f27d92b854b8b188dfcfeb Mon Sep 17 00:00:00 2001 From: Navina Ramesh Date: Thu, 7 Sep 2023 12:13:37 -0700 Subject: [PATCH] Use streamType for parsing and validating protobuf decoder config --- .../segment/local/utils/TableConfigUtils.java | 17 +++++++++++------ .../local/utils/TableConfigUtilsTest.java | 4 ++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index e8dfef485aaf..6d87cb04e8ea 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -156,7 +156,8 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema, @N } catch (Exception e) { throw new IllegalStateException("Could not create StreamConfig using the streamConfig map", e); } - validateDecoder(streamConfig); + validateDecoder(streamConfig.getStreamConfigsMap().getOrDefault(StreamConfigProperties.STREAM_TYPE, + "kafka"), streamConfig); } validateTierConfigList(tableConfig.getTierConfigsList()); validateIndexingConfig(tableConfig.getIndexingConfig(), schema); @@ -515,14 +516,18 @@ public static void validateIngestionAggregation(AggregationFunctionType function } @VisibleForTesting - static void validateDecoder(StreamConfig streamConfig) { + static void validateDecoder(String streamType, StreamConfig streamConfig) { if (streamConfig.getDecoderClass().equals("org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder")) { + String descriptorFileKey = String.format("stream.%s.decoder.prop.descriptorFile", streamType); // check the existence of the needed decoder props - if (!streamConfig.getDecoderProperties().containsKey("stream.kafka.decoder.prop.descriptorFile")) { - throw new IllegalStateException("Missing property of descriptorFile for ProtoBufMessageDecoder"); + if (!streamConfig.getDecoderProperties().containsKey(descriptorFileKey)) { + throw new IllegalStateException("Missing property of descriptorFile for ProtoBufMessageDecoder: " + + descriptorFileKey); } - if (!streamConfig.getDecoderProperties().containsKey("stream.kafka.decoder.prop.protoClassName")) { - throw new IllegalStateException("Missing property of protoClassName for ProtoBufMessageDecoder"); + String protoClassName = String.format("stream.%s.decoder.prop.protoClassName", streamType); + if (!streamConfig.getDecoderProperties().containsKey(protoClassName)) { + throw new IllegalStateException("Missing property of protoClassName for ProtoBufMessageDecoder: " + + protoClassName); } } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index c68b77c553dd..e05eb5711de2 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -622,14 +622,14 @@ public void ingestionStreamConfigsTest() { "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder"); streamConfigs.put("stream.kafka.decoder.prop.descriptorFile", "file://test"); try { - TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs)); + TableConfigUtils.validateDecoder("kafka", new StreamConfig("test", streamConfigs)); } catch (IllegalStateException e) { // expected } streamConfigs.remove("stream.kafka.decoder.prop.descriptorFile"); streamConfigs.put("stream.kafka.decoder.prop.protoClassName", "test"); try { - TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs)); + TableConfigUtils.validateDecoder("kafka", new StreamConfig("test", streamConfigs)); } catch (IllegalStateException e) { // expected }