Skip to content

Commit

Permalink
Merge f812ca0 into 8b3fcb2
Browse files Browse the repository at this point in the history
  • Loading branch information
Jialin Qiao committed Apr 6, 2020
2 parents 8b3fcb2 + f812ca0 commit 9c14a74
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 24 deletions.
8 changes: 4 additions & 4 deletions server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
Expand Up @@ -101,11 +101,11 @@ void createTimeseries(String path, TSDataType dataType, TSEncoding encoding,
if (cur instanceof LeafMNode) {
throw new PathAlreadyExistException(cur.getFullPath());
}
MNode leaf = new LeafMNode(cur, nodeNames[nodeNames.length - 1], dataType, encoding,
compressor, props);
if (cur.hasChild(leaf.getName())) {
throw new MetadataException(String.format("The timeseries %s has already existed.", path));
String leafName = nodeNames[nodeNames.length - 1];
if (cur.hasChild(leafName)) {
throw new PathAlreadyExistException(path);
}
MNode leaf = new LeafMNode(cur, leafName, dataType, encoding, compressor, props);
cur.addChild(leaf);
}

Expand Down
Expand Up @@ -63,6 +63,7 @@
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
Expand Down Expand Up @@ -127,9 +128,12 @@
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PlanExecutor implements IPlanExecutor {

private static final Logger logger = LoggerFactory.getLogger(PlanExecutor.class);
// for data query
protected IQueryRouter queryRouter;
// for system schema
Expand Down Expand Up @@ -709,10 +713,7 @@ public void insert(InsertPlan insertPlan) throws QueryProcessException {
}
TSDataType dataType = TypeInferenceUtils.getPredictedDataType(strValues[i]);
Path path = new Path(deviceId, measurement);

mManager.createTimeseries(path.toString(), dataType, getDefaultEncoding(dataType),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
internalCreateTimeseries(path.toString(), dataType);
}
LeafMNode measurementNode = (LeafMNode) node.getChild(measurement);
schemas[i] = measurementNode.getSchema();
Expand All @@ -724,6 +725,22 @@ public void insert(InsertPlan insertPlan) throws QueryProcessException {
}
}

/**
* create timeseries with ignore PathAlreadyExistException
*/
private void internalCreateTimeseries(String path, TSDataType dataType) throws MetadataException {
try {
mManager.createTimeseries(path, dataType, getDefaultEncoding(dataType),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
} catch (PathAlreadyExistException e) {
if (logger.isDebugEnabled()) {
logger.debug("Ignore PathAlreadyExistException when Concurrent inserting"
+ " a non-exist time series {}", path);
}
}
}

/**
* Get default encoding by dataType
*/
Expand Down Expand Up @@ -768,9 +785,7 @@ public TSStatus[] insertBatch(BatchInsertPlan batchInsertPlan) throws QueryProce
}
Path path = new Path(deviceId, measurementList[i]);
TSDataType dataType = dataTypes[i];
mManager.createTimeseries(path.getFullPath(), dataType, getDefaultEncoding(dataType),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
internalCreateTimeseries(path.getFullPath(), dataType);
}
LeafMNode measurementNode = (LeafMNode) node.getChild(measurementList[i]);

Expand Down
31 changes: 18 additions & 13 deletions server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
Expand Up @@ -1074,22 +1074,27 @@ public TSExecuteBatchStatementResp testInsertRowInBatch(TSInsertInBatchReq req)

@Override
public TSStatus insert(TSInsertReq req) {
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}
try {
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}

InsertPlan plan = new InsertPlan();
plan.setDeviceId(req.getDeviceId());
plan.setTime(req.getTimestamp());
plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
plan.setValues(req.getValues().toArray(new String[0]));
InsertPlan plan = new InsertPlan();
plan.setDeviceId(req.getDeviceId());
plan.setTime(req.getTimestamp());
plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
plan.setValues(req.getValues().toArray(new String[0]));

TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
return status;
TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
return status;
}
return executePlan(plan);
} catch (Exception e) {
logger.error("meet error when insert", e);
}
return executePlan(plan);
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}

@Override
Expand Down

0 comments on commit 9c14a74

Please sign in to comment.