Skip to content

Commit

Permalink
update shardsizeexpression every 10 seconds
Browse files Browse the repository at this point in the history
in order to avoid an indexinglistener
  • Loading branch information
msbt committed May 4, 2015
1 parent ca47b2e commit 5e89dcf
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 269 deletions.
2 changes: 1 addition & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ NOTE: Upgrading from 0.47 or earlier versions requires a full cluster restart

- Add plugin support

- make the ``sys.shards.size`` expression only access the filesystem when neccessary
- make the ``sys.shards.size`` expression refresh all 10 seconds maximum

- Increased performance of ``hostname`` and ``id`` expression for ``sys.nodes`` table

Expand Down
42 changes: 0 additions & 42 deletions blob/src/test/java/io/crate/integrationtests/BlobListenerTest.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,65 +22,36 @@
package io.crate.operation.reference.sys.shard;

import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.indexing.IndexingOperationListener;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.store.StoreStats;

import javax.annotation.Nullable;

public class ShardSizeExpression extends SysShardExpression<Long> {

public static final long CACHE_DURATION = 10_000; // 10s
public static final String NAME = "size";

private final IndexShard indexShard;

@Nullable
private StoreStats storeStats;
private StoreStats storeStats = null;
private long cached = 0;

@Inject
public ShardSizeExpression(IndexShard indexShard, ShardIndexingService shardIndexingService) {
public ShardSizeExpression(IndexShard indexShard) {
super(NAME);
this.indexShard = indexShard;
shardIndexingService.addListener(new IndexingOperationListener() {

private void nullStats() {
synchronized (ShardSizeExpression.this.indexShard) {
storeStats = null;
}
}

@Override
public void postIndex(Engine.Index index) {
nullStats();
}

@Override
public void postDelete(Engine.Delete delete) {
nullStats();
}

@Override
public void postCreate(Engine.Create create) {
nullStats();
}
}

@Override
public void postDeleteByQuery(Engine.DeleteByQuery deleteByQuery) {
nullStats();
}
});
private synchronized void ensureStats() {
long curTime = System.currentTimeMillis();
if (storeStats == null || curTime - cached > CACHE_DURATION) {
storeStats = indexShard.storeStats();
cached = curTime;
}
}

@Override
public Long value() {
synchronized (indexShard) {
if (storeStats == null) {
storeStats = indexShard.storeStats();
}
return storeStats.getSizeInBytes();
}
ensureStats();
return storeStats.getSizeInBytes();
}

}
30 changes: 0 additions & 30 deletions sql/src/test/java/io/crate/integrationtests/ShardStatsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.crate.blob.v2.BlobIndices;
import io.crate.test.integration.CrateIntegrationTest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.junit.Test;
Expand Down Expand Up @@ -111,33 +110,4 @@ public void testTableNameBlobTable() throws Exception {
assertThat(response.rowCount(), is(1L));
assertThat((Long)response.rows()[0][0], is(0L));
}

@Test
public void testShardSizeUpdatesAfterInsertUpdateDelete() throws Exception {
execute("create table shard_size_t (value string) clustered into 1 shards with (number_of_replicas=0)");
ensureGreen(); // sic, wait for all shards to be ready
execute("select size from sys.shards where table_name='shard_size_t'");
Long initialShardSize = (Long)response.rows()[0][0];
execute("select size from sys.shards where table_name='shard_size_t'");
assertThat((Long) response.rows()[0][0], is(initialShardSize)); // not changed

execute("insert into shard_size_t values (?)", new Object[]{Strings.base64UUID()});
execute("refresh table shard_size_t");
execute("select size from sys.shards where table_name='shard_size_t'");
Long afterInsert = (Long) response.rows()[0][0];
assertThat(afterInsert, is(greaterThan(initialShardSize))); // bigger


execute("update shard_size_t set value=?", new Object[]{randomAsciiOfLength(10000)});
execute("refresh table shard_size_t");
execute("select size from sys.shards where table_name='shard_size_t'");
Long afterUpdate = (Long) response.rows()[0][0];
assertThat(afterUpdate, is(greaterThan(afterInsert))); // bigger

execute("delete from shard_size_t");
execute("refresh table shard_size_t");
execute("select size from sys.shards where table_name='shard_size_t'");
Long afterDelete = (Long) response.rows()[0][0];
assertThat(afterDelete, is(initialShardSize));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ public String answer(InvocationOnMock invocation) throws Throwable {
IndexShard indexShard = mock(IndexShard.class);
bind(IndexShard.class).toInstance(indexShard);

ShardIndexingService shardIndexingService = mock(ShardIndexingService.class);
bind(ShardIndexingService.class).toInstance(shardIndexingService);

StoreStats storeStats = mock(StoreStats.class);
when(indexShard.storeStats()).thenReturn(storeStats);
when(storeStats.getSizeInBytes()).thenReturn(123456L);
Expand Down

0 comments on commit 5e89dcf

Please sign in to comment.