Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -28,6 +30,7 @@
import com.clickhouse.client.stream.DeferredInputStream;
import com.clickhouse.client.stream.EmptyInputStream;
import com.clickhouse.client.stream.Lz4InputStream;
import com.clickhouse.client.stream.RestrictedInputStream;
import com.clickhouse.client.stream.IterableByteArrayInputStream;
import com.clickhouse.client.stream.IterableByteBufferInputStream;
import com.clickhouse.client.stream.IterableMultipleInputStream;
Expand Down Expand Up @@ -85,6 +88,29 @@ public static ClickHouseInputStream wrap(ClickHouseFile file, InputStream input,
return chInput;
}

/**
* Wraps the given input stream with length limitation. Please pay attention
* that calling close() method of the wrapper will never close the inner input
* stream.
*
* @param input non-null input stream
* @param bufferSize buffer size
* @param length maximum bytes can be read from the input
* @param postCloseAction custom action will be performed right after closing
* the wrapped input stream
* @return non-null wrapped input stream
*/
public static ClickHouseInputStream wrap(InputStream input, int bufferSize, long length, Runnable postCloseAction) {
if (input instanceof RestrictedInputStream) {
RestrictedInputStream ris = (RestrictedInputStream) input;
if (ris.getRemaining() == length) {
return ris;
}
}

return new RestrictedInputStream(null, input, bufferSize, length, postCloseAction);
}

/**
* Gets an empty input stream that produces nothing and cannot be closed.
*
Expand Down Expand Up @@ -505,15 +531,20 @@ public static File save(File file, InputStream in, int bufferSize, int timeout,
* Optional post close action.
*/
protected final Runnable postCloseAction;
/**
* User data shared between multiple calls.
*/
protected final Map<String, Object> userData;

protected volatile boolean closed;

protected boolean closed;
protected OutputStream copyTo;

protected ClickHouseInputStream(ClickHouseFile file, OutputStream copyTo, Runnable postCloseAction) {
this.byteBuffer = ClickHouseByteBuffer.newInstance();
this.file = file != null ? file : ClickHouseFile.NULL;
this.postCloseAction = postCloseAction;

this.userData = new HashMap<>();
this.closed = false;
this.copyTo = copyTo;
}
Expand Down Expand Up @@ -550,6 +581,37 @@ public ClickHouseFile getUnderlyingFile() {
return file;
}

/**
* Gets user data associated with this input stream.
*
* @param key key
* @return value, could be null
*/
public final Object getUserData(String key) {
return userData.get(key);
}

/**
* Removes user data.
*
* @param key key
* @return removed user data, could be null
*/
public final Object removeUserData(String key) {
return userData.remove(key);
}

/**
* Sets user data.
*
* @param key key
* @param value value
* @return overidded value, could be null
*/
public final Object setUserData(String key, Object value) {
return userData.put(key, value);
}

