Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release query resource while exception happened in query producer thread #903

Merged
merged 3 commits into from Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,15 +18,16 @@
*/
package org.apache.iotdb.db.engine.cache;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* This class is used to cache <code>TsFileMetaData</code> of tsfile in IoTDB.
*/
Expand Down Expand Up @@ -103,10 +104,10 @@ public TsFileMetaData get(TsFileResource tsFileResource) throws IOException {
}
synchronized (internPath) {
synchronized (cache) {
if (cache.containsKey(path)) {
if (cache.containsKey(tsFileResource)) {
cacheHitNum.incrementAndGet();
printCacheLog(true);
return cache.get(path);
return cache.get(tsFileResource);
}
}
printCacheLog(false);
Expand Down
Expand Up @@ -18,28 +18,6 @@
*/
package org.apache.iotdb.db.engine.storagegroup;

import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
Expand All @@ -62,14 +40,10 @@
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.*;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.mnode.LeafMNode;
import org.apache.iotdb.db.metadata.mnode.MNode;
Expand Down Expand Up @@ -99,6 +73,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;


/**
* For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
Expand Down Expand Up @@ -1690,9 +1675,15 @@ private void removeFullyOverlapFiles(TsFileResource resource, Iterator<TsFileRes
if (resource.getHistoricalVersions().containsAll(seqFile.getHistoricalVersions())
&& !resource.getHistoricalVersions().equals(seqFile.getHistoricalVersions())
&& seqFile.getWriteQueryLock().writeLock().tryLock()) {
iterator.remove();
seqFile.remove();
seqFile.getWriteQueryLock().writeLock().unlock();
try {
iterator.remove();
seqFile.remove();
} catch (Exception e) {
logger.error("Something gets wrong while removing FullyOverlapFiles ", e);
throw e;
} finally {
seqFile.getWriteQueryLock().writeLock().unlock();
}
}
}
}
Expand Down
Expand Up @@ -95,14 +95,24 @@ public void run() {
Thread.currentThread().interrupt();
reader.setHasRemaining(false);
} catch (IOException e) {
LOGGER.error(String
.format("Something gets wrong while reading from the series reader %s: ", pathName), e);
reader.setHasRemaining(false);
putExceptionBatchData(e, String.format("Something gets wrong while reading from the series reader %s: ", pathName));
} catch (Exception e) {
LOGGER.error("Something gets wrong: ", e);
putExceptionBatchData(e, "Something gets wrong: ");
}
}

private void putExceptionBatchData(Exception e, String logMessage) {
try {
LOGGER.error(logMessage, e);
reader.setHasRemaining(false);
blockingQueue.put(new ExceptionBatchData(e));
} catch (InterruptedException ex) {
ex.printStackTrace();
LOGGER.error("Interrupted while putting ExceptionBatchData into the blocking queue: ", ex);
Thread.currentThread().interrupt();
}
}

}

private List<ManagedSeriesReader> seriesReaderList;
Expand Down Expand Up @@ -141,7 +151,7 @@ public void run() {
* @param readers readers in List(IPointReader) structure
*/
public RawQueryDataSetWithoutValueFilter(List<Path> paths, List<TSDataType> dataTypes,
List<ManagedSeriesReader> readers) throws InterruptedException {
List<ManagedSeriesReader> readers) throws IOException, InterruptedException {
super(paths, dataTypes);
this.seriesReaderList = readers;
blockingQueueArray = new BlockingQueue[readers.size()];
Expand All @@ -153,7 +163,7 @@ public RawQueryDataSetWithoutValueFilter(List<Path> paths, List<TSDataType> data
init();
}

private void init() throws InterruptedException {
private void init() throws IOException, InterruptedException {
timeHeap = new TreeSet<>();
for (int i = 0; i < seriesReaderList.size(); i++) {
ManagedSeriesReader reader = seriesReaderList.get(i);
Expand All @@ -177,8 +187,7 @@ private void init() throws InterruptedException {
* for RPC in RawData query between client and server fill time buffer, value buffers and bitmap
* buffers
*/
public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder)
throws IOException, InterruptedException {
public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
int seriesNum = seriesReaderList.size();
TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();

Expand Down Expand Up @@ -339,14 +348,22 @@ public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder)
return tsQueryDataSet;
}

private void fillCache(int seriesIndex) throws InterruptedException {
private void fillCache(int seriesIndex) throws IOException, InterruptedException {
BatchData batchData = blockingQueueArray[seriesIndex].take();
// no more batch data in this time series queue
if (batchData instanceof SignalBatchData) {
noMoreDataInQueueArray[seriesIndex] = true;
}
// there are more batch data in this time series queue
else {
} else if (batchData instanceof ExceptionBatchData) {
// exception happened in producer thread
ExceptionBatchData exceptionBatchData = (ExceptionBatchData) batchData;
LOGGER.error("exception happened in producer thread", exceptionBatchData.exception);
if (exceptionBatchData.exception instanceof IOException) {
throw (IOException)exceptionBatchData.exception;
} else if (exceptionBatchData.exception instanceof RuntimeException) {
throw (RuntimeException)exceptionBatchData.exception;
}

} else { // there are more batch data in this time series queue
cachedBatchDataArray[seriesIndex] = batchData;

synchronized (seriesReaderList.get(seriesIndex)) {
Expand Down Expand Up @@ -387,7 +404,7 @@ protected boolean hasNextWithoutConstraint() {
* for spark/hadoop/hive integration and test
*/
@Override
protected RowRecord nextWithoutConstraint() {
protected RowRecord nextWithoutConstraint() throws IOException {
int seriesNum = seriesReaderList.size();

long minTime = timeHeap.pollFirst();
Expand All @@ -414,6 +431,9 @@ protected RowRecord nextWithoutConstraint() {
} catch (InterruptedException e) {
LOGGER.error("Interrupted while taking from the blocking queue: ", e);
Thread.currentThread().interrupt();
} catch (IOException e) {
LOGGER.error("Got IOException", e);
throw e;
}
}

Expand Down
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.iotdb.db.query.executor;

import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
Expand All @@ -41,6 +39,10 @@
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* IoTDB query executor.
*/
Expand All @@ -60,7 +62,7 @@ public RawDataQueryExecutor(RawDataQueryPlan queryPlan) {
* without filter or with global time filter.
*/
public QueryDataSet executeWithoutValueFilter(QueryContext context)
throws StorageEngineException {
throws StorageEngineException {

List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context);
try {
Expand All @@ -69,6 +71,8 @@ public QueryDataSet executeWithoutValueFilter(QueryContext context)
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StorageEngineException(e.getMessage());
} catch (IOException e) {
throw new StorageEngineException(e.getMessage());
}
}

Expand Down
Expand Up @@ -592,6 +592,7 @@ public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
private TSExecuteStatementResp internalExecuteQueryStatement(
long statementId, PhysicalPlan plan, int fetchSize, String username) {
long t1 = System.currentTimeMillis();
long queryId = -1;
try {
TSExecuteStatementResp resp = getQueryResp(plan, username); // column headers

Expand All @@ -611,7 +612,7 @@ private TSExecuteStatementResp internalExecuteQueryStatement(
} // else default ignoreTimeStamp is false
resp.setOperationType(plan.getOperatorType().toString());
// generate the queryId for the operation
long queryId = generateQueryId(true);
queryId = generateQueryId(true);
// put it into the corresponding Set

statementId2QueryId.computeIfAbsent(statementId, k -> new HashSet<>()).add(queryId);
Expand All @@ -629,6 +630,13 @@ private TSExecuteStatementResp internalExecuteQueryStatement(
return resp;
} catch (Exception e) {
logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
if (queryId != -1) {
try {
releaseQueryResource(queryId);
} catch (StorageEngineException ex) {
logger.error("Error happened while releasing query resource: ", ex);
}
}
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, t1);
Expand Down Expand Up @@ -861,6 +869,11 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
}
} catch (Exception e) {
logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
try {
releaseQueryResource(req.queryId);
} catch (StorageEngineException ex) {
logger.error("Error happened while releasing query resource: ", ex);
}
return RpcUtils.getTSFetchResultsResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
Expand Down
@@ -0,0 +1,16 @@
package org.apache.iotdb.tsfile.read.common;


public class ExceptionBatchData extends BatchData {

public Exception exception;

public ExceptionBatchData(Exception exception) {
this.exception = exception;
}

@Override
public boolean hasCurrent() {
throw new UnsupportedOperationException("hasCurrent is not supported for ExceptionBatchData");
}
}