diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java index c1a1b4405795a..80f4c8e5f19ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.pipe.extractor; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionExtractor; @@ -40,6 +42,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +53,8 @@ import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE; @@ -60,6 +65,7 @@ import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY; @@ -84,6 +90,14 @@ public IoTDBDataRegionExtractor() { @Override public void validate(PipeParameterValidator validator) throws Exception { + // Check whether the pattern is legal + validatePattern( + validator + .getParameters() + .getStringOrDefault( + Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY), + EXTRACTOR_PATTERN_DEFAULT_VALUE)); + // Validate extractor.history.enable and extractor.realtime.enable validator .validateAttributeValueRange( @@ -134,6 +148,39 @@ public void validate(PipeParameterValidator validator) throws Exception { realtimeExtractor.validate(validator); } + private void validatePattern(String pattern) { + if (!pattern.startsWith("root")) { + throw new IllegalArgumentException( + "The argument `extractor.pattern` or `source.pattern` is an illegal path."); + } + + try { + PathUtils.isLegalPath(pattern); + } catch (IllegalPathException e) { + try { + if ("root".equals(pattern) || "root.".equals(pattern)) { + return; + } + + // Split the pattern to nodes. + String[] pathNodes = StringUtils.splitPreserveAllTokens(pattern, "\\."); + + // Check whether the pattern without last node is legal. + PathUtils.splitPathToDetachedNodes( + String.join(".", Arrays.copyOfRange(pathNodes, 0, pathNodes.length - 1))); + String lastNode = pathNodes[pathNodes.length - 1]; + + // Check whether the last node is legal. + if (!"".equals(lastNode)) { + Double.parseDouble(lastNode); + } + } catch (Exception ignored) { + throw new IllegalArgumentException( + "The argument `extractor.pattern` or `source.pattern` is an illegal path."); + } + } + } + private void constructHistoricalExtractor() { // Enable historical extractor by default historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java index 1914316fd2820..cdbff585ac601 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java @@ -53,4 +53,52 @@ public void testIoTDBDataRegionExtractor() { Assert.fail(); } } + + @Test + public void testIoTDBDataRegionExtractorWithPattern() { + Assert.assertEquals( + testIoTDBDataRegionExtractorWithPattern("root.a-b").getClass(), + IllegalArgumentException.class); + Assert.assertEquals( + testIoTDBDataRegionExtractorWithPattern("root.1.a").getClass(), + IllegalArgumentException.class); + Assert.assertEquals( + testIoTDBDataRegionExtractorWithPattern("r").getClass(), IllegalArgumentException.class); + Assert.assertEquals( + testIoTDBDataRegionExtractorWithPattern("").getClass(), IllegalArgumentException.class); + Assert.assertEquals( + testIoTDBDataRegionExtractorWithPattern("123").getClass(), IllegalArgumentException.class); + Assert.assertEquals( + testIoTDBDataRegionExtractorWithPattern("root.a b").getClass(), + IllegalArgumentException.class); + Assert.assertEquals( + testIoTDBDataRegionExtractorWithPattern("root.a+b").getClass(), + IllegalArgumentException.class); + Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.ab.")); + Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.a#b")); + Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.一二三")); + Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.一二。三")); + Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.")); + Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.ab")); + Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.a.b.c.1e2")); + Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root")); + Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.`a-b`")); + Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.1")); + } + + public Exception testIoTDBDataRegionExtractorWithPattern(String pattern) { + try (IoTDBDataRegionExtractor extractor = new IoTDBDataRegionExtractor()) { + extractor.validate( + new PipeParameterValidator( + new PipeParameters( + new HashMap() { + { + put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern); + } + }))); + } catch (Exception e) { + return e; + } + return null; + } }