Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ public static short getSegIndex(long globalIndex) {
return (short) (globalIndex & SchemaFileConfig.SEG_INDEX_MASK);
}

/** TODO: shall merge with {@linkplain PageManager#reEstimateSegSize} */
static short reEstimateSegSize(int oldSize) {
for (short size : SchemaFileConfig.SEG_SIZE_LST) {
if (oldSize < size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.SchemaFileConfig.SEG_HEADER_SIZE;
import static org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.SchemaFileConfig.SEG_MAX_SIZ;
import static org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.SchemaFileConfig.SEG_MIN_SIZ;
import static org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.SchemaFileConfig.SEG_OFF_DIG;
import static org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.SchemaFileConfig.SEG_SIZE_LST;
import static org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.SchemaFileConfig.SEG_SIZE_METRIC;

Expand Down Expand Up @@ -148,7 +149,6 @@ public void writeNewChildren(IMNode node) throws MetadataException, IOException
ISchemaPage curPage;
ByteBuffer childBuffer;
String alias;
int secIdxEntrance = -1; // first page of secondary index
// TODO: reserve order of insert in container may be better
for (Map.Entry<String, IMNode> entry :
ICachedMNodeContainer.getCachedMNodeContainer(node).getNewChildBuffer().entrySet().stream()
Expand Down Expand Up @@ -477,7 +477,7 @@ public ISchemaPage getPageInstance(int pageIdx) throws IOException, MetadataExce
@Deprecated
// TODO: improve to remove
private long preAllocateSegment(short size) throws IOException, MetadataException {
ISegmentedPage page = getMinApplSegmentedPageInMem(size);
ISegmentedPage page = getMinApplSegmentedPageInMem((short) (size + SEG_OFF_DIG));
return SchemaFile.getGlobalIndex(page.getPageIndex(), page.allocNewSegment(size));
}

Expand Down Expand Up @@ -621,6 +621,9 @@ private static short estimateSegmentSize(IMNode node) {
* SchemaPageOverflowException} occurs. It is designed to accelerate when there is lots of new
* children nodes, avoiding segments extend several times.
*
* <p>Notice that SegmentOverflowException inside a page with sufficient space will not reach
* here. Supposed to merge with SchemaFile#reEstimateSegSize.
*
* @param expSize expected size calculated from next new record
* @param batchSize size of children within one {@linkplain #writeNewChildren(IMNode)}
* @return estimated size
Expand All @@ -630,14 +633,21 @@ private static short reEstimateSegSize(int expSize, int batchSize) throws Metada
if (batchSize < SEG_SIZE_METRIC[0]) {
return reEstimateSegSize(expSize);
}
int base_tier = 0;
for (int i = 0; i < SEG_SIZE_LST.length; i++) {
if (SEG_SIZE_LST[i] >= expSize) {
base_tier = i;
break;
}
}
int tier = SEG_SIZE_LST.length - 1;
while (tier > 0) {
while (tier >= base_tier) {
if (batchSize > SEG_SIZE_METRIC[tier]) {
return SEG_SIZE_LST[tier];
}
tier--;
}
return SEG_SIZE_LST[0];
return SEG_SIZE_LST[base_tier];
}

private static short reEstimateSegSize(int expSize) throws MetadataException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

public class SchemaFileTest {

Expand Down Expand Up @@ -503,6 +505,45 @@ public void testUpdateOnFullPageSegment() throws MetadataException, IOException
sf.close();
}

@Test
public void testEstimateSegSize() throws Exception {
// to test whether estimation of segment size works on edge cases
/**
* related methods shall be merged further: {@linkplain SchemaFile#reEstimateSegSize}
* ,{@linkplain PageManager#reEstimateSegSize}
*/
IMNode sgNode = new StorageGroupMNode(null, "mma", 111111111L);
IMNode d1 = fillChildren(sgNode, 300, "d", this::supplyEntity);
ISchemaFile sf = SchemaFile.initSchemaFile("root.sg", TEST_SCHEMA_REGION_ID);
try {
sf.writeMNode(sgNode);

fillChildren(d1, 46, "s", this::supplyMeasurement);
sf.writeMNode(d1);

moveAllToBuffer(d1);
moveAllToBuffer(sgNode);

// it's an edge case where a wrapped segment need to extend to another page while its expected
// size
// measured by insertion batch and existed size at same time.
fillChildren(sgNode, 350, "sd", this::supplyEntity);
sf.writeMNode(sgNode);
fillChildren(d1, 20, "ss", this::supplyMeasurement);
sf.writeMNode(d1);

Iterator<IMNode> verifyChildren = sf.getChildren(d1);
int cnt = 0;
while (verifyChildren.hasNext()) {
cnt++;
verifyChildren.next();
}
Assert.assertEquals(66, cnt);
} finally {
sf.close();
}
}

@Test
public void test200KAlias() throws Exception {
ISchemaFile sf = SchemaFile.initSchemaFile("root.sg", TEST_SCHEMA_REGION_ID);
Expand Down Expand Up @@ -877,6 +918,31 @@ private static long getSegAddr(ISchemaFile sf, long curAddr, String key) {
// endregion

// region IMNode Shortcut

private IMNode supplyMeasurement(IMNode par, String name) {
return getMeasurementNode(par, name, name + "_als");
}

private IMNode supplyInternal(IMNode par, String name) {
return new InternalMNode(par, name);
}

private IMNode supplyEntity(IMNode par, String name) {
return new EntityMNode(par, name);
}

private IMNode fillChildren(
IMNode par, int number, String prefix, BiFunction<IMNode, String, IMNode> nodeFactory) {
String childName;
IMNode lastChild = null;
for (int i = 0; i < number; i++) {
childName = prefix + "_" + i;
lastChild = nodeFactory.apply(par, childName);
par.addChild(lastChild);
}
return lastChild;
}

// open for package
static void addMeasurementChild(IMNode par, String mid) {
par.addChild(getMeasurementNode(par, mid, mid + "alias"));
Expand Down Expand Up @@ -923,6 +989,23 @@ static void moveToUpdateBuffer(IMNode par, String childName) {
ICachedMNodeContainer.getCachedMNodeContainer(par).updateMNode(childName);
}

static void moveAllToUpdate(IMNode par) {
List<String> childNames =
par.getChildren().values().stream().map(IMNode::getName).collect(Collectors.toList());
for (String name : childNames) {
ICachedMNodeContainer.getCachedMNodeContainer(par).moveMNodeToCache(name);
ICachedMNodeContainer.getCachedMNodeContainer(par).updateMNode(name);
}
}

static void moveAllToBuffer(IMNode par) {
List<String> childNames =
par.getChildren().values().stream().map(IMNode::getName).collect(Collectors.toList());
for (String name : childNames) {
ICachedMNodeContainer.getCachedMNodeContainer(par).moveMNodeToCache(name);
}
}

static long getSegAddrInContainer(IMNode par) {
return ICachedMNodeContainer.getCachedMNodeContainer(par).getSegmentAddress();
}
Expand Down