Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7441ad9
start
yschengzi Mar 29, 2023
e4ebcd4
matcher
yschengzi Apr 6, 2023
920f56a
spotless
yschengzi Apr 6, 2023
c635196
start tsfile epoch
yschengzi Apr 14, 2023
31a2bcc
finish real time collect
yschengzi Apr 17, 2023
75167f9
move to evnet
yschengzi Apr 17, 2023
4b05d1f
fix concurrent and multi tsfile epoch
yschengzi Apr 17, 2023
4261f37
add test
yschengzi Apr 18, 2023
8c2d5fb
rename DataRegionChangeDataCache to DataRegionChangeDataAssigner
yschengzi Apr 18, 2023
2ca352b
add rat
yschengzi Apr 19, 2023
8bb0611
add timeout and result count in PipeRealtimeCollectTest
yschengzi Apr 24, 2023
95c65d4
Merge remote-tracking branch 'apache/master' into IOTDB-5739
yschengzi Apr 25, 2023
ebea67f
add logback and turn alive into AtomicBoolean
yschengzi Apr 25, 2023
3e7333e
rename api package name
SteveYurongSu Apr 25, 2023
94bbc48
pom.xml: disrupter.version
SteveYurongSu Apr 25, 2023
8109de3
Merge branch 'master' of github.com:apache/iotdb into pr/9479
SteveYurongSu Apr 25, 2023
5dcbb3d
fix test: add wait time to 10 min
yschengzi Apr 25, 2023
ec80da2
merge branch 'IOTDB-5739' of github.com:yschengzi/iotdb into IOTDB-5739
yschengzi Apr 25, 2023
724f0f2
rename test package
SteveYurongSu Apr 25, 2023
0df653c
make config a seperate package
SteveYurongSu Apr 25, 2023
093f829
add some TODOs
SteveYurongSu Apr 25, 2023
78c03cc
refactor realtime collect process
SteveYurongSu Apr 26, 2023
d783f2d
fix: invoke PipeEngine in TsFileProcessor
SteveYurongSu Apr 26, 2023
a650638
optimize tests
SteveYurongSu Apr 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;

