diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java index fd93b329bfa96..d5301e7d24270 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java @@ -18,8 +18,6 @@ */ package org.apache.iotdb.db.engine.cache; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -27,6 +25,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + /** * This class is used to cache TsFileMetaData of tsfile in IoTDB. */ @@ -103,10 +104,10 @@ public TsFileMetaData get(TsFileResource tsFileResource) throws IOException { } synchronized (internPath) { synchronized (cache) { - if (cache.containsKey(path)) { + if (cache.containsKey(tsFileResource)) { cacheHitNum.incrementAndGet(); printCacheLog(true); - return cache.get(path); + return cache.get(tsFileResource); } } printCacheLog(false); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 16bfb4a295a6f..72e05b192dada 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -18,28 +18,6 @@ */ package org.apache.iotdb.db.engine.storagegroup; -import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; -import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -62,14 +40,10 @@ import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.version.SimpleFileVersionController; import org.apache.iotdb.db.engine.version.VersionController; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.exception.MergeException; -import org.apache.iotdb.db.exception.TsFileProcessorException; +import org.apache.iotdb.db.exception.*; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.exception.StorageGroupProcessorException; -import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.metadata.mnode.LeafMNode; import org.apache.iotdb.db.metadata.mnode.MNode; @@ -99,6 +73,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; +import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; + /** * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one @@ -1690,9 +1675,15 @@ private void removeFullyOverlapFiles(TsFileResource resource, Iterator seriesReaderList; @@ -141,7 +150,7 @@ public void run() { * @param readers readers in List(IPointReader) structure */ public RawQueryDataSetWithoutValueFilter(List paths, List dataTypes, - List readers) throws InterruptedException { + List readers) throws IOException, InterruptedException { super(paths, dataTypes); this.seriesReaderList = readers; blockingQueueArray = new BlockingQueue[readers.size()]; @@ -153,7 +162,7 @@ public RawQueryDataSetWithoutValueFilter(List paths, List data init(); } - private void init() throws InterruptedException { + private void init() throws IOException, InterruptedException { timeHeap = new TreeSet<>(); for (int i = 0; i < seriesReaderList.size(); i++) { ManagedSeriesReader reader = seriesReaderList.get(i); @@ -177,8 +186,7 @@ private void init() throws InterruptedException { * for RPC in RawData query between client and server fill time buffer, value buffers and bitmap * buffers */ - public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) - throws IOException, InterruptedException { + public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException { int seriesNum = seriesReaderList.size(); TSQueryDataSet tsQueryDataSet = new TSQueryDataSet(); @@ -339,14 +347,22 @@ public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) return tsQueryDataSet; } - private void fillCache(int seriesIndex) throws InterruptedException { + private void fillCache(int seriesIndex) throws IOException, InterruptedException { BatchData batchData = blockingQueueArray[seriesIndex].take(); // no more batch data in this time series queue if (batchData instanceof SignalBatchData) { noMoreDataInQueueArray[seriesIndex] = true; - } - // there are more batch data in this time series queue - else { + } else if (batchData instanceof ExceptionBatchData) { + // exception happened in producer thread + ExceptionBatchData exceptionBatchData = (ExceptionBatchData) batchData; + LOGGER.error("exception happened in producer thread", exceptionBatchData.getException()); + if (exceptionBatchData.getException() instanceof IOException) { + throw (IOException)exceptionBatchData.getException(); + } else if (exceptionBatchData.getException() instanceof RuntimeException) { + throw (RuntimeException)exceptionBatchData.getException(); + } + + } else { // there are more batch data in this time series queue cachedBatchDataArray[seriesIndex] = batchData; synchronized (seriesReaderList.get(seriesIndex)) { @@ -387,7 +403,7 @@ protected boolean hasNextWithoutConstraint() { * for spark/hadoop/hive integration and test */ @Override - protected RowRecord nextWithoutConstraint() { + protected RowRecord nextWithoutConstraint() throws IOException { int seriesNum = seriesReaderList.size(); long minTime = timeHeap.pollFirst(); @@ -414,6 +430,9 @@ protected RowRecord nextWithoutConstraint() { } catch (InterruptedException e) { LOGGER.error("Interrupted while taking from the blocking queue: ", e); Thread.currentThread().interrupt(); + } catch (IOException e) { + LOGGER.error("Got IOException", e); + throw e; } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java index 4f9ea11ba49d9..4eb2cbddcad67 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java @@ -18,8 +18,6 @@ */ package org.apache.iotdb.db.query.executor; -import java.util.ArrayList; -import java.util.List; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; @@ -41,6 +39,10 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + /** * IoTDB query executor. */ @@ -60,7 +62,7 @@ public RawDataQueryExecutor(RawDataQueryPlan queryPlan) { * without filter or with global time filter. */ public QueryDataSet executeWithoutValueFilter(QueryContext context) - throws StorageEngineException { + throws StorageEngineException { List readersOfSelectedSeries = initManagedSeriesReader(context); try { @@ -69,6 +71,8 @@ public QueryDataSet executeWithoutValueFilter(QueryContext context) } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new StorageEngineException(e.getMessage()); + } catch (IOException e) { + throw new StorageEngineException(e.getMessage()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 3ad965c25717b..130dd6d806e70 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -592,6 +592,7 @@ public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) { private TSExecuteStatementResp internalExecuteQueryStatement( long statementId, PhysicalPlan plan, int fetchSize, String username) { long t1 = System.currentTimeMillis(); + long queryId = -1; try { TSExecuteStatementResp resp = getQueryResp(plan, username); // column headers @@ -611,7 +612,7 @@ private TSExecuteStatementResp internalExecuteQueryStatement( } // else default ignoreTimeStamp is false resp.setOperationType(plan.getOperatorType().toString()); // generate the queryId for the operation - long queryId = generateQueryId(true); + queryId = generateQueryId(true); // put it into the corresponding Set statementId2QueryId.computeIfAbsent(statementId, k -> new HashSet<>()).add(queryId); @@ -629,6 +630,13 @@ private TSExecuteStatementResp internalExecuteQueryStatement( return resp; } catch (Exception e) { logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e); + if (queryId != -1) { + try { + releaseQueryResource(queryId); + } catch (StorageEngineException ex) { + logger.error("Error happened while releasing query resource: ", ex); + } + } return RpcUtils.getTSExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); } finally { Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, t1); @@ -861,6 +869,11 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) { } } catch (Exception e) { logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e); + try { + releaseQueryResource(req.queryId); + } catch (StorageEngineException ex) { + logger.error("Error happened while releasing query resource: ", ex); + } return RpcUtils.getTSFetchResultsResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java new file mode 100644 index 0000000000000..d71d39b03abf5 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.read.common; + +public class ExceptionBatchData extends BatchData { + + private Exception exception; + + public ExceptionBatchData(Exception exception) { + this.exception = exception; + } + + @Override + public boolean hasCurrent() { + throw new UnsupportedOperationException("hasCurrent is not supported for ExceptionBatchData"); + } + + public Exception getException() { + return exception; + } +}