/**
* Peeks one byte. It's similar as {@link #read()} except it never changes
* cursor.
Expand Down Expand Up @@ -849,6 +911,8 @@ public boolean isClosed() {
public void close() throws IOException {
if (!closed) {
closed = true;
// clear user data if any
userData.clear();
// don't want to hold the last byte array reference for too long
byteBuffer.reset();
if (postCloseAction != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public static ClickHouseOutputStream of(OutputStream output, int bufferSize, Cli
protected final ClickHouseFile file;
protected final Runnable postCloseAction;

protected boolean closed;
protected volatile boolean closed;

protected ClickHouseOutputStream(ClickHouseFile file, Runnable postCloseAction) {
this.file = file != null ? file : ClickHouseFile.NULL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public static ClickHouseResponse of(ClickHouseResponse response, ClickHouseRecor
private final List<ClickHouseRecord> records;
private final ClickHouseResponseSummary summary;

private boolean isClosed;
private volatile boolean closed;

protected ClickHouseSimpleResponse(List<ClickHouseColumn> columns, List<ClickHouseRecord> records,
ClickHouseResponseSummary summary) {
Expand Down Expand Up @@ -169,11 +169,11 @@ public Iterable<ClickHouseRecord> records() {
@Override
public void close() {
// nothing to close
isClosed = true;
closed = true;
}

@Override
public boolean isClosed() {
return isClosed;
return closed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static ClickHouseResponse of(ClickHouseConfig config, ClickHouseInputStre
protected final List<ClickHouseColumn> columns;
protected final ClickHouseResponseSummary summary;

private boolean closed;
private volatile boolean closed;

protected ClickHouseStreamResponse(ClickHouseConfig config, ClickHouseInputStream input,
Map<String, Serializable> settings, List<ClickHouseColumn> columns, ClickHouseResponseSummary summary)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package com.clickhouse.client.stream;

import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;

import com.clickhouse.client.ClickHouseByteBuffer;
import com.clickhouse.client.ClickHouseChecker;
import com.clickhouse.client.ClickHouseDataUpdater;
import com.clickhouse.client.ClickHouseFile;
import com.clickhouse.client.ClickHouseOutputStream;
import com.clickhouse.client.ClickHouseUtils;
import com.clickhouse.client.config.ClickHouseClientOption;

/**
* Wrapper of {@link java.io.InputStream} with length limitation. Unlike
* {@link WrappedInputStream}, the inner input stream remains open after calling
* close().
*/
public class RestrictedInputStream extends AbstractByteArrayInputStream {
private final InputStream in;

private long length;

@Override
protected int updateBuffer() throws IOException {
position = 0;

if (closed) {
return limit = 0;
}

int len = buffer.length;
if (this.length < len) {
len = (int) this.length;
}

int off = 0;
while (len > 0) {
int read = in.read(buffer, off, len);
if (read == -1) {
break;
} else {
off += read;
len -= read;
}
}
if (copyTo != null) {
copyTo.write(buffer, 0, off);
}
this.length -= off;
return limit = off;
}

public RestrictedInputStream(ClickHouseFile file, InputStream input, int bufferSize, long length,
Runnable postCloseAction) {
super(file, null, postCloseAction);

this.in = ClickHouseChecker.nonNull(input, "InputStream");
// fixed buffer
this.buffer = new byte[ClickHouseUtils.getBufferSize(bufferSize,
(int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(),
(int) ClickHouseClientOption.MAX_BUFFER_SIZE.getDefaultValue())];

this.length = ClickHouseChecker.notLessThan(length, "Length", 0L);

this.position = 0;
this.limit = 0;
}

@Override
public ClickHouseByteBuffer readCustom(ClickHouseDataUpdater reader) throws IOException {
if (reader == null) {
return byteBuffer.reset();
}
ensureOpen();

LinkedList<byte[]> list = new LinkedList<>();
int offset = position;
int len = 0;
boolean more = true;
while (more) {
int remain = limit - position;
if (remain < 1) {
closeQuietly();
more = false;
} else {
int read = reader.update(buffer, position, limit);
if (read == -1) {
byte[] bytes = new byte[limit];
System.arraycopy(buffer, position, bytes, position, remain);
len += remain;
position = limit;
list.add(bytes);
if (updateBuffer() < 1) {
closeQuietly();
more = false;
}
} else {
len += read;
position += read;
list.add(buffer);
more = false;
}
}
}
return byteBuffer.update(list, offset, len);
}

@Override
public long pipe(ClickHouseOutputStream output) throws IOException {
long count = 0L;
if (output == null || output.isClosed()) {
return count;
}
ensureOpen();

try {
int l = limit;
int p = position;
int remain = l - p;
if (remain > 0) {
output.writeBytes(buffer, p, remain);
count += remain;
position = l;
}

while ((remain = updateBuffer()) > 0) {
output.writeBytes(buffer, 0, remain);
count += remain;
}
} finally {
close();
}
return count;
}

public final long getRemaining() {
return length;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ private Object[][] getEmptyInputStreams() {
{ ClickHouseInputStream.of(new ByteArrayInputStream(new byte[0]),
new ByteArrayInputStream(new byte[0])) },
{ ClickHouseInputStream.of((ByteArrayInputStream) null, (ByteArrayInputStream) null) },
{ ClickHouseInputStream.wrap(new ByteArrayInputStream(new byte[] { 1, 2, 3 }), 2048, 0L, null) },
// strings
{ ClickHouseInputStream.of(new String[0]) },
{ ClickHouseInputStream.of("") },
Expand Down Expand Up @@ -252,6 +253,9 @@ private Object[][] getInputStreamsWithData() {
.of(new InputStream[] { new ByteArrayInputStream(new byte[] { 0x65 }), null,
new ByteArrayInputStream(new byte[] { 0x66, 0x67, 0x68 }), null,
new ByteArrayInputStream(new byte[] { 0x69, 0x70 }) }) },
{ ClickHouseInputStream.wrap(
new ByteArrayInputStream(new byte[] { 1, 2, 0x65, 0x66, 0x67, 0x68, 0x69, 0x70, 1, 2 }, 2, 6),
2048, 6L, null) },
// strings
{ ClickHouseInputStream.of("efghip") },
{ ClickHouseInputStream.of("e", "fg", "hip") },
Expand Down Expand Up @@ -322,7 +326,10 @@ private Object[][] getInputStreamWithData() {
ByteBuffer.wrap(new byte[] { -1, 1, 2, 3, -4 }, 1, 3),
ByteBuffer.allocate(0), null, ByteBuffer.wrap(new byte[] { 4, 5 }), null,
ByteBuffer.allocate(0), null),
null) }
null) },
new Object[] {
new RestrictedInputStream(null, new ByteArrayInputStream(new byte[] { 1, 2, 3, 4, 5 }), 2048,
5L, null) },
};
}

