Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ private PatternResult calculateMatch(
boolean partialQuery) {
PatternCalculationResult pointsMatchRes =
calculatePointsMatch(querySections, matchedSections, partialQuery);
if (pointsMatchRes == null) {
if (pointsMatchRes == null || pointsMatchRes.getMatchedPoints().isEmpty()) {
return null;
}
if (pointsMatchRes.getMatch() > queryCtx.getThreshold()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.udf.api.customizer.config.UDAFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
import org.apache.iotdb.udf.api.exception.UDFParameterNotValidException;
import org.apache.iotdb.udf.api.type.Type;
import org.apache.iotdb.udf.api.utils.ResultValue;

Expand All @@ -40,9 +41,14 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

public class UDAFPatternMatch implements UDAF {

static final String THRESHOLD_PARAM = "threshold";
static final String TIME_PATTERN_PARAM = "timePattern";
static final String VALUE_PATTERN_PARAM = "valuePattern";

private Long[] timePattern;
private Double[] valuePattern;
private float threshold;
Expand All @@ -52,19 +58,7 @@ public class UDAFPatternMatch implements UDAF {
public void beforeStart(UDFParameters udfParameters, UDAFConfigurations udafConfigurations) {
udafConfigurations.setOutputDataType(Type.TEXT);
Map<String, String> attributes = udfParameters.getAttributes();
if (!attributes.containsKey("threshold")) {
threshold = 100;
} else {
threshold = Float.parseFloat(attributes.get("threshold"));
}
timePattern =
Arrays.stream(attributes.get("timePattern").split(","))
.map(Long::valueOf)
.toArray(Long[]::new);
valuePattern =
Arrays.stream(attributes.get("valuePattern").split(","))
.map(Double::valueOf)
.toArray(Double[]::new);
threshold = Float.parseFloat(attributes.get(THRESHOLD_PARAM));
}

@Override
Expand Down Expand Up @@ -136,13 +130,49 @@ public void outputFinal(State state, ResultValue resultValue) {

@Override
public void validate(UDFParameterValidator validator) {

try {
String timePatternStr = validator.getParameters().getStringOrDefault(TIME_PATTERN_PARAM, "");
timePattern =
Arrays.stream(timePatternStr.split(",")).map(Long::valueOf).toArray(Long[]::new);

} catch (Exception e) {
throw new UDFParameterNotValidException(
"Illegal parameter, timePattern must be long,long...");
}
try {
String valuePatternStr =
validator.getParameters().getStringOrDefault(VALUE_PATTERN_PARAM, "");
valuePattern =
Arrays.stream(valuePatternStr.split(",")).map(Double::valueOf).toArray(Double[]::new);
} catch (Exception e) {
throw new UDFParameterNotValidException(
"Illegal parameter, valuePattern must be double,double...");
}
validator
.validateInputSeriesNumber(1)
.validateInputSeriesDataType(
0, Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE, Type.BOOLEAN)
.validateRequiredAttribute("timePattern")
.validateRequiredAttribute("valuePattern")
.validateRequiredAttribute("threshold");
.validateRequiredAttribute(THRESHOLD_PARAM)
.validateRequiredAttribute(TIME_PATTERN_PARAM)
.validateRequiredAttribute(VALUE_PATTERN_PARAM)
.validate(
(UDFParameterValidator.SingleObjectValidationRule)
payload -> ((Long[]) payload).length > 1,
"Illegal parameter, timePattern size must larger 1.",
timePattern)
.validate(
(UDFParameterValidator.SingleObjectValidationRule)
payload ->
IntStream.range(1, ((Long[]) payload).length)
.allMatch(i -> ((Long[]) payload)[i] > ((Long[]) payload)[i - 1]),
"Illegal parameter, timePattern value must be in ascending order.",
timePattern)
.validate(
payload -> ((Long[]) payload[0]).length == ((Double[]) payload[1]).length,
"Illegal parameter, timePattern size must equals valuePattern size.",
timePattern,
valuePattern);
}

private double getValue(Column column, int i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@
package org.apache.iotdb.library;

import org.apache.iotdb.library.match.PatternExecutor;
import org.apache.iotdb.library.match.UDAFPatternMatch;
import org.apache.iotdb.library.match.model.PatternContext;
import org.apache.iotdb.library.match.model.PatternResult;
import org.apache.iotdb.library.match.model.Point;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
import org.apache.iotdb.udf.api.exception.UDFAttributeNotProvidedException;
import org.apache.iotdb.udf.api.exception.UDFInputSeriesNumberNotValidException;
import org.apache.iotdb.udf.api.exception.UDFParameterNotValidException;
import org.apache.iotdb.udf.api.type.Type;

import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
Expand All @@ -33,7 +40,9 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class UDAFPatternTest {
private final PatternExecutor executor = new PatternExecutor();
Expand Down Expand Up @@ -85,4 +94,65 @@ public void testPatternExecutor() {
Assert.assertNotNull(results);
Assert.assertEquals(1, results.size());
}

@Test
public void testParameterValidator() {
UDAFPatternMatch patternMatch = new UDAFPatternMatch();
List<String> stringList = new ArrayList<>();
List<Type> typeList = new ArrayList<>();
Map<String, String> userAttributes = new HashMap<>();
userAttributes.put("timePattern", "1,2,3");
userAttributes.put("valuePattern", "1.0,2.0");
userAttributes.put("threshold", "100");

UDFParameterValidator validator =
new UDFParameterValidator(new UDFParameters(stringList, typeList, userAttributes));

Assert.assertThrows(
UDFInputSeriesNumberNotValidException.class, () -> patternMatch.validate(validator));

stringList.add("s1");
typeList.add(Type.FLOAT);
userAttributes.clear();
Assert.assertThrows(
"Illegal parameter, timePattern must be long,long...",
UDFParameterNotValidException.class,
() -> patternMatch.validate(validator));

userAttributes.put("timePattern", "1,3,2");
Assert.assertThrows(
"Illegal parameter, valuePattern must be double,double...",
UDFParameterNotValidException.class,
() -> patternMatch.validate(validator));

userAttributes.put("valuePattern", "1.0,2.0");
Assert.assertThrows(
"Illegal parameter, timePattern size must equals valuePattern size",
UDFParameterNotValidException.class,
() -> patternMatch.validate(validator));

userAttributes.remove("valuePattern");
userAttributes.put("valuePattern", "1.0,2.0,3.0");

Assert.assertThrows(
"Illegal parameter, timePattern value must be in ascending order.",
UDFParameterNotValidException.class,
() -> patternMatch.validate(validator));

userAttributes.remove("timePattern");
userAttributes.put("timePattern", "1,2,3");

Assert.assertThrows(
"attribute threshold is required but was not provided.",
UDFAttributeNotProvidedException.class,
() -> patternMatch.validate(validator));

userAttributes.put("threshold", "100");

try {
patternMatch.validate(validator);
} catch (Exception e) {
Assert.fail("Should not throw exception");
}
}
}