public class DoNothingConnector implements PipeConnector {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;

import java.io.IOException;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.pipe.api;

import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.event.Event;

/**
* PipeCollector
*
* <p>PipeCollector is responsible for capturing events from sources.
*
* <p>Various data sources can be supported by implementing different PipeCollector classes.
*
* <p>The lifecycle of a PipeCollector is as follows:
*
* <ul>
* <li>When a collaboration task is created, the KV pairs of `WITH COLLECTOR` clause in SQL are
* parsed and the validation method {@link PipeCollector#validate(PipeParameterValidator)}
* will be called to validate the parameters.
* <li>Before the collaboration task starts, the method {@link
* PipeCollector#customize(PipeParameters, PipeCollectorRuntimeConfiguration)} will be called
* to config the runtime behavior of the PipeCollector.
* <li>Then the method {@link PipeCollector#start()} will be called to start the PipeCollector.
* <li>While the collaboration task is in progress, the method {@link PipeCollector#supply()} will
* be called to capture events from sources and then the events will be passed to the
* PipeProcessor.
* <li>The method {@link PipeCollector#close()} will be called when the collaboration task is
* cancelled (the `DROP PIPE` command is executed).
* </ul>
*/
public interface PipeCollector extends PipePlugin {

/**
* This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
* PipeCollector#customize(PipeParameters, PipeCollectorRuntimeConfiguration)} is called.
*
* @param validator the validator used to validate {@link PipeParameters}
* @throws Exception if any parameter is not valid
*/
void validate(PipeParameterValidator validator) throws Exception;

/**
* This method is mainly used to customize PipeCollector. In this method, the user can do the
* following things:
*
* <ul>
* <li>Use PipeParameters to parse key-value pair attributes entered by the user.
* <li>Set the running configurations in PipeCollectorRuntimeConfiguration.
* </ul>
*
* <p>This method is called after the method {@link
* PipeCollector#validate(PipeParameterValidator)} is called.
*
* @param parameters used to parse the input parameters entered by the user
* @param configuration used to set the required properties of the running PipeCollector
* @throws Exception the user can throw errors if necessary
*/
void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration)
throws Exception;

/**
* Start the collector. After this method is called, events should be ready to be supplied by
* {@link PipeCollector#supply()}. This method is called after {@link
* PipeCollector#customize(PipeParameters, PipeCollectorRuntimeConfiguration)} is called.
*
* @throws Exception the user can throw errors if necessary
*/
void start() throws Exception;

/**
* Supply single event from the collector and the caller will send the event to the processor.
* This method is called after {@link PipeCollector#start()} is called.
*
* @return the event to be supplied. the event may be null if the collector has no more events at
* the moment, but the collector is still running for more events.
* @throws Exception the user can throw errors if necessary
*/
Event supply() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;

/**
* PipeConnector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;

/**
* PipeProcessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package org.apache.iotdb.pipe.api.collector;

import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;

import java.io.IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.iotdb.pipe.api.collector;

import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;

import java.io.IOException;
import java.util.function.BiConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@
* under the License.
*/

package org.apache.iotdb.db.pipe.core.collector.realtime;
package org.apache.iotdb.pipe.api.customizer.collector;

public class PipeRealtimeCollector {}
import org.apache.iotdb.pipe.api.customizer.PipeRuntimeConfiguration;
import org.apache.iotdb.pipe.api.exception.PipeException;

// TODO: complete this class
public class PipeCollectorRuntimeConfiguration implements PipeRuntimeConfiguration {

@Override
public void check() throws PipeException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@
package org.apache.iotdb.pipe.api.event;

/** This interface is used to abstract events in collaboration tasks. */
public interface Event {}
public interface Event {

/** @return the type of the event */
EventType getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
* under the License.
*/

package org.apache.iotdb.db.pipe.core.collector;
package org.apache.iotdb.pipe.api.event;

public class PipeCollectorEventPendingQueue {}
public enum EventType {
TABLET_INSERTION,
TSFILE_INSERTION,
DELETION,
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
* under the License.
*/

package org.apache.iotdb.pipe.api.event.deletion;
package org.apache.iotdb.pipe.api.event.dml.deletion;

import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.EventType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.TimeRange;

Expand All @@ -39,4 +40,9 @@ public interface DeletionEvent extends Event {
* @return TimeRange
*/
TimeRange getTimeRange();

@Override
default EventType getType() {
return EventType.DELETION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
* under the License.
*/

package org.apache.iotdb.pipe.api.event.insertion;
package org.apache.iotdb.pipe.api.event.dml.insertion;

import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.EventType;
import org.apache.iotdb.tsfile.write.record.Tablet;

import java.util.Iterator;
Expand Down Expand Up @@ -53,4 +54,9 @@ public interface TabletInsertionEvent extends Event {
* RowCollector
*/
TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer);

@Override
default EventType getType() {
return EventType.TABLET_INSERTION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
* under the License.
*/

package org.apache.iotdb.pipe.api.event.insertion;
package org.apache.iotdb.pipe.api.event.dml.insertion;

import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.EventType;

/**
* TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks,
Expand All @@ -41,4 +42,9 @@ public interface TsFileInsertionEvent extends Event {
* @return TsFileInsertionEvent
*/
TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable);

@Override
default EventType getType() {
return EventType.TSFILE_INSERTION;
}
}
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
<thrift.version>0.14.1</thrift.version>
<airline.version>0.8</airline.version>
<jackson.version>2.13.5</jackson.version>
<disrupter.version>3.4.2</disrupter.version>
<jackson.databind.version>2.13.4.2</jackson.databind.version>
<antlr4.version>4.8-1</antlr4.version>
<common.cli.version>1.3.1</common.cli.version>
Expand Down Expand Up @@ -283,6 +284,11 @@
<artifactId>guava</artifactId>
<version>[${guava.version},)</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>${disrupter.version}</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
Expand Down
6 changes: 4 additions & 2 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
Expand All @@ -222,7 +221,10 @@
<groupId>org.apache.iotdb</groupId>
<artifactId>node-commons</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</dependency>
</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.rescon.MemTableManager;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
Expand Down Expand Up @@ -278,6 +279,11 @@ public void insert(InsertRowNode insertRowNode) throws WriteProcessException {
workMemTable.insert(insertRowNode);
}

// collect plan node in pipe
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, tsFileResource);

// update start time of this memtable
tsFileResource.updateStartTime(
insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime());
Expand Down Expand Up @@ -379,13 +385,17 @@ public void insertTablet(
}
throw new WriteProcessException(e);
}

for (int i = start; i < end; i++) {
results[i] = RpcUtils.SUCCESS_STATUS;
}

// collect plan node in pipe
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionInfo.getDataRegion().getDataRegionId(), insertTabletNode, tsFileResource);

tsFileResource.updateStartTime(
insertTabletNode.getDeviceID().toStringID(), insertTabletNode.getTimes()[start]);

// for sequence tsfile, we update the endTime only when the file is prepared to be closed.
// for unsequence tsfile, we have to update the endTime for each insertion.
if (!sequence) {
Expand Down Expand Up @@ -843,6 +853,9 @@ void asyncClose() {
.getOrCreateSyncManager(dataRegionInfo.getDataRegion().getDataRegionId())) {
syncManager.syncRealTimeTsFile(tsFileResource.getTsFile());
}
PipeInsertionDataNodeListener.getInstance()
.listenToTsFile(dataRegionInfo.getDataRegion().getDataRegionId(), tsFileResource);

// When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke
// flushing memTable in System module.
addAMemtableIntoFlushingList(tmpMemTable);
Expand Down
Loading