Skip to content

Commit

Permalink
Merge branch 'solandra' of git://github.com/tjake/Solandra into solandra
Browse files Browse the repository at this point in the history
Conflicts:
	src/lucandra/cluster/CassandraIndexManager.java
  • Loading branch information
ceocoder committed May 4, 2011
2 parents 4a65a7c + c27c46d commit 1ac0fca
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 113 deletions.
63 changes: 24 additions & 39 deletions src/lucandra/cluster/CassandraIndexManager.java
Expand Up @@ -67,7 +67,7 @@ public class CassandraIndexManager
"16384"));

private final int offsetSlots = maxDocsPerShard / reserveSlabSize;
private final int expirationTime = 120; // seconds
public final int expirationTime = 120; // seconds

private final ConcurrentMap<String, AllNodeRsvps> indexReserves = new MapMaker().makeMap();

Expand Down Expand Up @@ -213,7 +213,7 @@ private ShardInfo getShardInfo(String indexName, boolean force) throws IOExcepti

ReadCommand cmd = new SliceFromReadCommand(CassandraUtils.keySpace, key, new ColumnParent(
CassandraUtils.schemaInfoColumnFamily), ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 100);
ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE);

List<Row> rows = CassandraUtils.robustRead(ConsistencyLevel.QUORUM, cmd);

Expand All @@ -236,39 +236,21 @@ private ShardInfo getShardInfo(String indexName, boolean force) throws IOExcepti
{
String shardStr = ByteBufferUtil.string(c.name());
Integer shardNum = Integer.valueOf(shardStr);

// goto each shard and get local offset
cmd = new SliceFromReadCommand(CassandraUtils.keySpace, CassandraUtils.hashKeyBytes((indexName
).getBytes("UTF-8"), CassandraUtils.delimeterBytes, "shards"
.getBytes("UTF-8")), new ColumnParent(CassandraUtils.schemaInfoColumnFamily),
ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 100);

List<Row> lrows = CassandraUtils.robustRead(ConsistencyLevel.QUORUM, cmd);

if (lrows != null && !lrows.isEmpty())

assert c instanceof SuperColumn;

NodeInfo nodes = new NodeInfo(shardNum);

for(IColumn subCol : c.getSubColumns())
{
assert rows.size() == 1;

Row lrow = lrows.get(0);

if (lrow.cf != null && !lrow.cf.isMarkedForDelete())
{
for (IColumn lc : lrow.cf.getSortedColumns())
{
NodeInfo nodes = new NodeInfo(shardNum);

for (IColumn s : lc.getSubColumns())
{
String token = ByteBufferUtil.string(s.name());
AtomicInteger offset = new AtomicInteger(Integer.valueOf(ByteBufferUtil.string(s
.value())));

nodes.nodes.put(token, offset);
}

shards.shards.put(shardNum, nodes);
}
}

String token = ByteBufferUtil.string(subCol.name());
AtomicInteger offset = new AtomicInteger(Integer.valueOf(ByteBufferUtil.string(subCol.value())));


nodes.nodes.put(token, offset);

shards.shards.put(shardNum, nodes);
}
}
}
Expand Down Expand Up @@ -814,11 +796,8 @@ private NodeInfo addNewShard(String indexName) throws IOException
{
logger.info("added new shard for " + indexName + " " + nodes.shard + " with offset " + randomSeq[0]);

RowMutation rm = updateNodeOffset(indexName, getToken(), nodes.shard, randomSeq[0]); // offset
// 0
RowMutation rm2 = updateNodeOffset(indexName + "~" + nodes.shard, getToken(), nodes.shard, randomSeq[0]); // offset
// 0

RowMutation rm = updateNodeOffset(indexName, getToken(), nodes.shard, randomSeq[0]); // offset 0

CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
}

Expand Down Expand Up @@ -857,6 +836,12 @@ private RowMutation updateNodeOffset(String indexName, String myToken, Integer s
if (logger.isDebugEnabled())
logger.debug("updated node offset for " + indexName + "(" + shard + ")(" + myToken + ") to " + offset);
}
else
{
throw new RuntimeException("inner shard offset update attempt: "+indexName);
}


