From 25bf200eca1cae494a5764a41c51799b259b9410 Mon Sep 17 00:00:00 2001 From: yschengzi Date: Fri, 3 Nov 2023 14:53:22 +0800 Subject: [PATCH 1/5] Add count point processor plugin in example --- example/count-point-processor/pom.xml | 43 +++++++++++++ .../org/apache/iotdb/CountPointProcessor.java | 64 +++++++++++++++++++ example/pom.xml | 1 + 3 files changed, 108 insertions(+) create mode 100644 example/count-point-processor/pom.xml create mode 100644 example/count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java diff --git a/example/count-point-processor/pom.xml b/example/count-point-processor/pom.xml new file mode 100644 index 0000000000000..78f7ab0a597d6 --- /dev/null +++ b/example/count-point-processor/pom.xml @@ -0,0 +1,43 @@ + + + 4.0.0 + + org.apache.iotdb + iotdb-examples + 1.3.0-SNAPSHOT + + count-point-processor-example + IoTDB: Example: CountPointProcessor + + 8 + 8 + UTF-8 + + + + org.apache.iotdb + pipe-api + ${project.version} + + + org.apache.iotdb + iotdb-server + ${project.version} + + + org.apache.iotdb + common-api + 1.3.0-SNAPSHOT + + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.2.0 + + + + + diff --git a/example/count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java b/example/count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java new file mode 100644 index 0000000000000..4125a4e95a217 --- /dev/null +++ b/example/count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java @@ -0,0 +1,64 @@ +package org.apache.iotdb; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.pipe.api.PipeProcessor; +import org.apache.iotdb.pipe.api.collector.EventCollector; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.tsfile.enums.TSDataType; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import java.util.Collections; +import java.util.concurrent.atomic.AtomicLong; + +public class CountPointProcessor implements PipeProcessor { + private static final String AGGREGATE_SERIES_KEY = "aggregate-series"; + private static AtomicLong writePointCount = new AtomicLong(0); + + private PartialPath aggregateSeries; + + @Override + public void validate(PipeParameterValidator validator) throws Exception { + validator.validateRequiredAttribute(AGGREGATE_SERIES_KEY); + } + + @Override + public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) + throws Exception { + this.aggregateSeries = new PartialPath(parameters.getString(AGGREGATE_SERIES_KEY)); + } + + @Override + public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) + throws Exception { + tabletInsertionEvent.processTablet( + (tablet, rowCollector) -> { + writePointCount.addAndGet(tablet.rowSize); + }); + } + + @Override + public void process(Event event, EventCollector eventCollector) throws Exception { + if (event instanceof PipeHeartbeatEvent) { + Tablet tablet = + new Tablet( + aggregateSeries.getDevice(), + Collections.singletonList( + new MeasurementSchema(aggregateSeries.getMeasurement(), TSDataType.INT64)), + 1); + tablet.rowSize = 1; + tablet.addTimestamp(0, System.currentTimeMillis()); + tablet.addValue(aggregateSeries.getMeasurement(), 0, writePointCount.get()); + eventCollector.collect(new PipeRawTabletInsertionEvent(tablet, false, null, null, false)); + } + } + + @Override + public void close() throws Exception {} +} diff --git a/example/pom.xml b/example/pom.xml index 68e27f7c00908..a78a89f50a8d7 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -47,6 +47,7 @@ rest-java-example flink-sql schema + count-point-processor From 06886d504faf577167efa8a6e1b4e350603a4a60 Mon Sep 17 00:00:00 2001 From: yschengzi Date: Fri, 3 Nov 2023 15:07:32 +0800 Subject: [PATCH 2/5] add license --- example/count-point-processor/pom.xml | 22 ++++++++++++++++++- .../org/apache/iotdb/CountPointProcessor.java | 19 ++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/example/count-point-processor/pom.xml b/example/count-point-processor/pom.xml index 78f7ab0a597d6..3845e0697c014 100644 --- a/example/count-point-processor/pom.xml +++ b/example/count-point-processor/pom.xml @@ -1,4 +1,24 @@ + 4.0.0 @@ -27,7 +47,7 @@ org.apache.iotdb common-api - 1.3.0-SNAPSHOT + ${project.version} diff --git a/example/count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java b/example/count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java index 4125a4e95a217..331c5e3d997c4 100644 --- a/example/count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java +++ b/example/count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.iotdb; import org.apache.iotdb.commons.path.PartialPath; From 0b9a48ac230e4cc675fd64c34e5890d76a75f401 Mon Sep 17 00:00:00 2001 From: yschengzi Date: Fri, 3 Nov 2023 15:46:04 +0800 Subject: [PATCH 3/5] update version --- example/count-point-processor/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/count-point-processor/pom.xml b/example/count-point-processor/pom.xml index 3845e0697c014..5abb546d2e247 100644 --- a/example/count-point-processor/pom.xml +++ b/example/count-point-processor/pom.xml @@ -24,7 +24,7 @@ org.apache.iotdb iotdb-examples - 1.3.0-SNAPSHOT + 1.3.1-SNAPSHOT count-point-processor-example IoTDB: Example: CountPointProcessor From 9bb7408879d35c329b879c80ae4184e8ecc01d2b Mon Sep 17 00:00:00 2001 From: yschengzi Date: Fri, 3 Nov 2023 16:21:54 +0800 Subject: [PATCH 4/5] rename to pipe-count-point-processor & modify scope to provided --- .../pom.xml | 7 +++++-- .../main/java/org/apache/iotdb/CountPointProcessor.java | 0 example/pom.xml | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) rename example/{count-point-processor => pipe-count-point-processor}/pom.xml (91%) rename example/{count-point-processor => pipe-count-point-processor}/src/main/java/org/apache/iotdb/CountPointProcessor.java (100%) diff --git a/example/count-point-processor/pom.xml b/example/pipe-count-point-processor/pom.xml similarity index 91% rename from example/count-point-processor/pom.xml rename to example/pipe-count-point-processor/pom.xml index 5abb546d2e247..6b07cbdbdf45b 100644 --- a/example/count-point-processor/pom.xml +++ b/example/pipe-count-point-processor/pom.xml @@ -26,8 +26,8 @@ iotdb-examples 1.3.1-SNAPSHOT - count-point-processor-example - IoTDB: Example: CountPointProcessor + pipe-count-point-processor-example + IoTDB: Example: Pipe: CountPointProcessor 8 8 @@ -38,16 +38,19 @@ org.apache.iotdb pipe-api ${project.version} + provided org.apache.iotdb iotdb-server ${project.version} + provided org.apache.iotdb common-api ${project.version} + provided diff --git a/example/count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java similarity index 100% rename from example/count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java rename to example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java diff --git a/example/pom.xml b/example/pom.xml index baeb3af28859a..05cc683a62dca 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -47,7 +47,7 @@ rest-java-example flink-sql schema - count-point-processor + pipe-count-point-processor From 8b89bd9ebfb3ff04457fdbfabab41b4a43133958 Mon Sep 17 00:00:00 2001 From: yschengzi Date: Fri, 3 Nov 2023 17:08:08 +0800 Subject: [PATCH 5/5] rename to pipe-opc-ua-sink --- example/pipe-count-point-processor/pom.xml | 2 +- example/{opc-ua-sink => pipe-opc-ua-sink}/pom.xml | 9 +++------ .../main/java/org/apache/iotdb/opcua/ClientExample.java | 0 .../java/org/apache/iotdb/opcua/ClientExampleRunner.java | 0 .../src/main/java/org/apache/iotdb/opcua/ClientTest.java | 0 .../apache/iotdb/opcua/IoTDBKeyStoreLoaderClient.java | 0 example/pom.xml | 2 +- 7 files changed, 5 insertions(+), 8 deletions(-) rename example/{opc-ua-sink => pipe-opc-ua-sink}/pom.xml (81%) rename example/{opc-ua-sink => pipe-opc-ua-sink}/src/main/java/org/apache/iotdb/opcua/ClientExample.java (100%) rename example/{opc-ua-sink => pipe-opc-ua-sink}/src/main/java/org/apache/iotdb/opcua/ClientExampleRunner.java (100%) rename example/{opc-ua-sink => pipe-opc-ua-sink}/src/main/java/org/apache/iotdb/opcua/ClientTest.java (100%) rename example/{opc-ua-sink => pipe-opc-ua-sink}/src/main/java/org/apache/iotdb/opcua/IoTDBKeyStoreLoaderClient.java (100%) diff --git a/example/pipe-count-point-processor/pom.xml b/example/pipe-count-point-processor/pom.xml index 6b07cbdbdf45b..4689fcda2b8fb 100644 --- a/example/pipe-count-point-processor/pom.xml +++ b/example/pipe-count-point-processor/pom.xml @@ -27,7 +27,7 @@ 1.3.1-SNAPSHOT pipe-count-point-processor-example - IoTDB: Example: Pipe: CountPointProcessor + IoTDB: Example: Pipe: Count Point Processor 8 8 diff --git a/example/opc-ua-sink/pom.xml b/example/pipe-opc-ua-sink/pom.xml similarity index 81% rename from example/opc-ua-sink/pom.xml rename to example/pipe-opc-ua-sink/pom.xml index 0a68d2f6e49cb..a8550d26385f9 100644 --- a/example/opc-ua-sink/pom.xml +++ b/example/pipe-opc-ua-sink/pom.xml @@ -19,18 +19,15 @@ under the License. --> - - + org.apache.iotdb iotdb-examples 1.3.1-SNAPSHOT 4.0.0 - opc-ua-sink-example - IoTDB: Example: OPCUA Sink + pipe-opc-ua-sink-example + IoTDB: Example: Pipe: OPCUA Sink org.eclipse.milo diff --git a/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExample.java b/example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExample.java similarity index 100% rename from example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExample.java rename to example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExample.java diff --git a/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExampleRunner.java b/example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExampleRunner.java similarity index 100% rename from example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExampleRunner.java rename to example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExampleRunner.java diff --git a/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientTest.java b/example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientTest.java similarity index 100% rename from example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientTest.java rename to example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientTest.java diff --git a/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/IoTDBKeyStoreLoaderClient.java b/example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/IoTDBKeyStoreLoaderClient.java similarity index 100% rename from example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/IoTDBKeyStoreLoaderClient.java rename to example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/IoTDBKeyStoreLoaderClient.java diff --git a/example/pom.xml b/example/pom.xml index 05cc683a62dca..e3ed8c76f03d7 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -39,7 +39,7 @@ flink mqtt mqtt-customize - opc-ua-sink + pipe-opc-ua-sink pulsar udf trigger