Skip to content

Commit

Permalink
add a retry logic on SocketTimeoutException in the FileReadingCollector
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed May 12, 2015
1 parent a373852 commit 1895dd3
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 26 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Changes for Crate
=================

- ``COPY FROM``: imports from S3 will now retry the import in case of a
read timeout.

- Select queries will now wait a bit longer before throwing an error in case
the shards of a table aren't fully active.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
Expand All @@ -51,6 +52,7 @@

public class FileReadingCollector implements CrateCollector {

public static final int MAX_SOCKET_TIMEOUT_RETRIES = 5;
private final Map<String, FileInputFactory> fileInputFactoryMap;
private final URI fileUri;
private final Predicate<URI> globPredicate;
Expand Down Expand Up @@ -167,29 +169,41 @@ public void doCollect(RamAccountingContext ramAccountingContext) throws IOExcept
uris = getUris(fileInput, uriPredicate);
try {
for (URI uri : uris) {
InputStream inputStream = fileInput.getStream(uri);
if (inputStream == null) {
readLines(fileInput, collectorContext, uri, 0, 0);
}
} finally {
downstream.finish();
}
}

private void readLines(FileInput fileInput, CollectorContext collectorContext, URI uri, long startLine, int retry) throws IOException {
InputStream inputStream = fileInput.getStream(uri);
if (inputStream == null) {
return;
}

String line;
long linesRead = 0L;
try (BufferedReader reader = createReader(inputStream)) {
while ((line = reader.readLine()) != null) {
linesRead++;
if (linesRead < startLine) {
continue;
}
BufferedReader reader;
reader = createReader(inputStream);

try {
while ((line = reader.readLine()) != null) {
if (line.length() == 0) { // skip empty lines
continue;
}
collectorContext.lineContext().rawSource(line.getBytes(StandardCharsets.UTF_8));
if (!downstream.setNextRow(row)) {
throw new CollectionAbortedException();
}
}
} finally {
reader.close();
if (line.length() == 0) { // skip empty lines
continue;
}
collectorContext.lineContext().rawSource(line.getBytes(StandardCharsets.UTF_8));
if (!downstream.setNextRow(row)) {
break;
}
}
} finally {
downstream.finish();
} catch (SocketTimeoutException e) {
if (retry > MAX_SOCKET_TIMEOUT_RETRIES) {
throw e;
} else {
readLines(fileInput, collectorContext, uri, linesRead + 1, retry + 1);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import io.crate.core.collections.Bucket;
Expand All @@ -39,13 +40,18 @@
import io.crate.operation.projectors.CollectingProjector;
import io.crate.operation.reference.file.FileLineReferenceResolver;
import io.crate.test.integration.CrateUnitTest;
import io.crate.testing.TestingHelpers;
import io.crate.types.DataTypes;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.lucene.util.IOUtils;
import org.apache.tools.ant.taskdefs.TempFile;
import org.junit.*;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.*;
import java.net.SocketTimeoutException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -55,6 +61,7 @@

import static io.crate.testing.TestingHelpers.createReference;
import static io.crate.testing.TestingHelpers.isRow;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -166,6 +173,24 @@ public void testCollectWithEmptyLine() throws Throwable {
assertCorrectResult(projector.result().get());
}

@Test
public void testCollectWithOneSocketTimeout() throws Throwable {
S3ObjectInputStream inputStream = mock(S3ObjectInputStream.class);

when(inputStream.read(new byte[anyInt()], anyInt(), anyByte()))
.thenAnswer(new WriteBufferAnswer(new byte[] { 102, 111, 111, 10})) // first line: foo
.thenThrow(new SocketTimeoutException()) // exception causes retry
.thenAnswer(new WriteBufferAnswer(new byte[] { 102, 111, 111, 10})) // first line again, because of retry
.thenAnswer(new WriteBufferAnswer(new byte[] { 98, 97, 114, 10 })) // second line: bar
.thenReturn(-1);


CollectingProjector projector = getObjects("s3://fakebucket/foo", null, inputStream);
Bucket rows = projector.result().get();
assertThat(rows.size(), is(2));
assertThat(TestingHelpers.printedTable(rows), is("foo\nbar\n"));
}

@Test
public void unsupportedURITest() throws Throwable {
expectedException.expect(IllegalArgumentException.class);
Expand All @@ -184,6 +209,12 @@ private CollectingProjector getObjects(String fileUri) throws Throwable {
}

private CollectingProjector getObjects(String fileUri, String compression) throws Throwable {
S3ObjectInputStream inputStream = mock(S3ObjectInputStream.class);
when(inputStream.read(new byte[anyInt()], anyInt(), anyByte())).thenReturn(-1);
return getObjects(fileUri, compression, inputStream);
}

private CollectingProjector getObjects(String fileUri, String compression, final S3ObjectInputStream s3InputStream) throws Throwable {
CollectingProjector projector = new CollectingProjector();
FileCollectInputSymbolVisitor.Context context =
inputSymbolVisitor.process(createReference("_raw", DataTypes.STRING));
Expand All @@ -205,14 +236,12 @@ protected AmazonS3 initClient(String accessKey, String secretKey) throws IOExcep
S3ObjectSummary summary = mock(S3ObjectSummary.class);
S3Object s3Object = mock(S3Object.class);

S3ObjectInputStream inputStream = mock(S3ObjectInputStream.class);

when(client.listObjects(anyString(), anyString())).thenReturn(objectListing);
when(objectListing.getObjectSummaries()).thenReturn(Arrays.asList(summary));
when(summary.getKey()).thenReturn("foo");
when(client.getObject("fakebucket", "foo")).thenReturn(s3Object);
when(s3Object.getObjectContent()).thenReturn(inputStream);
when(inputStream.read(new byte[anyInt()], anyInt(), anyByte())).thenReturn(-1);
when(s3Object.getObjectContent()).thenReturn(s3InputStream);
when(client.listNextBatchOfObjects(any(ObjectListing.class))).thenReturn(objectListing);
when(objectListing.isTruncated()).thenReturn(false);
return client;
Expand Down Expand Up @@ -245,4 +274,20 @@ public static String resolveURI(File path, String file){
}
return uri;
}

private static class WriteBufferAnswer implements Answer<Integer> {

private byte[] bytes;

public WriteBufferAnswer(byte[] bytes) {
this.bytes = bytes;
}

@Override
public Integer answer(InvocationOnMock invocation) throws Throwable {
byte[] buffer = (byte[]) invocation.getArguments()[0];
System.arraycopy(bytes, 0, buffer, 0, bytes.length);
return bytes.length;
}
}
}

0 comments on commit 1895dd3

Please sign in to comment.