return rm;
}

Expand Down
5 changes: 1 addition & 4 deletions src/org/apache/lucene/search/LucandraFieldCache.java
Expand Up @@ -161,10 +161,7 @@ static final class StopFillCacheException extends RuntimeException
}

final static IndexReader.ReaderFinishedListener purgeReader = new IndexReader.ReaderFinishedListener() {
// @Override
// -- not
// until
// Java 1.6

public void finished(IndexReader reader)
{
FieldCache.DEFAULT.purge(reader);
Expand Down
130 changes: 60 additions & 70 deletions test/lucandra/cluster/IndexManagerTests.java
Expand Up @@ -19,9 +19,7 @@
*/
package lucandra.cluster;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -37,10 +35,7 @@

public class IndexManagerTests
{
static String indexName = String.valueOf(System.nanoTime());



static String indexName = String.valueOf(System.nanoTime());

private class TestCassandraIndexManager extends CassandraIndexManager
{
Expand All @@ -59,7 +54,7 @@ public String getToken()
}

@BeforeClass
public static void setUpBeforeClass()
public static void setUpBeforeClass()
{
// start cassandra
try
Expand All @@ -71,9 +66,8 @@ public static void setUpBeforeClass()
throw new RuntimeException(e);
}
}


@Test

// @Test
public void testCassandraIncrement3()
{

Expand All @@ -82,36 +76,40 @@ public void testCassandraIncrement3()
ExecutorService svc = Executors.newFixedThreadPool(16);

final TestCassandraIndexManager idx = new TestCassandraIndexManager(1);

List<Callable<Set<Long>>> callables = new ArrayList<Callable<Set<Long>>>();
for (int i = 0; i < 16; i++)
for (int i = 0; i < 1000; i++)
{

final int iname = i;

Callable<Set<Long>> r = new Callable<Set<Long>>() {

public Set<Long> call()
{

long startTime = System.currentTimeMillis();

Set<Long> all = new HashSet<Long>(CassandraIndexManager.maxDocsPerShard);
Set<Long> all = new HashSet<Long>(1000);

for (int i = 0; i < 10000; i++)
for (int j = 0; j < 1000; j++)
{
Long id = null;
try
{
id = idx.getNextId("i"+i, "i" + i);
id = idx.getNextId("i" + iname, "i" + j);

assertTrue(id + " already exists " + all.size(), all.add(id));
}
catch (IOException e)
{
throw new RuntimeException(e);
}


if (i % 10000 == 0)
if (j % 100 == 0)
{
long endTime = System.currentTimeMillis();
System.err.println(Thread.currentThread().getName() + " id:" + id + ", 10k iterations in "
System.err.println(Thread.currentThread().getName() + " id:" + id + ", 100 iterations in "
+ (endTime - startTime) / 1000 + " sec");
startTime = endTime;
}
Expand All @@ -129,12 +127,10 @@ public Set<Long> call()
{
List<Future<Set<Long>>> results = svc.invokeAll(callables);


for (Future<Set<Long>> result : results)
{
Set<Long> thread = result.get();


}
}
catch (InterruptedException e1)
Expand All @@ -161,9 +157,8 @@ public Set<Long> call()
}

}


@Test
// @Test
public void testCassandraIncrement() throws IOException
{

Expand All @@ -174,42 +169,43 @@ public void testCassandraIncrement() throws IOException
long startTime = System.currentTimeMillis();

Map<Integer, AtomicInteger> shardStats = new HashMap<Integer, AtomicInteger>();

// Add
for (int i = 0; i < CassandraIndexManager.maxDocsPerShard; i++)
for (int i = 0; i < CassandraIndexManager.maxDocsPerShard * 2; i++)
{
Long id = idx.getNextId(indexName, "i" + i);

assertNotNull(id);
//System.err.println(CassandraIndexManager.getShardFromDocId(id));

// System.err.println(CassandraIndexManager.getShardFromDocId(id));
AtomicInteger counter = shardStats.get(CassandraIndexManager.getShardFromDocId(id));
if(counter == null)
if (counter == null)
{
counter = new AtomicInteger(0);
shardStats.put(CassandraIndexManager.getShardFromDocId(id), counter);
}
counter.incrementAndGet();

assertTrue(id + " already exists " + all.size(), all.add(id));

if (i % 10000 == 0)
{
long endTime = System.currentTimeMillis();
System.err.println("added:" + id + ", 10k iterations in " + (endTime - startTime) / 1000 + " sec "+shardStats);
System.err.println("added:" + id + ", 10k iterations in " + (endTime - startTime) / 1000 + " sec "
+ shardStats);
startTime = endTime;
}
}

assertEquals(3, CassandraIndexManager.getShardFromDocId(idx.getMaxId(indexName)));
assertEquals(7, CassandraIndexManager.getShardFromDocId(idx.getMaxId(indexName)));

// Update
for (int i = 0; i < CassandraIndexManager.maxDocsPerShard; i++)
for (int i = 0; i < CassandraIndexManager.maxDocsPerShard * 2; i++)
{
Long id = idx.getId(indexName, "i" + i);

assertNotNull("i"+i, id);
assertNotNull("i" + i, id);

if (i % 10000 == 0)
{
long endTime = System.currentTimeMillis();
Expand All @@ -221,15 +217,15 @@ public void testCassandraIncrement() throws IOException
}

@Test
public void testCassandraIncrement2()
public void testCassandraIncrement2() throws Exception
{

indexName = String.valueOf(System.nanoTime());

ExecutorService svc = Executors.newFixedThreadPool(16);

final TestCassandraIndexManager idx = new TestCassandraIndexManager(4);

List<Callable<Set<Long>>> callables = new ArrayList<Callable<Set<Long>>>();
for (int i = 0; i < 16; i++)
{
Expand All @@ -253,11 +249,22 @@ public Set<Long> call()
{
throw new RuntimeException(e);
}

assertTrue(id + " already exists " + all.size(), all.add(id));

if (i % 10000 == 0)
{
if (i < 20000)
try
{
Thread.sleep(120 * 1000);
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}

long endTime = System.currentTimeMillis();
System.err.println(Thread.currentThread().getName() + " id:" + id + ", 10k iterations in "
+ (endTime - startTime) / 1000 + " sec");
Expand All @@ -273,47 +280,30 @@ public Set<Long> call()
callables.add(r);
}

try
{
List<Future<Set<Long>>> results = svc.invokeAll(callables);
List<Future<Set<Long>>> results = svc.invokeAll(callables);

Set<Long> all = new HashSet<Long>(CassandraIndexManager.maxDocsPerShard);
Set<Long> all = new HashSet<Long>(CassandraIndexManager.maxDocsPerShard);
boolean hasError = false;
for (Future<Set<Long>> result : results)
{
Set<Long> thread = result.get();

for (Future<Set<Long>> result : results)
for (Long id : thread)
{
Set<Long> thread = result.get();

for (Long id : thread)
if (!all.add(id))
{
if (!all.add(id))
{
System.err.println(id + " already exists " + all.size());
}
System.err.println(id + " already exists " + all.size());
hasError = true;
}
}
}
catch (InterruptedException e1)
{
// TODO Auto-generated catch block
e1.printStackTrace();
}
catch (ExecutionException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}

if(hasError)
fail("Found duplicate entries");

svc.shutdown();

try
{
svc.awaitTermination(10, TimeUnit.MINUTES);
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
svc.awaitTermination(10, TimeUnit.MINUTES);

}

Expand All @@ -338,5 +328,5 @@ public void testCustomRandomPartitioner()
assertEquals(rp.getToken(hashBuf).token.abs().toString(), key);
}
}

}

0 comments on commit 1ac0fca

Please sign in to comment.