Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IOTDB-884] group createMultiTimeseriesPlan by partitionGroup #1854

Merged
merged 3 commits into from
Oct 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CountPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
Expand Down Expand Up @@ -121,6 +122,8 @@ public Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(PhysicalPlan plan)
return splitAndRoutePlan((InsertRowPlan) plan);
} else if (plan instanceof AlterTimeSeriesPlan) {
return splitAndRoutePlan((AlterTimeSeriesPlan) plan);
} else if (plan instanceof CreateMultiTimeSeriesPlan) {
return splitAndRoutePlan((CreateMultiTimeSeriesPlan) plan);
}
//the if clause can be removed after the program is stable
if (PartitionUtils.isLocalNonQueryPlan(plan)) {
Expand Down Expand Up @@ -292,4 +295,64 @@ private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(CountPlan plan)
}
return result;
}

@SuppressWarnings("SuspiciousSystemArraycopy")
private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(CreateMultiTimeSeriesPlan plan)
throws MetadataException {
Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
Map<PartitionGroup, PhysicalPlan> groupHoldPlan = new HashMap<>();

for (int i = 0; i < plan.getPaths().size(); i++) {
PartialPath path = plan.getPaths().get(i);
PartitionGroup partitionGroup =
partitionTable.partitionByPathTime(path, 0);
CreateMultiTimeSeriesPlan subPlan = null;
if (groupHoldPlan.get(partitionGroup) == null) {
jt2594838 marked this conversation as resolved.
Show resolved Hide resolved
subPlan = new CreateMultiTimeSeriesPlan();
subPlan.setPaths(new ArrayList<>());
subPlan.setDataTypes(new ArrayList<>());
subPlan.setEncodings(new ArrayList<>());
subPlan.setCompressors(new ArrayList<>());
if (plan.getAlias() != null) {
subPlan.setAlias(new ArrayList<>());
}
if (plan.getProps() != null) {
subPlan.setProps(new ArrayList<>());
}
if (plan.getTags() != null) {
subPlan.setTags(new ArrayList<>());
}
if (plan.getAttributes() != null) {
subPlan.setAttributes(new ArrayList<>());
}
subPlan.setIndexes(new ArrayList<>());
groupHoldPlan.put(partitionGroup, subPlan);
} else {
subPlan = (CreateMultiTimeSeriesPlan) groupHoldPlan.get(partitionGroup);
}

subPlan.getPaths().add(path);
subPlan.getDataTypes().add(plan.getDataTypes().get(i));
subPlan.getEncodings().add(plan.getEncodings().get(i));
subPlan.getCompressors().add(plan.getCompressors().get(i));
if (plan.getAlias() != null) {
subPlan.getAlias().add(plan.getAlias().get(i));
}
if (plan.getProps() != null) {
subPlan.getProps().add(plan.getProps().get(i));
}
if (plan.getTags() != null) {
subPlan.getTags().add(plan.getTags().get(i));
}
if (plan.getAttributes() != null) {
subPlan.getAttributes().add(plan.getAttributes().get(i));
}
subPlan.getIndexes().add(plan.getIndexes().get(i));
}

for (Map.Entry<PartitionGroup, PhysicalPlan> entry : groupHoldPlan.entrySet()) {
result.put(entry.getValue(), entry.getKey());
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.service.IoTDB;
Expand Down Expand Up @@ -1466,18 +1467,18 @@ private Map<PhysicalPlan, PartitionGroup> splitPlan(PhysicalPlan plan)
*/
private TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) {
// the error codes from the groups that cannot execute the plan
TSStatus status;
TSStatus status = null;
if (planGroupMap.size() == 1) {
status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
} else {
if (plan instanceof InsertTabletPlan) {
// InsertTabletPlans contain many rows, each will correspond to a TSStatus as its
if (plan instanceof InsertTabletPlan || plan instanceof CreateMultiTimeSeriesPlan) {
// InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, each will correspond to a TSStatus as its
// execution result, as the plan is split and the sub-plans may have interleaving ranges,
// we must assure that each TSStatus is placed to the right position
// e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to NodeA and row2
// belongs to NodeB, when NodeA returns a success while NodeB returns a failure, the
// failure and success should be placed into proper positions in TSStatus.subStatus
status = forwardInsertTabletPlan(planGroupMap, (InsertTabletPlan) plan);
status = forwardMultiSubPlan(planGroupMap, plan);
} else {
status = forwardToMultipleGroup(planGroupMap);
}
Expand Down Expand Up @@ -1510,46 +1511,66 @@ private TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, Phy
*
* @param planGroupMap sub-plan -> data group pairs
*/
private TSStatus forwardInsertTabletPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap,
InsertTabletPlan plan) {
private TSStatus forwardMultiSubPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap,
PhysicalPlan parentPlan) {
List<String> errorCodePartitionGroups = new ArrayList<>();
TSStatus tmpStatus;
TSStatus[] subStatus = null;
boolean noFailure = true;
boolean isBatchFailure = false;
EndPoint endPoint = null;
InsertTabletPlan subPlan;
int totalRowNum = 0;
// for we put the result to right position for CreateMultiTimeSeriesPlan
Map<Integer, Integer> indexToPos = new HashMap<>();
if (parentPlan instanceof InsertTabletPlan) {
totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount();
} else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
totalRowNum = ((CreateMultiTimeSeriesPlan) parentPlan).getIndexes().size();

// index -> pos in the indexs array
for (int i = 0; i < totalRowNum; i++) {
indexToPos.put(((CreateMultiTimeSeriesPlan) parentPlan).getIndexes().get(i), i);
}
}
// send sub-plans to each belonging data group and collect results
for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
tmpStatus = forwardToSingleGroup(entry);
subPlan = (InsertTabletPlan) entry.getKey();
logger.debug("{}: from {},{},{}", name, entry.getKey(), entry.getValue(), tmpStatus);
noFailure =
(tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
(tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
isBatchFailure = (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
|| isBatchFailure;
if (tmpStatus.isSetRedirectNode() && subPlan.getMaxTime() == plan.getMaxTime()) {
endPoint = tmpStatus.getRedirectNode();
|| isBatchFailure;
if (parentPlan instanceof InsertTabletPlan) {
if (tmpStatus.isSetRedirectNode() &&
((InsertTabletPlan) entry.getKey()).getMaxTime() == ((InsertTabletPlan) parentPlan).getMaxTime()) {
endPoint = tmpStatus.getRedirectNode();
}
}
if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
if (subStatus == null) {
subStatus = new TSStatus[plan.getRowCount()];
subStatus = new TSStatus[totalRowNum];
Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
}
// set the status from one group to the proper positions of the overall status
PartitionUtils.reordering((InsertTabletPlan) entry.getKey(), subStatus,
if (parentPlan instanceof InsertTabletPlan) {
PartitionUtils.reordering((InsertTabletPlan) entry.getKey(), subStatus,
tmpStatus.subStatus.toArray(new TSStatus[]{}));
} else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) entry.getKey();
for (int i = 0; i < subPlan.getIndexes().size(); i++) {
subStatus[indexToPos.get(subPlan.getIndexes().get(i))] = tmpStatus.subStatus.get(i);
}
}
}
if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// execution failed, record the error message
errorCodePartitionGroups.add(String.format("[%s@%s:%s:%s]",
tmpStatus.getCode(), entry.getValue().getHeader(),
tmpStatus.getMessage(), tmpStatus.subStatus));
tmpStatus.getCode(), entry.getValue().getHeader(),
tmpStatus.getMessage(), tmpStatus.subStatus));
}
}

return concludeFinalStatus(noFailure, endPoint, isBatchFailure, subStatus,
errorCodePartitionGroups);
return concludeFinalStatus(noFailure, endPoint, isBatchFailure, subStatus, errorCodePartitionGroups);
}

