Skip to content

Commit

Permalink
[CONJ-366] faster connection using pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Apr 4, 2017
1 parent b8a02fe commit 9e9cfe5
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 122 deletions.
Expand Up @@ -77,15 +77,15 @@ public class DecompressPacketInputStream implements PacketInputStream {
private int cachePos;
private int cacheEnd;

private InputStream inputStream;
private BufferedInputStream inputStream;

private int packetSeq;
private int compressPacketSeq;
private int maxQuerySizeToLog;
private int lastPacketLength;

public DecompressPacketInputStream(InputStream in, int maxQuerySizeToLog) {
inputStream = new BufferedInputStream(in, 64 * 1024);
public DecompressPacketInputStream(BufferedInputStream in, int maxQuerySizeToLog) {
inputStream = in;
this.maxQuerySizeToLog = maxQuerySizeToLog;
}

Expand Down
Expand Up @@ -71,7 +71,7 @@ public class StandardPacketInputStream implements PacketInputStream {
private byte[] header = new byte[4];
private byte[] reusableArray = new byte[REUSABLE_BUFFER_LENGTH];

private InputStream inputStream;
private BufferedInputStream inputStream;

private int packetSeq;
private int maxQuerySizeToLog;
Expand All @@ -89,6 +89,16 @@ public Buffer getPacket(boolean reUsable) throws IOException {
return new Buffer(getPacketArray(reUsable), lastPacketLength);
}

/**
* Get current Buffered input stream for creating compress input stream,
* to avoid losing already read bytes in case of pipelining.
*
* @return buffer input stream.
*/
public BufferedInputStream getBufferedInputStream() {
return inputStream;
}

/**
* Get next packet.
* If packet is more than 16M, read as many packet needed to finish packet.
Expand Down

0 comments on commit 9e9cfe5

Please sign in to comment.