Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,6 @@ private void recover() throws DataRegionException {
true,
resource.getTsFile().getName());
resource.upgradeModFile(upgradeModFileThreadPool);
if (resource.anyModFileExists()) {
FileMetrics.getInstance().increaseModFileNum(1);
FileMetrics.getInstance().increaseModFileSize(resource.getTotalModSizeInByte());
}
}
}
while (!value.isEmpty()) {
Expand Down Expand Up @@ -531,10 +527,6 @@ private void recover() throws DataRegionException {
resource.getTsFile().getName());
}
resource.upgradeModFile(upgradeModFileThreadPool);
if (resource.anyModFileExists()) {
FileMetrics.getInstance().increaseModFileNum(1);
FileMetrics.getInstance().increaseModFileSize(resource.getTotalModSizeInByte());
}
}
while (!value.isEmpty()) {
TsFileResource tsFileResource = value.get(value.size() - 1);
Expand Down Expand Up @@ -1723,9 +1715,10 @@ public void syncDeleteDataFiles() throws TsFileProcessorException {
tsFileResourceList.forEach(
x -> {
FileMetrics.getInstance().deleteTsFile(x.isSeq(), Collections.singletonList(x));
if (x.getExclusiveModFile().exists()) {
FileMetrics.getInstance().decreaseModFileNum(1);
FileMetrics.getInstance().decreaseModFileSize(x.getExclusiveModFile().getSize());
try {
x.removeModFile();
} catch (IOException e) {
logger.warn("Cannot remove mod file {}", x, e);
}
});
deleteAllSGFolders(TierManager.getInstance().getAllFilesFolders());
Expand Down Expand Up @@ -1941,10 +1934,6 @@ public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorExcepti
resource.getTsFileSize(),
resource.isSeq(),
resource.getTsFile().getName());
if (resource.getExclusiveModFile().exists()) {
FileMetrics.getInstance().increaseModFileNum(1);
FileMetrics.getInstance().increaseModFileSize(resource.getExclusiveModFile().getSize());
}
}
WritingMetrics.getInstance().recordActiveTimePartitionCount(-1);
} finally {
Expand Down Expand Up @@ -2560,7 +2549,6 @@ private void deleteDataDirectlyInFile(List<TsFileResource> tsfileResourceList, M

for (ModificationFile involvedModificationFile : involvedModificationFiles) {
// delete data in sealed file
long originSize = involvedModificationFile.getSize();
involvedModificationFile.write(modEntry);
// The file size may be smaller than the original file, so the increment here may be
// negative
Expand All @@ -2569,8 +2557,6 @@ private void deleteDataDirectlyInFile(List<TsFileResource> tsfileResourceList, M
"[Deletion] Deletion with path {} written into mods file:{}.",
modEntry,
involvedModificationFile);
FileMetrics.getInstance()
.increaseModFileSize(involvedModificationFile.getSize() - originSize);
}

// can be deleted by files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,6 @@ public static void addFilesToFileMetrics(TsFileResource resource) {
resource.getTsFile().length(),
resource.isSeq(),
resource.getTsFile().getName());
if (resource.exclusiveModFileExists()) {
FileMetrics.getInstance().increaseModFileNum(1);
FileMetrics.getInstance().increaseModFileSize(resource.getTotalModSizeInByte());
}
}

private static void updateOneTargetMods(TsFileResource targetFile, Set<ModEntry> modifications)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,25 @@ public boolean affects(String measurementName) {
}

