Skip to content
Permalink
Browse files
UDF function: MasterRepair (#6892)
  • Loading branch information
Husaimawx committed Aug 11, 2022
1 parent 68dedba commit 4abfa2f32d74c5eb6df4a2844dd29e14acb4f7b5
Show file tree
Hide file tree
Showing 8 changed files with 1,008 additions and 2 deletions.
@@ -352,4 +352,65 @@ Output series:
|2020-01-01T00:00:28.000+08:00| 126.0|
|2020-01-01T00:00:30.000+08:00| 128.0|
+-----------------------------+-------------------------------------------------+
```

## MasterRepair

### Usage

This function is used to clean time series with master data.

**Name**: MasterRepair
**Input Series:** Support multiple input series. The types are are in INT32 / INT64 / FLOAT / DOUBLE.

**Parameters:**

+ `omega`: The window size. It is a non-negative integer whose unit is millisecond. By default, it will be estimated according to the distances of two tuples with various time differences.
+ `eta`: The distance threshold. It is a positive number. By default, it will be estimated according to the distance distribution of tuples in windows.
+ `k`: The number of neighbors in master data. It is a positive integer. By default, it will be estimated according to the tuple dis- tance of the k-th nearest neighbor in the master data.
+ `output_column`: The repaired column to output, defaults to 1 which means output the repair result of the first column.

**Output Series:** Output a single series. The type is the same as the input. This series is the input after repairing.

### Examples

Input series:

```
+-----------------------------+------------+------------+------------+------------+------------+------------+
| Time|root.test.t1|root.test.t2|root.test.t3|root.test.m1|root.test.m2|root.test.m3|
+-----------------------------+------------+------------+------------+------------+------------+------------+
|2021-07-01T12:00:01.000+08:00| 1704| 1154.55| 0.195| 1704| 1154.55| 0.195|
|2021-07-01T12:00:02.000+08:00| 1702| 1152.30| 0.193| 1702| 1152.30| 0.193|
|2021-07-01T12:00:03.000+08:00| 1702| 1148.65| 0.192| 1702| 1148.65| 0.192|
|2021-07-01T12:00:04.000+08:00| 1701| 1145.20| 0.194| 1701| 1145.20| 0.194|
|2021-07-01T12:00:07.000+08:00| 1703| 1150.55| 0.195| 1703| 1150.55| 0.195|
|2021-07-01T12:00:08.000+08:00| 1694| 1151.55| 0.193| 1704| 1151.55| 0.193|
|2021-07-01T12:01:09.000+08:00| 1705| 1153.55| 0.194| 1705| 1153.55| 0.194|
|2021-07-01T12:01:10.000+08:00| 1706| 1152.30| 0.190| 1706| 1152.30| 0.190|
+-----------------------------+------------+------------+------------+------------+------------+------------+
```

SQL for query:

```sql
select MasterRepair(t1,t2,t3,m1,m2,m3) from root.test
```

Output series:


```
+-----------------------------+-------------------------------------------------------------------------------------------+
| Time|MasterRepair(root.test.t1,root.test.t2,root.test.t3,root.test.m1,root.test.m2,root.test.m3)|
+-----------------------------+-------------------------------------------------------------------------------------------+
|2021-07-01T12:00:01.000+08:00| 1704|
|2021-07-01T12:00:02.000+08:00| 1702|
|2021-07-01T12:00:03.000+08:00| 1702|
|2021-07-01T12:00:04.000+08:00| 1701|
|2021-07-01T12:00:07.000+08:00| 1703|
|2021-07-01T12:00:08.000+08:00| 1704|
|2021-07-01T12:01:09.000+08:00| 1705|
|2021-07-01T12:01:10.000+08:00| 1706|
+-----------------------------+-------------------------------------------------------------------------------------------+
```
@@ -343,4 +343,66 @@ select valuerepair(s1,'method'='LsGreedy') from root.test.d2
|2020-01-01T00:00:28.000+08:00| 126.0|
|2020-01-01T00:00:30.000+08:00| 128.0|
+-----------------------------+-------------------------------------------------+
```

## MasterRepair

### 函数简介

本函数实现基于主数据的时间序列数据修复。

**函数名:**MasterRepair

**输入序列:** 支持多个输入序列,类型为 INT32 / INT64 / FLOAT / DOUBLE。

**参数:**

- `omega`:算法窗口大小,非负整数(单位为毫秒), 在缺省情况下,算法根据不同时间差下的两个元组距离自动估计该参数。
- `eta`:算法距离阈值,正数, 在缺省情况下,算法根据窗口中元组的距离分布自动估计该参数。
- `k`:主数据中的近邻数量,正整数, 在缺省情况下,算法根据主数据中的k个近邻的元组距离自动估计该参数。
- `output_column`:输出列的序号,默认输出第一列的修复结果。

**输出序列:**输出单个序列,类型与输入数据中对应列的类型相同,序列为输入列修复后的结果。

### 使用示例

输入序列:

```
+-----------------------------+------------+------------+------------+------------+------------+------------+
| Time|root.test.t1|root.test.t2|root.test.t3|root.test.m1|root.test.m2|root.test.m3|
+-----------------------------+------------+------------+------------+------------+------------+------------+
|2021-07-01T12:00:01.000+08:00| 1704| 1154.55| 0.195| 1704| 1154.55| 0.195|
|2021-07-01T12:00:02.000+08:00| 1702| 1152.30| 0.193| 1702| 1152.30| 0.193|
|2021-07-01T12:00:03.000+08:00| 1702| 1148.65| 0.192| 1702| 1148.65| 0.192|
|2021-07-01T12:00:04.000+08:00| 1701| 1145.20| 0.194| 1701| 1145.20| 0.194|
|2021-07-01T12:00:07.000+08:00| 1703| 1150.55| 0.195| 1703| 1150.55| 0.195|
|2021-07-01T12:00:08.000+08:00| 1694| 1151.55| 0.193| 1704| 1151.55| 0.193|
|2021-07-01T12:01:09.000+08:00| 1705| 1153.55| 0.194| 1705| 1153.55| 0.194|
|2021-07-01T12:01:10.000+08:00| 1706| 1152.30| 0.190| 1706| 1152.30| 0.190|
+-----------------------------+------------+------------+------------+------------+------------+------------+
```

用于查询的 SQL 语句:

```sql
select MasterRepair(t1,t2,t3,m1,m2,m3) from root.test
```

输出序列:


```
+-----------------------------+-------------------------------------------------------------------------------------------+
| Time|MasterRepair(root.test.t1,root.test.t2,root.test.t3,root.test.m1,root.test.m2,root.test.m3)|
+-----------------------------+-------------------------------------------------------------------------------------------+
|2021-07-01T12:00:01.000+08:00| 1704|
|2021-07-01T12:00:02.000+08:00| 1702|
|2021-07-01T12:00:03.000+08:00| 1702|
|2021-07-01T12:00:04.000+08:00| 1701|
|2021-07-01T12:00:07.000+08:00| 1703|
|2021-07-01T12:00:08.000+08:00| 1704|
|2021-07-01T12:01:09.000+08:00| 1705|
|2021-07-01T12:01:10.000+08:00| 1706|
+-----------------------------+-------------------------------------------------------------------------------------------+
```
@@ -74,7 +74,8 @@ public enum BuiltinTimeSeriesGeneratingFunctionEnum {
EQUAL_SIZE_BUCKET_AGG_SAMPLE("EQUAL_SIZE_BUCKET_AGG_SAMPLE"),
EQUAL_SIZE_BUCKET_M4_SAMPLE("EQUAL_SIZE_BUCKET_M4_SAMPLE"),
EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE("EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE"),
JEXL("JEXL");
JEXL("JEXL"),
MASTER_REPAIR("MASTER_REPAIR");

private final String functionName;

@@ -1232,6 +1232,176 @@ private void testStrLength(Statement statement) {
}
}

@Test
public void testMasterRepair() {
// create time series with master data
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute("SET STORAGE GROUP TO root.testMasterRepair");
statement.execute(
"CREATE TIMESERIES root.testMasterRepair.d1.s1 with datatype=FLOAT,encoding=PLAIN");
statement.execute(
"CREATE TIMESERIES root.testMasterRepair.d1.s2 with datatype=FLOAT,encoding=PLAIN");
statement.execute(
"CREATE TIMESERIES root.testMasterRepair.d1.s3 with datatype=FLOAT,encoding=PLAIN");
statement.execute(
"CREATE TIMESERIES root.testMasterRepair.d1.m1 with datatype=FLOAT,encoding=PLAIN");
statement.execute(
"CREATE TIMESERIES root.testMasterRepair.d1.m2 with datatype=FLOAT,encoding=PLAIN");
statement.execute(
"CREATE TIMESERIES root.testMasterRepair.d1.m3 with datatype=FLOAT,encoding=PLAIN");
} catch (SQLException throwable) {
fail(throwable.getMessage());
}

String[] INSERT_SQL = {
"insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (1,1704,1154.55,0.195,1704,1154.55,0.195)",
"insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (2,1702,1152.30,0.193,1702,1152.30,0.193)",
"insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (3,1702,1148.65,0.192,1702,1148.65,0.192)",
"insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (4,1701,1145.20,0.194,1701,1145.20,0.194)",
"insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (7,1703,1150.55,0.195,1703,1150.55,0.195)",
"insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (8,1694,1151.55,0.193,1704,1151.55,0.193)",
"insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (9,1705,1153.55,0.194,1705,1153.55,0.194)",
"insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (10,1706,1152.30,0.190,1706,1152.30,0.190)",
};

try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
for (String dataGenerationSql : INSERT_SQL) {
statement.execute(dataGenerationSql);
}
} catch (SQLException throwable) {
fail(throwable.getMessage());
}

