Skip to content

Commit

Permalink
Merge 351cb50 into 6ba8aa8
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieTien97 committed Mar 12, 2020
2 parents 6ba8aa8 + 351cb50 commit b21f3ce
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 51 deletions.
Expand Up @@ -18,15 +18,16 @@
*/
package org.apache.iotdb.db.engine.cache;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* This class is used to cache <code>TsFileMetaData</code> of tsfile in IoTDB.
*/
Expand Down Expand Up @@ -103,10 +104,10 @@ public TsFileMetaData get(TsFileResource tsFileResource) throws IOException {
}
synchronized (internPath) {
synchronized (cache) {
if (cache.containsKey(path)) {
if (cache.containsKey(tsFileResource)) {
cacheHitNum.incrementAndGet();
printCacheLog(true);
return cache.get(path);
return cache.get(tsFileResource);
}
}
printCacheLog(false);
Expand Down
Expand Up @@ -18,28 +18,6 @@
*/
package org.apache.iotdb.db.engine.storagegroup;

import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
Expand All @@ -62,14 +40,10 @@
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.*;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.mnode.LeafMNode;
import org.apache.iotdb.db.metadata.mnode.MNode;
Expand Down Expand Up @@ -99,6 +73,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;


/**
* For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
Expand Down Expand Up @@ -1690,9 +1675,15 @@ private void removeFullyOverlapFiles(TsFileResource resource, Iterator<TsFileRes
if (resource.getHistoricalVersions().containsAll(seqFile.getHistoricalVersions())
&& !resource.getHistoricalVersions().equals(seqFile.getHistoricalVersions())
&& seqFile.getWriteQueryLock().writeLock().tryLock()) {
iterator.remove();
seqFile.remove();
seqFile.getWriteQueryLock().writeLock().unlock();
try {
iterator.remove();
seqFile.remove();
} catch (Exception e) {
logger.error("Something gets wrong while removing FullyOverlapFiles ", e);
throw e;
} finally {
seqFile.getWriteQueryLock().writeLock().unlock();
}
}
}
}
Expand Down
Expand Up @@ -95,14 +95,23 @@ public void run() {
Thread.currentThread().interrupt();
reader.setHasRemaining(false);
} catch (IOException e) {
LOGGER.error(String
.format("Something gets wrong while reading from the series reader %s: ", pathName), e);
reader.setHasRemaining(false);
putExceptionBatchData(e, String.format("Something gets wrong while reading from the series reader %s: ", pathName));
} catch (Exception e) {
LOGGER.error("Something gets wrong: ", e);
putExceptionBatchData(e, "Something gets wrong: ");
}
}

private void putExceptionBatchData(Exception e, String logMessage) {
try {
LOGGER.error(logMessage, e);
reader.setHasRemaining(false);
blockingQueue.put(new ExceptionBatchData(e));
} catch (InterruptedException ex) {
LOGGER.error("Interrupted while putting ExceptionBatchData into the blocking queue: ", ex);
Thread.currentThread().interrupt();
}
}

}

private List<ManagedSeriesReader> seriesReaderList;
Expand Down Expand Up @@ -141,7 +150,7 @@ public void run() {
* @param readers readers in List(IPointReader) structure
*/
public RawQueryDataSetWithoutValueFilter(List<Path> paths, List<TSDataType> dataTypes,
List<ManagedSeriesReader> readers) throws InterruptedException {
List<ManagedSeriesReader> readers) throws IOException, InterruptedException {
super(paths, dataTypes);
this.seriesReaderList = readers;
blockingQueueArray = new BlockingQueue[readers.size()];
Expand All @@ -153,7 +162,7 @@ public RawQueryDataSetWithoutValueFilter(List<Path> paths, List<TSDataType> data
init();
}

private void init() throws InterruptedException {
private void init() throws IOException, InterruptedException {
timeHeap = new TreeSet<>();
for (int i = 0; i < seriesReaderList.size(); i++) {
ManagedSeriesReader reader = seriesReaderList.get(i);
Expand All @@ -177,8 +186,7 @@ private void init() throws InterruptedException {
* for RPC in RawData query between client and server fill time buffer, value buffers and bitmap
* buffers
*/
public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder)
throws IOException, InterruptedException {
public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
int seriesNum = seriesReaderList.size();
TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();

Expand Down Expand Up @@ -339,14 +347,22 @@ public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder)
return tsQueryDataSet;
}

private void fillCache(int seriesIndex) throws InterruptedException {
private void fillCache(int seriesIndex) throws IOException, InterruptedException {
BatchData batchData = blockingQueueArray[seriesIndex].take();
// no more batch data in this time series queue
if (batchData instanceof SignalBatchData) {
noMoreDataInQueueArray[seriesIndex] = true;
}
// there are more batch data in this time series queue
else {
} else if (batchData instanceof ExceptionBatchData) {
// exception happened in producer thread
ExceptionBatchData exceptionBatchData = (ExceptionBatchData) batchData;
LOGGER.error("exception happened in producer thread", exceptionBatchData.getException());
if (exceptionBatchData.getException() instanceof IOException) {
throw (IOException)exceptionBatchData.getException();
} else if (exceptionBatchData.getException() instanceof RuntimeException) {
throw (RuntimeException)exceptionBatchData.getException();
}

} else { // there are more batch data in this time series queue
cachedBatchDataArray[seriesIndex] = batchData;

synchronized (seriesReaderList.get(seriesIndex)) {
Expand Down Expand Up @@ -387,7 +403,7 @@ protected boolean hasNextWithoutConstraint() {
* for spark/hadoop/hive integration and test
*/
@Override
protected RowRecord nextWithoutConstraint() {
protected RowRecord nextWithoutConstraint() throws IOException {
int seriesNum = seriesReaderList.size();

long minTime = timeHeap.pollFirst();
Expand All @@ -414,6 +430,9 @@ protected RowRecord nextWithoutConstraint() {
} catch (InterruptedException e) {
LOGGER.error("Interrupted while taking from the blocking queue: ", e);
Thread.currentThread().interrupt();
} catch (IOException e) {
LOGGER.error("Got IOException", e);
throw e;
}
}

Expand Down
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.iotdb.db.query.executor;

import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
Expand All @@ -41,6 +39,10 @@
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* IoTDB query executor.
*/
Expand All @@ -60,7 +62,7 @@ public RawDataQueryExecutor(RawDataQueryPlan queryPlan) {
* without filter or with global time filter.
*/
public QueryDataSet executeWithoutValueFilter(QueryContext context)
throws StorageEngineException {
throws StorageEngineException {

List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context);
try {
Expand All @@ -69,6 +71,8 @@ public QueryDataSet executeWithoutValueFilter(QueryContext context)
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StorageEngineException(e.getMessage());
} catch (IOException e) {
throw new StorageEngineException(e.getMessage());
}
}

Expand Down
Expand Up @@ -592,6 +592,7 @@ public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
private TSExecuteStatementResp internalExecuteQueryStatement(
long statementId, PhysicalPlan plan, int fetchSize, String username) {
long t1 = System.currentTimeMillis();
long queryId = -1;
try {
TSExecuteStatementResp resp = getQueryResp(plan, username); // column headers

Expand All @@ -611,7 +612,7 @@ private TSExecuteStatementResp internalExecuteQueryStatement(
} // else default ignoreTimeStamp is false
resp.setOperationType(plan.getOperatorType().toString());
// generate the queryId for the operation
long queryId = generateQueryId(true);
queryId = generateQueryId(true);
// put it into the corresponding Set

statementId2QueryId.computeIfAbsent(statementId, k -> new HashSet<>()).add(queryId);
Expand All @@ -629,6 +630,13 @@ private TSExecuteStatementResp internalExecuteQueryStatement(
return resp;
} catch (Exception e) {
logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
if (queryId != -1) {
try {
releaseQueryResource(queryId);
} catch (StorageEngineException ex) {
logger.error("Error happened while releasing query resource: ", ex);
}
}
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, t1);
Expand Down Expand Up @@ -861,6 +869,11 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
}
} catch (Exception e) {
logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
try {
releaseQueryResource(req.queryId);
} catch (StorageEngineException ex) {
logger.error("Error happened while releasing query resource: ", ex);
}
return RpcUtils.getTSFetchResultsResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
Expand Down
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.read.common;

public class ExceptionBatchData extends BatchData {

private Exception exception;

public ExceptionBatchData(Exception exception) {
this.exception = exception;
}

@Override
public boolean hasCurrent() {
throw new UnsupportedOperationException("hasCurrent is not supported for ExceptionBatchData");
}

public Exception getException() {
return exception;
}
}

0 comments on commit b21f3ce

Please sign in to comment.