@Override
public void serialize(OutputStream stream) throws IOException {
ReadWriteIOUtils.writeVar(tableName, stream);
idPredicate.serialize(stream);
ReadWriteForEncodingUtils.writeVarInt(measurementNames.size(), stream);
public long serialize(OutputStream stream) throws IOException {
long size = ReadWriteIOUtils.writeVar(tableName, stream);
size += idPredicate.serialize(stream);
size += ReadWriteForEncodingUtils.writeVarInt(measurementNames.size(), stream);
for (String measurementName : measurementNames) {
ReadWriteIOUtils.writeVar(measurementName, stream);
size += ReadWriteIOUtils.writeVar(measurementName, stream);
}
return size;
}

@Override
public void serialize(ByteBuffer buffer) {
ReadWriteIOUtils.writeVar(tableName, buffer);
idPredicate.serialize(buffer);
ReadWriteForEncodingUtils.writeVarInt(measurementNames.size(), buffer);
public long serialize(ByteBuffer buffer) {
long size = ReadWriteIOUtils.writeVar(tableName, buffer);
size += idPredicate.serialize(buffer);
size += ReadWriteForEncodingUtils.writeVarInt(measurementNames.size(), buffer);
for (String measurementName : measurementNames) {
ReadWriteIOUtils.writeVar(measurementName, buffer);
size += ReadWriteIOUtils.writeVar(measurementName, buffer);
}
return size;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ public enum IDPredicateType {
SEGMENT_EXACT_MATCH,
AND;

public void serialize(OutputStream stream) throws IOException {
public long serialize(OutputStream stream) throws IOException {
stream.write((byte) ordinal());
return 1;
}

public void serialize(ByteBuffer buffer) {
public long serialize(ByteBuffer buffer) {
buffer.put((byte) ordinal());
return 1;
}

public static IDPredicateType deserialize(InputStream stream) throws IOException {
Expand All @@ -76,13 +78,13 @@ protected IDPredicate(IDPredicateType type) {
public abstract boolean matches(IDeviceID deviceID);

@Override
public void serialize(OutputStream stream) throws IOException {
type.serialize(stream);
public long serialize(OutputStream stream) throws IOException {
return type.serialize(stream);
}

@Override
public void serialize(ByteBuffer buffer) {
type.serialize(buffer);
public long serialize(ByteBuffer buffer) {
return type.serialize(buffer);
}

public static IDPredicate createFrom(ByteBuffer buffer) {
Expand Down Expand Up @@ -177,15 +179,17 @@ public int serializedSize() {
}

@Override
public void serialize(OutputStream stream) throws IOException {
super.serialize(stream);
deviceID.serialize(stream);
public long serialize(OutputStream stream) throws IOException {
long size = super.serialize(stream);
size += deviceID.serialize(stream);
return size;
}

@Override
public void serialize(ByteBuffer buffer) {
super.serialize(buffer);
deviceID.serialize(buffer);
public long serialize(ByteBuffer buffer) {
long size = super.serialize(buffer);
size += deviceID.serialize(buffer);
return size;
}

@Override
Expand Down Expand Up @@ -251,17 +255,19 @@ public int serializedSize() {
}

@Override
public void serialize(OutputStream stream) throws IOException {
super.serialize(stream);
ReadWriteIOUtils.writeVar(pattern, stream);
ReadWriteForEncodingUtils.writeVarInt(segmentIndex, stream);
public long serialize(OutputStream stream) throws IOException {
long size = super.serialize(stream);
size += ReadWriteIOUtils.writeVar(pattern, stream);
size += ReadWriteForEncodingUtils.writeVarInt(segmentIndex, stream);
return size;
}

@Override
public void serialize(ByteBuffer buffer) {
super.serialize(buffer);
ReadWriteIOUtils.writeVar(pattern, buffer);
ReadWriteForEncodingUtils.writeVarInt(segmentIndex, buffer);
public long serialize(ByteBuffer buffer) {
long size = super.serialize(buffer);
size += ReadWriteIOUtils.writeVar(pattern, buffer);
size += ReadWriteForEncodingUtils.writeVarInt(segmentIndex, buffer);
return size;
}

@Override
Expand Down Expand Up @@ -334,21 +340,23 @@ public int serializedSize() {
}

@Override
public void serialize(OutputStream stream) throws IOException {
super.serialize(stream);
ReadWriteForEncodingUtils.writeVarInt(predicates.size(), stream);
public long serialize(OutputStream stream) throws IOException {
long size = super.serialize(stream);
size += ReadWriteForEncodingUtils.writeVarInt(predicates.size(), stream);
for (IDPredicate predicate : predicates) {
predicate.serialize(stream);
size += predicate.serialize(stream);
}
return size;
}

@Override
public void serialize(ByteBuffer buffer) {
super.serialize(buffer);
ReadWriteForEncodingUtils.writeVarInt(predicates.size(), buffer);
public long serialize(ByteBuffer buffer) {
long size = super.serialize(buffer);
size += ReadWriteForEncodingUtils.writeVarInt(predicates.size(), buffer);
for (IDPredicate predicate : predicates) {
predicate.serialize(buffer);
size += predicate.serialize(buffer);
}
return size;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,21 @@ public int serializedSize() {
}

@Override
public void serialize(OutputStream stream) throws IOException {
public long serialize(OutputStream stream) throws IOException {
stream.write(modType.getTypeNum());
ReadWriteIOUtils.write(timeRange.getMin(), stream);
ReadWriteIOUtils.write(timeRange.getMax(), stream);
long size = 1;
size += ReadWriteIOUtils.write(timeRange.getMin(), stream);
size += ReadWriteIOUtils.write(timeRange.getMax(), stream);
return size;
}

@Override
public void serialize(ByteBuffer buffer) {
public long serialize(ByteBuffer buffer) {
buffer.put(modType.getTypeNum());
ReadWriteIOUtils.write(timeRange.getMin(), buffer);
ReadWriteIOUtils.write(timeRange.getMax(), buffer);
long size = 1;
size += ReadWriteIOUtils.write(timeRange.getMin(), buffer);
size += ReadWriteIOUtils.write(timeRange.getMax(), buffer);
return size;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;

import org.slf4j.Logger;
Expand Down Expand Up @@ -73,46 +74,56 @@ public ModificationFile(String filePath) {
public ModificationFile(File file) {
this.file = file;
fileExists = file.length() > 0;
if (fileExists) {
FileMetrics.getInstance().increaseModFileNum(1);
FileMetrics.getInstance().increaseModFileSize(file.length());
}
}

@SuppressWarnings("java:S2093") // cannot use try-with-resource, should not close here
public void write(ModEntry entry) throws IOException {
lock.writeLock().lock();
long size = 0;
try {
if (fileOutputStream == null) {
fileOutputStream =
new BufferedOutputStream(Files.newOutputStream(file.toPath(), CREATE, APPEND));
channel = FileChannel.open(file.toPath(), CREATE, APPEND);
}
entry.serialize(fileOutputStream);
size += entry.serialize(fileOutputStream);
fileOutputStream.flush();
} finally {
lock.writeLock().unlock();
}
if (!fileExists) {
fileExists = true;
FileMetrics.getInstance().increaseModFileNum(1);
}
FileMetrics.getInstance().increaseModFileSize(size);
}

@SuppressWarnings("java:S2093") // cannot use try-with-resource, should not close here
public void write(Collection<? extends ModEntry> entries) throws IOException {
lock.writeLock().lock();
long size = 0;
try {
if (fileOutputStream == null) {
fileOutputStream =
new BufferedOutputStream(Files.newOutputStream(file.toPath(), CREATE, APPEND));
channel = FileChannel.open(file.toPath(), CREATE, APPEND);
}
for (ModEntry entry : entries) {
entry.serialize(fileOutputStream);
size += entry.serialize(fileOutputStream);
}
fileOutputStream.flush();
} finally {
lock.writeLock().unlock();
}
if (!fileExists) {
FileMetrics.getInstance().increaseModFileNum(1);
fileExists = true;
}
FileMetrics.getInstance().increaseModFileSize(size);
}

public Iterator<ModEntry> getModIterator(long offset) throws IOException {
Expand Down Expand Up @@ -151,7 +162,7 @@ public File getFile() {
return file;
}

public long getFileLength() throws IOException {
public long getFileLength() {
lock.readLock().lock();
try {
return file.length();
Expand All @@ -172,10 +183,6 @@ public static long[] parseFileName(String name) {
return new long[] {levelNum, modNum};
}

public long getSize() {
return file.length();
}

public class ModIterator implements Iterator<ModEntry>, AutoCloseable {
private InputStream inputStream;
private ModEntry nextEntry;
Expand Down Expand Up @@ -251,6 +258,8 @@ public boolean exists() {
public void remove() throws IOException {
close();
FileUtils.deleteFileOrDirectory(file);
FileMetrics.getInstance().decreaseModFileNum(1);
FileMetrics.getInstance().decreaseModFileSize(getFileLength());
fileExists = false;
}

Expand Down Expand Up @@ -281,8 +290,8 @@ public String toString() {
return "ModificationFile{" + "file=" + file + '}';
}

public void compact() {
long originFileSize = getSize();
public void compact() throws IOException {
long originFileSize = getFileLength();
if (originFileSize > COMPACT_THRESHOLD && !hasCompacted) {
try {
Map<PartialPath, List<ModEntry>> pathModificationMap =
Expand All @@ -307,11 +316,11 @@ public void compact() {
Files.move(new File(newModsFileName).toPath(), file.toPath());
LOGGER.info("{} settle successful", file);

if (getSize() > COMPACT_THRESHOLD) {
if (getFileLength() > COMPACT_THRESHOLD) {
LOGGER.warn(
"After the mod file is settled, the file size is still greater than 1M,the size of the file before settle is {},after settled the file size is {}",
originFileSize,
getSize());
getFileLength());
}
} catch (IOException e) {
LOGGER.error("remove origin file or rename new mods file error.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iotdb.db.storageengine.dataregion.modification;

import org.apache.iotdb.db.service.metrics.FileMetrics;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;

import java.io.File;
Expand Down Expand Up @@ -136,7 +135,6 @@ private synchronized ModificationFile allocateNew(TsFileResource tsFileResource)
references.add(tsFileResource);
modFileReferences.put(newModFile, references);

FileMetrics.getInstance().increaseModFileNum(1);
return newModFile;
}

Expand Down
Loading