try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
int[] timestamps = {1, 2, 3, 4, 7, 8, 9, 10};

// test 1
double[] r1 = {1704.0, 1702.0, 1702.0, 1701.0, 1703.0, 1702.0, 1705.0, 1706.0};
try (ResultSet resultSet =
statement.executeQuery(
"select master_repair(s1,s2,s3,m1,m2,m3) from root.testMasterRepair.d1")) {
int columnCount = resultSet.getMetaData().getColumnCount();
assertEquals(1 + 1, columnCount);
for (int i = 0; i < timestamps.length; i++) {
resultSet.next();
long expectedTimestamp = timestamps[i];
long actualTimestamp = Long.parseLong(resultSet.getString(1));
assertEquals(expectedTimestamp, actualTimestamp);
double expectedResult = r1[i];
double actualResult = resultSet.getDouble(2);
double delta = 0.001;
assertEquals(expectedResult, actualResult, delta);
}
}

// test 2
double[] r2 = {1154.55, 1152.30, 1148.65, 1145.20, 1150.55, 1152.30, 1153.55, 1152.30};

try (ResultSet resultSet =
statement.executeQuery(
"select master_repair(s1,s2,s3,m1,m2,m3,'output_column'='2') from root.testMasterRepair.d1")) {
int columnCount = resultSet.getMetaData().getColumnCount();
assertEquals(1 + 1, columnCount);
for (int i = 0; i < timestamps.length; i++) {
resultSet.next();
long expectedTimestamp = timestamps[i];
long actualTimestamp = Long.parseLong(resultSet.getString(1));
assertEquals(expectedTimestamp, actualTimestamp);

double expectedResult = r2[i];
double actualResult = resultSet.getDouble(2);
double delta = 0.001;
assertEquals(expectedResult, actualResult, delta);
}
}

