From 6c002eca19152deed6410c5877a4c2633c4e83f8 Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Mon, 4 May 2015 21:22:14 +0200 Subject: [PATCH] fixup! update shardsizeexpression every 10 seconds --- .../java/io/crate/blob/BlobContainer.java | 17 +---- .../main/java/io/crate/blob/DigestBlob.java | 2 - .../main/java/io/crate/blob/v2/BlobShard.java | 22 +----- .../test/java/io/crate/DigestBlobTests.java | 21 +----- .../integrationtests/BlobIntegrationTest.java | 49 ------------- .../main/java/io/crate/core/CachedRef.java | 63 ++++++++++++++++ .../java/io/crate/core/CachedRefTest.java | 71 +++++++++++++++++++ docs/sql/system.txt | 3 + .../sys/shard/ShardSizeExpression.java | 23 +++--- .../shard/blob/BlobShardSizeExpression.java | 38 +++------- .../collect/files/BlobDocCollectorTest.java | 4 +- 11 files changed, 162 insertions(+), 151 deletions(-) create mode 100644 core/src/main/java/io/crate/core/CachedRef.java create mode 100644 core/src/test/java/io/crate/core/CachedRefTest.java diff --git a/blob/src/main/java/io/crate/blob/BlobContainer.java b/blob/src/main/java/io/crate/blob/BlobContainer.java index 6f53a420a7b8..b93eb607b843 100644 --- a/blob/src/main/java/io/crate/blob/BlobContainer.java +++ b/blob/src/main/java/io/crate/blob/BlobContainer.java @@ -35,7 +35,6 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; public class BlobContainer { @@ -58,13 +57,10 @@ public class BlobContainer { private final File tmpDirectory; private final File varDirectory; - private final CopyOnWriteArrayList listeners; - - public BlobContainer(File baseDirectory, CopyOnWriteArrayList listeners) { + public BlobContainer(File baseDirectory) { this.baseDirectory = baseDirectory; this.tmpDirectory = new File(baseDirectory, "tmp"); this.varDirectory = new File(baseDirectory, "var"); - this.listeners = listeners; FileSystemUtils.mkdirs(this.varDirectory); FileSystemUtils.mkdirs(this.tmpDirectory); @@ -168,17 +164,6 @@ public File getFile(String digest) { return new File(getVarDirectory(), digest.substring(0, 2) + File.separator + digest); } - /** - * called when a blob got committed, finally stored - * @param digest the digest of the blob - */ - protected void onCommit(String digest) { - for (BlobListener listener : listeners) { - listener.onCommit(digest); - } - } - - public DigestBlob createBlob(String digest, UUID transferId) { // TODO: check if exists already return new DigestBlob(this, digest, transferId); diff --git a/blob/src/main/java/io/crate/blob/DigestBlob.java b/blob/src/main/java/io/crate/blob/DigestBlob.java index b7eba997bd38..bf6da4eca376 100644 --- a/blob/src/main/java/io/crate/blob/DigestBlob.java +++ b/blob/src/main/java/io/crate/blob/DigestBlob.java @@ -170,8 +170,6 @@ public File commit() throws DigestMismatchException { } File newFile = container.getFile(digest); file.renameTo(newFile); - // call blob listeners - container.onCommit(digest); return newFile; } diff --git a/blob/src/main/java/io/crate/blob/v2/BlobShard.java b/blob/src/main/java/io/crate/blob/v2/BlobShard.java index 23ae178e4ddf..fa96a91db3e4 100644 --- a/blob/src/main/java/io/crate/blob/v2/BlobShard.java +++ b/blob/src/main/java/io/crate/blob/v2/BlobShard.java @@ -23,7 +23,6 @@ import io.crate.blob.BlobContainer; import io.crate.blob.BlobEnvironment; -import io.crate.blob.BlobListener; import io.crate.blob.stats.BlobStats; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; @@ -34,15 +33,12 @@ import org.elasticsearch.index.shard.service.IndexShard; import java.io.File; -import java.util.concurrent.CopyOnWriteArrayList; public class BlobShard extends AbstractIndexShardComponent { private final BlobContainer blobContainer; private final IndexShard indexShard; - private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); - @Inject protected BlobShard(ShardId shardId, @IndexSettings Settings indexSettings, BlobEnvironment blobEnvironment, @@ -51,7 +47,7 @@ protected BlobShard(ShardId shardId, @IndexSettings Settings indexSettings, this.indexShard = indexShard; File blobDir = blobDir(blobEnvironment); logger.info("creating BlobContainer at {}", blobDir); - this.blobContainer = new BlobContainer(blobDir, listeners); + this.blobContainer = new BlobContainer(blobDir); } public byte[][] currentDigests(byte prefix) { @@ -59,21 +55,7 @@ public byte[][] currentDigests(byte prefix) { } public boolean delete(String digest) { - boolean deleted = blobContainer.getFile(digest).delete(); - - // call delete listeners - for (BlobListener blobListener : listeners) { - blobListener.onDelete(digest); - } - return deleted; - } - - public void addListener(BlobListener blobListener) { - listeners.add(blobListener); - } - - public void removeListener(BlobListener blobListener) { - listeners.remove(blobListener); + return blobContainer.getFile(digest).delete(); } public BlobContainer blobContainer() { diff --git a/blob/src/test/java/io/crate/DigestBlobTests.java b/blob/src/test/java/io/crate/DigestBlobTests.java index 7f58dc4ae54c..ad2658768a0b 100644 --- a/blob/src/test/java/io/crate/DigestBlobTests.java +++ b/blob/src/test/java/io/crate/DigestBlobTests.java @@ -22,7 +22,6 @@ package io.crate; import io.crate.blob.BlobContainer; -import io.crate.blob.BlobListener; import io.crate.blob.DigestBlob; import io.crate.test.integration.CrateUnitTest; import org.elasticsearch.common.bytes.BytesArray; @@ -39,11 +38,6 @@ import java.nio.file.Path; import java.nio.file.attribute.BasicFileAttributes; import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; public class DigestBlobTests extends CrateUnitTest { @@ -92,11 +86,7 @@ public void testDigestBlobResumeHeadAndAddContent() throws IOException { UUID transferId = UUID.randomUUID(); int currentPos = 2; - CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); - BlobListener listener = mock(BlobListener.class); - listeners.add(listener); - - BlobContainer container = new BlobContainer(tmpDir.toFile(), listeners); + BlobContainer container = new BlobContainer(tmpDir.toFile()); File filePath = new File(container.getTmpDirectory(), String.format("%s.%s", digest, transferId.toString())); if (filePath.exists()) { filePath.delete(); @@ -125,7 +115,6 @@ public void testDigestBlobResumeHeadAndAddContent() throws IOException { assertEquals("ABCDEFGHIJKLMNO", new BytesArray(buffer).toUtf8().trim()); File file = digestBlob.commit(); - verify(listener, times(1)).onCommit(digest); // check if final file's content is correct buffer = new byte[15]; @@ -143,11 +132,7 @@ public void testDigestBlobResumeHeadAndAddContent() throws IOException { @Test public void testResumeDigestBlobAddHeadAfterContent() throws IOException { UUID transferId = UUID.randomUUID(); - CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); - BlobListener listener = mock(BlobListener.class); - listeners.add(listener); - - BlobContainer container = new BlobContainer(tmpDir.toFile(), listeners); + BlobContainer container = new BlobContainer(tmpDir.toFile()); DigestBlob digestBlob = DigestBlob.resumeTransfer( container, "417de3231e23dcd6d224ff60918024bc6c59aa58", transferId, 2); @@ -170,8 +155,6 @@ public void testResumeDigestBlobAddHeadAfterContent() throws IOException { File file = digestBlob.commit(); - verify(listener, times(1)).onCommit("417de3231e23dcd6d224ff60918024bc6c59aa58"); - // check if final file's content is correct buffer = new byte[15]; stream = new FileInputStream(file); diff --git a/blob/src/test/java/io/crate/integrationtests/BlobIntegrationTest.java b/blob/src/test/java/io/crate/integrationtests/BlobIntegrationTest.java index 8598971b070e..7822137898d2 100644 --- a/blob/src/test/java/io/crate/integrationtests/BlobIntegrationTest.java +++ b/blob/src/test/java/io/crate/integrationtests/BlobIntegrationTest.java @@ -1,9 +1,6 @@ package io.crate.integrationtests; -import io.crate.blob.BlobListener; -import io.crate.blob.v2.BlobIndices; -import io.crate.blob.v2.BlobShard; import io.crate.rest.CrateRestFilter; import io.crate.test.integration.CrateIntegrationTest; import org.apache.commons.lang3.StringUtils; @@ -14,7 +11,6 @@ import org.apache.http.entity.StringEntity; import org.apache.http.message.BasicHeader; import org.apache.http.util.EntityUtils; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.junit.Test; @@ -22,12 +18,6 @@ import java.io.IOException; import java.util.Locale; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.core.Is.is; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - @CrateIntegrationTest.ClusterScope(scope = CrateIntegrationTest.Scope.SUITE, numNodes = 2) public class BlobIntegrationTest extends BlobHttpIntegrationTest { @@ -244,43 +234,4 @@ public void testIndexOnNonBlobTable() throws IOException { assertEquals("{\"_index\":\"test_no_blobs\",\"_type\":\"default\",\"_id\":\"1\",\"_version\":1,\"created\":true}", EntityUtils.toString(res.getEntity())); } - - @Test - public void testBlobListenersExecuted() throws Exception { - final String INDEX = "listener_t"; - - BlobIndices blobIndices = cluster().getInstance(BlobIndices.class); - - Settings indexSettings = ImmutableSettings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .build(); - blobIndices.createBlobTable(INDEX, indexSettings).get(); - - BlobShard shard = null; - for (BlobIndices indices : cluster().getInstances(BlobIndices.class)) { - shard = indices.blobShard(BlobIndices.fullIndexName(INDEX), 0); - if (shard != null) { - break; - } - } - assertThat(shard, is(notNullValue())); - BlobListener listener = mock(BlobListener.class); - shard.addListener(listener); - - String digest = "32d10c7b8cf96570ca04ce37f2a19d84240d3a89"; - put(blobUri(INDEX, digest), "abcdefghijklmnopqrstuvwxyz").close(); - verify(listener, times(1)).onCommit(digest); - - String digest2 = "c520e6109835c876fd98636efec43dd61634b7d3"; - put(blobUri(INDEX, digest2), StringUtils.repeat("a", 1500)).close(); - verify(listener, times(1)).onCommit(digest2); - - delete(blobUri(INDEX, digest)).close(); - verify(listener, times(1)).onDelete(digest); - - delete(blobUri(INDEX, digest2)).close(); - verify(listener, times(1)).onDelete(digest2); - - } } diff --git a/core/src/main/java/io/crate/core/CachedRef.java b/core/src/main/java/io/crate/core/CachedRef.java new file mode 100644 index 000000000000..b76961b2ddf5 --- /dev/null +++ b/core/src/main/java/io/crate/core/CachedRef.java @@ -0,0 +1,63 @@ +/* + * Licensed to CRATE Technology GmbH ("Crate") under one or more contributor + * license agreements. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. Crate licenses + * this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * However, if you have executed another commercial license agreement + * with Crate these terms will supersede the license and you may use the + * software solely pursuant to the terms of the relevant commercial agreement. + */ + +package io.crate.core; + +import java.util.concurrent.TimeUnit; + +/** + * A cache for a single reference that is cached for cacheTime milliseconds + * and after that refreshed by calling {@link #refresh()} on new access to {@link #get()}. + * @param + */ +public abstract class CachedRef { + + private final long cacheTime; + private long cachedAt = 0L; + private volatile T value = null; + + public CachedRef(long cacheTime, TimeUnit timeUnit) { + this.cacheTime = timeUnit.toMillis(cacheTime); + } + + /** + * guaranteed to be not null if {@link #refresh()} does not return null. + * @return the cached value or a refreshed one. + */ + public T get() { + ensureValue(); + return value; + } + + private synchronized void ensureValue() { + long curTime = System.currentTimeMillis(); + if (value == null || curTime - cachedAt > cacheTime) { + value = refresh(); + cachedAt = curTime; + } + } + + /** + * create a new value + * @return the new value + */ + public abstract T refresh(); +} diff --git a/core/src/test/java/io/crate/core/CachedRefTest.java b/core/src/test/java/io/crate/core/CachedRefTest.java new file mode 100644 index 000000000000..3eb8e0f859bd --- /dev/null +++ b/core/src/test/java/io/crate/core/CachedRefTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to CRATE Technology GmbH ("Crate") under one or more contributor + * license agreements. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. Crate licenses + * this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * However, if you have executed another commercial license agreement + * with Crate these terms will supersede the license and you may use the + * software solely pursuant to the terms of the relevant commercial agreement. + */ + +package io.crate.core; + +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import io.crate.test.integration.CrateUnitTest; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +public class CachedRefTest extends CrateUnitTest { + + @Test + public void testZeroCacheAlwaysRefresh() throws Exception { + CachedRef longCache = new CachedRef(0, TimeUnit.MILLISECONDS) { + private AtomicLong internalLong = new AtomicLong(0L); + @Override + public Long refresh() { + return internalLong.incrementAndGet(); + } + }; + long previous = -1L; + for (int i = 0; i < 1000; i++) { + long cur = longCache.get(); + assertThat(cur, is(greaterThan(previous))); + previous = cur; + Thread.sleep(1); + } + } + + @Repeat(iterations=10) + @Test + public void testRefresh() throws Exception { + CachedRef longCache = new CachedRef(10, TimeUnit.MILLISECONDS) { + private AtomicLong internalLong = new AtomicLong(0L); + @Override + public Long refresh() { + return internalLong.incrementAndGet(); + } + }; + + long first = longCache.get(); + assertThat(first, is(1L)); + assertThat(longCache.get(), is(first)); + Thread.sleep(12); + assertThat(longCache.get(), is(2L)); + } +} diff --git a/docs/sql/system.txt b/docs/sql/system.txt index ec92c6870ec5..0be0c5dee03e 100644 --- a/docs/sql/system.txt +++ b/docs/sql/system.txt @@ -473,6 +473,9 @@ The table schema is as follows: | | getting relocated to at the time | | +------------------+----------------------------------+-------------+ | size | Current size in bytes. | Long | +| | This value is cached for max. | | +| | 10 seconds to reduce file system | | +| | access. | | +------------------+----------------------------------+-------------+ | state | The current state of the shard. | String | | | Possible states are: | | diff --git a/sql/src/main/java/io/crate/operation/reference/sys/shard/ShardSizeExpression.java b/sql/src/main/java/io/crate/operation/reference/sys/shard/ShardSizeExpression.java index 736aa252493c..ba0b1e006212 100644 --- a/sql/src/main/java/io/crate/operation/reference/sys/shard/ShardSizeExpression.java +++ b/sql/src/main/java/io/crate/operation/reference/sys/shard/ShardSizeExpression.java @@ -21,18 +21,24 @@ package io.crate.operation.reference.sys.shard; +import io.crate.core.CachedRef; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.store.StoreStats; +import java.util.concurrent.TimeUnit; + public class ShardSizeExpression extends SysShardExpression { - public static final long CACHE_DURATION = 10_000; // 10s public static final String NAME = "size"; private final IndexShard indexShard; - private StoreStats storeStats = null; - private long cached = 0; + private final CachedRef storeStatsCache = new CachedRef(10, TimeUnit.SECONDS) { + @Override + public StoreStats refresh() { + return indexShard.storeStats(); + } + }; @Inject public ShardSizeExpression(IndexShard indexShard) { @@ -40,18 +46,9 @@ public ShardSizeExpression(IndexShard indexShard) { this.indexShard = indexShard; } - private synchronized void ensureStats() { - long curTime = System.currentTimeMillis(); - if (storeStats == null || curTime - cached > CACHE_DURATION) { - storeStats = indexShard.storeStats(); - cached = curTime; - } - } - @Override public Long value() { - ensureStats(); - return storeStats.getSizeInBytes(); + return storeStatsCache.get().getSizeInBytes(); } } diff --git a/sql/src/main/java/io/crate/operation/reference/sys/shard/blob/BlobShardSizeExpression.java b/sql/src/main/java/io/crate/operation/reference/sys/shard/blob/BlobShardSizeExpression.java index db3aeb47fa6e..69c5368e837b 100644 --- a/sql/src/main/java/io/crate/operation/reference/sys/shard/blob/BlobShardSizeExpression.java +++ b/sql/src/main/java/io/crate/operation/reference/sys/shard/blob/BlobShardSizeExpression.java @@ -21,55 +21,35 @@ package io.crate.operation.reference.sys.shard.blob; -import io.crate.blob.BlobListener; import io.crate.blob.stats.BlobStats; import io.crate.blob.v2.BlobShard; +import io.crate.core.CachedRef; import io.crate.metadata.shard.blob.BlobShardReferenceImplementation; import io.crate.operation.reference.sys.shard.SysShardExpression; import org.elasticsearch.common.inject.Inject; -import javax.annotation.Nullable; +import java.util.concurrent.TimeUnit; public class BlobShardSizeExpression extends SysShardExpression implements BlobShardReferenceImplementation { public static final String NAME = "size"; private final BlobShard blobShard; - - @Nullable - private BlobStats blobStats; + private final CachedRef blobStatsCache = new CachedRef(10, TimeUnit.SECONDS) { + @Override + public BlobStats refresh() { + return blobShard.blobStats(); + } + }; @Inject public BlobShardSizeExpression(final BlobShard blobShard) { super(NAME); this.blobShard = blobShard; - this.blobShard.addListener(new BlobListener() { - - private void nullBlobStats() { - synchronized (blobShard) { - blobStats = null; - } - } - - @Override - public void onCommit(String digest) { - nullBlobStats(); - } - - @Override - public void onDelete(String digest) { - nullBlobStats(); - } - }); } @Override public Long value() { - synchronized (blobShard) { - if (blobStats == null) { - blobStats = blobShard.blobStats(); - } - return blobStats.totalUsage(); - } + return blobStatsCache.get().totalUsage(); } } diff --git a/sql/src/test/java/io/crate/operation/collect/files/BlobDocCollectorTest.java b/sql/src/test/java/io/crate/operation/collect/files/BlobDocCollectorTest.java index bc20334d778a..544bb8b3a94a 100644 --- a/sql/src/test/java/io/crate/operation/collect/files/BlobDocCollectorTest.java +++ b/sql/src/test/java/io/crate/operation/collect/files/BlobDocCollectorTest.java @@ -22,7 +22,6 @@ package io.crate.operation.collect.files; import io.crate.blob.BlobContainer; -import io.crate.blob.BlobListener; import io.crate.blob.v2.BlobShard; import io.crate.core.collections.Bucket; import io.crate.operation.Input; @@ -43,7 +42,6 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; import static io.crate.testing.TestingHelpers.isRow; import static org.hamcrest.Matchers.contains; @@ -68,7 +66,7 @@ public void cleanUp() throws Exception { @Test public void testBlobFound() throws Exception { - BlobContainer container = new BlobContainer(tmpDir.toFile(), new CopyOnWriteArrayList()); + BlobContainer container = new BlobContainer(tmpDir.toFile()); String digest = "417de3231e23dcd6d224ff60918024bc6c59aa58"; File blob = new File(container.getVarDirectory().getAbsolutePath() + "/01/" + digest);