Browse files

Merge remote branch 'upstream/master'

  • Loading branch information...
2 parents 60ecd30 + 9d3402a commit 7c5fc8d5f86a464ea01d85251dc7a312ec1620f4 @hofmeister committed May 16, 2011
View
6 README.md
@@ -24,6 +24,12 @@ Java 1.5 (or above) is required; the <strike>Sun</strike> Oracle version is reco
<h1>Build and run couchdb-lucene</h1>
+If you are on OS X, you might find it easiest to;
+
+<pre>
+brew install couchdb-lucene
+</pre>
+
<ol>
<li>Install Maven 2.
<li>checkout repository
View
10 pom.xml
@@ -38,6 +38,11 @@
<version>1.6</version>
</dependency>
<dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.5</version>
+ </dependency>
+ <dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20090211</version>
@@ -73,6 +78,11 @@
<version>${tika-version}</version>
</dependency>
<dependency>
+ <groupId>org.erlang.otp</groupId>
+ <artifactId>jinterface</artifactId>
+ <version>1.5.3.2</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.5.6</version>
View
55 src/main/java/com/github/rnewson/couchdb/lucene/DatabaseIndexer.java
@@ -52,7 +52,7 @@
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
-import org.apache.lucene.store.SimpleFSLockFactory;
+import org.apache.lucene.store.SingleInstanceLockFactory;
import org.apache.lucene.util.Version;
import org.json.JSONArray;
import org.json.JSONException;
@@ -63,6 +63,7 @@
import com.github.rnewson.couchdb.lucene.couchdb.CouchDocument;
import com.github.rnewson.couchdb.lucene.couchdb.Database;
import com.github.rnewson.couchdb.lucene.couchdb.DesignDocument;
+import com.github.rnewson.couchdb.lucene.couchdb.UpdateSequence;
import com.github.rnewson.couchdb.lucene.couchdb.View;
import com.github.rnewson.couchdb.lucene.util.Analyzers;
import com.github.rnewson.couchdb.lucene.util.Constants;
@@ -81,7 +82,7 @@
private String etag;
private final Analyzer analyzer;
- private long pending_seq;
+ private UpdateSequence pending_seq;
private IndexReader reader;
private final IndexWriter writer;
private final Database database;
@@ -163,10 +164,10 @@ private void blockForLatest(final boolean staleOk) throws IOException, JSONExcep
if (staleOk) {
return;
}
- final long latest = database.getInfo().getUpdateSequence();
+ final UpdateSequence latest = database.getInfo().getUpdateSequence();
synchronized (this) {
long timeout = getSearchTimeout();
- while (pending_seq < latest) {
+ while (pending_seq.isEarlierThan(latest)) {
try {
final long start = System.currentTimeMillis();
wait(timeout);
@@ -181,8 +182,8 @@ private void blockForLatest(final boolean staleOk) throws IOException, JSONExcep
}
}
- private synchronized void setPendingSequence(final long newSequence) {
- pending_seq = newSequence;
+ private synchronized void setPendingSequence(final UpdateSequence seq) {
+ pending_seq = seq;
notifyAll();
}
@@ -225,7 +226,7 @@ private static long now() {
private final Database database;
- private long ddoc_seq;
+ private UpdateSequence ddoc_seq;
private long lastCommit;
@@ -239,7 +240,7 @@ private static long now() {
private final File root;
- private long since;
+ private UpdateSequence since;
private final Map<View, IndexState> states = Collections
.synchronizedMap(new HashMap<View, IndexState>());
@@ -321,7 +322,7 @@ public Void handleResponse(final HttpResponse response)
break loop;
}
- final long seq = json.getLong("seq");
+ final UpdateSequence seq = new UpdateSequence(json.getString("seq"));
final String id = json.getString("id");
CouchDocument doc;
if (!json.isNull("doc")) {
@@ -343,7 +344,7 @@ public Void handleResponse(final HttpResponse response)
}
if (id.startsWith("_design")) {
- if (seq > ddoc_seq) {
+ if (ddoc_seq.isEarlierThan(seq)) {
logger.info("Exiting due to design document change.");
break loop;
}
@@ -360,7 +361,7 @@ public Void handleResponse(final HttpResponse response)
final View view = entry.getKey();
final IndexState state = entry.getValue();
- if (seq > state.pending_seq) {
+ if (state.pending_seq.isEarlierThan(seq)) {
final Document[] docs;
try {
docs = state.converter.convert(doc, view
@@ -663,7 +664,9 @@ private void close() {
}
}
states.clear();
- Context.exit();
+ if (context != null) {
+ Context.exit();
+ }
latch.countDown();
}
@@ -676,9 +679,9 @@ private void commitAll() throws IOException {
final View view = entry.getKey();
final IndexState state = entry.getValue();
- if (state.pending_seq > getUpdateSequence(state.writer)) {
+ if (getUpdateSequence(state.writer).isEarlierThan(state.pending_seq)) {
final Map<String, String> userData = new HashMap<String, String>();
- userData.put("last_seq", Long.toString(state.pending_seq));
+ userData.put("last_seq", state.pending_seq.toString());
state.writer.commit(userData);
logger.info(view + " now at update_seq " + state.pending_seq);
}
@@ -712,22 +715,22 @@ private IndexState getState(final HttpServletRequest req,
return result;
}
- private long getUpdateSequence(final Directory dir) throws IOException {
+ private UpdateSequence getUpdateSequence(final Directory dir) throws IOException {
if (!IndexReader.indexExists(dir)) {
- return 0L;
+ return UpdateSequence.BOTTOM;
}
return getUpdateSequence(IndexReader.getCommitUserData(dir));
}
- private long getUpdateSequence(final IndexWriter writer) throws IOException {
+ private UpdateSequence getUpdateSequence(final IndexWriter writer) throws IOException {
return getUpdateSequence(writer.getDirectory());
}
- private long getUpdateSequence(final Map<String, String> userData) {
+ private UpdateSequence getUpdateSequence(final Map<String, String> userData) {
if (userData != null && userData.containsKey("last_seq")) {
- return Long.parseLong(userData.get("last_seq"));
+ return new UpdateSequence(userData.get("last_seq"));
}
- return 0L;
+ return UpdateSequence.BOTTOM;
}
private void init() throws IOException, JSONException {
@@ -738,7 +741,7 @@ private void init() throws IOException, JSONException {
context.setOptimizationLevel(9);
this.ddoc_seq = database.getInfo().getUpdateSequence();
- this.since = -1L;
+ this.since = null;
for (final DesignDocument ddoc : database.getAllDesignDocuments()) {
for (final Entry<String, View> entry : ddoc.getAllViews()
@@ -749,12 +752,14 @@ private void init() throws IOException, JSONException {
if (!states.containsKey(view)) {
final Directory dir = FSDirectory.open(viewDir(view, true),
- new SimpleFSLockFactory());
- final long seq = getUpdateSequence(dir);
- if (since == -1) {
+ new SingleInstanceLockFactory());
+ final UpdateSequence seq = getUpdateSequence(dir);
+ if (since == null) {
+ since = seq;
+ }
+ if (seq.isEarlierThan(since)) {
since = seq;
}
- since = Math.min(since, seq);
logger.debug(dir + " bumped since to " + since);
final DocumentConverter converter = new DocumentConverter(
View
11 src/main/java/com/github/rnewson/couchdb/lucene/Main.java
@@ -61,7 +61,6 @@ public static void main(String[] args) throws Exception {
System.exit(1);
}
LOG.info("Index output goes to: " + dir.getCanonicalPath());
- cleanLocks(dir);
final Server server = new Server();
final SelectChannelConnector connector = new SelectChannelConnector();
@@ -89,14 +88,4 @@ public static void main(String[] args) throws Exception {
server.join();
}
- private static void cleanLocks(final File root) throws IOException {
- final Iterator it = FileUtils.iterateFiles(root,
- new String[] { "lock" }, true);
- while (it.hasNext()) {
- final File lock = (File) it.next();
- LOG.info("Releasing stale lock at " + lock);
- lock.delete();
- }
- }
-
}
View
7 src/main/java/com/github/rnewson/couchdb/lucene/Tika.java
@@ -21,9 +21,11 @@
import java.io.IOException;
import java.io.InputStream;
+import org.apache.commons.io.IOUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.DublinCore;
import org.apache.tika.metadata.HttpHeaders;
@@ -50,13 +52,10 @@ public void parse(final InputStream in, final String contentType, final String f
try {
// Add body text.
- doc.add(text(fieldName, tika.parseToString(in, md), false));
+ doc.add(new Field(fieldName, tika.parse(in, md)));
} catch (final IOException e) {
log.warn("Failed to index an attachment.", e);
return;
- } catch (final TikaException e) {
- log.warn("Failed to parse an attachment.", e);
- return;
}
// Add DC attributes.
View
2 src/main/java/com/github/rnewson/couchdb/lucene/couchdb/Database.java
@@ -105,7 +105,7 @@ public DatabaseInfo getInfo() throws IOException, JSONException {
return httpClient.execute(get, handler);
}
- public HttpUriRequest getChangesRequest(final long since)
+ public HttpUriRequest getChangesRequest(final UpdateSequence since)
throws IOException {
return new HttpGet(
url
View
4 src/main/java/com/github/rnewson/couchdb/lucene/couchdb/DatabaseInfo.java
@@ -11,8 +11,8 @@ public DatabaseInfo(final JSONObject json) {
this.json = json;
}
- public long getUpdateSequence() throws JSONException {
- return json.getLong("update_seq");
+ public UpdateSequence getUpdateSequence() throws JSONException {
+ return new UpdateSequence(json.getString("update_seq"));
}
public String getName() throws JSONException {
View
84 src/main/java/com/github/rnewson/couchdb/lucene/couchdb/UpdateSequence.java
@@ -0,0 +1,84 @@
+package com.github.rnewson.couchdb.lucene.couchdb;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.codec.binary.Base64;
+
+import com.ericsson.otp.erlang.OtpErlangDecodeException;
+import com.ericsson.otp.erlang.OtpErlangList;
+import com.ericsson.otp.erlang.OtpErlangLong;
+import com.ericsson.otp.erlang.OtpErlangObject;
+import com.ericsson.otp.erlang.OtpErlangTuple;
+import com.ericsson.otp.erlang.OtpInputStream;
+
+public final class UpdateSequence {
+
+ public static final UpdateSequence BOTTOM = new UpdateSequence("0");
+
+ private long seq;
+ private Map<String, Long> vector;
+ private final String asString;
+
+ public UpdateSequence(final String seq) {
+ this.asString = seq;
+
+ if (seq.matches("[0-9]+")) {
+ this.seq = Long.parseLong(seq);
+ return;
+ }
+
+ if (seq.matches("[0-9]+-[0-9a-zA-Z_-]+")) {
+ final String packedSeqs = seq.split("-", 2)[1];
+ final byte[] bytes = new Base64(true).decode(packedSeqs);
+ final OtpInputStream stream = new OtpInputStream(bytes);
+ try {
+ final OtpErlangList list = (OtpErlangList) stream.read_any();
+ this.vector = new HashMap<String, Long>();
+ for (int i = 0, arity = list.arity(); i < arity; i++) {
+ final OtpErlangTuple tuple = (OtpErlangTuple) list
+ .elementAt(i);
+ final OtpErlangObject node = tuple.elementAt(0);
+ final OtpErlangObject range = tuple.elementAt(1);
+ final OtpErlangLong node_seq = (OtpErlangLong) tuple
+ .elementAt(2);
+ vector.put(node + "-" + range, node_seq.longValue());
+ }
+ } catch (final OtpErlangDecodeException e) {
+ throw new IllegalArgumentException(seq + " not valid.");
+ }
+ return;
+ }
+
+ throw new IllegalArgumentException(seq + " not recognized.");
+ }
+
+ public boolean isEarlierThan(final UpdateSequence other) {
+ if (this == BOTTOM) {
+ return true;
+ }
+
+ if (vector == null && other.vector == null) {
+ return this.seq < other.seq;
+ } else if (vector != null && other.vector != null) {
+ final Iterator<Entry<String, Long>> it = this.vector.entrySet()
+ .iterator();
+ while (it.hasNext()) {
+ final Entry<String, Long> entry = it.next();
+ final Long otherValue = other.vector.get(entry.getKey());
+ if (otherValue != null && otherValue >= entry.getValue()) {
+ return false;
+ }
+ }
+ return true;
+ }
+ throw new IllegalArgumentException(other + " is not compatible.");
+ }
+
+ public String toString() {
+ return asString;
+ }
+
+}
View
2 src/test/java/com/github/rnewson/couchdb/lucene/TikaTest.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
@@ -53,7 +52,6 @@ public void testXML() throws IOException {
public void testWord() throws IOException {
parse("example.doc", "application/msword", "bar");
assertThat(doc.getField("bar"), not(nullValue()));
- assertThat(doc.get("bar"), containsString("576 dsf45 d56 dsgh"));
}
private void parse(final String resource, final String type, final String field) throws IOException {

0 comments on commit 7c5fc8d

Please sign in to comment.