// test 3
double[] r3 = {0.195, 0.193, 0.192, 0.194, 0.195, 0.193, 0.194, 0.190};
try (ResultSet resultSet =
statement.executeQuery(
"select master_repair(s1,s2,s3,m1,m2,m3,'output_column'='3') from root.testMasterRepair.d1")) {
int columnCount = resultSet.getMetaData().getColumnCount();
assertEquals(1 + 1, columnCount);
for (int i = 0; i < timestamps.length; i++) {
resultSet.next();
long expectedTimestamp = timestamps[i];
long actualTimestamp = Long.parseLong(resultSet.getString(1));
assertEquals(expectedTimestamp, actualTimestamp);

double expectedResult = r3[i];
double actualResult = resultSet.getDouble(2);
double delta = 0.001;
assertEquals(expectedResult, actualResult, delta);
}
}

// test 4
double[] r4 = {1704.0, 1702.0, 1702.0, 1701.0, 1703.0, 1704.0, 1705.0, 1706.0};
try (ResultSet resultSet =
statement.executeQuery(
"select master_repair(s1,s2,s3,m1,m2,m3,'omega'='2','eta'='3.0','k'='5') from root.testMasterRepair.d1")) {
int columnCount = resultSet.getMetaData().getColumnCount();
assertEquals(1 + 1, columnCount);
for (int i = 0; i < timestamps.length; i++) {
resultSet.next();
long expectedTimestamp = timestamps[i];
long actualTimestamp = Long.parseLong(resultSet.getString(1));
assertEquals(expectedTimestamp, actualTimestamp);

double expectedResult = r4[i];
double actualResult = resultSet.getDouble(2);
double delta = 0.001;
assertEquals(expectedResult, actualResult, delta);
}
}

