Skip to content
Browse files

Merge branch 'solandra' of git://github.com/tjake/Solandra into solandra

Conflicts:
	test/lucandra/cluster/IndexManagerTests.java
  • Loading branch information...
2 parents 97dd32c + ec35913 commit 6b791b4d1397d1ccc01fbe0a5d72cd2f68c1974a @ceocoder committed May 18, 2011
View
25 build.xml
@@ -170,6 +170,31 @@
<fail if="testfailed" message="Some test(s) failed." />
</target>
+ <target name="test-long" depends="compile.tests">
+ <echo message="running tests" />
+ <mkdir dir="${build}/output" />
+ <junit fork="on" failureproperty="testfailed">
+
+ <classpath>
+ <path refid="solandra.classpath"/>
+ <pathelement location="${basedir}/solandra-app/"/>
+ </classpath>
+
+ <formatter type="xml" usefile="true" />
+ <formatter type="brief" usefile="false" />
+
+ <batchtest todir="${build}/output">
+ <fileset dir="${build.test.classes}" includes="**/IndexManagerTests.class" />
+ </batchtest>
+
+ <jvmarg value="-Xmx1G" />
+ <jvmarg value="-Dlog4j.configuration=file:///${basedir}/resources/log4j.properties"/>
+ <jvmarg value="-Dlog4j.defaultInitOverride=true" />
+
+ </junit>
+ <fail if="testfailed" message="Some test(s) failed." />
+ </target>
+
<target name="solandra.jar" depends="compile,compile.tests">
<jar jarfile="solandra.jar" basedir="${build.classes}" />
<jar jarfile="solandra-tests.jar" basedir="${build.test.classes}" />
View
4 reuters-demo/schema.xml
@@ -221,8 +221,8 @@
<fields>
<field name="id" type="string" indexed="true" stored="true" required="true" />
- <field name="title" type="text" indexed="true" stored="true"/>
- <field name="text" type="text" indexed="true" stored="true"/>
+ <field name="title" type="text" indexed="true" stored="true" termPositions="true"/>
+ <field name="text" type="text" indexed="true" stored="true" termPositions="true"/>
<field name="date" type="date" indexed="true" stored="true"/>
<field name="dateline" type="text" indexed="true" stored="true"/>
<field name="places" type="string" indexed="true" stored="true" multiValued="true" omitNorms="true" termVectors="true" />
View
38 src/lucandra/IndexWriter.java
@@ -36,7 +36,9 @@
import com.google.common.collect.MapMaker;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.log4j.Logger;
@@ -355,23 +357,45 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
commit(indexName, false);
}
- public TopDocs deleteDocuments(String indexName, Query query, boolean autoCommit) throws CorruptIndexException,
+ public long deleteDocuments(String indexName, Query query, boolean autoCommit) throws CorruptIndexException,
IOException
{
IndexReader reader = new IndexReader(indexName);
IndexSearcher searcher = new IndexSearcher(reader);
- TopDocs results = searcher.search(query, 1000);
+
+ // Also delete the id lookup
+ ByteBuffer idKey = CassandraUtils.hashKeyBytes(indexName.getBytes("UTF-8"),
+ CassandraUtils.delimeterBytes, "ids".getBytes("UTF-8"));
+
+ RowMutation rm = new RowMutation(CassandraUtils.keySpace, idKey);
- for (int i = 0; i < results.totalHits; i++)
+
+ TopDocs results = null;
+ long total = 0;
+ do
{
- ScoreDoc doc = results.scoreDocs[i];
+ results = searcher.search(query, 1024);
- deleteLucandraDocument(indexName, doc.doc, autoCommit);
- }
+ for (int i = 0; i < results.totalHits; i++)
+ {
+ ScoreDoc doc = results.scoreDocs[i];
- return results;
+ deleteLucandraDocument(indexName, doc.doc, true);
+
+ //Scale the doc ID to the sharded id.
+ ByteBuffer buf = ByteBufferUtil.bytes(String.valueOf(doc.doc));
+ rm.delete(new QueryPath(CassandraUtils.schemaInfoColumnFamily, buf), System.currentTimeMillis());
+ }
+
+
+ CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
+
+ total += results.totalHits;
+ }while(results.totalHits > 0);
+
+ return total;
}
public void deleteDocuments(String indexName, Term term, boolean autoCommit) throws CorruptIndexException,
View
573 src/lucandra/cluster/CassandraIndexManager.java
@@ -26,16 +26,15 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import lucandra.CassandraUtils;
import com.google.common.collect.MapMaker;
-import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
-
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.service.StorageService;
@@ -45,35 +44,36 @@
import org.apache.cassandra.utils.FBUtilities;
import org.apache.log4j.Logger;
-
//Instead of grabbing all of them just grab a contiguous slab via offset
public class CassandraIndexManager
{
// To increase throughput we distribute docs across a number of shards at
// once
// The idea being different shards live on different boxes
- protected final int shardsAtOnce;
+ protected final int shardsAtOnce;
- private int[] randomSeq;
+ private int[] randomSeq;
- public static final int maxDocsPerShard = Integer.valueOf(CassandraUtils.properties
- .getProperty(
- "solandra.maximum.docs.per.shard",
- "131072"));
- public static final int reserveSlabSize = Integer.valueOf(CassandraUtils.properties
- .getProperty(
- "solandra.index.id.reserve.size",
- "16384"));
+ public static final int maxDocsPerShard = Integer
+ .valueOf(CassandraUtils.properties
+ .getProperty(
+ "solandra.maximum.docs.per.shard",
+ "131072"));
+ public static final int reserveSlabSize = Integer.valueOf(CassandraUtils.properties
+ .getProperty(
+ "solandra.index.id.reserve.size",
+ "16384"));
- private final int offsetSlots = maxDocsPerShard / reserveSlabSize;
- public final int expirationTime = 120; // seconds
+ private final int offsetSlots = (maxDocsPerShard / reserveSlabSize);
+ public final int expirationTime = 120; // seconds
- private final ConcurrentMap<String, AllNodeRsvps> indexReserves = new MapMaker().makeMap();
-
- private final ConcurrentMap<String, ShardInfo> indexShards = new MapMaker().makeMap();
+ private final ConcurrentMap<String, AllNodeRsvps> indexReserves = new MapMaker().makeMap();
- private static final Logger logger = Logger.getLogger(CassandraIndexManager.class);
+ private final ConcurrentMap<String, ShardInfo> indexShards = new MapMaker().makeMap();
+ private final ConcurrentMap<String, ShardInfo> indexUsed = new MapMaker().makeMap();
+
+ private static final Logger logger = Logger.getLogger(CassandraIndexManager.class);
private class ShardInfo
{
@@ -89,74 +89,174 @@ public ShardInfo(String indexName)
}
private class NodeInfo
- {
+ {
public Integer shard;
public Map<String, AtomicInteger> nodes = new HashMap<String, AtomicInteger>();
public NodeInfo(Integer shard)
{
this.shard = shard;
}
+
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + getOuterType().hashCode();
+ result = prime * result + ((nodes == null) ? 0 : nodes.hashCode());
+ result = prime * result + ((shard == null) ? 0 : shard.hashCode());
+ return result;
+ }
+
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ NodeInfo other = (NodeInfo) obj;
+ if (!getOuterType().equals(other.getOuterType()))
+ return false;
+ if (nodes == null)
+ {
+ if (other.nodes != null)
+ return false;
+ }
+ else if (!nodes.equals(other.nodes))
+ return false;
+ if (shard == null)
+ {
+ if (other.shard != null)
+ return false;
+ }
+ else if (!shard.equals(other.shard))
+ return false;
+ return true;
+ }
+
+ private CassandraIndexManager getOuterType()
+ {
+ return CassandraIndexManager.this;
+ }
}
private class AllNodeRsvps
{
- public final AtomicLong incrementor = new AtomicLong(0);
- public final List<RsvpInfo> rsvpList = new ArrayList<RsvpInfo>();
-
+ public final AtomicLong incrementor = new AtomicLong(0);
+ public final List<RsvpInfo> rsvpList = new ArrayList<RsvpInfo>();
+
public Long getNextId()
- {
- if(rsvpList.isEmpty())
+ {
+ if (rsvpList.isEmpty())
+ {
return null;
+ }
long start = incrementor.incrementAndGet();
- long len = rsvpList.size();
- long end = start + len;
-
- for(long i=start; i<end; i++)
+ long len = rsvpList.size();
+ long end = start + len;
+
+ for (long i = start; i < end; i++)
{
- int pos = (int)(i % len);
-
+ int pos = (int) (i % len);
+
RsvpInfo info = rsvpList.get(pos);
-
- if(info == null)
+
+ if (info == null)
continue;
-
- //clear expired ids
- if(info.ttl < System.currentTimeMillis())
+
+ // clear expired ids
+ if (info.ttl < System.currentTimeMillis())
{
rsvpList.set(pos, null);
+ continue;
}
+
+ //We can only increment our token
+ if(info.token.equals(getToken()))
+ {
+ int nextId = info.currentId.incrementAndGet();
- int nextId = info.currentId.incrementAndGet();
-
- if(nextId <= info.endId)
- {
- return (long)(maxDocsPerShard * info.node.shard) + nextId;
+
+ //logger.info(info.token+" "+info.shard+" "+info.currentId.get());
+
+ if (nextId <= info.endId)
+ {
+ return (long) (maxDocsPerShard * info.shard) + nextId;
+ }
+ else
+ {
+ rsvpList.set(pos, null);
+ }
}
- else
- {
- rsvpList.set(pos, null);
- }
}
-
+
return null;
}
}
-
+
private class RsvpInfo
{
- public NodeInfo node;
+ public String token;
+ public Integer shard;
public AtomicInteger currentId;
public final int endId;
public final long ttl = System.currentTimeMillis() + (expirationTime * 1000);
- public RsvpInfo(int startId, int endId, NodeInfo node)
+ public RsvpInfo(int startId, int endId, int shard, String token)
{
currentId = new AtomicInteger(startId);
this.endId = endId;
- this.node = node;
+ this.token = token;
+ this.shard = shard;
+ }
+
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + getOuterType().hashCode();
+ result = prime * result + ((shard == null) ? 0 : shard.hashCode());
+ result = prime * result + ((token == null) ? 0 : token.hashCode());
+ return result;
+ }
+
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ RsvpInfo other = (RsvpInfo) obj;
+ if (!getOuterType().equals(other.getOuterType()))
+ return false;
+ if (shard == null)
+ {
+ if (other.shard != null)
+ return false;
+ }
+ else if (!shard.equals(other.shard))
+ return false;
+ if (token == null)
+ {
+ if (other.token != null)
+ return false;
+ }
+ else if (!token.equals(other.token))
+ return false;
+ return true;
+ }
+
+ private CassandraIndexManager getOuterType()
+ {
+ return CassandraIndexManager.this;
}
+
+
}
public CassandraIndexManager(int shardsAtOnce)
@@ -190,7 +290,7 @@ public CassandraIndexManager(int shardsAtOnce)
randomSeq = shuffle(randomSeq, r);
}
- private ShardInfo getShardInfo(String indexName, boolean force) throws IOException
+ private ShardInfo getShardInfo(String indexName, boolean force) throws IOException
{
ShardInfo shards = indexShards.get(indexName);
@@ -218,7 +318,10 @@ private ShardInfo getShardInfo(String indexName, boolean force) throws IOExcepti
List<Row> rows = CassandraUtils.robustRead(ConsistencyLevel.QUORUM, cmd);
shards = new ShardInfo(indexName);
- if (rows != null || !rows.isEmpty())
+ AllNodeRsvps allNodeRsvps = new AllNodeRsvps();
+
+
+ if (rows != null && !rows.isEmpty())
{
assert rows.size() == 1;
@@ -236,43 +339,87 @@ private ShardInfo getShardInfo(String indexName, boolean force) throws IOExcepti
{
String shardStr = ByteBufferUtil.string(c.name());
Integer shardNum = Integer.valueOf(shardStr);
-
+
assert c instanceof SuperColumn;
-
+
NodeInfo nodes = new NodeInfo(shardNum);
-
- for(IColumn subCol : c.getSubColumns())
+
+ for (IColumn subCol : c.getSubColumns())
{
-
+
String token = ByteBufferUtil.string(subCol.name());
+
+
AtomicInteger offset = new AtomicInteger(Integer.valueOf(ByteBufferUtil.string(subCol.value())));
-
-
+ int startSeqOffset = getRandomSequenceOffset(offset.get());
+
+ //Leave a mark at each shard so we track the offsets hit.
nodes.nodes.put(token, offset);
-
- shards.shards.put(shardNum, nodes);
+ shards.shards.put(shardNum, nodes);
+
+
+ //Load this reserve if there is more to go.
+ if(offset.get() < (maxDocsPerShard - 1))
+ {
+ int seqOffset = getRandomSequenceOffset(offset.get()+1);
+
+ if(startSeqOffset == seqOffset)
+ {
+ logger.info("Found reserved shard"+shardStr+"("+token+"):"+(offset.get()+1)+" TO " + (randomSeq[seqOffset]+reserveSlabSize));
+ allNodeRsvps.rsvpList.add(new RsvpInfo(offset.get()+1, (randomSeq[seqOffset]+reserveSlabSize), nodes.shard, token));
+ }
+ }
}
}
}
}
+ else
+ {
+ logger.info("No shard info found for :" + indexName);
+ }
if (currentShards == null)
{
- if (indexShards.putIfAbsent(indexName, shards) == null)
+ currentShards = indexShards.putIfAbsent(indexName, shards);
+
+ if (currentShards == null)
+ {
+ indexReserves.put(indexName, allNodeRsvps);
+
return shards;
+ }
}
else if (indexShards.replace(indexName, currentShards, shards))
{
logger.info(indexName + " has " + shards.shards.size() + " shards");
- return shards;
+
+ currentShards = shards;
+ }
+ else
+ {
+ //Merge together active and new
+ for (Map.Entry<Integer, NodeInfo> entry : shards.shards.entrySet())
+ {
+ currentShards.shards.put(entry.getKey(), entry.getValue());
+ }
}
- return indexShards.get(indexName);
-
+ AllNodeRsvps currentNodeRsvps = indexReserves.get(indexName);
+
+ for (RsvpInfo rsvp : allNodeRsvps.rsvpList)
+ {
+ if(!currentNodeRsvps.rsvpList.contains(rsvp))
+ {
+ currentNodeRsvps.rsvpList.add(rsvp);
+ }
+ }
+
+ return currentShards;
}
+ //TODO
public void deleteId(String indexName, long id)
{
@@ -387,9 +534,9 @@ public long getNextId(String indexName, String key, RowMutation[] rowMutations)
if (id == null)
throw new IllegalStateException(myToken + ": Unable to reserve an id");
- int shard = getShardFromDocId(id);
+ int shard = getShardFromDocId(id);
int shardedId = getShardedDocId(id);
-
+
ByteBuffer idCol = ByteBufferUtil.bytes(String.valueOf(shardedId));
ByteBuffer keyCol = ByteBuffer.wrap(key.getBytes("UTF-8"));
@@ -405,7 +552,6 @@ public long getNextId(String indexName, String key, RowMutation[] rowMutations)
ByteBuffer keyKey = CassandraUtils.hashKeyBytes((indexName + "~" + key).getBytes("UTF-8"),
CassandraUtils.delimeterBytes, "keys".getBytes("UTF-8"));
-
ByteBuffer idVal = ByteBuffer.wrap(id.toString().getBytes("UTF-8"));
RowMutation rm2 = new RowMutation(CassandraUtils.keySpace, keyKey);
@@ -446,32 +592,39 @@ public void resetCounter(String indexName) throws IOException
for (NodeInfo nodes : shards.shards.values())
{
for (String token : nodes.nodes.keySet())
- rms.add(updateNodeOffset(indexName, token, nodes.shard, randomSeq[0]));
+ rms.add(updateNodeOffset(indexName, token, nodes.shard, -1));
}
CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rms.toArray(new RowMutation[] {}));
}
- private Long nextReservedId(String indexName, NodeInfo[] shards, String myToken) throws IOException
+ private synchronized Long nextReservedId(String indexName, NodeInfo[] shards, String myToken) throws IOException
{
if (logger.isDebugEnabled())
logger.debug("in reserveIds for index " + indexName);
AllNodeRsvps currentRsvpd = indexReserves.get(indexName);
if (currentRsvpd != null)
- {
+ {
Long nextId = currentRsvpd.getNextId();
-
- if(nextId != null)
+
+ if (nextId != null)
return nextId;
-
+
if (logger.isDebugEnabled())
- logger.debug("need more ids for " + myToken);
+ logger.debug("need more ids for " + myToken);
}
AllNodeRsvps allNewRsvps = new AllNodeRsvps();
+ ShardInfo usedShardInfo = indexUsed.get(indexName);
+ if(usedShardInfo == null)
+ {
+ usedShardInfo = new ShardInfo(indexName);
+ indexUsed.put(indexName, usedShardInfo);
+ }
+
// Pick a new shard
for (NodeInfo node : shards)
{
@@ -485,6 +638,13 @@ private Long nextReservedId(String indexName, NodeInfo[] shards, String myToken)
// goto next offset marker (unless its the first or last)
int randomSequenceOffset = getRandomSequenceOffset(startingOffset);
+ NodeInfo usedNodeInfo = usedShardInfo.shards.get(node.shard);
+ if(usedNodeInfo == null)
+ {
+ usedNodeInfo = new NodeInfo(node.shard);
+ usedShardInfo.shards.put(node.shard, usedNodeInfo);
+ }
+
if (startingOffset != randomSeq[0])
{
if (randomSequenceOffset != (offsetSlots - 1))
@@ -497,23 +657,37 @@ private Long nextReservedId(String indexName, NodeInfo[] shards, String myToken)
}
}
- // logger.info(myToken+ " startingOffset = "+startingOffset+
- // ", nextOffset = "+nextOffset);
+ if (logger.isTraceEnabled())
+ logger.trace(myToken + " startingOffset = " + startingOffset + ", nextOffset = " + nextOffset);
-
-
- synchronized (node)
+ while(true)
{
- // First, make sure another thread didn't already do this work
- AllNodeRsvps possiblyNewRsvpd = indexReserves.get(indexName);
- if (possiblyNewRsvpd != currentRsvpd || startingOffset != offset.get())
+
+ //Avoid re-checking used slabs
+ if(usedNodeInfo != null)
{
- return possiblyNewRsvpd == null ? null : possiblyNewRsvpd.getNextId();
- }
+ if(usedNodeInfo.nodes.get(""+nextOffset) != null)
+ {
+ CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, updateNodeOffset(indexName, myToken, node.shard, nextOffset));
+
+ // try next offset
+ int seqOffset = getRandomSequenceOffset(nextOffset);
+ if( seqOffset < (offsetSlots - 1))
+ {
+ nextOffset = randomSeq[seqOffset+1];
+ continue;
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+
ByteBuffer key = CassandraUtils.hashKeyBytes((indexName + "~" + node.shard).getBytes("UTF-8"),
- CassandraUtils.delimeterBytes, "ids".getBytes("UTF-8"));
+ CassandraUtils.delimeterBytes, "rsvp".getBytes("UTF-8"));
// Write the reserves
RowMutation rm = new RowMutation(CassandraUtils.keySpace, key);
@@ -570,27 +744,22 @@ private Long nextReservedId(String indexName, NodeInfo[] shards, String myToken)
// See which ones we successfully reserved
for (IColumn c : supercol.getSubColumns())
{
-
+
// someone already took this id
if (!(c instanceof ExpiringColumn) && !(c instanceof DeletedColumn))
{
if (logger.isDebugEnabled())
- try
- {
- logger.debug(offset + " was taken by " + ByteBufferUtil.string(c.name()));
- }
- catch (CharacterCodingException e)
- {
-
- }
-
+ logger.debug(offset + " was taken by " + ByteBufferUtil.string(c.name()));
+
winningToken = null;
break;
}
// expired reservation
if (c.isMarkedForDelete())
+ {
continue;
+ }
if (c.timestamp() == minTtl && winningToken.compareTo(c.name()) <= 0)
{
@@ -601,7 +770,7 @@ private Long nextReservedId(String indexName, NodeInfo[] shards, String myToken)
{
minTtl = c.timestamp();
winningToken = c.name();
- }
+ }
}
String winningTokenStr;
@@ -615,73 +784,89 @@ private Long nextReservedId(String indexName, NodeInfo[] shards, String myToken)
}
// we won!
- if (winningToken != null && winningTokenStr.equals(myToken))
+ if (winningTokenStr.equals(myToken))
{
- int numReserved = nextOffset;
- for (int i = nextOffset; i == nextOffset || i % reserveSlabSize != 0; i++)
- {
- numReserved++;
- }
-
- allNewRsvps.rsvpList.add(new RsvpInfo(nextOffset, numReserved, node));
+ //Mark this as permanently taken
+ rm = new RowMutation(CassandraUtils.keySpace, key);
+
+ rm.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, id, ByteBuffer.wrap(myToken
+ .getBytes("UTF-8"))), off, System.currentTimeMillis());
+
+ CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
- logger.info("Reserved " + numReserved + " ids for " + myToken + " shard " + node.shard
- + " from slot " + getRandomSequenceOffset(nextOffset));
+ //Add to active rsvp list
+ allNewRsvps.rsvpList.add(new RsvpInfo(nextOffset, (nextOffset + reserveSlabSize - 1) , node.shard, myToken));
+
+ //if (logger.isTraceEnabled())
+ logger.info("Reserved " + reserveSlabSize + " ids for " + myToken + " shard " + node.shard
+ + " from slot " + getRandomSequenceOffset(nextOffset)+" "+nextOffset+" TO "+(nextOffset + reserveSlabSize - 1));
+
+ break;
}
else
{
- // we lost, try try again...
-
- // secial case, otherwise we never move on
- if (nextOffset == randomSeq[0])
- nextOffset += 1;
-
- // mark this offset as taken and move on
- updateNodeOffset(indexName, myToken, node.shard, nextOffset);
- continue;
+ //Mark this offset as taken.
+ CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, updateNodeOffset(indexName, myToken, node.shard, nextOffset));
+ usedNodeInfo.nodes.put(""+nextOffset, new AtomicInteger(1));
+
+ // we lost, try try again...
+ int seqOffset = getRandomSequenceOffset(nextOffset);
+ if( seqOffset < (offsetSlots - 1))
+ {
+ nextOffset = randomSeq[seqOffset+1];
+ }
+ else
+ {
+ break;
+ }
}
-
-
- if (logger.isDebugEnabled())
- logger.debug("offset for shard " + node.shard + " " + nextOffset);
}
}
-
// check that offset is the same as when we started
if (currentRsvpd == null)
{
if (indexReserves.putIfAbsent(indexName, allNewRsvps) != null)
{
- logger.info("reserves changed, using those instead");
+ //if (logger.isTraceEnabled())
+ logger.info("reserves changed, using those instead");
+
allNewRsvps = indexReserves.get(indexName);
}
}
else
{
if (!indexReserves.replace(indexName, currentRsvpd, allNewRsvps))
{
- logger.info("already reserved by someone else, using those");
+ if (logger.isTraceEnabled())
+ logger.info("already reserved by someone else, using those");
+
return indexReserves.get(indexName).getNextId();
}
}
- if (logger.isDebugEnabled())
- logger.debug("Reserved " + allNewRsvps.rsvpList.size() + " ids for " + myToken);
+ if (logger.isTraceEnabled())
+ logger.trace("Reserved " + allNewRsvps.rsvpList.size() + " shards for " + myToken);
+
+
return allNewRsvps.getNextId();
}
private int getRandomSequenceOffset(int offset)
{
+
+ if(offset < 0)
+ return -1;
+
if (offset >= CassandraIndexManager.maxDocsPerShard)
- throw new IllegalArgumentException("offset can not be > " + CassandraIndexManager.maxDocsPerShard);
+ throw new IllegalArgumentException("offset can not be >= " + CassandraIndexManager.maxDocsPerShard);
for (int randomSeqOffset = 0; randomSeqOffset < randomSeq.length; randomSeqOffset++)
{
int randomSequenceStart = randomSeq[randomSeqOffset];
- if (randomSequenceStart <= offset && offset < randomSequenceStart + reserveSlabSize)
+ if (offset >= randomSequenceStart && offset < randomSequenceStart + reserveSlabSize)
return randomSeqOffset;
}
@@ -701,71 +886,57 @@ private int getRandomSequenceOffset(int offset)
assert shards != null;
- synchronized (shards)
- {
- String myToken = getToken();
-
- NodeInfo[] picked = new NodeInfo[shardsAtOnce];
-
- int maxShard = -1;
- int pickedShard = 0;
-
- for (Map.Entry<Integer, NodeInfo> shard : shards.shards.entrySet())
- {
- NodeInfo nodes = shard.getValue();
-
- AtomicInteger offset = nodes.nodes.get(myToken);
-
- // new shard for this node
- if (offset == null)
- {
- // this means shard was started by another node
- offset = new AtomicInteger(randomSeq[0]);
-
- logger.info("shard started by another node initializing with " + randomSeq[0]);
-
- RowMutation rm = updateNodeOffset(shards.indexName, myToken, nodes.shard, offset.get());
- CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
- }
+ String myToken = getToken();
- int randomSeqOffset = getRandomSequenceOffset(offset.get());
+ NodeInfo[] picked = new NodeInfo[shardsAtOnce];
- if (logger.isDebugEnabled())
- logger.info(myToken + ": shard = " + shard.getKey() + ", offset = " + offset.get()
- + ", offsetLookup = " + randomSeqOffset + ", offsetSlots = " + offsetSlots);
+ int pickedShard = 0;
+
+
+ for (Map.Entry<Integer, NodeInfo> shard : shards.shards.entrySet())
+ {
+ NodeInfo nodes = shard.getValue();
- // can we still use this shard (other nodes havent gobbled up
- // the ids)?
- // if(randomSeqOffset+1 == offsetSlots)
- // logger.info(myToken+": shard = "+shard.getKey()+", offset = "+offset.get()+", offsetLookup = "+randomSeqOffset+", offsetSlots = "+offsetSlots);
+
+ AtomicInteger offset = nodes.nodes.get(myToken);
- if (randomSeqOffset + 1 < offsetSlots)
- {
- picked[pickedShard] = nodes;
- pickedShard++;
- if (pickedShard >= shardsAtOnce)
- return picked;
- }
+ // skip shards we don't know about.
+ if (offset == null)
+ {
+ continue;
+ }
- if (shard.getKey() > maxShard)
- maxShard = shard.getKey();
+ int randomSeqOffset = getRandomSequenceOffset(offset.get());
- }
+ if (logger.isTraceEnabled())
+ logger.trace(myToken + ": shard = " + shard.getKey() + ", offset = " + offset.get()
+ + ", offsetLookup = " + randomSeqOffset + ", offsetSlots = " + offsetSlots);
- // new shards
- for (int i = pickedShard; i < shardsAtOnce; i++)
+
+ if (randomSeqOffset != (offsetSlots-1))
{
- picked[i] = addNewShard(shards.indexName);
+ picked[pickedShard] = nodes;
+ pickedShard++;
+ if (pickedShard >= shardsAtOnce)
+ return picked;
}
- return picked;
}
+
+ // new shards
+ for (int i = pickedShard; i < shardsAtOnce; i++)
+ {
+ picked[i] = addNewShard(shards.indexName);
+ }
+
+ return picked;
+
}
private NodeInfo addNewShard(String indexName) throws IOException
{
ShardInfo shards = getShardInfo(indexName, false);
-
+
// get max shard
Integer maxShard = -1;
@@ -775,15 +946,12 @@ private NodeInfo addNewShard(String indexName) throws IOException
Integer currentOffset = null;
for (Map.Entry<String, AtomicInteger> e1 : max.getValue().nodes.entrySet())
- {
- if (e1.getValue().get() > 0)
- {
- currentOffset = e1.getValue().get();
- break;
- }
+ {
+ currentOffset = e1.getValue().get();
+ break;
}
- if (currentOffset != null && currentOffset > 0)
+ if (currentOffset != null)
{
maxShard = max.getKey();
}
@@ -794,35 +962,37 @@ private NodeInfo addNewShard(String indexName) throws IOException
NodeInfo dupNodes = null;
if ((dupNodes = shards.shards.putIfAbsent(nodes.shard, nodes)) == null)
{
- logger.info("added new shard for " + indexName + " " + nodes.shard + " with offset " + randomSeq[0]);
+ logger.info("added new shard for " + indexName + "("+getToken()+") " + nodes.shard);
- RowMutation rm = updateNodeOffset(indexName, getToken(), nodes.shard, randomSeq[0]); // offset 0
-
- CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
+ RowMutation rm = updateNodeOffset(indexName, getToken(), nodes.shard, -1);
+
+ CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
}
-
+
+
return dupNodes == null ? nodes : dupNodes;
}
private RowMutation updateNodeOffset(String indexName, String myToken, Integer shard, Integer offset)
throws IOException
{
+
// Update last offset info for this shard
ByteBuffer shardKey = CassandraUtils.hashKeyBytes(indexName.getBytes("UTF-8"), CassandraUtils.delimeterBytes,
"shards".getBytes("UTF-8"));
RowMutation rm = new RowMutation(CassandraUtils.keySpace, shardKey);
- rm.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, ByteBuffer.wrap(String.valueOf(shard)
- .getBytes("UTF-8")), ByteBuffer.wrap(myToken.getBytes("UTF-8"))), ByteBuffer.wrap(String
- .valueOf(offset).getBytes("UTF-8")), System.currentTimeMillis());
+ rm.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, ByteBuffer.wrap(String.valueOf(shard).getBytes(
+ "UTF-8")), ByteBuffer.wrap(myToken.getBytes("UTF-8"))), ByteBuffer.wrap(String.valueOf(offset)
+ .getBytes("UTF-8")), System.currentTimeMillis());
// update locally
- ShardInfo si = getShardInfo(indexName, false);
+ ShardInfo si = getShardInfo(indexName, false);
AtomicInteger o = null;
- NodeInfo n = si.shards.get(shard);
-
- if(!indexName.contains("~"))
- {
+ NodeInfo n = si.shards.get(shard);
+
+ if (!indexName.contains("~"))
+ {
if (n != null)
o = n.nodes.get(myToken);
else
@@ -833,15 +1003,14 @@ private RowMutation updateNodeOffset(String indexName, String myToken, Integer s
else
o.set(offset);
- if (logger.isDebugEnabled())
- logger.debug("updated node offset for " + indexName + "(" + shard + ")(" + myToken + ") to " + offset);
+ // if (logger.isDebugEnabled())
+ //logger.info("updated node offset for " + indexName + "(" + shard + ")(" + myToken + ") to " + offset);
}
else
{
- throw new RuntimeException("inner shard offset update attempt: "+indexName);
+ throw new RuntimeException("inner shard offset update attempt: " + indexName);
}
-
-
+
return rm;
}
View
33 src/solandra/JettySolandraRunner.java
@@ -19,6 +19,7 @@
*/
package solandra;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
@@ -33,6 +34,8 @@
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.FilterHolder;
+import org.mortbay.xml.XmlConfiguration;
+import org.xml.sax.SAXException;
public class JettySolandraRunner
@@ -52,10 +55,36 @@ public JettySolandraRunner( String context, int port, String solrConfigFilename
dispatchFilter.setInitParameter("solrconfig-filename", solrConfigFilename);
}
- private void init( String context, int port )
+ private void init( String context, int port )
{
this.context = context;
- server = new Server( port );
+ server = new Server( port );
+
+ InputStream configStream = JettySolandraRunner.class.getResourceAsStream("jetty.xml");
+
+ if(configStream != null)
+ {
+ XmlConfiguration configuration;
+ try
+ {
+ configuration = new XmlConfiguration(configStream);
+ configuration.configure(server);
+ }
+ catch (SAXException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+
server.setStopAtShutdown( true );
// Initialize the servlets
View
22 src/solandra/SolandraIndexWriter.java
@@ -449,26 +449,8 @@ public void deleteByQuery(DeleteUpdateCommand cmd) throws IOException
for(String subIndex : localShards)
{
Query q = QueryParsing.parseQuery(cmd.query, schema);
- TopDocs results = writer.deleteDocuments(subIndex, q, false);
-
- // Also delete the id lookup
- ByteBuffer idKey = CassandraUtils.hashKeyBytes(subIndex.getBytes("UTF-8"),
- CassandraUtils.delimeterBytes, "ids".getBytes("UTF-8"));
-
- RowMutation rm = new RowMutation(CassandraUtils.keySpace, idKey);
-
- for (ScoreDoc doc : results.scoreDocs)
- {
- //Scale the doc ID to the sharded id.
- // int shard = Integer.valueOf(subIndex.substring(subIndex.lastIndexOf('~')+1));
- ByteBuffer id = ByteBufferUtil.bytes(String.valueOf(doc.doc));// + (CassandraIndexManager.maxDocsPerShard * shard))));
- rm.delete(new QueryPath(CassandraUtils.schemaInfoColumnFamily, id), System.currentTimeMillis());
- }
-
- CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
-
-
-
+ long total = writer.deleteDocuments(subIndex, q, true);
+ logger.info("Deleted "+ total + " Documents");
}
madeIt = true;
View
55 test/lucandra/cluster/IndexManagerTests.java
@@ -30,14 +30,17 @@
import lucandra.CassandraUtils;
import lucandra.dht.RandomPartitioner;
-import org.apache.log4j.Logger;
import org.junit.BeforeClass;
import org.junit.Test;
+
public class IndexManagerTests
{
- private static final Logger logger = Logger.getLogger(IndexManagerTests.class);
static String indexName = String.valueOf(System.nanoTime());
+ static
+ {
+ // Logger.getLogger(CassandraIndexManager.class).setLevel(Level.TRACE);
+ }
private class TestCassandraIndexManager extends CassandraIndexManager
{
@@ -57,7 +60,7 @@ public String getToken()
@BeforeClass
public static void setUpBeforeClass()
- {
+ {
// start cassandra
try
{
@@ -69,7 +72,7 @@ public static void setUpBeforeClass()
}
}
- // @Test
+ //@Test
public void testCassandraIncrement3()
{
@@ -108,10 +111,10 @@ public void testCassandraIncrement3()
throw new RuntimeException(e);
}
- if (j % 100 == 0)
+ if (j % 500 == 0)
{
long endTime = System.currentTimeMillis();
- logger.info(Thread.currentThread().getName() + " id:" + id + ", 100 iterations in "
+ System.err.println(Thread.currentThread().getName() + " id:" + id + ", 500 iterations in "
+ (endTime - startTime) / 1000 + " sec");
startTime = endTime;
}
@@ -160,7 +163,7 @@ public void testCassandraIncrement3()
}
- // @Test
+ @Test
public void testCassandraIncrement() throws IOException
{
@@ -179,7 +182,7 @@ public void testCassandraIncrement() throws IOException
assertNotNull(id);
- // logger.info(CassandraIndexManager.getShardFromDocId(id));
+ // System.err.println(CassandraIndexManager.getShardFromDocId(id));
AtomicInteger counter = shardStats.get(CassandraIndexManager.getShardFromDocId(id));
if (counter == null)
{
@@ -193,13 +196,13 @@ public void testCassandraIncrement() throws IOException
if (i % 10000 == 0)
{
long endTime = System.currentTimeMillis();
- logger.info("added:" + id + ", 10k iterations in " + (endTime - startTime) / 1000 + " sec "
+ System.err.println("added:" + id + ", 10k iterations in " + (endTime - startTime) / 1000 + " sec "
+ shardStats);
startTime = endTime;
}
}
- assertEquals(7, CassandraIndexManager.getShardFromDocId(idx.getMaxId(indexName)));
+ assertEquals(3, CassandraIndexManager.getShardFromDocId(idx.getMaxId(indexName)));
// Update
for (int i = 0; i < CassandraIndexManager.maxDocsPerShard * 2; i++)
@@ -211,7 +214,7 @@ public void testCassandraIncrement() throws IOException
if (i % 10000 == 0)
{
long endTime = System.currentTimeMillis();
- logger.info("updated:" + id + ", 10k iterations in " + (endTime - startTime) / 1000 + " sec");
+ System.err.println("updated:" + id + ", 10k iterations in " + (endTime - startTime) / 1000 + " sec");
startTime = endTime;
}
@@ -226,16 +229,19 @@ public void testCassandraIncrement2() throws Exception
ExecutorService svc = Executors.newFixedThreadPool(16);
- final TestCassandraIndexManager idx = new TestCassandraIndexManager(4);
List<Callable<Set<Long>>> callables = new ArrayList<Callable<Set<Long>>>();
- for (int i = 0; i < 16; i++)
+ for (int i = 0; i < 3; i++)
{
+ final int iidx = i;
+
Callable<Set<Long>> r = new Callable<Set<Long>>() {
public Set<Long> call()
{
+ TestCassandraIndexManager idx = new TestCassandraIndexManager(4);
+
long startTime = System.currentTimeMillis();
Set<Long> all = new HashSet<Long>(CassandraIndexManager.maxDocsPerShard);
@@ -245,17 +251,22 @@ public void testCassandraIncrement2() throws Exception
Long id = null;
try
{
- id = idx.getNextId(indexName, "i" + i);
+ id = idx.getNextId(indexName, "i" + i+"_"+iidx);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
- assertTrue(id + " already exists " + all.size(), all.add(id));
+ assertTrue(id + " already exists " + all.size() +" shard="+CassandraIndexManager.getShardFromDocId(id)+" id="+CassandraIndexManager.getShardedDocId(id), all.add(id));
- if (i % 10000 == 0)
+ if (i > 0 && i % 10000 == 0)
{
+ long endTime = System.currentTimeMillis();
+ System.err.println(Thread.currentThread().getName() + " id:" + id + ", 10k iterations in "
+ + (endTime - startTime) / 1000 + " sec");
+ startTime = endTime;
+
if (i < 20000)
try
{
@@ -265,12 +276,7 @@ public void testCassandraIncrement2() throws Exception
{
// TODO Auto-generated catch block
e.printStackTrace();
- }
-
- long endTime = System.currentTimeMillis();
- logger.info(Thread.currentThread().getName() + " id:" + id + ", 10k iterations in "
- + (endTime - startTime) / 1000 + " sec");
- startTime = endTime;
+ }
}
}
@@ -294,8 +300,9 @@ public void testCassandraIncrement2() throws Exception
{
if (!all.add(id))
{
- logger.error(id + " already exists " + all.size());
+ System.err.println(id + " already exists " + all.size()+" shard="+CassandraIndexManager.getShardFromDocId(id)+" id="+CassandraIndexManager.getShardedDocId(id));
hasError = true;
+
}
}
}
@@ -309,7 +316,7 @@ public void testCassandraIncrement2() throws Exception
}
- @Test
+ // @Test
public void testCustomRandomPartitioner()
{
String[] keys = new String[] { "0", "83316744970572273156255124564039073023",

0 comments on commit 6b791b4

Please sign in to comment.
Something went wrong with that request. Please try again.