Navigation Menu

Skip to content

Commit

Permalink
Added a client and Fixed #143.
Browse files Browse the repository at this point in the history
  • Loading branch information
amccurry committed Mar 19, 2012
1 parent 15dab63 commit 31daed3
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 22 deletions.
@@ -0,0 +1,55 @@
package com.nearinfinity.blur.lucene.index;

import java.io.IOException;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;

public class TimeBasedIndexDeletionPolicy implements IndexDeletionPolicy {

private static final Log LOG = LogFactory.getLog(TimeBasedIndexDeletionPolicy.class);

private long maxAge;

public TimeBasedIndexDeletionPolicy(long maxAge) {
this.maxAge = maxAge;
}

@Override
public void onInit(List<? extends IndexCommit> commits) throws IOException {
onCommit(commits);
}

@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
IndexCommit current = commits.get(commits.size() - 1);
int length;
if (isTooOld(current.getTimestamp())) {
// the current index is old enough that following generation can be
// removed
length = commits.size() - 1;
} else {
// the current index is NOT old enough, so therefore the next generation
// (no matter the old can be removed)
length = commits.size() - 2;
}
for (int i = 0; i < length; i++) {
IndexCommit commit = commits.get(i);
if (isTooOld(commit.getTimestamp())) {
LOG.info("Removing old generation [" + commit.getGeneration() + "] for directory [" + commit.getDirectory() + "]");
commit.delete();
}
}
}

private boolean isTooOld(long timestamp) {
if (timestamp + maxAge < System.currentTimeMillis()) {
return true;
}
return false;
}

}
Expand Up @@ -551,6 +551,7 @@ private BlurIndex openShard(String table, String shard) throws IOException {
writer.setNrtCachingMaxMergeSizeMB(5.0);
writer.setWalPath(_walPath);
writer.setConfiguration(_configuration);
writer.setIndexDeletionPolicy(_indexDeletionPolicy);
writer.init();
index = writer;
}
Expand Down
Expand Up @@ -11,6 +11,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.TieredMergePolicy;
Expand Down Expand Up @@ -56,6 +57,8 @@ public class BlurNRTIndex extends BlurIndex {
private Thread _refresher;
private TransactionRecorder _recorder;
private Configuration _configuration;
private Path _walPath;
private IndexDeletionPolicy _indexDeletionPolicy;

private SearcherWarmer _warmer = new SearcherWarmer() {
@Override
Expand All @@ -71,7 +74,6 @@ public void warm(IndexSearcher s) throws IOException {
}
}
};
private Path _walPath;