private TSStatus concludeFinalStatus(boolean noFailure, EndPoint endPoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.IoTDBException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
Expand Down Expand Up @@ -1329,9 +1329,9 @@ void commitLog(Log log) throws LogExecutionException {
private TSStatus handleLogExecutionException(
PhysicalPlanLog log, LogExecutionException e) {
Throwable cause = IOUtils.getRootCause(e);
if (cause instanceof BatchInsertionException) {
if (cause instanceof BatchProcessException) {
return RpcUtils
.getStatus(Arrays.asList(((BatchInsertionException) cause).getFailingStatus()));
.getStatus(Arrays.asList(((BatchProcessException) cause).getFailingStatus()));
}
TSStatus tsStatus = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
cause.getClass().getName() + ":" + cause.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.ShutdownException;
import org.apache.iotdb.db.exception.StorageEngineException;
Expand Down Expand Up @@ -379,7 +379,7 @@ public void insert(InsertRowPlan insertRowPlan) throws StorageEngineException {
* @return result of each row
*/
public void insertTablet(InsertTabletPlan insertTabletPlan)
throws StorageEngineException, BatchInsertionException {
throws StorageEngineException, BatchProcessException {
StorageGroupProcessor storageGroupProcessor;
try {
storageGroupProcessor = getProcessor(insertTabletPlan.getDeviceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
Expand Down Expand Up @@ -641,10 +641,10 @@ public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException {
/**
* Insert a tablet (rows belonging to the same devices) into this storage group.
*
* @throws BatchInsertionException if some of the rows failed to be inserted
* @throws BatchProcessException if some of the rows failed to be inserted
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchInsertionException {
public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchProcessException {
writeLock();
try {
TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
Expand All @@ -669,7 +669,7 @@ public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchInsertio
}
// loc pointing at first legal position
if (loc == insertTabletPlan.getRowCount()) {
throw new BatchInsertionException(results);
throw new BatchProcessException(results);
}
// before is first start point
int before = loc;
Expand Down Expand Up @@ -730,7 +730,7 @@ public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchInsertio
tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime);

if (!noFailure) {
throw new BatchInsertionException(results);
throw new BatchProcessException(results);
}
} finally {
writeUnlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.service.rpc.thrift.TSStatus;

public class BatchInsertionException extends QueryProcessException {
public class BatchProcessException extends QueryProcessException {

private TSStatus[] failingStatus;

public BatchInsertionException(TSStatus[] failingStatus) {
super("Batch insertion failed");
public BatchProcessException(TSStatus[] failingStatus) {
super("Batch process failed");
this.failingStatus = failingStatus;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.io.IOException;
import java.sql.SQLException;
import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
Expand Down Expand Up @@ -94,7 +94,7 @@ void update(PartialPath path, long startTime, long endTime, String value)
* execute batch insert plan
*
* @return result of each row
* @throws BatchInsertionException when some of the rows failed
* @throws BatchProcessException when some of the rows failed
*/
void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.iotdb.db.engine.merge.manage.MergeManager.TaskStatus;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
Expand Down Expand Up @@ -130,6 +131,8 @@
import org.apache.iotdb.db.utils.AuthUtils;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
Expand Down Expand Up @@ -1056,8 +1059,9 @@ private boolean createTimeSeries(CreateTimeSeriesPlan createTimeSeriesPlan)
return true;
}

private boolean createMultiTimeSeries(CreateMultiTimeSeriesPlan createMultiTimeSeriesPlan) {
Map<Integer, Exception> results = new HashMap<>(createMultiTimeSeriesPlan.getPaths().size());
private boolean createMultiTimeSeries(CreateMultiTimeSeriesPlan createMultiTimeSeriesPlan) throws QueryProcessException {
TSStatus[] results = null;
boolean hasFailed = false;
for (int i = 0; i < createMultiTimeSeriesPlan.getPaths().size(); i++) {
CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan(createMultiTimeSeriesPlan.getPaths().get(i),
createMultiTimeSeriesPlan.getDataTypes().get(i), createMultiTimeSeriesPlan.getEncodings().get(i),
Expand All @@ -1070,11 +1074,19 @@ private boolean createMultiTimeSeries(CreateMultiTimeSeriesPlan createMultiTimeS
try {
createTimeSeries(plan);
} catch (QueryProcessException e) {
results.put(createMultiTimeSeriesPlan.getIndexes().get(i), e);
if (results == null) {
results = new TSStatus[createMultiTimeSeriesPlan.getIndexes().size()];
Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
}
results[i] = RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
hasFailed = true;
logger.debug("meet error while processing create timeseries. ", e);
}
}
createMultiTimeSeriesPlan.setResults(results);

if (hasFailed) {
throw new BatchProcessException(results);
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
Expand Down Expand Up @@ -283,6 +284,9 @@ public static PhysicalPlan create(ByteBuffer buffer) throws IOException, Illegal
plan = new FlushPlan();
plan.deserialize(buffer);
break;
case CREATE_MULTI_TIMESERIES:
plan = new CreateMultiTimeSeriesPlan();
plan.deserialize(buffer);
default:
throw new IOException("unrecognized log type " + type);
}
Expand All @@ -294,7 +298,7 @@ public enum PhysicalPlanType {
INSERT, DELETE, BATCHINSERT, SET_STORAGE_GROUP, CREATE_TIMESERIES, TTL, GRANT_WATERMARK_EMBEDDING,
REVOKE_WATERMARK_EMBEDDING, CREATE_ROLE, DELETE_ROLE, CREATE_USER, REVOKE_USER_ROLE, REVOKE_ROLE_PRIVILEGE,
REVOKE_USER_PRIVILEGE, GRANT_ROLE_PRIVILEGE, GRANT_USER_PRIVILEGE, GRANT_USER_ROLE, MODIFY_PASSWORD, DELETE_USER,
DELETE_STORAGE_GROUP, SHOW_TIMESERIES, DELETE_TIMESERIES, LOAD_CONFIGURATION, MULTI_CREATE_TIMESERIES,
DELETE_STORAGE_GROUP, SHOW_TIMESERIES, DELETE_TIMESERIES, LOAD_CONFIGURATION, CREATE_MULTI_TIMESERIES,
ALTER_TIMESERIES, FLUSH
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ public void setColumn(int index, Object column) {
columns[index] = column;
}

@Override
public long getMinTime() {
return minTime;
}
Expand Down
Loading