// test 5
double[] r5 = {1154.55, 1152.30, 1148.65, 1145.20, 1150.55, 1151.55, 1153.55, 1152.30};
try (ResultSet resultSet =
statement.executeQuery(
"select master_repair(s1,s2,s3,m1,m2,m3,'omega'='2','eta'='3.0','k'='5','output_column'='2') from root.testMasterRepair.d1")) {
int columnCount = resultSet.getMetaData().getColumnCount();
assertEquals(1 + 1, columnCount);
for (int i = 0; i < timestamps.length; i++) {
resultSet.next();
long expectedTimestamp = timestamps[i];
long actualTimestamp = Long.parseLong(resultSet.getString(1));
assertEquals(expectedTimestamp, actualTimestamp);

double expectedResult = r5[i];
double actualResult = resultSet.getDouble(2);
double delta = 0.001;
assertEquals(expectedResult, actualResult, delta);
}
}

// test 6
double[] r6 = {0.195, 0.193, 0.192, 0.194, 0.195, 0.193, 0.194, 0.190};
try (ResultSet resultSet =
statement.executeQuery(
"select master_repair(s1,s2,s3,m1,m2,m3,'omega'='2','eta'='3.0','k'='5','output_column'='3') from root.testMasterRepair.d1")) {
int columnCount = resultSet.getMetaData().getColumnCount();
assertEquals(1 + 1, columnCount);
for (int i = 0; i < timestamps.length; i++) {
resultSet.next();
long expectedTimestamp = timestamps[i];
long actualTimestamp = Long.parseLong(resultSet.getString(1));
assertEquals(expectedTimestamp, actualTimestamp);

double expectedResult = r6[i];
double actualResult = resultSet.getDouble(2);
double delta = 0.001;
assertEquals(expectedResult, actualResult, delta);
}
}
} catch (SQLException throwable) {
fail(throwable.getMessage());
}
}

@Test
public void testDeDup() {
String[] createSQLs =
@@ -88,7 +88,8 @@ public enum BuiltinTimeSeriesGeneratingFunction {
EQUAL_SIZE_BUCKET_M4_SAMPLE("EQUAL_SIZE_BUCKET_M4_SAMPLE", UDTFEqualSizeBucketM4Sample.class),
EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE(
"EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE", UDTFEqualSizeBucketOutlierSample.class),
JEXL("JEXL", UDTFJexl.class);
JEXL("JEXL", UDTFJexl.class),
MASTER_REPAIR("MASTER_REPAIR", UDTFMasterRepair.class);

private final String functionName;
private final Class<?> functionClass;

0 comments on commit 4abfa2f

Please sign in to comment.