Permalink
Browse files

consolidate partition behavior in IPartitioner, so creating a new par…

…titioner should be only a matter of implementing that interface. all the external switch statements on PartitionerType have been folded into that.

        SSTable is now the only part of the code that cares about the distinction between a 'raw' key and a 'decorated' key.  variables in that class have been named clientKey or decoratedKey to show which is which.  others don't care either because they only deal with decorated keys (SequenceFile, FileStruct) or only with client keys (everyone else).

patch by jbellis; reviewed by Jun Rao for #58
  • Loading branch information...
1 parent 309cb50 commit 19926e832cfd0d7c9ac62b1a071df75072e43ecc Jonathan Ellis committed Apr 7, 2009
@@ -20,7 +20,6 @@
import java.io.File;
import java.io.IOException;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -46,7 +45,6 @@
import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.io.SequenceFile;
import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.PartitionerType;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.FileUtils;
@@ -445,15 +443,14 @@ public ColumnFamily getColumnFamily(String key, String columnFamilyColumn, IFilt
*/
List<ColumnFamily> getColumnFamilies(String key, String columnFamilyColumn, IFilter filter) throws IOException
{
- List<ColumnFamily> columnFamilies1 = new ArrayList<ColumnFamily>();
+ List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
/* Get the ColumnFamily from Memtable */
- getColumnFamilyFromCurrentMemtable(key, columnFamilyColumn, filter, columnFamilies1);
- if (columnFamilies1.size() == 0 || !filter.isDone())
+ getColumnFamilyFromCurrentMemtable(key, columnFamilyColumn, filter, columnFamilies);
+ if (columnFamilies.size() == 0 || !filter.isDone())
{
/* Check if MemtableManager has any historical information */
- MemtableManager.instance().getColumnFamily(key, columnFamily_, columnFamilyColumn, filter, columnFamilies1);
+ MemtableManager.instance().getColumnFamily(key, columnFamily_, columnFamilyColumn, filter, columnFamilies);
}
- List<ColumnFamily> columnFamilies = columnFamilies1;
if (columnFamilies.size() == 0 || !filter.isDone())
{
long start = System.currentTimeMillis();
@@ -1195,39 +1192,10 @@ boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint ta
+ totalBytesWritten + " Total keys read ..." + totalkeysRead);
return result;
}
-
- private void doWrite(SSTable ssTable, String key, DataOutputBuffer bufOut) throws IOException
- {
- PartitionerType pType = StorageService.getPartitionerType();
- switch ( pType )
- {
- case OPHF:
- ssTable.append(key, bufOut);
- break;
-
- default:
- String[] peices = key.split(":");
- key = peices[1];
- BigInteger hash = new BigInteger(peices[0]);
- ssTable.append(key, hash, bufOut);
- break;
- }
- }
-
- private void doFill(BloomFilter bf, String key)
+
+ private void doFill(BloomFilter bf, String decoratedKey)
{
- PartitionerType pType = StorageService.getPartitionerType();
- switch ( pType )
- {
- case OPHF:
- bf.fill(key);
- break;
-
- default:
- String[] peices = key.split(":");
- bf.fill(peices[1]);
- break;
- }
+ bf.fill(StorageService.getPartitioner().undecorateKey(decoratedKey));
}
/*
@@ -1348,11 +1316,10 @@ void doFileCompaction(List<String> files, int minBufferSize)
if ( ssTable == null )
{
- PartitionerType pType = StorageService.getPartitionerType();
- ssTable = new SSTable(compactionFileLocation, mergedFileName, pType);
+ ssTable = new SSTable(compactionFileLocation, mergedFileName);
}
- doWrite(ssTable, lastkey, bufOut);
-
+ ssTable.append(lastkey, bufOut);
+
/* Fill the bloom filter with the key */
doFill(compactedBloomFilter, lastkey);
totalkeysWritten++;
@@ -19,22 +19,19 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.math.BigInteger;
-import org.apache.cassandra.continuations.Suspendable;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.IFileReader;
import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.io.SequenceFile;
-import org.apache.cassandra.service.PartitionerType;
import org.apache.cassandra.service.StorageService;
public class FileStruct implements Comparable<FileStruct>
{
IFileReader reader_;
- String key_;
+ String key_; // decorated!
DataInputBuffer bufIn_;
DataOutputBuffer bufOut_;
@@ -54,22 +51,7 @@ public FileStruct(String file, int bufSize) throws IOException
public String getKey()
{
- String key = key_;
- if ( !key.equals(SSTable.blockIndexKey_) )
- {
- PartitionerType pType = StorageService.getPartitionerType();
- switch ( pType )
- {
- case OPHF:
- break;
-
- default:
- String[] peices = key.split(":");
- key = peices[1];
- break;
- }
- }
- return key;
+ return key_;
}
public DataOutputBuffer getBuffer()
@@ -116,23 +98,7 @@ public long advance() throws IOException
public int compareTo(FileStruct f)
{
- int value = 0;
- PartitionerType pType = StorageService.getPartitionerType();
- switch( pType )
- {
- case OPHF:
- value = key_.compareTo(f.key_);
- break;
-
- default:
- String lhs = key_.split(":")[0];
- BigInteger b = new BigInteger(lhs);
- String rhs = f.key_.split(":")[0];
- BigInteger b2 = new BigInteger(rhs);
- value = b.compareTo(b2);
- break;
- }
- return value;
+ return StorageService.getPartitioner().getDecoratedKeyComparator().compare(key_, f.key_);
}
public void close() throws IOException
@@ -26,6 +26,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.Comparator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -43,10 +45,10 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.service.PartitionerType;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.service.IPartitioner;
+import org.apache.cassandra.service.StorageService;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -393,57 +395,25 @@ void flush(CommitLog.CommitLogContext cLogCtx) throws IOException
return;
}
- PartitionerType pType = StorageService.getPartitionerType();
String directory = DatabaseDescriptor.getDataFileLocation();
String filename = cfStore.getNextFileName();
- SSTable ssTable = new SSTable(directory, filename, pType);
- switch (pType)
- {
- case OPHF:
- flushForOrderPreservingPartitioner(ssTable, cfStore, cLogCtx);
- break;
+ SSTable ssTable = new SSTable(directory, filename);
- default:
- flushForRandomPartitioner(ssTable, cfStore, cLogCtx);
- break;
- }
- }
-
- private void flushForRandomPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
- {
- /* List of primary keys in sorted order */
- List<PrimaryKey> pKeys = PrimaryKey.create( columnFamilies_.keySet() );
- DataOutputBuffer buffer = new DataOutputBuffer();
- /* Use this BloomFilter to decide if a key exists in a SSTable */
- BloomFilter bf = new BloomFilter(pKeys.size(), 15);
- for ( PrimaryKey pKey : pKeys )
+ // sort keys in the order they would be in when decorated
+ final IPartitioner partitioner = StorageService.getPartitioner();
+ final Comparator<String> dc = partitioner.getDecoratedKeyComparator();
+ ArrayList<String> orderedKeys = new ArrayList<String>(columnFamilies_.keySet());
+ Collections.sort(orderedKeys, new Comparator<String>()
{
- buffer.reset();
- ColumnFamily columnFamily = columnFamilies_.get(pKey.key());
- if ( columnFamily != null )
+ public int compare(String o1, String o2)
{
- /* serialize the cf with column indexes */
- ColumnFamily.serializerWithIndexes().serialize( columnFamily, buffer );
- /* Now write the key and value to disk */
- ssTable.append(pKey.key(), pKey.hash(), buffer);
- bf.fill(pKey.key());
- columnFamily.clear();
+ return dc.compare(partitioner.decorateKey(o1), partitioner.decorateKey(o2));
}
- }
- ssTable.close(bf);
- cfStore.onMemtableFlush(cLogCtx);
- cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
- buffer.close();
- }
-
- private void flushForOrderPreservingPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
- {
- List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
- Collections.sort(keys);
+ });
DataOutputBuffer buffer = new DataOutputBuffer();
/* Use this BloomFilter to decide if a key exists in a SSTable */
- BloomFilter bf = new BloomFilter(keys.size(), 15);
- for ( String key : keys )
+ BloomFilter bf = new BloomFilter(columnFamilies_.size(), 15);
+ for (String key : orderedKeys)
{
buffer.reset();
ColumnFamily columnFamily = columnFamilies_.get(key);
@@ -452,7 +422,7 @@ private void flushForOrderPreservingPartitioner(SSTable ssTable, ColumnFamilySto
/* serialize the cf with column indexes */
ColumnFamily.serializerWithIndexes().serialize( columnFamily, buffer );
/* Now write the key and value to disk */
- ssTable.append(key, buffer);
+ ssTable.append(partitioner.decorateKey(key), buffer);
bf.fill(key);
columnFamily.clear();
}
@@ -461,5 +431,8 @@ private void flushForOrderPreservingPartitioner(SSTable ssTable, ColumnFamilySto
cfStore.onMemtableFlush(cLogCtx);
cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
buffer.close();
+
+ columnFamilies_.clear();
}
+
}
Oops, something went wrong.

0 comments on commit 19926e8

Please sign in to comment.