Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.iotdb.db.pipe.extractor;

import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionExtractor;
Expand All @@ -40,6 +42,7 @@
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,6 +53,8 @@

import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE;
Expand All @@ -60,6 +65,7 @@
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;

Expand All @@ -84,6 +90,14 @@ public IoTDBDataRegionExtractor() {

@Override
public void validate(PipeParameterValidator validator) throws Exception {
// Check whether the pattern is legal
validatePattern(
validator
.getParameters()
.getStringOrDefault(
Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY),
EXTRACTOR_PATTERN_DEFAULT_VALUE));

// Validate extractor.history.enable and extractor.realtime.enable
validator
.validateAttributeValueRange(
Expand Down Expand Up @@ -134,6 +148,39 @@ public void validate(PipeParameterValidator validator) throws Exception {
realtimeExtractor.validate(validator);
}

private void validatePattern(String pattern) {
if (!pattern.startsWith("root")) {
throw new IllegalArgumentException(
"The argument `extractor.pattern` or `source.pattern` is an illegal path.");
}

try {
PathUtils.isLegalPath(pattern);
} catch (IllegalPathException e) {
try {
if ("root".equals(pattern) || "root.".equals(pattern)) {
return;
}

// Split the pattern to nodes.
String[] pathNodes = StringUtils.splitPreserveAllTokens(pattern, "\\.");

// Check whether the pattern without last node is legal.
PathUtils.splitPathToDetachedNodes(
String.join(".", Arrays.copyOfRange(pathNodes, 0, pathNodes.length - 1)));
String lastNode = pathNodes[pathNodes.length - 1];

// Check whether the last node is legal.
if (!"".equals(lastNode)) {
Double.parseDouble(lastNode);
}
} catch (Exception ignored) {
throw new IllegalArgumentException(
"The argument `extractor.pattern` or `source.pattern` is an illegal path.");
}
}
}

private void constructHistoricalExtractor() {
// Enable historical extractor by default
historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,52 @@ public void testIoTDBDataRegionExtractor() {
Assert.fail();
}
}

@Test
public void testIoTDBDataRegionExtractorWithPattern() {
Assert.assertEquals(
testIoTDBDataRegionExtractorWithPattern("root.a-b").getClass(),
IllegalArgumentException.class);
Assert.assertEquals(
testIoTDBDataRegionExtractorWithPattern("root.1.a").getClass(),
IllegalArgumentException.class);
Assert.assertEquals(
testIoTDBDataRegionExtractorWithPattern("r").getClass(), IllegalArgumentException.class);
Assert.assertEquals(
testIoTDBDataRegionExtractorWithPattern("").getClass(), IllegalArgumentException.class);
Assert.assertEquals(
testIoTDBDataRegionExtractorWithPattern("123").getClass(), IllegalArgumentException.class);
Assert.assertEquals(
testIoTDBDataRegionExtractorWithPattern("root.a b").getClass(),
IllegalArgumentException.class);
Assert.assertEquals(
testIoTDBDataRegionExtractorWithPattern("root.a+b").getClass(),
IllegalArgumentException.class);
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.ab."));
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.a#b"));
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.一二三"));
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.一二。三"));
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root."));
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.ab"));
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.a.b.c.1e2"));
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root"));
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.`a-b`"));
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.1"));
}

public Exception testIoTDBDataRegionExtractorWithPattern(String pattern) {
try (IoTDBDataRegionExtractor extractor = new IoTDBDataRegionExtractor()) {
extractor.validate(
new PipeParameterValidator(
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern);
}
})));
} catch (Exception e) {
return e;
}
return null;
}
}