Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeReceiverMetrics;
import org.apache.iotdb.db.pipe.receiver.visitor.PipePlanToStatementVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementDataTypeConvertExecutionVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTableModelDataTypeConvertExecutionVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementToBatchVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTreeModelDataTypeConvertExecutionVisitor;
import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
Expand Down Expand Up @@ -144,9 +145,6 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {
new PipeStatementExceptionVisitor();
private static final PipeStatementPatternParseVisitor STATEMENT_PATTERN_PARSE_VISITOR =
new PipeStatementPatternParseVisitor();
private final PipeStatementDataTypeConvertExecutionVisitor
statementDataTypeConvertExecutionVisitor =
new PipeStatementDataTypeConvertExecutionVisitor(this::executeStatementForTreeModel);
private final PipeStatementToBatchVisitor batchVisitor = new PipeStatementToBatchVisitor();

// Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> confignode (cluster
Expand Down Expand Up @@ -770,18 +768,27 @@ private TSStatus executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch(
? executeStatementForTableModel(statement, dataBaseName)
: executeStatementForTreeModel(statement);

// The following code is used to handle the data type mismatch exception
// Data type conversion is not supported for table model statements
if (isTableModelStatement) {
return status;
}
// Try to convert data type if the statement is a tree model statement
// and the status code is not success
return shouldConvertDataTypeOnTypeMismatch
&& ((statement instanceof InsertBaseStatement
&& ((InsertBaseStatement) statement).hasFailedMeasurements())
|| status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
? statement.accept(statementDataTypeConvertExecutionVisitor, status).orElse(status)
? isTableModelStatement
? statement
.accept(
new PipeStatementTableModelDataTypeConvertExecutionVisitor(
tableConvertStatement ->
executeStatementForTableModel(tableConvertStatement, dataBaseName),
dataBaseName),
status)
.orElse(status)
: statement
.accept(
new PipeStatementTreeModelDataTypeConvertExecutionVisitor(
this::executeStatementForTreeModel),
status)
.orElse(status)
: status;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public PipeConvertedInsertRowStatement(final InsertRowStatement insertRowStateme
}

@Override
protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
public boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
LOGGER.info(
"Pipe: Inserting row to {}.{}. Casting type from {} to {}.",
devicePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
Expand All @@ -26,7 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipeConvertedInsertTabletStatement extends InsertTabletStatement {
public abstract class PipeConvertedInsertTabletStatement extends InsertTabletStatement {

private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConvertedInsertTabletStatement.class);
Expand All @@ -50,7 +50,7 @@ public PipeConvertedInsertTabletStatement(final InsertTabletStatement insertTabl
}

@Override
protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
public boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
LOGGER.info(
"Pipe: Inserting tablet to {}.{}. Casting type from {} to {}.",
devicePath,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.receiver.transform.statement;

import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;

public class PipeTableModelConvertedInsertRowStatement extends PipeConvertedInsertRowStatement {

public PipeTableModelConvertedInsertRowStatement(
final InsertRowStatement insertRowStatement, final String databaseName) {
super(insertRowStatement);
// InsertBaseStatement
columnCategories = insertRowStatement.getColumnCategories();
idColumnIndices = insertRowStatement.getIdColumnIndices();
attrColumnIndices = insertRowStatement.getAttrColumnIndices();
writeToTable = insertRowStatement.isWriteToTable();
logicalViewSchemaList = insertRowStatement.getLogicalViewSchemaList();

setDatabaseName(databaseName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.receiver.transform.statement;

import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;

public class PipeTableModelConvertedInsertTabletStatement
extends PipeConvertedInsertTabletStatement {

public PipeTableModelConvertedInsertTabletStatement(
final InsertTabletStatement insertTabletStatement, final String databaseName) {
super(insertTabletStatement);
// InsertBaseStatement
columnCategories = insertTabletStatement.getColumnCategories();
idColumnIndices = insertTabletStatement.getIdColumnIndices();
attrColumnIndices = insertTabletStatement.getAttrColumnIndices();
writeToTable = insertTabletStatement.isWriteToTable();
logicalViewSchemaList = insertTabletStatement.getLogicalViewSchemaList();

setDatabaseName(databaseName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.receiver.transform.statement;

import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;

public class PipeTreeModelConvertedInsertRowStatement extends PipeConvertedInsertRowStatement {

public PipeTreeModelConvertedInsertRowStatement(final InsertRowStatement insertRowStatement) {
super(insertRowStatement);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.receiver.transform.statement;

import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;

public class PipeTreeModelConvertedInsertTabletStatement
extends PipeConvertedInsertTabletStatement {

public PipeTreeModelConvertedInsertTabletStatement(
final InsertTabletStatement insertTabletStatement) {
super(insertTabletStatement);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,19 @@
package org.apache.iotdb.db.pipe.receiver.visitor;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
import org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement;
import org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.commons.io.FileUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* This visitor transforms the data type of the statement when the statement is executed and an
* exception occurs. The transformed statement (if any) is returned and will be executed again.
*/
public class PipeStatementDataTypeConvertExecutionVisitor
public abstract class PipeStatementDataTypeConvertExecutionVisitor
extends StatementVisitor<Optional<TSStatus>, TSStatus> {

private static final Logger LOGGER =
Expand All @@ -61,13 +43,13 @@ public interface StatementExecutor {
TSStatus execute(final Statement statement);
}

private final StatementExecutor statementExecutor;
protected final StatementExecutor statementExecutor;

public PipeStatementDataTypeConvertExecutionVisitor(final StatementExecutor statementExecutor) {
this.statementExecutor = statementExecutor;
}

private Optional<TSStatus> tryExecute(final Statement statement) {
protected Optional<TSStatus> tryExecute(final Statement statement) {
try {
return Optional.of(statementExecutor.execute(statement));
} catch (final Exception e) {
Expand Down Expand Up @@ -95,33 +77,8 @@ public Optional<TSStatus> visitLoadFile(
status,
loadTsFileStatement);

for (final File file : loadTsFileStatement.getTsFiles()) {
try (final TsFileInsertionEventScanParser container =
new TsFileInsertionEventScanParser(
file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {
for (final Pair<Tablet, Boolean> tabletWithIsAligned : container.toTabletWithIsAligneds()) {
final PipeConvertedInsertTabletStatement statement =
new PipeConvertedInsertTabletStatement(
PipeTransferTabletRawReq.toTPipeTransferRawReq(
tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight())
.constructStatement());
TSStatus result = statementExecutor.execute(statement);

// Retry once if the write process is rejected
if (result.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
result = statementExecutor.execute(statement);
}

if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())) {
return Optional.empty();
}
}
} catch (final Exception e) {
LOGGER.warn(
"Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e);
return Optional.empty();
}
if (!executeConvert(loadTsFileStatement).isPresent()) {
return Optional.empty();
}

if (loadTsFileStatement.isDeleteAfterLoad()) {
Expand All @@ -134,65 +91,6 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {
return Optional.of(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}

@Override
public Optional<TSStatus> visitInsertRow(
final InsertRowStatement insertRowStatement, final TSStatus status) {
return tryExecute(new PipeConvertedInsertRowStatement(insertRowStatement));
}

@Override
public Optional<TSStatus> visitInsertRows(
final InsertRowsStatement insertRowsStatement, final TSStatus status) {
if (insertRowsStatement.getInsertRowStatementList() == null
|| insertRowsStatement.getInsertRowStatementList().isEmpty()) {
return Optional.empty();
}

final InsertRowsStatement convertedInsertRowsStatement = new InsertRowsStatement();
convertedInsertRowsStatement.setInsertRowStatementList(
insertRowsStatement.getInsertRowStatementList().stream()
.map(PipeConvertedInsertRowStatement::new)
.collect(Collectors.toList()));
return tryExecute(convertedInsertRowsStatement);
}

@Override
public Optional<TSStatus> visitInsertRowsOfOneDevice(
final InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, final TSStatus status) {
if (insertRowsOfOneDeviceStatement.getInsertRowStatementList() == null
|| insertRowsOfOneDeviceStatement.getInsertRowStatementList().isEmpty()) {
return Optional.empty();
}

final InsertRowsOfOneDeviceStatement convertedInsertRowsOfOneDeviceStatement =
new InsertRowsOfOneDeviceStatement();
convertedInsertRowsOfOneDeviceStatement.setInsertRowStatementList(
insertRowsOfOneDeviceStatement.getInsertRowStatementList().stream()
.map(PipeConvertedInsertRowStatement::new)
.collect(Collectors.toList()));
return tryExecute(convertedInsertRowsOfOneDeviceStatement);
}

@Override
public Optional<TSStatus> visitInsertTablet(
final InsertTabletStatement insertTabletStatement, final TSStatus status) {
return tryExecute(new PipeConvertedInsertTabletStatement(insertTabletStatement));
}

@Override
public Optional<TSStatus> visitInsertMultiTablets(
final InsertMultiTabletsStatement insertMultiTabletsStatement, final TSStatus status) {
if (insertMultiTabletsStatement.getInsertTabletStatementList() == null
|| insertMultiTabletsStatement.getInsertTabletStatementList().isEmpty()) {
return Optional.empty();
}

final InsertMultiTabletsStatement convertedInsertMultiTabletsStatement =
new InsertMultiTabletsStatement();
convertedInsertMultiTabletsStatement.setInsertTabletStatementList(
insertMultiTabletsStatement.getInsertTabletStatementList().stream()
.map(PipeConvertedInsertTabletStatement::new)
.collect(Collectors.toList()));
return tryExecute(convertedInsertMultiTabletsStatement);
}
protected abstract Optional<TSStatus> executeConvert(
final LoadTsFileStatement loadTsFileStatement);
}
Loading