Permalink
Browse files

Merge branch 'solandra' of git://github.com/tjake/Solandra into solandra

  • Loading branch information...
2 parents 3a589dd + 8b044cf commit 68bfd91f92e8241b963eb36b65ddeb32263cc38c @ceocoder committed Jun 14, 2011
@@ -26,6 +26,8 @@
import java.util.concurrent.ConcurrentMap;
import lucandra.cluster.CassandraIndexManager;
+import lucandra.serializers.thrift.DocumentMetadata;
+import lucandra.serializers.thrift.ThriftTerm;
import com.google.common.collect.MapMaker;
@@ -34,19 +36,19 @@
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.SimpleAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.FieldSelector;
+import org.apache.lucene.analysis.tokenattributes.TypeAttribute;
+import org.apache.lucene.document.*;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.*;
+import org.apache.lucene.index.IndexWriter.MaxFieldLength;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.TermFreqVector;
-import org.apache.lucene.index.IndexWriter.MaxFieldLength;
import org.apache.lucene.search.Similarity;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.util.OpenBitSet;
import solandra.SolandraFieldSelector;
@@ -311,49 +313,30 @@ public Document document(int docNum, FieldSelector selector) throws CorruptIndex
continue;
}
- byte[] value;
- ByteBuffer v = ByteBuffer.wrap(CassandraUtils.decompress(ByteBufferUtil.getArray(col.value())));
- int vlimit = v.limit() - v.position();
-
- if (v.get(v.limit() - 1) != Byte.MAX_VALUE && v.get(v.limit() - 1) != Byte.MIN_VALUE)
+ DocumentMetadata dm = lucandra.IndexWriter.fromBytesUsingThrift(col.value());
+
+ for(ThriftTerm term : dm.getTerms())
{
- throw new CorruptIndexException("Lucandra field is not properly encoded: " + docNum + "("
- + fieldName + ")");
-
- }
- else if (v.get(v.limit() - 1) == Byte.MAX_VALUE)
- { // Binary
- value = new byte[vlimit - 1];
- ByteBufferUtil.arrayCopy(v, v.position(), value, 0, vlimit - 1);
-
- field = new Field(fieldName, value, Store.YES);
- cacheDoc.add(field);
- }
- else if (v.get(v.limit() - 1) == Byte.MIN_VALUE)
- { // String
- value = new byte[vlimit - 1];
- ByteBufferUtil.arrayCopy(v, v.position(), value, 0, vlimit - 1);
-
- // Check for multi-fields
- String fieldString = new String(value, "UTF-8");
-
- if (fieldString.indexOf(CassandraUtils.delimeter) >= 0)
+ Fieldable f = null;
+
+ if( term.isSetLongVal() )
+ {
+ f = new NumericField(term.getField()).setLongValue(term.getLongVal());
+ }
+ else if(term.isSetIs_binary())
{
- StringTokenizer tok = new StringTokenizer(fieldString, CassandraUtils.delimeter);
- while (tok.hasMoreTokens())
- {
- field = new Field(fieldName, tok.nextToken(), Store.YES, Index.ANALYZED);
- cacheDoc.add(field);
- }
+ if(term.is_binary)
+ f = new Field(term.getField(), term.getText());
+ else
+ f = new Field(term.getField(), new String(term.getText()), Store.YES, Index.ANALYZED);
}
else
- {
+ throw new RuntimeException("Malformed term");
+
+ cacheDoc.add(f);
- field = new Field(fieldName, fieldString, Store.YES, Index.ANALYZED);
- cacheDoc.add(field);
- }
- }
- }
+ }
+ }
}
// Mark the required doc
@@ -19,9 +19,7 @@
*/
package lucandra;
-import java.io.IOException;
-import java.io.StringReader;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
@@ -44,22 +42,19 @@
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
-import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
-import org.apache.lucene.analysis.tokenattributes.PositionIncrementAttribute;
-import org.apache.lucene.analysis.tokenattributes.TermAttribute;
+import org.apache.lucene.analysis.tokenattributes.*;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.document.NumericField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.FieldInvertState;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.*;
-import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.transport.TMemoryInputTransport;
import org.apache.thrift.transport.TTransport;
@@ -89,8 +84,8 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
ByteBuffer indexTermsKey = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes, "terms"
.getBytes("UTF-8"));
- List<ThriftTerm> allIndexedTerms = new ArrayList<ThriftTerm>();
- Map<String, byte[]> fieldCache = new HashMap<String, byte[]>(1024);
+ DocumentMetadata allIndexedTerms = new DocumentMetadata();
+ Map<String, DocumentMetadata> fieldCache = new HashMap<String, DocumentMetadata>(1024);
// By default we don't handle indexSharding
// We round robin replace the index
@@ -99,21 +94,27 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
ByteBuffer docId = ByteBuffer.wrap(CassandraUtils.writeVInt(docNumber));
int position = 0;
- for (Fieldable field : (List<Fieldable>) doc.getFields())
+ for (Fieldable field : doc.getFields())
{
ThriftTerm firstTerm = null;
+
// Indexed field
if (field.isIndexed() && field.isTokenized())
{
-
TokenStream tokens = field.tokenStreamValue();
if (tokens == null)
{
- tokens = analyzer.tokenStream(field.name(), new StringReader(field.stringValue()));
+ Reader tokReader = field.readerValue();
+
+ if (tokReader == null)
+ tokReader = new StringReader(field.stringValue());
+
+ tokens = analyzer.reusableTokenStream(field.name(), tokReader);
}
+
// collect term information per field
Map<Term, Map<ByteBuffer, List<Number>>> allTermInformation = new HashMap<Term, Map<ByteBuffer, List<Number>>>();
@@ -138,11 +139,12 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
// positions
PositionIncrementAttribute posIncrAttribute = null;
if (field.isStorePositionWithTermVector())
- posIncrAttribute = (PositionIncrementAttribute) tokens
- .addAttribute(PositionIncrementAttribute.class);
-
- TermAttribute termAttribute = (TermAttribute) tokens.addAttribute(TermAttribute.class);
+ posIncrAttribute = (PositionIncrementAttribute) tokens.addAttribute(PositionIncrementAttribute.class);
+ //term as string
+ CharTermAttribute termAttribute = (CharTermAttribute) tokens.addAttribute(CharTermAttribute.class);
+
+
// store normalizations of field per term per document rather
// than per field.
// this adds more to write but less to read on other side
@@ -151,14 +153,14 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
while (tokens.incrementToken())
{
tokensInField++;
- Term term = new Term(field.name(), termAttribute.term());
-
- ThriftTerm tterm = new ThriftTerm(field.name(), termAttribute.term());
+ Term term = new Term(field.name(), termAttribute.toString());
+
+ ThriftTerm tterm = new ThriftTerm(term.field()).setText(ByteBuffer.wrap(term.text().getBytes("UTF-8"))).setIs_binary(false);
if(firstTerm == null)
firstTerm = tterm;
- allIndexedTerms.add(tterm);
+ allIndexedTerms.addToTerms(tterm);
// fetch all collected information for this term
Map<ByteBuffer, List<Number>> termInfo = allTermInformation.get(term);
@@ -227,7 +229,7 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
invertState.setLength(tokensInField);
final float norm = similarity.computeNorm(field.name(), invertState);
- bnorm.add(Similarity.encodeNorm(norm));
+ bnorm.add(Similarity.getDefault().encodeNormValue(norm));
}
for (Map.Entry<Term, Map<ByteBuffer, List<Number>>> term : allTermInformation.entrySet())
@@ -259,12 +261,12 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
// Untokenized fields go in without a termPosition
if (field.isIndexed() && !field.isTokenized())
{
- ThriftTerm tterm = new ThriftTerm(field.name(), field.stringValue());
+ ThriftTerm tterm = new ThriftTerm(field.name()).setText(ByteBuffer.wrap(field.stringValue().getBytes("UTF-8"))).setIs_binary(false);
if(firstTerm == null)
firstTerm = tterm;
- allIndexedTerms.add(tterm);
+ allIndexedTerms.addToTerms(tterm);
ByteBuffer key = CassandraUtils.hashKeyBytes(indexName.getBytes("UTF-8"),
CassandraUtils.delimeterBytes, field.name().getBytes("UTF-8"), CassandraUtils.delimeterBytes,
@@ -284,60 +286,49 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
// Stores each field as a column under this doc key
if (field.isStored())
- {
-
- byte[] _value = field.isBinary() ? field.getBinaryValue() : field.stringValue().getBytes("UTF-8");
-
- // first byte flags if binary or not
- byte[] value = new byte[_value.length + 1];
- System.arraycopy(_value, 0, value, 0, _value.length);
-
- value[value.length - 1] = (byte) (field.isBinary() ? Byte.MAX_VALUE : Byte.MIN_VALUE);
-
- // logic to handle multiple fields w/ same name
- byte[] currentValue = fieldCache.get(field.name());
- if (currentValue == null)
+ {
+ ThriftTerm tt = new ThriftTerm(field.name());
+
+ if (field instanceof NumericField)
{
- fieldCache.put(field.name(), value);
+ Number n = ((NumericField) field).getNumericValue();
+ tt.setLongVal(n.longValue());
}
- else
+
+ byte[] value = field.isBinary() ? field.getBinaryValue() : field.stringValue().getBytes("UTF-8");
+ tt.setText(ByteBuffer.wrap(value)).setIs_binary(field.isBinary());
+
+
+ // logic to handle multiple fields w/ same name
+ DocumentMetadata currentValue = fieldCache.get(field.name());
+ if (currentValue == null)
{
-
- // append new data
- byte[] newValue = new byte[currentValue.length + CassandraUtils.delimeterBytes.length
- + value.length - 1];
- System.arraycopy(currentValue, 0, newValue, 0, currentValue.length - 1);
- System.arraycopy(CassandraUtils.delimeterBytes, 0, newValue, currentValue.length - 1,
- CassandraUtils.delimeterBytes.length);
- System.arraycopy(value, 0, newValue,
- currentValue.length + CassandraUtils.delimeterBytes.length - 1, value.length);
-
- fieldCache.put(field.name(), newValue);
+ currentValue = new DocumentMetadata();
+ fieldCache.put(field.name(), currentValue);
}
+
+ currentValue.addToTerms(tt);
}
//Store for field cache
if(firstTerm != null)
- {
-
+ {
ByteBuffer fieldCacheKey = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes, firstTerm.field.getBytes());
- CassandraUtils.addMutations(workingMutations, CassandraUtils.fieldCacheColumnFamily, CassandraUtils.writeVInt(docNumber), fieldCacheKey, firstTerm.text.getBytes("UTF-8"));
+ CassandraUtils.addMutations(workingMutations, CassandraUtils.fieldCacheColumnFamily, CassandraUtils.writeVInt(docNumber), fieldCacheKey, firstTerm.text);
if(logger.isDebugEnabled())
logger.debug(indexName+" - firstTerm: "+ByteBufferUtil.string(fieldCacheKey));
-
- }
-
+ }
}
ByteBuffer key = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes,
Integer.toHexString(docNumber).getBytes("UTF-8"));
// Store each field as a column under this docId
- for (Map.Entry<String, byte[]> field : fieldCache.entrySet())
+ for (Map.Entry<String, DocumentMetadata> field : fieldCache.entrySet())
{
CassandraUtils.addMutations(workingMutations, CassandraUtils.docColumnFamily, field.getKey().getBytes(
- "UTF-8"), key, CassandraUtils.compress(field.getValue()));
+ "UTF-8"), key, toBytesUsingThrift(field.getValue()));
}
// Finally, Store meta-data so we can delete this document
@@ -451,26 +442,26 @@ private void deleteLucandraDocument(String indexName, int docNumber, boolean aut
if (metaCol == null)
return;
- List<Term> terms = fromBytesUsingThrift(metaCol.value());
+ DocumentMetadata terms = fromBytesUsingThrift(metaCol.value());
Set<String> fields = new HashSet<String>();
- for (Term term : terms)
+ for (ThriftTerm term : terms.getTerms())
{
//remove from field cache
- if(!fields.contains(term.field()))
+ if(!fields.contains(term.getField()))
{
- ByteBuffer fieldCacheKey = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes, term.field().getBytes());
+ ByteBuffer fieldCacheKey = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes, term.getField().getBytes());
CassandraUtils.addMutations(workingMutations, CassandraUtils.fieldCacheColumnFamily, CassandraUtils.writeVInt(docNumber), fieldCacheKey, (ByteBuffer) null);
- fields.add(term.field());
+ fields.add(term.getField());
}
try
{
- key = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes, term.field()
- .getBytes("UTF-8"), CassandraUtils.delimeterBytes, term.text().getBytes("UTF-8"));
+ key = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes, term.getField()
+ .getBytes("UTF-8"), CassandraUtils.delimeterBytes, term.getText());
}
catch (UnsupportedEncodingException e)
{
@@ -599,9 +590,8 @@ private void appendMutations(String indexName, Map<ByteBuffer, RowMutation> muta
}
/** Write all terms to bytes using thrift serialization */
- public static ByteBuffer toBytesUsingThrift(List<ThriftTerm> allTerms) throws IOException
+ public static ByteBuffer toBytesUsingThrift(DocumentMetadata data) throws IOException
{
- DocumentMetadata data = new DocumentMetadata(allTerms);
try
{
@@ -614,7 +604,7 @@ public static ByteBuffer toBytesUsingThrift(List<ThriftTerm> allTerms) throws IO
}
/** Read the object from bytes string. */
- public static List<Term> fromBytesUsingThrift(ByteBuffer data) throws IOException
+ public static DocumentMetadata fromBytesUsingThrift(ByteBuffer data) throws IOException
{
DocumentMetadata docMeta = new DocumentMetadata();
@@ -632,11 +622,6 @@ public static ByteBuffer toBytesUsingThrift(List<ThriftTerm> allTerms) throws IO
throw new IOException(e);
}
- List<Term> terms = new ArrayList<Term>(docMeta.terms.size());
- for(ThriftTerm term : docMeta.terms)
- {
- terms.add(new Term(term.field, term.text));
- }
- return terms;
+ return docMeta;
}
}
Oops, something went wrong.

0 comments on commit 68bfd91

Please sign in to comment.