public void init() throws IOException {
Path walTablePath = new Path(_walPath, _table);
Expand All @@ -80,6 +82,7 @@ public void init() throws IOException {
IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _analyzer);
conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
conf.setSimilarity(_similarity);
conf.setIndexDeletionPolicy(_indexDeletionPolicy);
TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
mergePolicy.setUseCompoundFile(false);

Expand Down Expand Up @@ -277,4 +280,8 @@ public void setWalPath(Path walPath) {
public void setConfiguration(Configuration configuration) {
_configuration = configuration;
}

public void setIndexDeletionPolicy(IndexDeletionPolicy indexDeletionPolicy) {
_indexDeletionPolicy = indexDeletionPolicy;
}
}
Expand Up @@ -26,7 +26,7 @@
import static com.nearinfinity.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_TIMETOLIVE;
import static com.nearinfinity.blur.utils.BlurConstants.BLUR_SHARD_FILTER_CACHE_CLASS;
import static com.nearinfinity.blur.utils.BlurConstants.BLUR_SHARD_HOSTNAME;
import static com.nearinfinity.blur.utils.BlurConstants.BLUR_SHARD_INDEX_DELETION_POLICY_CLASS;
import static com.nearinfinity.blur.utils.BlurConstants.BLUR_SHARD_INDEX_DELETION_POLICY_MAXAGE;
import static com.nearinfinity.blur.utils.BlurConstants.BLUR_SHARD_INDEX_WARMUP_CLASS;
import static com.nearinfinity.blur.utils.BlurConstants.BLUR_SHARD_OPENER_THREAD_COUNT;
import static com.nearinfinity.blur.utils.BlurConstants.BLUR_SHARD_SAFEMODEDELAY;
Expand All @@ -42,7 +42,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
Expand All @@ -53,6 +52,7 @@
import com.nearinfinity.blur.concurrent.ThreadWatcher;
import com.nearinfinity.blur.log.Log;
import com.nearinfinity.blur.log.LogFactory;
import com.nearinfinity.blur.lucene.index.TimeBasedIndexDeletionPolicy;
import com.nearinfinity.blur.manager.BlurFilterCache;
import com.nearinfinity.blur.manager.BlurQueryChecker;
import com.nearinfinity.blur.manager.DefaultBlurFilterCache;
Expand Down Expand Up @@ -142,7 +142,7 @@ public static void main(String[] args) throws TTransportException, IOException,

BlurFilterCache filterCache = getFilterCache(configuration);
BlurIndexWarmup indexWarmup = getIndexWarmup(configuration);
IndexDeletionPolicy indexDeletionPolicy = getIndexDeletionPolicy(configuration);
IndexDeletionPolicy indexDeletionPolicy = new TimeBasedIndexDeletionPolicy(configuration.getLong(BLUR_SHARD_INDEX_DELETION_POLICY_MAXAGE, 300000));

String walLogPath = configuration.get(BLUR_SHARD_WAL_PATH, "hdfs://localhost");

Expand Down Expand Up @@ -204,19 +204,6 @@ public void shutdown() {
server.start();
}

private static IndexDeletionPolicy getIndexDeletionPolicy(BlurConfiguration configuration) {
String _class = configuration.get(BLUR_SHARD_INDEX_DELETION_POLICY_CLASS);
if (_class != null) {
try {
Class<?> clazz = Class.forName(_class);
return (IndexDeletionPolicy) clazz.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return new KeepOnlyLastCommitDeletionPolicy();
}

private static BlurFilterCache getFilterCache(BlurConfiguration configuration) {
String _blurFilterCacheClass = configuration.get(BLUR_SHARD_FILTER_CACHE_CLASS);
if (_blurFilterCacheClass != null) {
Expand Down
Expand Up @@ -62,7 +62,7 @@ public class BlurConstants {
public static final String BLUR_MAX_CLAUSE_COUNT = "blur.max.clause.count";
public static final String BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS = "blur.shard.cache.max.querycache.elements";
public static final String BLUR_SHARD_OPENER_THREAD_COUNT = "blur.shard.opener.thread.count";
public static final String BLUR_SHARD_INDEX_DELETION_POLICY_CLASS = "blur.shard.index.deletion.policy.class";
public static final String BLUR_SHARD_INDEX_DELETION_POLICY_MAXAGE = "blur.shard.index.deletion.policy.maxage";
public static final String BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE = "blur.zookeeper.system.time.tolerance";

public static final String BLUR_CONTROLLER_SERVER_THRIFT_THREAD_COUNT = "blur.controller.server.thrift.thread.count";
Expand Down
@@ -0,0 +1,58 @@
package com.nearinfinity.blur.lucene.index;

import static org.junit.Assert.assertEquals;

import java.io.IOException;

import org.apache.lucene.analysis.KeywordAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.Version;
import org.junit.Test;

import com.nearinfinity.blur.index.IndexWriter;

public class TimeBasedIndexDeletionPolicyTest {

@Test
public void testTimeBasedIndexDeletionPolicy() throws IOException, InterruptedException {
TimeBasedIndexDeletionPolicy indexDeletionPolicy = new TimeBasedIndexDeletionPolicy(3000);
RAMDirectory directory = new RAMDirectory();
addAndCommit(directory, indexDeletionPolicy);
addAndCommit(directory, indexDeletionPolicy);
addAndCommit(directory, indexDeletionPolicy);
Thread.sleep(1000);
assertEquals(3, IndexReader.listCommits(directory).size());
Thread.sleep(4000);
addAndCommit(directory, indexDeletionPolicy);
assertEquals(2, IndexReader.listCommits(directory).size());
Thread.sleep(4000);
openClose(directory, indexDeletionPolicy);
assertEquals(1, IndexReader.listCommits(directory).size());
}

private void openClose(RAMDirectory directory, TimeBasedIndexDeletionPolicy indexDeletionPolicy) throws CorruptIndexException, LockObtainFailedException, IOException {
IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, new KeywordAnalyzer());
conf.setIndexDeletionPolicy(indexDeletionPolicy);
new IndexWriter(directory, conf).close();
}

private void addAndCommit(RAMDirectory directory, IndexDeletionPolicy indexDeletionPolicy) throws CorruptIndexException, LockObtainFailedException, IOException {
IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, new KeywordAnalyzer());
conf.setIndexDeletionPolicy(indexDeletionPolicy);
IndexWriter writer = new IndexWriter(directory, conf);
Document doc = new Document();
doc.add(new Field("id", "1", Store.YES, Index.ANALYZED_NO_NORMS));
writer.addDocument(doc);
writer.close();
}

}
@@ -0,0 +1,67 @@
package com.nearinfinity.blur.thrift;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.List;

import org.apache.thrift.TException;

import com.nearinfinity.blur.thrift.commands.BlurCommand;
import com.nearinfinity.blur.thrift.generated.Blur.Client;
import com.nearinfinity.blur.thrift.generated.Blur.Iface;
import com.nearinfinity.blur.thrift.generated.BlurException;

public class BlurClient {

static class BlurClientInvocationHandler implements InvocationHandler {

private List<Connection> connections;

public BlurClientInvocationHandler(List<Connection> connections) {
this.connections = connections;
}

@Override
public Object invoke(Object proxy, final Method method, final Object[] args) throws Throwable {
return BlurClientManager.execute(connections, new BlurCommand<Object>() {
@Override
public Object call(Client client) throws BlurException, TException {
try {
return method.invoke(client, args);
} catch (IllegalArgumentException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
Throwable targetException = e.getTargetException();
if (targetException instanceof BlurException) {
throw (BlurException) targetException;
}
if (targetException instanceof TException) {
throw (TException) targetException;
}
throw new RuntimeException(targetException);
}
}
});
}

}

public static Iface getClient(String connectionStr) {
List<Connection> connections = BlurClientManager.getConnections(connectionStr);
return getClient(connections);
}

public static Iface getClient(Connection connection) {
return getClient(Arrays.asList(connection));
}

public static Iface getClient(List<Connection> connections) {
return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class }, new BlurClientInvocationHandler(connections));
}

}
Expand Up @@ -250,10 +250,10 @@ public static void sleep(long backOffTime, long maxBackOffTime, int retry, int m
}

public static <CLIENT, T> T execute(String connectionStr, AbstractCommand<CLIENT, T> command, int maxRetries, long backOffTime, long maxBackOffTime) throws BlurException, TException, IOException {
return execute(getCommands(connectionStr),command,maxRetries,backOffTime,maxBackOffTime);
return execute(getConnections(connectionStr),command,maxRetries,backOffTime,maxBackOffTime);
}

private static List<Connection> getCommands(String connectionStr) {
public static List<Connection> getConnections(String connectionStr) {
int start = 0;
int index = connectionStr.indexOf(',');
if (index >= 0) {
Expand All @@ -270,7 +270,7 @@ private static List<Connection> getCommands(String connectionStr) {
}

public static <CLIENT, T> T execute(String connectionStr, AbstractCommand<CLIENT, T> command) throws BlurException, TException, IOException {
return execute(getCommands(connectionStr),command);
return execute(getConnections(connectionStr),command);
}

private static void returnClient(Connection connection, AtomicReference<Blur.Client> client) {
Expand Down
2 changes: 1 addition & 1 deletion src/blur-util/src/main/resources/blur-default.properties
Expand Up @@ -7,7 +7,7 @@ blur.shard.cache.max.querycache.elements=128
blur.shard.cache.max.timetolive=60000
blur.shard.filter.cache.class=com.nearinfinity.blur.manager.DefaultBlurFilterCache
blur.shard.index.warmup.class=com.nearinfinity.blur.manager.indexserver.DefaultBlurIndexWarmup
blur.shard.index.deletion.policy.class=org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy
blur.shard.index.deletion.policy.maxage=300000
blur.shard.blockcache.direct.memory.allocation=true
blur.shard.blockcache.slab.count=1
blur.shard.buffercache.1024=8192
Expand Down

0 comments on commit 31daed3

Please sign in to comment.