Storing large objects in cassandra has to be done carefully since it can cause excessive heap pressure and hot spots. Astyanax provides utility classes that address this issues by splitting up large objects into multiple keys and handles fetching them in random order to reduce hot spots.
The column family is expected to have UTF8Type for both the key and comparator with BytesType for the default_validation_class. The following CLI command can be used,
CREATE COLUMN FAMILY storage WITH comparator = UTF8Type AND key_validation_class=UTF8Type
First of all, add dependency com.netflix.astyanax:astyanax-recipes.
Before calling any of the read/write APIs you must first create a provider. A basic cassandra chunked provider is provided with Astyanax. You can write you own if you’d like to customize it further.
ChunkedStorageProvider provider = new CassandraChunkedStorageProvider( keyspace, "data_column_family_name");
The ObjectWriter will break up the file into chunks and push them to cassandra from multiple threads.
ObjectMetadata meta = ChunkedStorage.newWriter(provider, objName, someInputStream) .withChunkSize(0x1000) // Optional chunk size to override // the default for this provider .withConcurrencyLevel(8) // Optional. Upload chunks in 8 threads .withTtl(60) // Optional TTL for the entire object .call();
The file is read directly into an OutputStream. The ObjectReader handles parallelizing and randomizing the requests in batches.
// For this example we create a byte array output stream, which requires us to first read // the object size. You don't need to do this if you are reading into a FileOutputStream ObjectMetadata meta = ChunkedStorage.newInfoReader(provider, objName).call(); ByteArrayOutputStream os = new ByteArrayOutputStream(meta.getObjectSize().intValue()); // Read the file meta = ChunkedStorage.newReader(provider, objName, os) .withBatchSize(11) // Randomize fetching blocks within a batch. .withRetryPolicy(new ExponentialBackoffWithRetry(250,20)) // Retry policy for when a chunk isn't available. // This helps implement retries in a cross region // setup where replication may be slow .withConcurrencyLevel(2) // Download chunks in 2 threads. Be careful here. // Too many client + too many thread = Cassandra not happy .call();
Use this to determine the object size when creating a ByteArrayInputStream.
ObjectMetadata meta = ChunkedStorage.newInfoReader(provider, objName).call(); int objectSize = meta.getObjectSize();
Last edited by Dmitry Rzhevskiy,