Skip to content

Commit

Permalink
fixup! update shardsizeexpression every 10 seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
msbt committed May 4, 2015
1 parent 5e89dcf commit 6c002ec
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 151 deletions.
17 changes: 1 addition & 16 deletions blob/src/main/java/io/crate/blob/BlobContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;

public class BlobContainer {

Expand All @@ -58,13 +57,10 @@ public class BlobContainer {
private final File tmpDirectory;
private final File varDirectory;

private final CopyOnWriteArrayList<BlobListener> listeners;

public BlobContainer(File baseDirectory, CopyOnWriteArrayList<BlobListener> listeners) {
public BlobContainer(File baseDirectory) {
this.baseDirectory = baseDirectory;
this.tmpDirectory = new File(baseDirectory, "tmp");
this.varDirectory = new File(baseDirectory, "var");
this.listeners = listeners;

FileSystemUtils.mkdirs(this.varDirectory);
FileSystemUtils.mkdirs(this.tmpDirectory);
Expand Down Expand Up @@ -168,17 +164,6 @@ public File getFile(String digest) {
return new File(getVarDirectory(), digest.substring(0, 2) + File.separator + digest);
}

/**
* called when a blob got committed, finally stored
* @param digest the digest of the blob
*/
protected void onCommit(String digest) {
for (BlobListener listener : listeners) {
listener.onCommit(digest);
}
}


public DigestBlob createBlob(String digest, UUID transferId) {
// TODO: check if exists already
return new DigestBlob(this, digest, transferId);
Expand Down
2 changes: 0 additions & 2 deletions blob/src/main/java/io/crate/blob/DigestBlob.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ public File commit() throws DigestMismatchException {
}
File newFile = container.getFile(digest);
file.renameTo(newFile);
// call blob listeners
container.onCommit(digest);
return newFile;
}

Expand Down
22 changes: 2 additions & 20 deletions blob/src/main/java/io/crate/blob/v2/BlobShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import io.crate.blob.BlobContainer;
import io.crate.blob.BlobEnvironment;
import io.crate.blob.BlobListener;
import io.crate.blob.stats.BlobStats;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -34,15 +33,12 @@
import org.elasticsearch.index.shard.service.IndexShard;

import java.io.File;
import java.util.concurrent.CopyOnWriteArrayList;

public class BlobShard extends AbstractIndexShardComponent {

private final BlobContainer blobContainer;
private final IndexShard indexShard;

private final CopyOnWriteArrayList<BlobListener> listeners = new CopyOnWriteArrayList<>();

@Inject
protected BlobShard(ShardId shardId, @IndexSettings Settings indexSettings,
BlobEnvironment blobEnvironment,
Expand All @@ -51,29 +47,15 @@ protected BlobShard(ShardId shardId, @IndexSettings Settings indexSettings,
this.indexShard = indexShard;
File blobDir = blobDir(blobEnvironment);
logger.info("creating BlobContainer at {}", blobDir);
this.blobContainer = new BlobContainer(blobDir, listeners);
this.blobContainer = new BlobContainer(blobDir);
}

public byte[][] currentDigests(byte prefix) {
return blobContainer.cleanAndReturnDigests(prefix);
}

public boolean delete(String digest) {
boolean deleted = blobContainer.getFile(digest).delete();

// call delete listeners
for (BlobListener blobListener : listeners) {
blobListener.onDelete(digest);
}
return deleted;
}

public void addListener(BlobListener blobListener) {
listeners.add(blobListener);
}

public void removeListener(BlobListener blobListener) {
listeners.remove(blobListener);
return blobContainer.getFile(digest).delete();
}

public BlobContainer blobContainer() {
Expand Down
21 changes: 2 additions & 19 deletions blob/src/test/java/io/crate/DigestBlobTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
package io.crate;

import io.crate.blob.BlobContainer;
import io.crate.blob.BlobListener;
import io.crate.blob.DigestBlob;
import io.crate.test.integration.CrateUnitTest;
import org.elasticsearch.common.bytes.BytesArray;
Expand All @@ -39,11 +38,6 @@
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class DigestBlobTests extends CrateUnitTest {

Expand Down Expand Up @@ -92,11 +86,7 @@ public void testDigestBlobResumeHeadAndAddContent() throws IOException {
UUID transferId = UUID.randomUUID();
int currentPos = 2;

CopyOnWriteArrayList<BlobListener> listeners = new CopyOnWriteArrayList<>();
BlobListener listener = mock(BlobListener.class);
listeners.add(listener);

BlobContainer container = new BlobContainer(tmpDir.toFile(), listeners);
BlobContainer container = new BlobContainer(tmpDir.toFile());
File filePath = new File(container.getTmpDirectory(), String.format("%s.%s", digest, transferId.toString()));
if (filePath.exists()) {
filePath.delete();
Expand Down Expand Up @@ -125,7 +115,6 @@ public void testDigestBlobResumeHeadAndAddContent() throws IOException {
assertEquals("ABCDEFGHIJKLMNO", new BytesArray(buffer).toUtf8().trim());

File file = digestBlob.commit();
verify(listener, times(1)).onCommit(digest);

// check if final file's content is correct
buffer = new byte[15];
Expand All @@ -143,11 +132,7 @@ public void testDigestBlobResumeHeadAndAddContent() throws IOException {
@Test
public void testResumeDigestBlobAddHeadAfterContent() throws IOException {
UUID transferId = UUID.randomUUID();
CopyOnWriteArrayList<BlobListener> listeners = new CopyOnWriteArrayList<>();
BlobListener listener = mock(BlobListener.class);
listeners.add(listener);

BlobContainer container = new BlobContainer(tmpDir.toFile(), listeners);
BlobContainer container = new BlobContainer(tmpDir.toFile());

DigestBlob digestBlob = DigestBlob.resumeTransfer(
container, "417de3231e23dcd6d224ff60918024bc6c59aa58", transferId, 2);
Expand All @@ -170,8 +155,6 @@ public void testResumeDigestBlobAddHeadAfterContent() throws IOException {

File file = digestBlob.commit();

verify(listener, times(1)).onCommit("417de3231e23dcd6d224ff60918024bc6c59aa58");

// check if final file's content is correct
buffer = new byte[15];
stream = new FileInputStream(file);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package io.crate.integrationtests;


import io.crate.blob.BlobListener;
import io.crate.blob.v2.BlobIndices;
import io.crate.blob.v2.BlobShard;
import io.crate.rest.CrateRestFilter;
import io.crate.test.integration.CrateIntegrationTest;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -14,20 +11,13 @@
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.junit.Test;

import java.io.IOException;
import java.util.Locale;

import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

@CrateIntegrationTest.ClusterScope(scope = CrateIntegrationTest.Scope.SUITE, numNodes = 2)
public class BlobIntegrationTest extends BlobHttpIntegrationTest {

Expand Down Expand Up @@ -244,43 +234,4 @@ public void testIndexOnNonBlobTable() throws IOException {
assertEquals("{\"_index\":\"test_no_blobs\",\"_type\":\"default\",\"_id\":\"1\",\"_version\":1,\"created\":true}",
EntityUtils.toString(res.getEntity()));
}

@Test
public void testBlobListenersExecuted() throws Exception {
final String INDEX = "listener_t";

BlobIndices blobIndices = cluster().getInstance(BlobIndices.class);

Settings indexSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
blobIndices.createBlobTable(INDEX, indexSettings).get();

BlobShard shard = null;
for (BlobIndices indices : cluster().getInstances(BlobIndices.class)) {
shard = indices.blobShard(BlobIndices.fullIndexName(INDEX), 0);
if (shard != null) {
break;
}
}
assertThat(shard, is(notNullValue()));
BlobListener listener = mock(BlobListener.class);
shard.addListener(listener);

String digest = "32d10c7b8cf96570ca04ce37f2a19d84240d3a89";
put(blobUri(INDEX, digest), "abcdefghijklmnopqrstuvwxyz").close();
verify(listener, times(1)).onCommit(digest);

String digest2 = "c520e6109835c876fd98636efec43dd61634b7d3";
put(blobUri(INDEX, digest2), StringUtils.repeat("a", 1500)).close();
verify(listener, times(1)).onCommit(digest2);

delete(blobUri(INDEX, digest)).close();
verify(listener, times(1)).onDelete(digest);

delete(blobUri(INDEX, digest2)).close();
verify(listener, times(1)).onDelete(digest2);

}
}
63 changes: 63 additions & 0 deletions core/src/main/java/io/crate/core/CachedRef.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.core;

import java.util.concurrent.TimeUnit;

/**
* A cache for a single reference that is cached for <code>cacheTime</code> milliseconds
* and after that refreshed by calling {@link #refresh()} on new access to {@link #get()}.
* @param <T>
*/
public abstract class CachedRef<T> {

private final long cacheTime;
private long cachedAt = 0L;
private volatile T value = null;

public CachedRef(long cacheTime, TimeUnit timeUnit) {
this.cacheTime = timeUnit.toMillis(cacheTime);
}

/**
* guaranteed to be not null if {@link #refresh()} does not return null.
* @return the cached value or a refreshed one.
*/
public T get() {
ensureValue();
return value;
}

private synchronized void ensureValue() {
long curTime = System.currentTimeMillis();
if (value == null || curTime - cachedAt > cacheTime) {
value = refresh();
cachedAt = curTime;
}
}

/**
* create a new value
* @return the new value
*/
public abstract T refresh();
}
71 changes: 71 additions & 0 deletions core/src/test/java/io/crate/core/CachedRefTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.core;

import com.carrotsearch.randomizedtesting.annotations.Repeat;
import io.crate.test.integration.CrateUnitTest;
import org.junit.Test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

public class CachedRefTest extends CrateUnitTest {

@Test
public void testZeroCacheAlwaysRefresh() throws Exception {
CachedRef<Long> longCache = new CachedRef<Long>(0, TimeUnit.MILLISECONDS) {
private AtomicLong internalLong = new AtomicLong(0L);
@Override
public Long refresh() {
return internalLong.incrementAndGet();
}
};
long previous = -1L;
for (int i = 0; i < 1000; i++) {
long cur = longCache.get();
assertThat(cur, is(greaterThan(previous)));
previous = cur;
Thread.sleep(1);
}
}

@Repeat(iterations=10)
@Test
public void testRefresh() throws Exception {
CachedRef<Long> longCache = new CachedRef<Long>(10, TimeUnit.MILLISECONDS) {
private AtomicLong internalLong = new AtomicLong(0L);
@Override
public Long refresh() {
return internalLong.incrementAndGet();
}
};

long first = longCache.get();
assertThat(first, is(1L));
assertThat(longCache.get(), is(first));
Thread.sleep(12);
assertThat(longCache.get(), is(2L));
}
}
3 changes: 3 additions & 0 deletions docs/sql/system.txt
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,9 @@ The table schema is as follows:
| | getting relocated to at the time | |
+------------------+----------------------------------+-------------+
| size | Current size in bytes. | Long |
| | This value is cached for max. | |
| | 10 seconds to reduce file system | |
| | access. | |
+------------------+----------------------------------+-------------+
| state | The current state of the shard. | String |
| | Possible states are: | |
Expand Down

0 comments on commit 6c002ec

Please sign in to comment.