Skip to content

Commit

Permalink
[IOTDB-512] file reader close bug (#846)
Browse files Browse the repository at this point in the history
* fix TsFileSequenceReader being closed bug
  • Loading branch information
JackieTien97 committed Feb 27, 2020
1 parent 907176c commit 5337696
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -84,21 +85,31 @@ public Chunk get(ChunkMetaData chunkMetaData, TsFileSequenceReader reader) throw
lock.readLock().unlock();
}

lock.writeLock().lock();
if (lruCache.containsKey(chunkMetaData)) {
lock.readLock().lock();
lock.writeLock().unlock();
cacheHitNum.incrementAndGet();
printCacheLog(true);
Chunk chunk = lruCache.get(chunkMetaData);
lock.readLock().unlock();
Lock cacheLock = lock.writeLock();
try {
cacheLock.lock();
if (lruCache.containsKey(chunkMetaData)) {
try {
cacheLock = lock.readLock();
cacheLock.lock();
} finally {
lock.writeLock().unlock();
}
cacheHitNum.incrementAndGet();
printCacheLog(true);
Chunk chunk = lruCache.get(chunkMetaData);
return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt(), reader.getEndianType());
}
printCacheLog(false);
Chunk chunk = reader.readMemChunk(chunkMetaData);
lruCache.put(chunkMetaData, chunk);
return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt(), reader.getEndianType());
} catch (IOException e) {
logger.error("something wrong happened while reading {}", reader.getFileName());
throw e;
} finally {
cacheLock.unlock();
}
printCacheLog(false);
Chunk chunk = reader.readMemChunk(chunkMetaData);
lruCache.put(chunkMetaData, chunk);
lock.writeLock().unlock();
return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt(), reader.getEndianType());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -115,7 +116,9 @@ private void clearUnUsedFilesInFixTime() {

private void clearMap(Map<TsFileResource, TsFileSequenceReader> readerMap,
Map<TsFileResource, AtomicInteger> refMap) {
for (Map.Entry<TsFileResource, TsFileSequenceReader> entry : readerMap.entrySet()) {
Iterator<Map.Entry<TsFileResource, TsFileSequenceReader>> iterator = readerMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<TsFileResource, TsFileSequenceReader> entry = iterator.next();
TsFileSequenceReader reader = entry.getValue();
AtomicInteger refAtom = refMap.get(entry.getKey());

Expand All @@ -125,8 +128,11 @@ private void clearMap(Map<TsFileResource, TsFileSequenceReader> readerMap,
} catch (IOException e) {
logger.error("Can not close TsFileSequenceReader {} !", reader.getFileName(), e);
}
readerMap.remove(entry.getKey());
iterator.remove();
refMap.remove(entry.getKey());
if (resourceLogger.isDebugEnabled()) {
resourceLogger.debug("{} TsFileReader is closed because of no reference.", entry.getValue().getFileName());
}
}
}
}
Expand Down Expand Up @@ -186,9 +192,9 @@ void increaseFileReaderReference(TsFileResource tsFile, boolean isClosed) {
void decreaseFileReaderReference(TsFileResource tsFile, boolean isClosed) {
synchronized (this) {
if (!isClosed && unclosedReferenceMap.containsKey(tsFile)) {
unclosedReferenceMap.get(tsFile).getAndDecrement();
unclosedReferenceMap.get(tsFile).decrementAndGet();
} else if (closedReferenceMap.containsKey(tsFile)){
closedReferenceMap.get(tsFile).getAndDecrement();
closedReferenceMap.get(tsFile).decrementAndGet();
}
}
tsFile.getWriteQueryLock().readLock().unlock();
Expand All @@ -199,21 +205,25 @@ void decreaseFileReaderReference(TsFileResource tsFile, boolean isClosed) {
* integration tests will not conflict with each other.
*/
public synchronized void closeAndRemoveAllOpenedReaders() throws IOException {
for (Map.Entry<TsFileResource, TsFileSequenceReader> entry : closedFileReaderMap.entrySet()) {
Iterator<Map.Entry<TsFileResource, TsFileSequenceReader>> iterator = closedFileReaderMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<TsFileResource, TsFileSequenceReader> entry = iterator.next();
entry.getValue().close();
if (resourceLogger.isInfoEnabled()) {
resourceLogger.info("{} closedTsFileReader is closed.", entry.getValue().getFileName());
if (resourceLogger.isDebugEnabled()) {
resourceLogger.debug("{} closedTsFileReader is closed.", entry.getValue().getFileName());
}
closedReferenceMap.remove(entry.getKey());
closedFileReaderMap.remove(entry.getKey());
iterator.remove();
}
for (Map.Entry<TsFileResource, TsFileSequenceReader> entry : unclosedFileReaderMap.entrySet()) {
iterator = unclosedFileReaderMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<TsFileResource, TsFileSequenceReader> entry = iterator.next();
entry.getValue().close();
if (resourceLogger.isInfoEnabled()) {
resourceLogger.info("{} unclosedTsFileReader is closed.", entry.getValue().getFileName());
if (resourceLogger.isDebugEnabled()) {
resourceLogger.debug("{} unclosedTsFileReader is closed.", entry.getValue().getFileName());
}
unclosedReferenceMap.remove(entry.getKey());
unclosedFileReaderMap.remove(entry.getKey());
iterator.remove();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
*/
package org.apache.iotdb.db.query.control;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
* <p>
Expand Down Expand Up @@ -77,8 +74,10 @@ private void addUsedFilesForQuery(long queryId, List<TsFileResource> resources)
// this file may be deleted just before we lock it
if (tsFileResource.isDeleted()) {
Map<Long, Set<TsFileResource>> pathMap = !isClosed ? unsealedFilePathsMap : sealedFilePathsMap;
pathMap.get(queryId).remove(tsFileResource);
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed);
// This resource may be removed by other threads of this query.
if (pathMap.get(queryId).remove(tsFileResource)) {
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed);
}
iterator.remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
private static class ReadTask implements Runnable {

private final ManagedSeriesReader reader;
private final String pathName;
private BlockingQueue<BatchData> blockingQueue;

public ReadTask(ManagedSeriesReader reader,
BlockingQueue<BatchData> blockingQueue) {
BlockingQueue<BatchData> blockingQueue, String pathName) {
this.reader = reader;
this.blockingQueue = blockingQueue;
this.pathName = pathName;
}

@Override
Expand Down Expand Up @@ -91,10 +93,13 @@ public void run() {
} catch (InterruptedException e) {
LOGGER.error("Interrupted while putting into the blocking queue: ", e);
Thread.currentThread().interrupt();
reader.setHasRemaining(false);
} catch (IOException e) {
LOGGER.error("Something gets wrong while reading from the series reader: ", e);
LOGGER.error(String.format("Something gets wrong while reading from the series reader %s: ", pathName), e);
reader.setHasRemaining(false);
} catch (Exception e) {
LOGGER.error("Something gets wrong: ", e);
reader.setHasRemaining(false);
}
}
}
Expand Down Expand Up @@ -153,7 +158,7 @@ private void init() throws InterruptedException {
ManagedSeriesReader reader = seriesReaderList.get(i);
reader.setHasRemaining(true);
reader.setManagedByQueryManager(true);
TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[i]));
TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[i], paths.get(i).getFullPath()));
}
for (int i = 0; i < seriesReaderList.size(); i++) {
fillCache(i);
Expand Down Expand Up @@ -351,7 +356,7 @@ private void fillCache(int seriesIndex) throws InterruptedException {
// now we should submit it again
if (!reader.isManagedByQueryManager() && reader.hasRemaining()) {
reader.setManagedByQueryManager(true);
TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[seriesIndex]));
TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[seriesIndex], paths.get(seriesIndex).getFullPath()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,9 @@ private List<ChunkMetaData> loadSatisfiedChunkMetadatas(TsFileResource resource)
}

for (ChunkMetaData data : currentChunkMetaDataList) {
if (data.getChunkLoader() == null) {
TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
.get(resource, resource.isClosed());
data.setChunkLoader(new DiskChunkLoader(tsFileSequenceReader));
}
TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
.get(resource, resource.isClosed());
data.setChunkLoader(new DiskChunkLoader(tsFileSequenceReader));
}
List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk();
if (memChunks != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,6 @@
*/
package org.apache.iotdb.tsfile.read;

import static org.apache.iotdb.tsfile.write.writer.TsFileIOWriter.magicStringBytes;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
Expand All @@ -37,11 +27,7 @@
import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.apache.iotdb.tsfile.file.metadata.*;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
Expand All @@ -54,6 +40,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.iotdb.tsfile.write.writer.TsFileIOWriter.magicStringBytes;

public class TsFileSequenceReader implements AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(TsFileSequenceReader.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

public class DefaultTsFileInput implements TsFileInput {

FileChannel channel;
private FileChannel channel;

public DefaultTsFileInput(Path file) throws IOException {
channel = FileChannel.open(file, StandardOpenOption.READ);
Expand Down

0 comments on commit 5337696

Please sign in to comment.