Expand Down Expand Up @@ -721,4 +728,41 @@ public void testReadCustomBuffer() throws IOException {
Assert.assertThrows(IOException.class, () -> in.readCustom(new CustomReader((byte) 1, (byte) 2)::read));
}
}

@Test(groups = { "unit" })
public void testRestrictedInputStream() throws IOException {
ByteArrayInputStream bytes = new ByteArrayInputStream(new byte[] { 1, 2, 3, 4, 5, 6 });
Assert.assertThrows(IllegalArgumentException.class, () -> ClickHouseInputStream.wrap(bytes, 0, -1, null));

ClickHouseInputStream in = ClickHouseInputStream.wrap(bytes, 0, 0, null);
Assert.assertTrue(in instanceof RestrictedInputStream);
Assert.assertFalse(in.isClosed());
Assert.assertEquals(in.available(), 0);
Assert.assertEquals(((RestrictedInputStream) in).getRemaining(), 0);

in = ClickHouseInputStream.wrap(bytes, 0, 1, null);
Assert.assertEquals(in.available(), 1);
Assert.assertEquals(in.read(), 1);
Assert.assertEquals(in.available(), 0);
Assert.assertEquals(in.read(), -1);

in = ClickHouseInputStream.wrap(bytes, 0, 3, null);
Assert.assertEquals(in.available(), 3);
Assert.assertEquals(in.read(), 2);
Assert.assertEquals(in.available(), 2);
Assert.assertEquals(in.read(), 3);
Assert.assertEquals(in.available(), 1);
Assert.assertEquals(in.read(), 4);
Assert.assertEquals(in.available(), 0);
Assert.assertEquals(in.read(), -1);

in = ClickHouseInputStream.wrap(bytes, 0, 3, null);
Assert.assertEquals(in.available(), 2);
Assert.assertEquals(in.read(), 5);
Assert.assertEquals(in.available(), 1);
Assert.assertEquals(in.read(), 6);
Assert.assertEquals(in.available(), 0);
Assert.assertEquals(in.read(), -1);
Assert.assertEquals(((RestrictedInputStream) in).getRemaining(), 1);
}
}
Loading