Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ public void do_test()
assertEquals(results.get(0), 10);
assertEquals(results.get(1), 0);
assertEquals(results.get(2), 0);
assertEquals(results.get(3), 1, "number of received files");
insert_data(System.currentTimeMillis());
// Unsubscribe
consumer.unsubscribe(topicName);
Expand All @@ -177,6 +176,5 @@ public void do_test()
"Unsubscribe and resubscribe, progress is not retained. Full synchronization.");
assertEquals(results.get(1), 0, "Subscribe again after unsubscribe," + database + ".d_1");
assertEquals(results.get(2), 0, "Unsubscribe and then subscribe again," + database2 + ".d_2");
assertEquals(results.get(3), 1, "Number of received files: resubscribe after unsubscription");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void do_test()

AWAIT.untilAsserted(
() -> {
assertEquals(onReceiveCount.get(), 1, "receive files 1");
assertGte(onReceiveCount.get(), 1, "receive files over 1");
assertEquals(rowCounts.get(0).get(), 10, device + ".s_0");
assertEquals(rowCounts.get(1).get(), 0, device + ".s_1");
assertEquals(rowCounts.get(2).get(), 0, database + ".d_1.s_0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;

import org.apache.iotdb.db.pipe.connector.util.builder.PipeTableModeTsFileBuilder;
import org.apache.iotdb.db.pipe.connector.util.builder.PipeTreeModelTsFileBuilder;
import org.apache.iotdb.db.pipe.connector.util.builder.PipeTreeModelTsFileBuilderV2;
import org.apache.iotdb.db.pipe.connector.util.builder.PipeTsFileBuilder;
import org.apache.iotdb.db.pipe.connector.util.sorter.PipeTableModelTabletEventSorter;
import org.apache.iotdb.db.pipe.connector.util.sorter.PipeTreeModelTabletEventSorter;
Expand Down Expand Up @@ -61,7 +61,7 @@ public PipeTabletEventTsFileBatch(final int maxDelayInMs, final long requestMaxB
super(maxDelayInMs, requestMaxBatchSizeInBytes);

final AtomicLong tsFileIdGenerator = new AtomicLong(0);
treeModeTsFileBuilder = new PipeTreeModelTsFileBuilder(currentBatchId, tsFileIdGenerator);
treeModeTsFileBuilder = new PipeTreeModelTsFileBuilderV2(currentBatchId, tsFileIdGenerator);
tableModeTsFileBuilder = new PipeTableModeTsFileBuilder(currentBatchId, tsFileIdGenerator);
}

Expand Down Expand Up @@ -132,7 +132,10 @@ private void bufferTreeModelTablet(
final boolean isAligned) {
new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();

totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
// TODO: Currently, PipeTreeModelTsFileBuilderV2 still uses PipeTreeModelTsFileBuilder as a
// fallback builder, so memory table writing and storing temporary tablets require double the
// memory.
totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) * 2;

pipeName2WeightMap.compute(
new Pair<>(pipeName, creationTime),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.WriteUtils;
import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
Expand Down Expand Up @@ -142,7 +143,7 @@ List<Pair<String, File>> writeTableModelTabletsToTsFiles(
// Try making the tsfile size as large as possible
while (!linkedHashSet.isEmpty()) {
if (Objects.isNull(fileWriter)) {
createFileWriter();
fileWriter = new TsFileWriter(createFile());
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
Expand Down Expand Up @@ -143,7 +144,7 @@ private List<Pair<String, File>> writeTabletsToTsFiles()
// Try making the tsfile size as large as possible
while (!device2TabletsLinkedList.isEmpty()) {
if (Objects.isNull(fileWriter)) {
createFileWriter();
fileWriter = new TsFileWriter(createFile());
}
try {
tryBestToWriteTabletsIntoOneFile(device2TabletsLinkedList, device2Aligned);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.connector.util.builder;

import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.utils.DateUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

public class PipeTreeModelTsFileBuilderV2 extends PipeTsFileBuilder {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeTreeModelTsFileBuilderV2.class);

private static final PlanNodeId PLACEHOLDER_PLAN_NODE_ID =
new PlanNodeId("PipeTreeModelTsFileBuilderV2");

private final List<Tablet> tabletList = new ArrayList<>();
private final List<Boolean> isTabletAlignedList = new ArrayList<>();

// TODO: remove me later if stable
private final PipeTreeModelTsFileBuilder fallbackBuilder;

public PipeTreeModelTsFileBuilderV2(
final AtomicLong currentBatchId, final AtomicLong tsFileIdGenerator) {
super(currentBatchId, tsFileIdGenerator);
fallbackBuilder = new PipeTreeModelTsFileBuilder(currentBatchId, tsFileIdGenerator);
}

@Override
public void bufferTableModelTablet(final String dataBase, final Tablet tablet) {
throw new UnsupportedOperationException(
"PipeTreeModelTsFileBuilderV2 does not support table model tablet to build TSFile");
}

@Override
public void bufferTreeModelTablet(final Tablet tablet, final Boolean isAligned) {
tabletList.add(tablet);
isTabletAlignedList.add(isAligned);
fallbackBuilder.bufferTreeModelTablet(tablet, isAligned);
}

@Override
public List<Pair<String, File>> convertTabletToTsFileWithDBInfo()
throws IOException, WriteProcessException {
try {
return writeTabletsToTsFiles();
} catch (final Exception e) {
LOGGER.warn(
"Exception occurred when PipeTreeModelTsFileBuilderV2 writing tablets to tsfile, use fallback tsfile builder: {}",
e.getMessage(),
e);
return fallbackBuilder.convertTabletToTsFileWithDBInfo();
}
}

@Override
public boolean isEmpty() {
return tabletList.isEmpty();
}

@Override
public void onSuccess() {
super.onSuccess();
tabletList.clear();
isTabletAlignedList.clear();
fallbackBuilder.onSuccess();
}

@Override
public synchronized void close() {
super.close();
tabletList.clear();
isTabletAlignedList.clear();
fallbackBuilder.close();
}

private List<Pair<String, File>> writeTabletsToTsFiles() throws WriteProcessException {
final IMemTable memTable = new PrimitiveMemTable(null, null);
final List<Pair<String, File>> sealedFiles = new ArrayList<>();
try (final RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(createFile())) {
writeTabletsIntoOneFile(memTable, writer);
sealedFiles.add(new Pair<>(null, writer.getFile()));
} catch (final Exception e) {
LOGGER.warn(
"Batch id = {}: Failed to write tablets into tsfile, because {}",
currentBatchId.get(),
e.getMessage(),
e);
// TODO: handle ex
throw new WriteProcessException(e);
} finally {
memTable.release();
}

return sealedFiles;
}

private void writeTabletsIntoOneFile(
final IMemTable memTable, final RestorableTsFileIOWriter writer) throws Exception {
for (int i = 0, size = tabletList.size(); i < size; ++i) {
final Tablet tablet = tabletList.get(i);

// convert date value to int
// refer to
// org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
final Object[] values = tablet.getValues();
for (int j = 0; j < tablet.getSchemas().size(); ++j) {
final IMeasurementSchema schema = tablet.getSchemas().get(j);
if (Objects.nonNull(schema) && Objects.equals(TSDataType.DATE, schema.getType())) {
final LocalDate[] dates = ((LocalDate[]) values[j]);
final int[] dateValues = new int[dates.length];
for (int k = 0; k < Math.min(dates.length, tablet.getRowSize()); k++) {
dateValues[k] = DateUtils.parseDateExpressionToInt(dates[k]);
}
values[j] = dateValues;
}
}

final InsertTabletNode insertTabletNode =
new InsertTabletNode(
PLACEHOLDER_PLAN_NODE_ID,
new PartialPath(tablet.getDeviceId()),
isTabletAlignedList.get(i),
tablet.getSchemas().stream()
.map(IMeasurementSchema::getMeasurementName)
.toArray(String[]::new),
tablet.getSchemas().stream()
.map(IMeasurementSchema::getType)
.toArray(TSDataType[]::new),
// TODO: cast
tablet.getSchemas().stream()
.map(schema -> (MeasurementSchema) schema)
.toArray(MeasurementSchema[]::new),
tablet.getTimestamps(),
tablet.getBitMaps(),
tablet.getValues(),
tablet.getRowSize());

final int start = 0;
final int end = insertTabletNode.getRowCount();

try {
if (insertTabletNode.isAligned()) {
memTable.insertAlignedTablet(insertTabletNode, start, end, null);
} else {
memTable.insertTablet(insertTabletNode, start, end);
}
} catch (final org.apache.iotdb.db.exception.WriteProcessException e) {
throw new WriteProcessException(e);
}
}

final MemTableFlushTask memTableFlushTask = new MemTableFlushTask(memTable, writer, null, null);
memTableFlushTask.syncFlushMemTable();

writer.endFile();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,16 @@ public synchronized void close() {
}
}

protected void createFileWriter() throws IOException {
fileWriter =
new TsFileWriter(
new File(
batchFileBaseDir,
TS_FILE_PREFIX
+ "_"
+ IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
+ "_"
+ currentBatchId.get()
+ "_"
+ tsFileIdGenerator.getAndIncrement()
+ TsFileConstant.TSFILE_SUFFIX));
protected File createFile() throws IOException {
return new File(
batchFileBaseDir,
TS_FILE_PREFIX
+ "_"
+ IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
+ "_"
+ currentBatchId.get()
+ "_"
+ tsFileIdGenerator.getAndIncrement()
+ TsFileConstant.TSFILE_SUFFIX);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class WritingMetrics implements IMetricSet {
private static final WritingMetrics INSTANCE = new WritingMetrics();
Expand Down Expand Up @@ -813,6 +814,9 @@ public void recordFlushTsFileSize(String storageGroup, long size) {
}

private DataRegionId getDataRegionIdFromStorageGroupStr(String storageGroup) {
if (Objects.isNull(storageGroup)) {
return null;
}
int idx = storageGroup.lastIndexOf('-');
if (idx == -1) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2948,6 +2948,9 @@ public static void operateClearCache() {
}

public static Optional<String> getNonSystemDatabaseName(String databaseName) {
if (Objects.isNull(databaseName)) {
return Optional.empty();
}
if (databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) {
return Optional.empty();
}
Expand Down
Loading