Skip to content
Permalink
Browse files
IGNITE-16768 Sql. Implement temporary table size stub for sql statist…
…ics needs. - Fixes #755.

Signed-off-by: zstan <stanilovsky@gmail.com>
  • Loading branch information
zstan committed Apr 4, 2022
1 parent 99b544f commit b993b8fd6c47dfd5677e9fb251ddcc10e60fb1dd
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 2 deletions.
@@ -27,6 +27,7 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
@@ -56,6 +57,7 @@
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.RewindabilityTrait;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.storage.PartitionStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.jetbrains.annotations.Nullable;

@@ -438,10 +440,32 @@ private ColocationGroup partitionedGroup() {
}

private class StatisticsImpl implements Statistic {
private static final int STATS_CLI_UPDATE_THRESHOLD = 200;

AtomicInteger statReqCnt = new AtomicInteger();

private volatile long localRowCnt;

/** {@inheritDoc} */
@Override
public Double getRowCount() {
return 10_000d;
if (statReqCnt.getAndIncrement() % STATS_CLI_UPDATE_THRESHOLD == 0) {
int parts = table.storage().configuration().partitions().value();

long size = 0L;

for (int p = 0; p < parts; ++p) {
@Nullable PartitionStorage part = table.storage().getPartition(p);

if (part != null) {
size += part.rowsCount();
}
}

localRowCnt = size;
}

return (double) localRowCnt;
}

/** {@inheritDoc} */
@@ -25,6 +25,7 @@
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.lang.management.ManagementFactory;
@@ -36,6 +37,8 @@
import java.util.UUID;
import java.util.concurrent.Flow;
import java.util.function.Consumer;
import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.internal.manager.EventListener;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
@@ -44,6 +47,7 @@
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.storage.engine.TableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
@@ -175,6 +179,11 @@ public void testStopQueryOnNodeStop() throws Exception {

when(tbl.tableId()).thenReturn(UUID.randomUUID());

when(tbl.storage()).thenReturn(mock(TableStorage.class));
when(tbl.storage().configuration()).thenReturn(mock(TableConfiguration.class));
when(tbl.storage().configuration().partitions()).thenReturn(mock(ConfigurationValue.class));
when(tbl.storage().configuration().partitions().value()).thenReturn(1);

qryProc.start();

testRevisionRegister.moveRevision.accept(0L);
@@ -148,4 +148,12 @@ public interface PartitionStorage extends AutoCloseable {
* @throws StorageException If failed to destroy the data or storage is already stopped.
*/
void destroy() throws StorageException;

/**
* Returns rows count belongs to current storage.
*
* @return Rows count.
* @throws StorageException If failed to obtain size.
*/
long rowsCount();
}
@@ -134,6 +134,8 @@ public void scanSimple() throws Exception {

storage.write(dataRow1);

assertEquals(1, storage.rowsCount());

list = toList(storage.scan(key -> true));

assertThat(list, hasSize(1));
@@ -269,6 +269,11 @@ public void destroy() {
map.clear();
}

@Override
public long rowsCount() {
return map.size();
}

/** {@inheritDoc} */
@Override
public boolean equals(Object o) {
@@ -409,6 +409,15 @@ public void destroy() throws StorageException {
}
}

@Override
public long rowsCount() {
try {
return tree.size();
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error occurred while fetching the size.", e);
}
}

/** {@inheritDoc} */
@Override
public void close() {
@@ -350,6 +350,33 @@ public void close() throws Exception {
};
}

// TODO IGNITE-16769 Implement correct PartitionStorage rows count calculation.
@Override
public long rowsCount() {
var upperBound = new Slice(partitionEndPrefix());

var options = new ReadOptions().setIterateUpperBound(upperBound);

RocksIterator it = data.newIterator(options);

it.seek(partitionStartPrefix());

long size = 0;

while (it.isValid()) {
++size;
it.next();
}

try {
IgniteUtils.closeAll(options, upperBound);
} catch (Exception e) {
throw new StorageException("Error occurred while fetching the size.", e);
}

return size;
}

/** {@inheritDoc} */
@Override
public CompletableFuture<Void> snapshot(Path snapshotPath) {
@@ -213,7 +213,7 @@ public interface InternalTable extends AutoCloseable {
/**
* Gets a count of partitions of the table.
*
* @return Count of partitons.
* @return Count of partitions.
*/
int partitions();

0 comments on commit b993b8f

Please sign in to comment.