Permalink
Browse files

Streams Compression

patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-3015

git-svn-id: https://svn.apache.org/repos/asf/cassandra/trunk@1164689 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent 19b8a60 commit 8eb0a938938ca12448ee7944a802589d86b8b66f @xedin xedin committed Sep 2, 2011
View
@@ -58,6 +58,7 @@
* make the repair of a range repair all replica (CASSANDRA-2610)
* expose the ability to repair the first range (as returned by the
partitioner) of a node (CASSANDRA-2606)
+ * Streams Compression (CASSANDRA-3015)
0.8.5
* fix NPE when encryption_options is unspecified (CASSANDRA-3007)
View
Binary file not shown.
@@ -0,0 +1,11 @@
+Copyright 2009-2010 Ning, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not
+use this file except in compliance with the License. You may obtain a copy of
+the License at http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations under
+the License.
@@ -22,6 +22,7 @@
import java.io.*;
+import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.cassandra.gms.Gossiper;
@@ -78,7 +79,7 @@ public void run()
else
{
// streaming connections are per-session and have a fixed version. we can't do anything with a new-version stream connection, so drop it.
- logger.error("Received untranslated stream from newer protcol version. Terminating connection!");
+ logger.error("Received untranslated stream from newer protocol version. Terminating connection!");
}
// We are done with this connection....
return;
@@ -18,9 +18,9 @@
package org.apache.cassandra.streaming;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -40,6 +40,8 @@
import org.apache.cassandra.utils.Throttle;
import org.apache.cassandra.utils.WrappedRunnable;
+import com.ning.compress.lzf.LZFOutputStream;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +60,7 @@
// communication socket
private Socket socket;
// socket's output stream
- private DataOutputStream output;
+ private OutputStream output;
// system encryption options if any
private final EncryptionOptions encryptionOptions;
// allocate buffer to use for transfers only once
@@ -119,7 +121,7 @@ public void runMayThrow() throws IOException
private void stream() throws IOException
{
ByteBuffer HeaderBuffer = MessagingService.instance().constructStreamHeader(header, false, Gossiper.instance.getVersion(to));
- // write header
+ // write header (this should not be compressed for compatibility with other messages)
output.write(ByteBufferUtil.getArray(HeaderBuffer));
if (header.file == null)
@@ -129,6 +131,9 @@ private void stream() throws IOException
? CompressedRandomAccessReader.open(header.file.getFilename(), true)
: RandomAccessReader.open(new File(header.file.getFilename()), CHUNK_SIZE, true);
+ // setting up data compression stream
+ output = new LZFOutputStream(output);
+
try
{
// stream each of the required sections of the file
@@ -234,12 +239,12 @@ protected void bind() throws IOException
protected void connect() throws IOException
{
socket.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
- output = new DataOutputStream(socket.getOutputStream());
+ output = socket.getOutputStream();
}
protected void close() throws IOException
{
- socket.close();
+ output.close();
}
public String toString()
@@ -23,9 +23,6 @@
import java.net.Socket;
import java.util.Collections;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -41,6 +38,11 @@
import org.apache.cassandra.utils.BytesReadTracker;
import org.apache.cassandra.utils.Pair;
+import com.ning.compress.lzf.LZFInputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class IncomingStreamReader
{
private static final Logger logger = LoggerFactory.getLogger(IncomingStreamReader.class);
@@ -79,7 +81,7 @@ public void read() throws IOException
assert remoteFile.estimatedKeys > 0;
SSTableReader reader = null;
logger.debug("Estimated keys {}", remoteFile.estimatedKeys);
- DataInputStream dis = new DataInputStream(socket.getInputStream());
+ DataInputStream dis = new DataInputStream(new LZFInputStream(socket.getInputStream()));
try
{
reader = streamIn(dis, localFile, remoteFile);

0 comments on commit 8eb0a93

Please sign in to comment.