From 9362e2e10e358cde9062dccf442f325d9a926652 Mon Sep 17 00:00:00 2001 From: GraciesPadre Date: Tue, 11 Oct 2016 21:50:06 -0600 Subject: [PATCH 1/7] Beginning to add failure events. --- .../com/spectralogic/ds3client/IntValue.java | 28 +++ .../spectralogic/ds3client/MockedHeaders.java | 50 ++++++ .../ds3client/MockedWebResponse.java | 56 ++++++ .../integration/PutJobManagement_Test.java | 167 +++++++++++++++++- .../ds3client/helpers/Ds3ClientHelpers.java | 6 + .../helpers/FailureEventListener.java | 22 +++ .../ds3client/helpers/ReadJobImpl.java | 10 ++ .../ds3client/helpers/WriteJobImpl.java | 152 +++++++++++----- .../helpers/events/FailureEvent.java | 92 ++++++++++ .../spectralogic/ds3client/utils/Guard.java | 6 + 10 files changed, 534 insertions(+), 55 deletions(-) create mode 100644 ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/IntValue.java create mode 100644 ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/MockedHeaders.java create mode 100644 ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/MockedWebResponse.java create mode 100644 ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/FailureEventListener.java create mode 100644 ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/events/FailureEvent.java diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/IntValue.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/IntValue.java new file mode 100644 index 000000000..ab8c7ed27 --- /dev/null +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/IntValue.java @@ -0,0 +1,28 @@ +/* + * **************************************************************************** + * Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use + * this file except in compliance with the License. A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * **************************************************************************** + */ + +package com.spectralogic.ds3client; + +public final class IntValue { + private int intValue = 0; + + public int increment() { + return ++intValue; + } + + public int getValue() { + return intValue; + } +} diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/MockedHeaders.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/MockedHeaders.java new file mode 100644 index 000000000..8e1296bf8 --- /dev/null +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/MockedHeaders.java @@ -0,0 +1,50 @@ +/* + * **************************************************************************** + * Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use + * this file except in compliance with the License. A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * **************************************************************************** + */ + +package com.spectralogic.ds3client; + +import com.google.common.collect.Lists; +import com.spectralogic.ds3client.networking.Headers; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class MockedHeaders implements Headers { + private final Map headerValues; + + public MockedHeaders(final Map headerValues) { + this.headerValues = normalizeHeaderValues(headerValues); + } + + private static Map normalizeHeaderValues(final Map headerValues) { + final HashMap headers = new HashMap<>(); + for (final Map.Entry entry : headerValues.entrySet()) { + headers.put(entry.getKey().toLowerCase(), entry.getValue()); + } + return headers; + } + + @Override + public List get(final String key) { + return Lists.newArrayList(this.headerValues.get(key.toLowerCase())); + } + + @Override + public Set keys() { + return null; + } +} diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/MockedWebResponse.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/MockedWebResponse.java new file mode 100644 index 000000000..bd7bf7975 --- /dev/null +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/MockedWebResponse.java @@ -0,0 +1,56 @@ +/* + * **************************************************************************** + * Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use + * this file except in compliance with the License. A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * **************************************************************************** + */ + +package com.spectralogic.ds3client; + +import com.spectralogic.ds3client.networking.Headers; +import com.spectralogic.ds3client.networking.WebResponse; + +import org.apache.commons.io.IOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +public class MockedWebResponse implements WebResponse { + private final InputStream responseStream; + private final int statusCode; + private final Headers headers; + + public MockedWebResponse(final String responseString, final int statusCode, final Map headers) { + this.responseStream = IOUtils.toInputStream(responseString); + this.statusCode = statusCode; + this.headers = new MockedHeaders(headers); + } + + @Override + public InputStream getResponseStream() throws IOException { + return this.responseStream; + } + + @Override + public int getStatusCode() { + return this.statusCode; + } + + @Override + public Headers getHeaders() { + return headers; + } + + @Override + public void close() throws IOException { + } +} diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java index 786af1f73..3a8ba3d90 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java @@ -18,14 +18,18 @@ import com.google.common.collect.Lists; import com.spectralogic.ds3client.Ds3Client; import com.spectralogic.ds3client.Ds3ClientImpl; +import com.spectralogic.ds3client.IntValue; +import com.spectralogic.ds3client.MockedWebResponse; import com.spectralogic.ds3client.commands.*; import com.spectralogic.ds3client.commands.spectrads3.*; import com.spectralogic.ds3client.commands.spectrads3.notifications.*; import com.spectralogic.ds3client.exceptions.Ds3NoMoreRetriesException; import com.spectralogic.ds3client.helpers.Ds3ClientHelpers; +import com.spectralogic.ds3client.helpers.FailureEventListener; import com.spectralogic.ds3client.helpers.FileObjectGetter; import com.spectralogic.ds3client.helpers.FileObjectPutter; import com.spectralogic.ds3client.helpers.ObjectCompletedListener; +import com.spectralogic.ds3client.helpers.events.FailureEvent; import com.spectralogic.ds3client.helpers.options.WriteJobOptions; import com.spectralogic.ds3client.integration.test.helpers.ABMTestHelper; import com.spectralogic.ds3client.integration.test.helpers.TempStorageIds; @@ -50,7 +54,9 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import com.spectralogic.ds3client.integration.test.helpers.Ds3ClientShim; @@ -808,8 +814,7 @@ public void putMultiPartUploadPart() throws IOException { } @Test - public void testWriteJobWithRetries() throws Exception - { + public void testWriteJobWithRetries() throws Exception { final int maxNumObjectTransferAttempts = 3; transferAndCheckFileContent(maxNumObjectTransferAttempts, new ObjectTransferExceptionHandler() { @@ -824,14 +829,14 @@ public boolean handleException(final Throwable t) { private void transferAndCheckFileContent(final int maxNumObjectTransferAttempts, final ObjectTransferExceptionHandler objectTransferExceptionHandler) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, IOException, URISyntaxException { - final Ds3ClientShim ds3ClientShim = new Ds3ClientShim((Ds3ClientImpl)client); + final Ds3ClientShim ds3ClientShim = new Ds3ClientShim((Ds3ClientImpl) client); final String tempPathPrefix = null; final Path tempDirectory = Files.createTempDirectory(Paths.get("."), tempPathPrefix); try { final String DIR_NAME = "largeFiles/"; - final String[] FILE_NAMES = new String[] { "lesmis-copies.txt" }; + final String[] FILE_NAMES = new String[]{"lesmis-copies.txt"}; final Path dirPath = ResourceUtils.loadFileResource(DIR_NAME); @@ -867,7 +872,7 @@ public void objectCompleted(final String name) { shouldContinueTest = objectTransferExceptionHandler.handleException(t); } - if ( ! shouldContinueTest) { + if (!shouldContinueTest) { return; } @@ -909,14 +914,13 @@ private interface ObjectTransferExceptionHandler { } @Test - public void testWriteJobWithRetriesThrowsDs3NoMoreRetriesException() throws Exception - { + public void testWriteJobWithRetriesThrowsDs3NoMoreRetriesException() throws Exception { final int maxNumObjectTransferAttempts = 1; transferAndCheckFileContent(maxNumObjectTransferAttempts, new ObjectTransferExceptionHandler() { @Override public boolean handleException(final Throwable t) { - if ( ! (t instanceof Ds3NoMoreRetriesException)) { + if (!(t instanceof Ds3NoMoreRetriesException)) { fail("Got exception of unexpected type: " + t.getMessage()); return true; } @@ -925,4 +929,151 @@ public boolean handleException(final Throwable t) { } }); } + + @Test(expected = Ds3NoMoreRetriesException.class) + public void testFiringOnFailureEventWithFailedChunkAllocation() + throws IOException, URISyntaxException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { + final String tempPathPrefix = null; + final Path tempDirectory = Files.createTempDirectory(Paths.get("."), tempPathPrefix); + + try { + final IntValue numFailureEventsFired = new IntValue(); + + final int maxNumObjectTransferAttempts = 1; + final Ds3ClientHelpers.Job writeJob = createWriteJobWithObjectsReadyToTransfer(maxNumObjectTransferAttempts, + ClientFailureType.ChunkAllocation); + + final FailureEventListener failureEventListener = new FailureEventListener() { + @Override + public void onFailure(FailureEvent failureEvent) { + numFailureEventsFired.increment(); + } + }; + + writeJob.attachFailureEventListener(failureEventListener); + + writeJob.transfer(new FileObjectPutter(tempDirectory)); + + assertEquals(1, numFailureEventsFired.getValue()); + + writeJob.removeFailureEventListener(failureEventListener); + + writeJob.transfer(new FileObjectPutter(tempDirectory)); + + assertEquals(1, numFailureEventsFired.getValue()); + } finally { + FileUtils.deleteDirectory(tempDirectory.toFile()); + deleteAllContents(client, BUCKET_NAME); + } + } + + private enum ClientFailureType { + ChunkAllocation, + PutObject + } + + Ds3ClientHelpers.Job createWriteJobWithObjectsReadyToTransfer(final int maxNumObjectTransferAttempts, + final ClientFailureType clientFailureType) + throws IOException, URISyntaxException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { + final String DIR_NAME = "largeFiles/"; + final String[] FILE_NAMES = new String[]{"lesmis-copies.txt"}; + + final List bookTitles = new ArrayList<>(); + final List objects = new ArrayList<>(); + for (final String book : FILE_NAMES) { + final Path objPath = ResourceUtils.loadFileResource(DIR_NAME + book); + final long bookSize = Files.size(objPath); + final Ds3Object obj = new Ds3Object(book, bookSize); + + bookTitles.add(book); + objects.add(obj); + } + + Ds3Client ds3Client; + if (clientFailureType == ClientFailureType.ChunkAllocation) { + ds3Client = new Ds3ClientShimWithFailedChunkAllocation((Ds3ClientImpl) client); + } else { + ds3Client = new Ds3ClientShimWithFailedPutObject((Ds3ClientImpl) client); + } + + final int maxNumBlockAllocationRetries = 3; + final Ds3ClientHelpers ds3ClientHelpers = Ds3ClientHelpers.wrap(ds3Client, + maxNumBlockAllocationRetries, + maxNumObjectTransferAttempts); + + final Ds3ClientHelpers.Job writeJob = ds3ClientHelpers.startWriteJob(BUCKET_NAME, objects); + + return writeJob; + } + + private static class Ds3ClientShimWithFailedChunkAllocation extends Ds3ClientShim { + public Ds3ClientShimWithFailedChunkAllocation(Ds3ClientImpl ds3ClientImpl) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException + { + super(ds3ClientImpl); + } + + @Override + public AllocateJobChunkSpectraS3Response allocateJobChunkSpectraS3(final AllocateJobChunkSpectraS3Request request) + throws IOException { + final Map headers = new HashMap<>(); + headers.put("content-NONE", "text/xml"); + headers.put("Retry-After", "1"); + + final AllocateJobChunkSpectraS3Response allocateJobChunkSpectraS3Response = + new AllocateJobChunkSpectraS3Response(new MockedWebResponse("A response", 307, headers)); + + return allocateJobChunkSpectraS3Response; + } + } + + private static class Ds3ClientShimWithFailedPutObject extends Ds3ClientShim { + public Ds3ClientShimWithFailedPutObject(Ds3ClientImpl ds3ClientImpl) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException + { + super(ds3ClientImpl); + } + + @Override + public PutObjectResponse putObject(final PutObjectRequest request) throws IOException { + throw new IOException("A terrible, horrible thing happened!"); + } + } + + @Test(expected = RuntimeException.class) + public void testFiringOnFailureEventWithFailedPutObject() + throws IOException, URISyntaxException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { + final String tempPathPrefix = null; + final Path tempDirectory = Files.createTempDirectory(Paths.get("."), tempPathPrefix); + + try { + final IntValue numFailureEventsFired = new IntValue(); + + final int maxNumObjectTransferAttempts = 1; + final Ds3ClientHelpers.Job writeJob = createWriteJobWithObjectsReadyToTransfer(maxNumObjectTransferAttempts, + ClientFailureType.PutObject); + + final FailureEventListener failureEventListener = new FailureEventListener() { + @Override + public void onFailure(final FailureEvent failureEvent) { + numFailureEventsFired.increment(); + } + }; + + writeJob.attachFailureEventListener(failureEventListener); + + writeJob.transfer(new FileObjectPutter(tempDirectory)); + + assertEquals(1, numFailureEventsFired.getValue()); + + writeJob.removeFailureEventListener(failureEventListener); + + writeJob.transfer(new FileObjectPutter(tempDirectory)); + + assertEquals(1, numFailureEventsFired.getValue()); + } finally { + FileUtils.deleteDirectory(tempDirectory.toFile()); + deleteAllContents(client, BUCKET_NAME); + } + } } diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers.java index cee1d7dc7..354edfd61 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers.java @@ -91,6 +91,12 @@ public interface Job { void attachWaitingForChunksListener(final WaitingForChunksListener listener); void removeWaitingForChunksListener(final WaitingForChunksListener listener); + /** + * Attaches an event handler when an object transfer fails + */ + void attachFailureEventListener(final FailureEventListener listener); + void removeFailureEventListener(final FailureEventListener listener); + /** * Sets the maximum number of requests to execute at a time when fulfilling the job. */ diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/FailureEventListener.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/FailureEventListener.java new file mode 100644 index 000000000..e6295a91c --- /dev/null +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/FailureEventListener.java @@ -0,0 +1,22 @@ +/* + * **************************************************************************** + * Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use + * this file except in compliance with the License. A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * **************************************************************************** + */ + +package com.spectralogic.ds3client.helpers; + +import com.spectralogic.ds3client.helpers.events.FailureEvent; + +public interface FailureEventListener { + void onFailure(final FailureEvent failureEvent); +} diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java index f9c5ad209..6baa2491e 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java @@ -141,6 +141,16 @@ public void removeWaitingForChunksListener(final WaitingForChunksListener listen this.waitingForChunksListeners.remove(listener); } + @Override + public void attachFailureEventListener(FailureEventListener listener) { + // TODO implement this + } + + @Override + public void removeFailureEventListener(FailureEventListener listener) { + // TODO implement this + } + @Override public Ds3ClientHelpers.Job withMetadata(final Ds3ClientHelpers.MetadataAccess access) { throw new IllegalStateException("withMetadata method is not used with Read Jobs"); diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java index 6ea750b5d..72017495d 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java @@ -26,6 +26,7 @@ import com.spectralogic.ds3client.helpers.ChunkTransferrer.ItemTransferrer; import com.spectralogic.ds3client.helpers.Ds3ClientHelpers.ObjectChannelBuilder; import com.spectralogic.ds3client.helpers.events.EventRunner; +import com.spectralogic.ds3client.helpers.events.FailureEvent; import com.spectralogic.ds3client.models.*; import com.spectralogic.ds3client.models.Objects; import com.spectralogic.ds3client.models.common.Range; @@ -49,6 +50,7 @@ class WriteJobImpl extends JobImpl { private final ChecksumType.Type checksumType; private final Set checksumListeners; private final Set waitingForChunksListeners; + private final Set failureEventListeners; private final EventRunner eventRunner; private final int retryAfter; // Negative retryAfter value represent infinity retries private final int retryDelay; //Negative value means use default @@ -81,6 +83,7 @@ public WriteJobImpl( this.retryDelay = retryDelay; this.checksumListeners = Sets.newIdentityHashSet(); this.waitingForChunksListeners = Sets.newIdentityHashSet(); + this.failureEventListeners = Sets.newIdentityHashSet(); this.eventRunner = eventRunner; this.checksumType = type; @@ -144,6 +147,18 @@ public void removeWaitingForChunksListener(final WaitingForChunksListener listen this.waitingForChunksListeners.remove(listener); } + @Override + public void attachFailureEventListener(FailureEventListener listener) { + checkRunning(); + this.failureEventListeners.add(listener); + } + + @Override + public void removeFailureEventListener(FailureEventListener listener) { + checkRunning(); + this.failureEventListeners.remove(listener); + } + @Override public Ds3ClientHelpers.Job withMetadata(final Ds3ClientHelpers.MetadataAccess access) { checkRunning(); @@ -160,38 +175,69 @@ public Ds3ClientHelpers.Job withChecksum(final ChecksumFunction checksumFunction @Override public void transfer(final ObjectChannelBuilder channelBuilder) throws IOException { - running = true; - LOG.debug("Starting job transfer"); - if (this.masterObjectList == null || this.masterObjectList.getObjects() == null) { - LOG.info("There is nothing to transfer for job" - + (this.getJobId() == null ? "" : " " + this.getJobId().toString())); - return; - } + try { + running = true; + LOG.debug("Starting job transfer"); + if (this.masterObjectList == null || this.masterObjectList.getObjects() == null) { + LOG.info("There is nothing to transfer for job" + + (this.getJobId() == null ? "" : " " + this.getJobId().toString())); + return; + } - try (final JobState jobState = new JobState( - channelBuilder, - filteredChunks, - partTracker, - ImmutableMap.>of())) { - final ChunkTransferrer chunkTransferrer = new ChunkTransferrer( - new PutObjectTransferrerRetryDecorator(jobState), - this.client, - jobState.getPartTracker(), - this.maxParallelRequests - ); - for (final Objects chunk : filteredChunks) { - LOG.debug("Allocating chunk: {}", chunk.getChunkId().toString()); - chunkTransferrer.transferChunks( - this.masterObjectList.getNodes(), - Collections.singletonList(filterChunk(allocateChunk(chunk)))); + try (final JobState jobState = new JobState( + channelBuilder, + filteredChunks, + partTracker, + ImmutableMap.>of())) { + final ChunkTransferrer chunkTransferrer = new ChunkTransferrer( + new PutObjectTransferrerRetryDecorator(jobState), + this.client, + jobState.getPartTracker(), + this.maxParallelRequests + ); + for (final Objects chunk : filteredChunks) { + LOG.debug("Allocating chunk: {}", chunk.getChunkId().toString()); + chunkTransferrer.transferChunks( + this.masterObjectList.getNodes(), + Collections.singletonList(filterChunk(allocateChunk(chunk)))); + } + } catch (final IOException | RuntimeException e) { + throw e; + } catch (final Exception e) { + throw new RuntimeException(e); } - } catch (final IOException | RuntimeException e) { - throw e; - } catch (final Exception e) { - throw new RuntimeException(e); + } catch (final Throwable t) { + emitFailureEvent(new FailureEvent.Builder() + .doingWhat("putting object") + .withCausalException(t) + .usingSystemWithEndpoint(client.getConnectionDetails().getEndpoint()) + .withObjectNamed(getLabelForChunk(filteredChunks.get(0))) + .build()); + throw t; } } + private void emitFailureEvent(final FailureEvent failureEvent) { + for (final FailureEventListener failureEventListener : failureEventListeners) { + eventRunner.emitEvent(new Runnable() { + @Override + public void run() { + failureEventListener.onFailure(failureEvent); + } + }); + } + } + + private String getLabelForChunk(final Objects chunk) { + try { + return chunk.getObjects().get(0).getName(); + } catch (final Throwable t) { + LOG.error("Failed to get label for chunk.", t); + } + + return "unnamed object"; + } + private Objects allocateChunk(final Objects filtered) throws IOException { Objects chunk = null; while (chunk == null) { @@ -205,28 +251,40 @@ private Objects tryAllocateChunk(final Objects filtered) throws IOException { this.client.allocateJobChunkSpectraS3(new AllocateJobChunkSpectraS3Request(filtered.getChunkId().toString())); LOG.info("AllocatedJobChunkResponse status: {}", response.getStatus().toString()); - switch (response.getStatus()) { - case ALLOCATED: - retryAfterLeft = retryAfter; // Reset the number of retries to the initial value - return response.getObjectsResult(); - case RETRYLATER: - try { - if (retryAfterLeft == 0) { - throw new Ds3NoMoreRetriesException(this.retryAfter); - } - retryAfterLeft--; - - final int retryAfter = computeRetryAfter(response.getRetryAfterSeconds()); - emitWaitingForChunksEvents(retryAfter); - LOG.debug("Will retry allocate chunk call after {} seconds", retryAfter); - Thread.sleep(retryAfter * 1000); - return null; - } catch (final InterruptedException e) { - throw new RuntimeException(e); + try { + switch (response.getStatus()) { + case ALLOCATED: + retryAfterLeft = retryAfter; // Reset the number of retries to the initial value + return response.getObjectsResult(); + case RETRYLATER: + try { + if (retryAfterLeft == 0) { + throw new Ds3NoMoreRetriesException(this.retryAfter); + } + retryAfterLeft--; + + final int retryAfter = computeRetryAfter(response.getRetryAfterSeconds()); + emitWaitingForChunksEvents(retryAfter); + + LOG.debug("Will retry allocate chunk call after {} seconds", retryAfter); + Thread.sleep(retryAfter * 1000); + return null; + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + default: + assert false : "This line of code should be impossible to hit."; + return null; } - default: - assert false : "This line of code should be impossible to hit."; return null; + } catch (final Throwable t) { + emitFailureEvent(new FailureEvent.Builder() + .doingWhat("allocating chunk") + .usingSystemWithEndpoint(client.getConnectionDetails().getEndpoint()) + .withObjectNamed(getLabelForChunk(filtered)) + .withCausalException(t) + .build()); + throw t; } } diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/events/FailureEvent.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/events/FailureEvent.java new file mode 100644 index 000000000..7d2e344e5 --- /dev/null +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/events/FailureEvent.java @@ -0,0 +1,92 @@ +/* + * **************************************************************************** + * Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use + * this file except in compliance with the License. A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * **************************************************************************** + */ + +package com.spectralogic.ds3client.helpers.events; + +import com.google.common.base.Preconditions; +import com.spectralogic.ds3client.utils.Guard; + +public class FailureEvent { + private final String doingWhat; + private final String objectName; + private final String endpoint; + private final Throwable causalException; + + private FailureEvent(final String what, final String objectName, final String endpoint, final Throwable causalException) { + this.doingWhat = what; + this.objectName = objectName; + this.endpoint = endpoint; + this.causalException = causalException; + } + + public String doingWhat() { + return doingWhat; + } + + public String withObjectNamed() { + return objectName; + } + + public String usingSystemWithEndpoint() { + return endpoint; + } + + public Throwable getCausalException() { + return causalException; + } + + @Override + public String toString() { + return "Failure " + doingWhat + " with object named \"" + objectName + "\" using system with endpoint " + endpoint; + } + + public static class Builder { + private String doingWhat; + private String withObjectNamed; + private String usingSystemWithEndpoint; + private Throwable causalException; + + public Builder() { } + + public Builder doingWhat(final String what) { + this.doingWhat = what; + return this; + } + + public Builder withObjectNamed(final String objectName) { + this.withObjectNamed = objectName; + return this; + } + + public Builder usingSystemWithEndpoint(final String endpoint) { + this.usingSystemWithEndpoint = endpoint; + return this; + } + + public Builder withCausalException(final Throwable causalException) { + this.causalException = causalException; + return this; + } + + public FailureEvent build() { + Guard.throwOnNullOrEmptyString(doingWhat, "The failed activity may not be null or empty."); + Guard.throwOnNullOrEmptyString(withObjectNamed, "The name of the object involved in the activity may not be null or empty."); + Guard.throwOnNullOrEmptyString(usingSystemWithEndpoint, "The endpoint referenced in the activity may not be null or empty."); + Preconditions.checkNotNull(causalException, "The exception causing a failure may not be null."); + + return new FailureEvent(doingWhat, withObjectNamed, usingSystemWithEndpoint, causalException); + } + } +} diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/utils/Guard.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/utils/Guard.java index 98c9fcbf6..bb49934e7 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/utils/Guard.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/utils/Guard.java @@ -34,6 +34,12 @@ public static boolean isStringNullOrEmpty(final String str) { return str == null || str.isEmpty(); } + public static void throwOnNullOrEmptyString(final String str, final String message) { + if (isStringNullOrEmpty(str)) { + throw new IllegalArgumentException(message); + } + } + public static boolean isNotNullAndNotEmpty(final Collection collection) { return !isNullOrEmpty(collection); } From c542653115eae97f776408bc1b52a26ee9a3eb4b Mon Sep 17 00:00:00 2001 From: GraciesPadre Date: Wed, 12 Oct 2016 13:57:27 -0600 Subject: [PATCH 2/7] Adding failure events to reads --- .../integration/GetJobManagement_Test.java | 113 ++++++++++++++++++ .../integration/PutJobManagement_Test.java | 54 +-------- .../test/helpers/Ds3ClientShimFactory.java | 31 +++++ ...s3ClientShimWithFailedChunkAllocation.java | 64 ++++++++++ .../Ds3ClientShimWithFailedGetObject.java | 41 +++++++ .../Ds3ClientShimWithFailedPutObject.java | 21 ++++ .../ds3client/helpers/ReadJobImpl.java | 102 +++++++++++----- .../ds3client/helpers/WriteJobImpl.java | 4 +- 8 files changed, 349 insertions(+), 81 deletions(-) create mode 100644 ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimFactory.java create mode 100644 ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedChunkAllocation.java create mode 100644 ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedGetObject.java create mode 100644 ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedPutObject.java diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java index 4acee88f8..630cc5568 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java @@ -18,6 +18,7 @@ import com.google.common.collect.Lists; import com.spectralogic.ds3client.Ds3Client; import com.spectralogic.ds3client.Ds3ClientImpl; +import com.spectralogic.ds3client.IntValue; import com.spectralogic.ds3client.commands.DeleteObjectRequest; import com.spectralogic.ds3client.commands.GetObjectRequest; import com.spectralogic.ds3client.commands.GetObjectResponse; @@ -25,11 +26,15 @@ import com.spectralogic.ds3client.commands.spectrads3.GetJobSpectraS3Request; import com.spectralogic.ds3client.commands.spectrads3.GetJobSpectraS3Response; import com.spectralogic.ds3client.helpers.Ds3ClientHelpers; +import com.spectralogic.ds3client.helpers.FailureEventListener; import com.spectralogic.ds3client.helpers.FileObjectGetter; import com.spectralogic.ds3client.helpers.FileObjectPutter; +import com.spectralogic.ds3client.helpers.events.FailureEvent; import com.spectralogic.ds3client.helpers.options.ReadJobOptions; import com.spectralogic.ds3client.integration.test.helpers.ABMTestHelper; import com.spectralogic.ds3client.integration.test.helpers.Ds3ClientShim; +import com.spectralogic.ds3client.integration.test.helpers.Ds3ClientShimFactory; +import com.spectralogic.ds3client.integration.test.helpers.Ds3ClientShimWithFailedChunkAllocation; import com.spectralogic.ds3client.integration.test.helpers.TempStorageIds; import com.spectralogic.ds3client.integration.test.helpers.TempStorageUtil; import com.spectralogic.ds3client.models.ChecksumType; @@ -61,6 +66,7 @@ import static com.spectralogic.ds3client.integration.Util.RESOURCE_BASE_NAME; import static com.spectralogic.ds3client.integration.Util.deleteAllContents; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -309,4 +315,111 @@ public void testPartialRetriesWithInjectedFailures() throws NoSuchMethodExceptio deleteBigFileFromBlackPearlBucket(); } } + + @Test + public void testFiringFailureHandlerWhenGettingChunks() + throws URISyntaxException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, IOException + { + putBigFile(); + + final String tempPathPrefix = null; + final Path tempDirectory = Files.createTempDirectory(Paths.get("."), tempPathPrefix); + + try { + final IntValue numFailuresRecorded = new IntValue(); + + final FailureEventListener failureEventListener = new FailureEventListener() { + @Override + public void onFailure(final FailureEvent failureEvent) { + numFailuresRecorded.increment(); + } + }; + + final Ds3ClientHelpers.Job readJob = createReadJobWithObjectsReadyToTransfer(Ds3ClientShimFactory.ClientFailureType.ChunkAllocation); + + readJob.attachFailureEventListener(failureEventListener); + + try { + readJob.transfer(new FileObjectGetter(tempDirectory)); + } catch (final IOException e) { + assertEquals(1, numFailuresRecorded.getValue()); + + readJob.removeFailureEventListener(failureEventListener); + + try { + readJob.transfer(new FileObjectGetter(tempDirectory)); + } catch (final IOException ioe) { + assertEquals(1, numFailuresRecorded.getValue()); + } + } + } finally { + FileUtils.deleteDirectory(tempDirectory.toFile()); + deleteBigFileFromBlackPearlBucket(); + } + } + + private Ds3ClientHelpers.Job createReadJobWithObjectsReadyToTransfer(final Ds3ClientShimFactory.ClientFailureType clientFailureType) + throws IOException, URISyntaxException, NoSuchMethodException, IllegalAccessException, InvocationTargetException + { + final String DIR_NAME = "largeFiles/"; + final String FILE_NAME = "lesmis-copies.txt"; + + final Path objPath = ResourceUtils.loadFileResource(DIR_NAME + FILE_NAME); + final long bookSize = Files.size(objPath); + final Ds3Object obj = new Ds3Object(FILE_NAME, bookSize); + + final Ds3Client ds3Client = Ds3ClientShimFactory.makeWrappedDs3Client(clientFailureType, client); + + final int maxNumBlockAllocationRetries = 3; + final int maxNumObjectTransferAttempts = 3; + final Ds3ClientHelpers ds3ClientHelpers = Ds3ClientHelpers.wrap(ds3Client, + maxNumBlockAllocationRetries, + maxNumObjectTransferAttempts); + + final Ds3ClientHelpers.Job readJob = ds3ClientHelpers.startReadJob(BUCKET_NAME, Arrays.asList(obj)); + + return readJob; + } + + @Test + public void testFiringFailureHandlerWhenGettingObject() + throws URISyntaxException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, IOException + { + putBigFile(); + + final String tempPathPrefix = null; + final Path tempDirectory = Files.createTempDirectory(Paths.get("."), tempPathPrefix); + + try { + final IntValue numFailuresRecorded = new IntValue(); + + final FailureEventListener failureEventListener = new FailureEventListener() { + @Override + public void onFailure(final FailureEvent failureEvent) { + numFailuresRecorded.increment(); + } + }; + + final Ds3ClientHelpers.Job readJob = createReadJobWithObjectsReadyToTransfer(Ds3ClientShimFactory.ClientFailureType.GetObject); + + readJob.attachFailureEventListener(failureEventListener); + + try { + readJob.transfer(new FileObjectGetter(tempDirectory)); + } catch (final IOException e) { + assertEquals(1, numFailuresRecorded.getValue()); + + readJob.removeFailureEventListener(failureEventListener); + + try { + readJob.transfer(new FileObjectGetter(tempDirectory)); + } catch (final IOException ioe) { + assertEquals(1, numFailuresRecorded.getValue()); + } + } + } finally { + FileUtils.deleteDirectory(tempDirectory.toFile()); + deleteBigFileFromBlackPearlBucket(); + } + } } diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java index 3a8ba3d90..12e190842 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java @@ -19,7 +19,6 @@ import com.spectralogic.ds3client.Ds3Client; import com.spectralogic.ds3client.Ds3ClientImpl; import com.spectralogic.ds3client.IntValue; -import com.spectralogic.ds3client.MockedWebResponse; import com.spectralogic.ds3client.commands.*; import com.spectralogic.ds3client.commands.spectrads3.*; import com.spectralogic.ds3client.commands.spectrads3.notifications.*; @@ -32,6 +31,7 @@ import com.spectralogic.ds3client.helpers.events.FailureEvent; import com.spectralogic.ds3client.helpers.options.WriteJobOptions; import com.spectralogic.ds3client.integration.test.helpers.ABMTestHelper; +import com.spectralogic.ds3client.integration.test.helpers.Ds3ClientShimFactory; import com.spectralogic.ds3client.integration.test.helpers.TempStorageIds; import com.spectralogic.ds3client.integration.test.helpers.TempStorageUtil; import com.spectralogic.ds3client.models.*; @@ -39,6 +39,8 @@ import com.spectralogic.ds3client.networking.FailedRequestException; import com.spectralogic.ds3client.utils.ByteArraySeekableByteChannel; import com.spectralogic.ds3client.utils.ResourceUtils; +import com.spectralogic.ds3client.integration.test.helpers.Ds3ClientShimWithFailedChunkAllocation; +import com.spectralogic.ds3client.integration.test.helpers.Ds3ClientShimFactory.ClientFailureType; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.junit.*; @@ -54,9 +56,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; import com.spectralogic.ds3client.integration.test.helpers.Ds3ClientShim; @@ -945,7 +945,7 @@ public void testFiringOnFailureEventWithFailedChunkAllocation() final FailureEventListener failureEventListener = new FailureEventListener() { @Override - public void onFailure(FailureEvent failureEvent) { + public void onFailure(final FailureEvent failureEvent) { numFailureEventsFired.increment(); } }; @@ -967,11 +967,6 @@ public void onFailure(FailureEvent failureEvent) { } } - private enum ClientFailureType { - ChunkAllocation, - PutObject - } - Ds3ClientHelpers.Job createWriteJobWithObjectsReadyToTransfer(final int maxNumObjectTransferAttempts, final ClientFailureType clientFailureType) throws IOException, URISyntaxException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { @@ -989,12 +984,7 @@ Ds3ClientHelpers.Job createWriteJobWithObjectsReadyToTransfer(final int maxNumOb objects.add(obj); } - Ds3Client ds3Client; - if (clientFailureType == ClientFailureType.ChunkAllocation) { - ds3Client = new Ds3ClientShimWithFailedChunkAllocation((Ds3ClientImpl) client); - } else { - ds3Client = new Ds3ClientShimWithFailedPutObject((Ds3ClientImpl) client); - } + final Ds3Client ds3Client = Ds3ClientShimFactory.makeWrappedDs3Client(clientFailureType, client); final int maxNumBlockAllocationRetries = 3; final Ds3ClientHelpers ds3ClientHelpers = Ds3ClientHelpers.wrap(ds3Client, @@ -1006,40 +996,6 @@ Ds3ClientHelpers.Job createWriteJobWithObjectsReadyToTransfer(final int maxNumOb return writeJob; } - private static class Ds3ClientShimWithFailedChunkAllocation extends Ds3ClientShim { - public Ds3ClientShimWithFailedChunkAllocation(Ds3ClientImpl ds3ClientImpl) - throws NoSuchMethodException, IllegalAccessException, InvocationTargetException - { - super(ds3ClientImpl); - } - - @Override - public AllocateJobChunkSpectraS3Response allocateJobChunkSpectraS3(final AllocateJobChunkSpectraS3Request request) - throws IOException { - final Map headers = new HashMap<>(); - headers.put("content-NONE", "text/xml"); - headers.put("Retry-After", "1"); - - final AllocateJobChunkSpectraS3Response allocateJobChunkSpectraS3Response = - new AllocateJobChunkSpectraS3Response(new MockedWebResponse("A response", 307, headers)); - - return allocateJobChunkSpectraS3Response; - } - } - - private static class Ds3ClientShimWithFailedPutObject extends Ds3ClientShim { - public Ds3ClientShimWithFailedPutObject(Ds3ClientImpl ds3ClientImpl) - throws NoSuchMethodException, IllegalAccessException, InvocationTargetException - { - super(ds3ClientImpl); - } - - @Override - public PutObjectResponse putObject(final PutObjectRequest request) throws IOException { - throw new IOException("A terrible, horrible thing happened!"); - } - } - @Test(expected = RuntimeException.class) public void testFiringOnFailureEventWithFailedPutObject() throws IOException, URISyntaxException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimFactory.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimFactory.java new file mode 100644 index 000000000..05b73da58 --- /dev/null +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimFactory.java @@ -0,0 +1,31 @@ +package com.spectralogic.ds3client.integration.test.helpers; + +import com.spectralogic.ds3client.Ds3Client; +import com.spectralogic.ds3client.Ds3ClientImpl; + +import java.lang.reflect.InvocationTargetException; + +public final class Ds3ClientShimFactory { + public enum ClientFailureType { + ChunkAllocation, + PutObject, + GetObject + } + + private Ds3ClientShimFactory() { } + + public static Ds3Client makeWrappedDs3Client(final ClientFailureType clientFailureType, final Ds3Client ds3Client) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException + { + switch (clientFailureType) { + case ChunkAllocation: + return new Ds3ClientShimWithFailedChunkAllocation((Ds3ClientImpl)ds3Client); + case PutObject: + return new Ds3ClientShimWithFailedPutObject((Ds3ClientImpl)ds3Client); + case GetObject: + return new Ds3ClientShimWithFailedGetObject((Ds3ClientImpl)ds3Client); + default: + throw new IllegalArgumentException("I don't know what kind of thingy to make."); + } + } +} diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedChunkAllocation.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedChunkAllocation.java new file mode 100644 index 000000000..9695ad73e --- /dev/null +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedChunkAllocation.java @@ -0,0 +1,64 @@ +/* + * **************************************************************************** + * Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use + * this file except in compliance with the License. A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * **************************************************************************** + */ + +package com.spectralogic.ds3client.integration.test.helpers; + +import com.spectralogic.ds3client.Ds3ClientImpl; +import com.spectralogic.ds3client.MockedWebResponse; +import com.spectralogic.ds3client.commands.spectrads3.AllocateJobChunkSpectraS3Request; +import com.spectralogic.ds3client.commands.spectrads3.AllocateJobChunkSpectraS3Response; +import com.spectralogic.ds3client.commands.spectrads3.GetJobChunksReadyForClientProcessingSpectraS3Request; +import com.spectralogic.ds3client.commands.spectrads3.GetJobChunksReadyForClientProcessingSpectraS3Response; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Map; + +public class Ds3ClientShimWithFailedChunkAllocation extends Ds3ClientShim { + public Ds3ClientShimWithFailedChunkAllocation(final Ds3ClientImpl ds3ClientImpl) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException + { + super(ds3ClientImpl); + } + + @Override + public AllocateJobChunkSpectraS3Response allocateJobChunkSpectraS3(final AllocateJobChunkSpectraS3Request request) + throws IOException + { + final AllocateJobChunkSpectraS3Response allocateJobChunkSpectraS3Response = + new AllocateJobChunkSpectraS3Response(new MockedWebResponse("A response", 307, makeFailingResponseHeaders())); + + return allocateJobChunkSpectraS3Response; + } + + private Map makeFailingResponseHeaders() { + final Map headers = new HashMap<>(); + headers.put("content-NONE", "text/xml"); + headers.put("Retry-After", "1"); + + return headers; + } + + @Override + public GetJobChunksReadyForClientProcessingSpectraS3Response getJobChunksReadyForClientProcessingSpectraS3(final GetJobChunksReadyForClientProcessingSpectraS3Request request) + throws IOException + { + final GetJobChunksReadyForClientProcessingSpectraS3Response getJobChunksReadyForClientProcessingSpectraS3Response = + new GetJobChunksReadyForClientProcessingSpectraS3Response(new MockedWebResponse("A response", 307, makeFailingResponseHeaders())); + + return getJobChunksReadyForClientProcessingSpectraS3Response; + } +} diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedGetObject.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedGetObject.java new file mode 100644 index 000000000..514e0b494 --- /dev/null +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedGetObject.java @@ -0,0 +1,41 @@ +package com.spectralogic.ds3client.integration.test.helpers; + +import com.spectralogic.ds3client.Ds3Client; +import com.spectralogic.ds3client.Ds3ClientImpl; +import com.spectralogic.ds3client.commands.GetObjectRequest; +import com.spectralogic.ds3client.commands.GetObjectResponse; +import com.spectralogic.ds3client.models.JobNode; +import com.spectralogic.ds3client.networking.ConnectionDetails; +import com.spectralogic.ds3client.networking.NetworkClient; +import com.spectralogic.ds3client.networking.NetworkClientImpl; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; + +import static org.junit.Assert.fail; + +public class Ds3ClientShimWithFailedGetObject extends Ds3ClientShim { + public Ds3ClientShimWithFailedGetObject(final Ds3ClientImpl ds3ClientImpl) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException + { + super(ds3ClientImpl); + } + + @Override + public GetObjectResponse getObject(final GetObjectRequest request) throws IOException { + throw new IOException("A terrible, horrible thing happened!"); + } + + @Override + public Ds3Client newForNode(final JobNode node) { + final ConnectionDetails newConnectionDetails; + try { + final Ds3Client newClient = super.newForNode(node); + return new Ds3ClientShimWithFailedGetObject((Ds3ClientImpl)newClient); + } catch (final IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + fail("Failure trying to create Ds3Client used in verifying putObject retries: " + e.getMessage()); + } + + return null; + } +} diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedPutObject.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedPutObject.java new file mode 100644 index 000000000..c3dc15077 --- /dev/null +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedPutObject.java @@ -0,0 +1,21 @@ +package com.spectralogic.ds3client.integration.test.helpers; + +import com.spectralogic.ds3client.Ds3ClientImpl; +import com.spectralogic.ds3client.commands.PutObjectRequest; +import com.spectralogic.ds3client.commands.PutObjectResponse; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; + +public class Ds3ClientShimWithFailedPutObject extends Ds3ClientShim { + public Ds3ClientShimWithFailedPutObject(final Ds3ClientImpl ds3ClientImpl) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException + { + super(ds3ClientImpl); + } + + @Override + public PutObjectResponse putObject(final PutObjectRequest request) throws IOException { + throw new IOException("A terrible, horrible thing happened!"); + } +} diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java index 6baa2491e..f9a77f11a 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java @@ -25,9 +25,11 @@ import com.spectralogic.ds3client.helpers.ChunkTransferrer.ItemTransferrer; import com.spectralogic.ds3client.helpers.Ds3ClientHelpers.ObjectChannelBuilder; import com.spectralogic.ds3client.helpers.events.EventRunner; +import com.spectralogic.ds3client.helpers.events.FailureEvent; import com.spectralogic.ds3client.helpers.util.PartialObjectHelpers; import com.spectralogic.ds3client.models.*; import com.spectralogic.ds3client.models.common.Range; +import com.spectralogic.ds3client.networking.FailedRequestException; import com.spectralogic.ds3client.networking.Metadata; import com.spectralogic.ds3client.utils.Guard; @@ -35,7 +37,11 @@ import java.util.List; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + class ReadJobImpl extends JobImpl { + private static final Logger LOG = LoggerFactory.getLogger(ReadJobImpl.class); private final JobPartTracker partTracker; private final List chunks; @@ -43,6 +49,7 @@ class ReadJobImpl extends JobImpl { private final Set metadataListeners; private final Set checksumListeners; private final Set waitingForChunksListeners; + private final Set failureEventListeners; private final int retryAfter; // Negative retryAfter value represent infinity retries private final int retryDelay; // Negative value represents default private final EventRunner eventRunner; @@ -67,6 +74,7 @@ public ReadJobImpl( this.metadataListeners = Sets.newIdentityHashSet(); this.checksumListeners = Sets.newIdentityHashSet(); this.waitingForChunksListeners = Sets.newIdentityHashSet(); + this.failureEventListeners = Sets.newIdentityHashSet(); this.eventRunner = eventRunner; this.retryAfter = this.retryAfterLeft = retryAfter; @@ -142,13 +150,13 @@ public void removeWaitingForChunksListener(final WaitingForChunksListener listen } @Override - public void attachFailureEventListener(FailureEventListener listener) { - // TODO implement this + public void attachFailureEventListener(final FailureEventListener listener) { + failureEventListeners.add(listener); } @Override - public void removeFailureEventListener(FailureEventListener listener) { - // TODO implement this + public void removeFailureEventListener(final FailureEventListener listener) { + failureEventListeners.remove(listener); } @Override @@ -163,17 +171,18 @@ public Ds3ClientHelpers.Job withChecksum(final ChecksumFunction checksumFunction @Override public void transfer(final ObjectChannelBuilder channelBuilder) - throws IOException { - running = true; + throws IOException + { + running = true; try (final JobState jobState = new JobState( channelBuilder, this.masterObjectList.getObjects(), partTracker, blobToRanges)) { final ChunkTransferrer chunkTransferrer = new ChunkTransferrer( - new GetObjectTransferrerRetryDecorator(jobState), - this.client, - jobState.getPartTracker(), - this.maxParallelRequests + new GetObjectTransferrerRetryDecorator(jobState), + this.client, + jobState.getPartTracker(), + this.maxParallelRequests ); while (jobState.hasObjects()) { transferNextChunks(chunkTransferrer); @@ -185,29 +194,62 @@ public void transfer(final ObjectChannelBuilder channelBuilder) } } - private void transferNextChunks(final ChunkTransferrer chunkTransferrer) - throws IOException, InterruptedException { - final GetJobChunksReadyForClientProcessingSpectraS3Response availableJobChunks = - this.client.getJobChunksReadyForClientProcessingSpectraS3(new GetJobChunksReadyForClientProcessingSpectraS3Request(this.masterObjectList.getJobId().toString())); - switch(availableJobChunks.getStatus()) { - case AVAILABLE: { - final MasterObjectList availableMol = availableJobChunks.getMasterObjectListResult(); - chunkTransferrer.transferChunks(availableMol.getNodes(), availableMol.getObjects()); - retryAfterLeft = retryAfter; // Reset the number of retries to the initial value - break; + private String getLabelForChunk(final Objects chunk) { + try { + return chunk.getObjects().get(0).getName(); + } catch (final Throwable t) { + LOG.error("Failed to get label for chunk.", t); } - case RETRYLATER: { - if (retryAfterLeft == 0) { - throw new Ds3NoMoreRetriesException(this.retryAfter); + + return "unnamed object"; + } + + private void transferNextChunks(final ChunkTransferrer chunkTransferrer) + throws IOException, InterruptedException + { + try { + final GetJobChunksReadyForClientProcessingSpectraS3Response availableJobChunks = + this.client.getJobChunksReadyForClientProcessingSpectraS3(new GetJobChunksReadyForClientProcessingSpectraS3Request(this.masterObjectList.getJobId().toString())); + switch (availableJobChunks.getStatus()) { + case AVAILABLE: { + final MasterObjectList availableMol = availableJobChunks.getMasterObjectListResult(); + chunkTransferrer.transferChunks(availableMol.getNodes(), availableMol.getObjects()); + retryAfterLeft = retryAfter; // Reset the number of retries to the initial value + break; + } + case RETRYLATER: { + if (retryAfterLeft == 0) { + throw new Ds3NoMoreRetriesException(this.retryAfter); + } + retryAfterLeft--; + final int secondsToDelay = computeDelay(availableJobChunks.getRetryAfterSeconds()); + emitWaitingForChunksEvents(secondsToDelay); + Thread.sleep(secondsToDelay * 1000); + break; + } + default: + assert false : "This line of code should be impossible to hit."; } - retryAfterLeft--; - final int secondsToDelay = computeDelay(availableJobChunks.getRetryAfterSeconds()); - emitWaitingForChunksEvents(secondsToDelay); - Thread.sleep(secondsToDelay * 1000); - break; + } catch (final Throwable t) { + emitFailureEvent(new FailureEvent.Builder() + .doingWhat("getting object") + .withObjectNamed(getLabelForChunk(masterObjectList.getObjects().get(0))) + .usingSystemWithEndpoint(client.getConnectionDetails().getEndpoint()) + .withCausalException(t) + .build()); + + throw t; } - default: - assert false : "This line of code should be impossible to hit."; + } + + private void emitFailureEvent(final FailureEvent failureEvent) { + for (final FailureEventListener failureEventListener : failureEventListeners) { + eventRunner.emitEvent(new Runnable() { + @Override + public void run() { + failureEventListener.onFailure(failureEvent); + } + }); } } diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java index 72017495d..da033b80e 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java @@ -148,13 +148,13 @@ public void removeWaitingForChunksListener(final WaitingForChunksListener listen } @Override - public void attachFailureEventListener(FailureEventListener listener) { + public void attachFailureEventListener(final FailureEventListener listener) { checkRunning(); this.failureEventListeners.add(listener); } @Override - public void removeFailureEventListener(FailureEventListener listener) { + public void removeFailureEventListener(final FailureEventListener listener) { checkRunning(); this.failureEventListeners.remove(listener); } From 070a91fc2a8ee81436706cd83f48b32ca9405838 Mon Sep 17 00:00:00 2001 From: GraciesPadre Date: Wed, 12 Oct 2016 14:53:52 -0600 Subject: [PATCH 3/7] Making failure event types an enum --- .../integration/PutJobManagement_Test.java | 40 +++++--- .../ds3client/helpers/JobImpl.java | 29 ++++++ .../ds3client/helpers/ReadJobImpl.java | 99 ++++++++----------- .../ds3client/helpers/WriteJobImpl.java | 70 +++++-------- .../helpers/events/FailureEvent.java | 23 ++++- 5 files changed, 138 insertions(+), 123 deletions(-) diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java index 12e190842..cc603b379 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java @@ -930,7 +930,7 @@ public boolean handleException(final Throwable t) { }); } - @Test(expected = Ds3NoMoreRetriesException.class) + @Test public void testFiringOnFailureEventWithFailedChunkAllocation() throws IOException, URISyntaxException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { final String tempPathPrefix = null; @@ -952,15 +952,19 @@ public void onFailure(final FailureEvent failureEvent) { writeJob.attachFailureEventListener(failureEventListener); - writeJob.transfer(new FileObjectPutter(tempDirectory)); - - assertEquals(1, numFailureEventsFired.getValue()); - - writeJob.removeFailureEventListener(failureEventListener); + try { + writeJob.transfer(new FileObjectPutter(tempDirectory)); + } catch (final Ds3NoMoreRetriesException e) { + assertEquals(1, numFailureEventsFired.getValue()); - writeJob.transfer(new FileObjectPutter(tempDirectory)); + writeJob.removeFailureEventListener(failureEventListener); - assertEquals(1, numFailureEventsFired.getValue()); + try { + writeJob.transfer(new FileObjectPutter(tempDirectory)); + } catch (final Ds3NoMoreRetriesException nmre) { + assertEquals(1, numFailureEventsFired.getValue()); + } + } } finally { FileUtils.deleteDirectory(tempDirectory.toFile()); deleteAllContents(client, BUCKET_NAME); @@ -996,7 +1000,7 @@ Ds3ClientHelpers.Job createWriteJobWithObjectsReadyToTransfer(final int maxNumOb return writeJob; } - @Test(expected = RuntimeException.class) + @Test public void testFiringOnFailureEventWithFailedPutObject() throws IOException, URISyntaxException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { final String tempPathPrefix = null; @@ -1018,15 +1022,19 @@ public void onFailure(final FailureEvent failureEvent) { writeJob.attachFailureEventListener(failureEventListener); - writeJob.transfer(new FileObjectPutter(tempDirectory)); - - assertEquals(1, numFailureEventsFired.getValue()); - - writeJob.removeFailureEventListener(failureEventListener); + try { + writeJob.transfer(new FileObjectPutter(tempDirectory)); + } catch (final RuntimeException e) { + assertEquals(1, numFailureEventsFired.getValue()); - writeJob.transfer(new FileObjectPutter(tempDirectory)); + writeJob.removeFailureEventListener(failureEventListener); - assertEquals(1, numFailureEventsFired.getValue()); + try { + writeJob.transfer(new FileObjectPutter(tempDirectory)); + } catch (final RuntimeException nmre) { + assertEquals(1, numFailureEventsFired.getValue()); + } + } } finally { FileUtils.deleteDirectory(tempDirectory.toFile()); deleteAllContents(client, BUCKET_NAME); diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/JobImpl.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/JobImpl.java index 802def312..f3da97d62 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/JobImpl.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/JobImpl.java @@ -17,13 +17,20 @@ import com.spectralogic.ds3client.Ds3Client; import com.spectralogic.ds3client.helpers.Ds3ClientHelpers.Job; +import com.spectralogic.ds3client.helpers.events.FailureEvent; import com.spectralogic.ds3client.models.BulkObject; import com.spectralogic.ds3client.models.MasterObjectList; +import com.spectralogic.ds3client.models.Objects; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.UUID; abstract class JobImpl implements Job { + private static final Logger LOG = LoggerFactory.getLogger(JobImpl.class); + protected final Ds3Client client; protected final MasterObjectList masterObjectList; protected boolean running = false; @@ -81,4 +88,26 @@ protected void transferItem( } } } + + protected FailureEvent makeFailureEvent(final FailureEvent.FailureActivity failureActivity, + final Throwable causalException, + final Objects chunk) + { + return new FailureEvent.Builder() + .doingWhat(failureActivity) + .withCausalException(causalException) + .withObjectNamed(getLabelForChunk(chunk)) + .usingSystemWithEndpoint(client.getConnectionDetails().getEndpoint()) + .build(); + } + + protected String getLabelForChunk(final Objects chunk) { + try { + return chunk.getObjects().get(0).getName(); + } catch (final Throwable t) { + LOG.error("Failed to get label for chunk.", t); + } + + return "unnamed object"; + } } diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java index f9a77f11a..295a31c0b 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java @@ -151,11 +151,13 @@ public void removeWaitingForChunksListener(final WaitingForChunksListener listen @Override public void attachFailureEventListener(final FailureEventListener listener) { + checkRunning(); failureEventListeners.add(listener); } @Override public void removeFailureEventListener(final FailureEventListener listener) { + checkRunning(); failureEventListeners.remove(listener); } @@ -173,72 +175,57 @@ public Ds3ClientHelpers.Job withChecksum(final ChecksumFunction checksumFunction public void transfer(final ObjectChannelBuilder channelBuilder) throws IOException { - running = true; - try (final JobState jobState = new JobState( - channelBuilder, - this.masterObjectList.getObjects(), - partTracker, blobToRanges)) { - final ChunkTransferrer chunkTransferrer = new ChunkTransferrer( - new GetObjectTransferrerRetryDecorator(jobState), - this.client, - jobState.getPartTracker(), - this.maxParallelRequests - ); - while (jobState.hasObjects()) { - transferNextChunks(chunkTransferrer); - } - } catch (final RuntimeException | IOException e) { - throw e; - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - private String getLabelForChunk(final Objects chunk) { try { - return chunk.getObjects().get(0).getName(); + running = true; + try (final JobState jobState = new JobState( + channelBuilder, + this.masterObjectList.getObjects(), + partTracker, blobToRanges)) { + final ChunkTransferrer chunkTransferrer = new ChunkTransferrer( + new GetObjectTransferrerRetryDecorator(jobState), + this.client, + jobState.getPartTracker(), + this.maxParallelRequests + ); + while (jobState.hasObjects()) { + transferNextChunks(chunkTransferrer); + } + } catch (final RuntimeException | IOException e) { + throw e; + } catch (final Exception e) { + throw new RuntimeException(e); + } } catch (final Throwable t) { - LOG.error("Failed to get label for chunk.", t); + running = false; + emitFailureEvent(makeFailureEvent(FailureEvent.FailureActivity.GettingObject, t, masterObjectList.getObjects().get(0))); + throw t; } - - return "unnamed object"; } private void transferNextChunks(final ChunkTransferrer chunkTransferrer) throws IOException, InterruptedException { - try { - final GetJobChunksReadyForClientProcessingSpectraS3Response availableJobChunks = - this.client.getJobChunksReadyForClientProcessingSpectraS3(new GetJobChunksReadyForClientProcessingSpectraS3Request(this.masterObjectList.getJobId().toString())); - switch (availableJobChunks.getStatus()) { - case AVAILABLE: { - final MasterObjectList availableMol = availableJobChunks.getMasterObjectListResult(); - chunkTransferrer.transferChunks(availableMol.getNodes(), availableMol.getObjects()); - retryAfterLeft = retryAfter; // Reset the number of retries to the initial value - break; - } - case RETRYLATER: { - if (retryAfterLeft == 0) { - throw new Ds3NoMoreRetriesException(this.retryAfter); - } - retryAfterLeft--; - final int secondsToDelay = computeDelay(availableJobChunks.getRetryAfterSeconds()); - emitWaitingForChunksEvents(secondsToDelay); - Thread.sleep(secondsToDelay * 1000); - break; + final GetJobChunksReadyForClientProcessingSpectraS3Response availableJobChunks = + this.client.getJobChunksReadyForClientProcessingSpectraS3(new GetJobChunksReadyForClientProcessingSpectraS3Request(this.masterObjectList.getJobId().toString())); + switch (availableJobChunks.getStatus()) { + case AVAILABLE: { + final MasterObjectList availableMol = availableJobChunks.getMasterObjectListResult(); + chunkTransferrer.transferChunks(availableMol.getNodes(), availableMol.getObjects()); + retryAfterLeft = retryAfter; // Reset the number of retries to the initial value + break; + } + case RETRYLATER: { + if (retryAfterLeft == 0) { + throw new Ds3NoMoreRetriesException(this.retryAfter); } - default: - assert false : "This line of code should be impossible to hit."; + retryAfterLeft--; + final int secondsToDelay = computeDelay(availableJobChunks.getRetryAfterSeconds()); + emitWaitingForChunksEvents(secondsToDelay); + Thread.sleep(secondsToDelay * 1000); + break; } - } catch (final Throwable t) { - emitFailureEvent(new FailureEvent.Builder() - .doingWhat("getting object") - .withObjectNamed(getLabelForChunk(masterObjectList.getObjects().get(0))) - .usingSystemWithEndpoint(client.getConnectionDetails().getEndpoint()) - .withCausalException(t) - .build()); - - throw t; + default: + assert false : "This line of code should be impossible to hit."; } } diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java index da033b80e..9746b74df 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java @@ -207,12 +207,8 @@ public void transfer(final ObjectChannelBuilder channelBuilder) throw new RuntimeException(e); } } catch (final Throwable t) { - emitFailureEvent(new FailureEvent.Builder() - .doingWhat("putting object") - .withCausalException(t) - .usingSystemWithEndpoint(client.getConnectionDetails().getEndpoint()) - .withObjectNamed(getLabelForChunk(filteredChunks.get(0))) - .build()); + running = false; + emitFailureEvent(makeFailureEvent(FailureEvent.FailureActivity.PuttingObject, t, filteredChunks.get(0))); throw t; } } @@ -228,16 +224,6 @@ public void run() { } } - private String getLabelForChunk(final Objects chunk) { - try { - return chunk.getObjects().get(0).getName(); - } catch (final Throwable t) { - LOG.error("Failed to get label for chunk.", t); - } - - return "unnamed object"; - } - private Objects allocateChunk(final Objects filtered) throws IOException { Objects chunk = null; while (chunk == null) { @@ -252,39 +238,29 @@ private Objects tryAllocateChunk(final Objects filtered) throws IOException { LOG.info("AllocatedJobChunkResponse status: {}", response.getStatus().toString()); - try { - switch (response.getStatus()) { - case ALLOCATED: - retryAfterLeft = retryAfter; // Reset the number of retries to the initial value - return response.getObjectsResult(); - case RETRYLATER: - try { - if (retryAfterLeft == 0) { - throw new Ds3NoMoreRetriesException(this.retryAfter); - } - retryAfterLeft--; - - final int retryAfter = computeRetryAfter(response.getRetryAfterSeconds()); - emitWaitingForChunksEvents(retryAfter); - - LOG.debug("Will retry allocate chunk call after {} seconds", retryAfter); - Thread.sleep(retryAfter * 1000); - return null; - } catch (final InterruptedException e) { - throw new RuntimeException(e); + switch (response.getStatus()) { + case ALLOCATED: + retryAfterLeft = retryAfter; // Reset the number of retries to the initial value + return response.getObjectsResult(); + case RETRYLATER: + try { + if (retryAfterLeft == 0) { + throw new Ds3NoMoreRetriesException(this.retryAfter); } - default: - assert false : "This line of code should be impossible to hit."; + retryAfterLeft--; + + final int retryAfter = computeRetryAfter(response.getRetryAfterSeconds()); + emitWaitingForChunksEvents(retryAfter); + + LOG.debug("Will retry allocate chunk call after {} seconds", retryAfter); + Thread.sleep(retryAfter * 1000); return null; - } - } catch (final Throwable t) { - emitFailureEvent(new FailureEvent.Builder() - .doingWhat("allocating chunk") - .usingSystemWithEndpoint(client.getConnectionDetails().getEndpoint()) - .withObjectNamed(getLabelForChunk(filtered)) - .withCausalException(t) - .build()); - throw t; + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + default: + assert false : "This line of code should be impossible to hit."; + return null; } } diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/events/FailureEvent.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/events/FailureEvent.java index 7d2e344e5..96028a655 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/events/FailureEvent.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/events/FailureEvent.java @@ -52,15 +52,30 @@ public String toString() { return "Failure " + doingWhat + " with object named \"" + objectName + "\" using system with endpoint " + endpoint; } + public enum FailureActivity { + PuttingObject("putting object"), + GettingObject("getting object"); + + private final String activityText; + + FailureActivity(final String activityText) { + this.activityText = activityText; + } + + public String getActivityText() { + return activityText; + } + } + public static class Builder { - private String doingWhat; + private FailureActivity doingWhat; private String withObjectNamed; private String usingSystemWithEndpoint; private Throwable causalException; public Builder() { } - public Builder doingWhat(final String what) { + public Builder doingWhat(final FailureActivity what) { this.doingWhat = what; return this; } @@ -81,12 +96,12 @@ public Builder withCausalException(final Throwable causalException) { } public FailureEvent build() { - Guard.throwOnNullOrEmptyString(doingWhat, "The failed activity may not be null or empty."); + Preconditions.checkNotNull(doingWhat, "The failed activity may not be null or empty."); Guard.throwOnNullOrEmptyString(withObjectNamed, "The name of the object involved in the activity may not be null or empty."); Guard.throwOnNullOrEmptyString(usingSystemWithEndpoint, "The endpoint referenced in the activity may not be null or empty."); Preconditions.checkNotNull(causalException, "The exception causing a failure may not be null."); - return new FailureEvent(doingWhat, withObjectNamed, usingSystemWithEndpoint, causalException); + return new FailureEvent(doingWhat.getActivityText(), withObjectNamed, usingSystemWithEndpoint, causalException); } } } From a0724b6168468ca306c71fc6d7c221143e25a540 Mon Sep 17 00:00:00 2001 From: GraciesPadre Date: Wed, 12 Oct 2016 15:53:30 -0600 Subject: [PATCH 4/7] Refactoring to remove some duplication in read and write job impl --- .../test/helpers/Ds3ClientShimFactory.java | 15 +++ .../Ds3ClientShimWithFailedGetObject.java | 17 ++- .../Ds3ClientShimWithFailedPutObject.java | 15 +++ .../ds3client/helpers/JobImpl.java | 90 +++++++++++++++- .../ds3client/helpers/ReadJobImpl.java | 85 +-------------- .../ds3client/helpers/WriteJobImpl.java | 102 +++--------------- .../helpers/Ds3ClientHelpers_Test.java | 59 +++++++++- 7 files changed, 213 insertions(+), 170 deletions(-) diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimFactory.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimFactory.java index 05b73da58..782ff37bf 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimFactory.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimFactory.java @@ -1,3 +1,18 @@ +/* + * **************************************************************************** + * Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use + * this file except in compliance with the License. A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * **************************************************************************** + */ + package com.spectralogic.ds3client.integration.test.helpers; import com.spectralogic.ds3client.Ds3Client; diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedGetObject.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedGetObject.java index 514e0b494..65932f159 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedGetObject.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedGetObject.java @@ -1,3 +1,18 @@ +/* + * **************************************************************************** + * Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use + * this file except in compliance with the License. A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * **************************************************************************** + */ + package com.spectralogic.ds3client.integration.test.helpers; import com.spectralogic.ds3client.Ds3Client; @@ -6,8 +21,6 @@ import com.spectralogic.ds3client.commands.GetObjectResponse; import com.spectralogic.ds3client.models.JobNode; import com.spectralogic.ds3client.networking.ConnectionDetails; -import com.spectralogic.ds3client.networking.NetworkClient; -import com.spectralogic.ds3client.networking.NetworkClientImpl; import java.io.IOException; import java.lang.reflect.InvocationTargetException; diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedPutObject.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedPutObject.java index c3dc15077..2ace634da 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedPutObject.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/test/helpers/Ds3ClientShimWithFailedPutObject.java @@ -1,3 +1,18 @@ +/* + * **************************************************************************** + * Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use + * this file except in compliance with the License. A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * **************************************************************************** + */ + package com.spectralogic.ds3client.integration.test.helpers; import com.spectralogic.ds3client.Ds3ClientImpl; diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/JobImpl.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/JobImpl.java index f3da97d62..980a99379 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/JobImpl.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/JobImpl.java @@ -15,10 +15,13 @@ package com.spectralogic.ds3client.helpers; +import com.google.common.collect.Sets; import com.spectralogic.ds3client.Ds3Client; import com.spectralogic.ds3client.helpers.Ds3ClientHelpers.Job; +import com.spectralogic.ds3client.helpers.events.EventRunner; import com.spectralogic.ds3client.helpers.events.FailureEvent; import com.spectralogic.ds3client.models.BulkObject; +import com.spectralogic.ds3client.models.ChecksumType; import com.spectralogic.ds3client.models.MasterObjectList; import com.spectralogic.ds3client.models.Objects; @@ -26,6 +29,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Set; import java.util.UUID; abstract class JobImpl implements Job { @@ -36,11 +40,22 @@ abstract class JobImpl implements Job { protected boolean running = false; protected int maxParallelRequests = 10; private final int objectTransferAttempts; + private final EventRunner eventRunner; + private final Set failureEventListeners; + private final Set waitingForChunksListeners; + private final Set checksumListeners; - public JobImpl(final Ds3Client client, final MasterObjectList masterObjectList, final int objectTransferAttempts) { + public JobImpl(final Ds3Client client, + final MasterObjectList masterObjectList, + final int objectTransferAttempts, + final EventRunner eventRunner) { this.client = client; this.masterObjectList = masterObjectList; this.objectTransferAttempts = objectTransferAttempts; + this.eventRunner = eventRunner; + this.failureEventListeners = Sets.newIdentityHashSet(); + this.waitingForChunksListeners = Sets.newIdentityHashSet(); + this.checksumListeners = Sets.newIdentityHashSet(); } @Override @@ -89,6 +104,57 @@ protected void transferItem( } } + protected EventRunner getEventRunner() { + return eventRunner; + } + + @Override + public void attachChecksumListener(final ChecksumListener listener) { + checkRunning(); + this.checksumListeners.add(listener); + } + + @Override + public void removeChecksumListener(final ChecksumListener listener) { + checkRunning(); + this.checksumListeners.remove(listener); + } + + @Override + public void attachWaitingForChunksListener(final WaitingForChunksListener listener) { + checkRunning(); + this.waitingForChunksListeners.add(listener); + } + + @Override + public void removeWaitingForChunksListener(final WaitingForChunksListener listener) { + checkRunning(); + this.waitingForChunksListeners.remove(listener); + } + + @Override + public void attachFailureEventListener(final FailureEventListener listener) { + checkRunning(); + this.failureEventListeners.add(listener); + } + + @Override + public void removeFailureEventListener(final FailureEventListener listener) { + checkRunning(); + this.failureEventListeners.remove(listener); + } + + protected void emitFailureEvent(final FailureEvent failureEvent) { + for (final FailureEventListener failureEventListener : failureEventListeners) { + eventRunner.emitEvent(new Runnable() { + @Override + public void run() { + failureEventListener.onFailure(failureEvent); + } + }); + } + } + protected FailureEvent makeFailureEvent(final FailureEvent.FailureActivity failureActivity, final Throwable causalException, final Objects chunk) @@ -110,4 +176,26 @@ protected String getLabelForChunk(final Objects chunk) { return "unnamed object"; } + + protected void emitWaitingForChunksEvents(final int retryAfter) { + for (final WaitingForChunksListener waitingForChunksListener : waitingForChunksListeners) { + eventRunner.emitEvent(new Runnable() { + @Override + public void run() { + waitingForChunksListener.waiting(retryAfter); + } + }); + } + } + + protected void emitChecksumEvents(final BulkObject bulkObject, final ChecksumType.Type type, final String checksum) { + for (final ChecksumListener listener : checksumListeners) { + getEventRunner().emitEvent(new Runnable() { + @Override + public void run() { + listener.value(bulkObject, type, checksum); + } + }); + } + } } diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java index 295a31c0b..de456fb60 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java @@ -29,7 +29,6 @@ import com.spectralogic.ds3client.helpers.util.PartialObjectHelpers; import com.spectralogic.ds3client.models.*; import com.spectralogic.ds3client.models.common.Range; -import com.spectralogic.ds3client.networking.FailedRequestException; import com.spectralogic.ds3client.networking.Metadata; import com.spectralogic.ds3client.utils.Guard; @@ -47,12 +46,9 @@ class ReadJobImpl extends JobImpl { private final List chunks; private final ImmutableMap> blobToRanges; private final Set metadataListeners; - private final Set checksumListeners; - private final Set waitingForChunksListeners; - private final Set failureEventListeners; + private final int retryAfter; // Negative retryAfter value represent infinity retries private final int retryDelay; // Negative value represents default - private final EventRunner eventRunner; private int retryAfterLeft; // The number of retries left @@ -65,17 +61,13 @@ public ReadJobImpl( final int retryDelay, final EventRunner eventRunner ) { - super(client, masterObjectList, objectTransferAttempts); + super(client, masterObjectList, objectTransferAttempts, eventRunner); this.chunks = this.masterObjectList.getObjects(); this.partTracker = JobPartTrackerFactory .buildPartTracker(getAllBlobApiBeans(chunks), eventRunner); this.blobToRanges = PartialObjectHelpers.mapRangesToBlob(masterObjectList.getObjects(), objectRanges); this.metadataListeners = Sets.newIdentityHashSet(); - this.checksumListeners = Sets.newIdentityHashSet(); - this.waitingForChunksListeners = Sets.newIdentityHashSet(); - this.failureEventListeners = Sets.newIdentityHashSet(); - this.eventRunner = eventRunner; this.retryAfter = this.retryAfterLeft = retryAfter; this.retryDelay = retryDelay; @@ -125,42 +117,6 @@ public void removeMetadataReceivedListener(final MetadataReceivedListener listen this.metadataListeners.remove(listener); } - @Override - public void attachChecksumListener(final ChecksumListener listener) { - checkRunning(); - this.checksumListeners.add(listener); - } - - @Override - public void removeChecksumListener(final ChecksumListener listener) { - checkRunning(); - this.checksumListeners.remove(listener); - } - - @Override - public void attachWaitingForChunksListener(final WaitingForChunksListener listener) { - checkRunning(); - this.waitingForChunksListeners.add(listener); - } - - @Override - public void removeWaitingForChunksListener(final WaitingForChunksListener listener) { - checkRunning(); - this.waitingForChunksListeners.remove(listener); - } - - @Override - public void attachFailureEventListener(final FailureEventListener listener) { - checkRunning(); - failureEventListeners.add(listener); - } - - @Override - public void removeFailureEventListener(final FailureEventListener listener) { - checkRunning(); - failureEventListeners.remove(listener); - } - @Override public Ds3ClientHelpers.Job withMetadata(final Ds3ClientHelpers.MetadataAccess access) { throw new IllegalStateException("withMetadata method is not used with Read Jobs"); @@ -229,17 +185,6 @@ private void transferNextChunks(final ChunkTransferrer chunkTransferrer) } } - private void emitFailureEvent(final FailureEvent failureEvent) { - for (final FailureEventListener failureEventListener : failureEventListeners) { - eventRunner.emitEvent(new Runnable() { - @Override - public void run() { - failureEventListener.onFailure(failureEvent); - } - }); - } - } - private int computeDelay(final int retryAfterSeconds) { if (retryDelay == -1) { return retryAfterSeconds; @@ -248,17 +193,6 @@ private int computeDelay(final int retryAfterSeconds) { } } - private void emitWaitingForChunksEvents(final int secondsToRetry) { - for (final WaitingForChunksListener waitingForChunksListener : waitingForChunksListeners) { - eventRunner.emitEvent(new Runnable() { - @Override - public void run() { - waitingForChunksListener.waiting(secondsToRetry); - } - }); - } - } - private final class GetObjectTransferrerRetryDecorator implements ItemTransferrer { private final GetObjectTransferrer getObjectTransferrer; @@ -300,25 +234,14 @@ public void transferItem(final Ds3Client client, final BulkObject ds3Object) final GetObjectResponse response = client.getObject(request); final Metadata metadata = response.getMetadata(); - sendChecksumEvents(ds3Object, response.getChecksumType(), response.getChecksum()); + emitChecksumEvents(ds3Object, response.getChecksumType(), response.getChecksum()); sendMetadataEvents(ds3Object.getName(), metadata); } } - private void sendChecksumEvents(final BulkObject ds3Object, final ChecksumType.Type type, final String checksum) { - for (final ChecksumListener listener : this.checksumListeners) { - eventRunner.emitEvent(new Runnable() { - @Override - public void run() { - listener.value(ds3Object, type, checksum); - } - }); - } - } - private void sendMetadataEvents(final String objName , final Metadata metadata) { for (final MetadataReceivedListener listener : this.metadataListeners) { - eventRunner.emitEvent(new Runnable() { + getEventRunner().emitEvent(new Runnable() { @Override public void run() { listener.metadataReceived(objName, metadata); diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java index 9746b74df..0bef0fd2f 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java @@ -17,7 +17,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; -import com.google.common.collect.Sets; import com.spectralogic.ds3client.Ds3Client; import com.spectralogic.ds3client.commands.PutObjectRequest; import com.spectralogic.ds3client.commands.spectrads3.AllocateJobChunkSpectraS3Request; @@ -48,10 +47,7 @@ class WriteJobImpl extends JobImpl { private final JobPartTracker partTracker; private final List filteredChunks; private final ChecksumType.Type checksumType; - private final Set checksumListeners; - private final Set waitingForChunksListeners; - private final Set failureEventListeners; - private final EventRunner eventRunner; + private final int retryAfter; // Negative retryAfter value represent infinity retries private final int retryDelay; //Negative value means use default @@ -67,7 +63,7 @@ public WriteJobImpl( final int objectTransferAttempts, final int retryDelay, final EventRunner eventRunner) { - super(client, masterObjectList, objectTransferAttempts); + super(client, masterObjectList, objectTransferAttempts, eventRunner); if (this.masterObjectList == null || this.masterObjectList.getObjects() == null) { LOG.info("Job has no data to transfer"); this.filteredChunks = null; @@ -81,10 +77,6 @@ public WriteJobImpl( } this.retryAfter = this.retryAfterLeft = retryAfter; this.retryDelay = retryDelay; - this.checksumListeners = Sets.newIdentityHashSet(); - this.waitingForChunksListeners = Sets.newIdentityHashSet(); - this.failureEventListeners = Sets.newIdentityHashSet(); - this.eventRunner = eventRunner; this.checksumType = type; } @@ -123,42 +115,6 @@ public void removeMetadataReceivedListener(final MetadataReceivedListener listen throw new IllegalStateException("Metadata listeners are not used with Write jobs"); } - @Override - public void attachChecksumListener(final ChecksumListener listener) { - checkRunning(); - this.checksumListeners.add(listener); - } - - @Override - public void removeChecksumListener(final ChecksumListener listener) { - checkRunning(); - this.checksumListeners.remove(listener); - } - - @Override - public void attachWaitingForChunksListener(final WaitingForChunksListener listener) { - checkRunning(); - this.waitingForChunksListeners.add(listener); - } - - @Override - public void removeWaitingForChunksListener(final WaitingForChunksListener listener) { - checkRunning(); - this.waitingForChunksListeners.remove(listener); - } - - @Override - public void attachFailureEventListener(final FailureEventListener listener) { - checkRunning(); - this.failureEventListeners.add(listener); - } - - @Override - public void removeFailureEventListener(final FailureEventListener listener) { - checkRunning(); - this.failureEventListeners.remove(listener); - } - @Override public Ds3ClientHelpers.Job withMetadata(final Ds3ClientHelpers.MetadataAccess access) { checkRunning(); @@ -213,17 +169,6 @@ public void transfer(final ObjectChannelBuilder channelBuilder) } } - private void emitFailureEvent(final FailureEvent failureEvent) { - for (final FailureEventListener failureEventListener : failureEventListeners) { - eventRunner.emitEvent(new Runnable() { - @Override - public void run() { - failureEventListener.onFailure(failureEvent); - } - }); - } - } - private Objects allocateChunk(final Objects filtered) throws IOException { Objects chunk = null; while (chunk == null) { @@ -264,17 +209,6 @@ private Objects tryAllocateChunk(final Objects filtered) throws IOException { } } - private void emitWaitingForChunksEvents(final int retryAfter) { - for (final WaitingForChunksListener waitingForChunksListener : waitingForChunksListeners) { - eventRunner.emitEvent(new Runnable() { - @Override - public void run() { - waitingForChunksListener.waiting(retryAfter); - } - }); - } - } - private int computeRetryAfter(final int retryAfterSeconds) { if (retryDelay == -1) { return retryAfterSeconds; @@ -286,6 +220,7 @@ private int computeRetryAfter(final int retryAfterSeconds) { /** * Filters out chunks that have already been completed. We will get the same chunk name back from the server, but it * will not have any objects in it, so we remove that from the list of objects that are returned. + * * @param objectsList The list to be filtered * @return The filtered list */ @@ -395,6 +330,7 @@ private String calculateChecksum(final BulkObject ds3Object, final SeekableByteC } private static final int READ_BUFFER_SIZE = 10 * 1024 * 1024; + private String hashInputStream(final Hasher digest, final InputStream stream) throws IOException { final byte[] buffer = new byte[READ_BUFFER_SIZE]; int bytesRead; @@ -414,24 +350,20 @@ private String hashInputStream(final Hasher digest, final InputStream stream) th private Hasher getHasher(final ChecksumType.Type checksumType) { switch (checksumType) { - case MD5: return new MD5Hasher(); - case SHA_256: return new SHA256Hasher(); - case SHA_512: return new SHA512Hasher(); - case CRC_32: return new CRC32Hasher(); - case CRC_32C: return new CRC32CHasher(); - default: throw new RuntimeException("Unknown checksum type " + checksumType.toString()); + case MD5: + return new MD5Hasher(); + case SHA_256: + return new SHA256Hasher(); + case SHA_512: + return new SHA512Hasher(); + case CRC_32: + return new CRC32Hasher(); + case CRC_32C: + return new CRC32CHasher(); + default: + throw new RuntimeException("Unknown checksum type " + checksumType.toString()); } } } - - private void emitChecksumEvents(final BulkObject bulkObject, final ChecksumType.Type type, final String checksum) { - for (final ChecksumListener listener : checksumListeners) { - eventRunner.emitEvent(new Runnable() { - @Override - public void run() { - listener.value(bulkObject, type, checksum); - } - }); - } - } } + diff --git a/ds3-sdk/src/test/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers_Test.java b/ds3-sdk/src/test/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers_Test.java index fdddc0a4f..7ec6a20de 100644 --- a/ds3-sdk/src/test/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers_Test.java +++ b/ds3-sdk/src/test/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers_Test.java @@ -28,6 +28,7 @@ import com.spectralogic.ds3client.models.Error; import com.spectralogic.ds3client.models.Objects; import com.spectralogic.ds3client.models.bulk.Ds3Object; +import com.spectralogic.ds3client.models.common.Credentials; import com.spectralogic.ds3client.networking.ConnectionDetails; import com.spectralogic.ds3client.networking.FailedRequestException; import com.spectralogic.ds3client.utils.ByteArraySeekableByteChannel; @@ -37,6 +38,7 @@ import org.mockito.Mockito; import java.io.IOException; +import java.net.URI; import java.nio.channels.SeekableByteChannel; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -117,7 +119,10 @@ public void testReadObjectsWithFailedGet() throws IOException, ParseException { Mockito.when(ds3Client.getObject(getRequestHas(MYBUCKET, "foo", jobId, 6))).thenThrow(new StubException()); Mockito.when(ds3Client.getObject(getRequestHas(MYBUCKET, "baz", jobId, 6))).then(getObjectAnswer("ntents")); - + + final ConnectionDetails connectionDetails = makeMockConnectionDetails(); + Mockito.when(ds3Client.getConnectionDetails()).thenReturn(connectionDetails); + final Job job = Ds3ClientHelpers.wrap(ds3Client).startReadJob(MYBUCKET, Lists.newArrayList( new Ds3Object("foo"), new Ds3Object("bar"), @@ -132,6 +137,55 @@ public SeekableByteChannel buildChannel(final String key) throws IOException { } }); } + + private ConnectionDetails makeMockConnectionDetails() { + return new ConnectionDetails() { + @Override + public String getEndpoint() { + return "endpoint"; + } + + @Override + public Credentials getCredentials() { + return null; + } + + @Override + public boolean isHttps() { + return false; + } + + @Override + public URI getProxy() { + return null; + } + + @Override + public int getRetries() { + return 0; + } + + @Override + public int getBufferSize() { + return 0; + } + + @Override + public int getConnectionTimeout() { + return 0; + } + + @Override + public int getSocketTimeout() { + return 0; + } + + @Override + public boolean isCertificateVerification() { + return false; + } + }; + } @Test public void testWriteObjects() throws IOException, ParseException { @@ -633,6 +687,9 @@ public void testReadObjectsWithRetryAfter() throws IOException, ParseException { .getJobChunksReadyForClientProcessingSpectraS3(hasJobId(jobId))) .thenReturn(jobChunksResponse); + final ConnectionDetails connectionDetails = makeMockConnectionDetails(); + Mockito.when(ds3Client.getConnectionDetails()).thenReturn(connectionDetails); + final Job job = Ds3ClientHelpers.wrap(ds3Client, 1).startReadJob(MYBUCKET, Lists.newArrayList( new Ds3Object("foo") )); From 71662981b896556e85d3f404753db1d53e87347b Mon Sep 17 00:00:00 2001 From: GraciesPadre Date: Wed, 12 Oct 2016 20:09:32 -0600 Subject: [PATCH 5/7] Cleaning up tests --- .../integration/GetJobManagement_Test.java | 2 + .../integration/PutJobManagement_Test.java | 2 + .../ds3client/helpers/WriteJobImpl.java | 2 +- .../helpers/events/FailureEvent.java | 10 ++-- .../helpers/Ds3ClientHelpers_Test.java | 55 ++----------------- 5 files changed, 14 insertions(+), 57 deletions(-) diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java index 630cc5568..c3c454b26 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java @@ -332,6 +332,7 @@ public void testFiringFailureHandlerWhenGettingChunks() @Override public void onFailure(final FailureEvent failureEvent) { numFailuresRecorded.increment(); + assertEquals(FailureEvent.FailureActivity.GettingObject, failureEvent.doingWhat()); } }; @@ -397,6 +398,7 @@ public void testFiringFailureHandlerWhenGettingObject() @Override public void onFailure(final FailureEvent failureEvent) { numFailuresRecorded.increment(); + assertEquals(FailureEvent.FailureActivity.GettingObject, failureEvent.doingWhat()); } }; diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java index cc603b379..63f8dc601 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java @@ -947,6 +947,7 @@ public void testFiringOnFailureEventWithFailedChunkAllocation() @Override public void onFailure(final FailureEvent failureEvent) { numFailureEventsFired.increment(); + assertEquals(FailureEvent.FailureActivity.PuttingObject, failureEvent.doingWhat()); } }; @@ -1017,6 +1018,7 @@ public void testFiringOnFailureEventWithFailedPutObject() @Override public void onFailure(final FailureEvent failureEvent) { numFailureEventsFired.increment(); + assertEquals(FailureEvent.FailureActivity.PuttingObject, failureEvent.doingWhat()); } }; diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java index b2a23ed17..034de8be7 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java @@ -172,7 +172,7 @@ public void emitWaitingForChunksEvents(final int secondsToDelay) { } } catch (final Throwable t) { running = false; - emitFailureEvent(makeFailureEvent(FailureEvent.FailureActivity.GettingObject, t, masterObjectList.getObjects().get(0))); + emitFailureEvent(makeFailureEvent(FailureEvent.FailureActivity.PuttingObject, t, masterObjectList.getObjects().get(0))); throw t; } } diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/events/FailureEvent.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/events/FailureEvent.java index 96028a655..8cb8d9bd6 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/events/FailureEvent.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/events/FailureEvent.java @@ -19,19 +19,19 @@ import com.spectralogic.ds3client.utils.Guard; public class FailureEvent { - private final String doingWhat; + private final FailureActivity doingWhat; private final String objectName; private final String endpoint; private final Throwable causalException; - private FailureEvent(final String what, final String objectName, final String endpoint, final Throwable causalException) { + private FailureEvent(final FailureActivity what, final String objectName, final String endpoint, final Throwable causalException) { this.doingWhat = what; this.objectName = objectName; this.endpoint = endpoint; this.causalException = causalException; } - public String doingWhat() { + public FailureActivity doingWhat() { return doingWhat; } @@ -49,7 +49,7 @@ public Throwable getCausalException() { @Override public String toString() { - return "Failure " + doingWhat + " with object named \"" + objectName + "\" using system with endpoint " + endpoint; + return "Failure " + doingWhat().getActivityText() + " with object named \"" + withObjectNamed() + "\" using system with endpoint " + usingSystemWithEndpoint(); } public enum FailureActivity { @@ -101,7 +101,7 @@ public FailureEvent build() { Guard.throwOnNullOrEmptyString(usingSystemWithEndpoint, "The endpoint referenced in the activity may not be null or empty."); Preconditions.checkNotNull(causalException, "The exception causing a failure may not be null."); - return new FailureEvent(doingWhat.getActivityText(), withObjectNamed, usingSystemWithEndpoint, causalException); + return new FailureEvent(doingWhat, withObjectNamed, usingSystemWithEndpoint, causalException); } } } diff --git a/ds3-sdk/src/test/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers_Test.java b/ds3-sdk/src/test/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers_Test.java index 1d8414121..891543f9f 100644 --- a/ds3-sdk/src/test/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers_Test.java +++ b/ds3-sdk/src/test/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers_Test.java @@ -119,7 +119,8 @@ public void testReadObjectsWithFailedGet() throws IOException, ParseException { Mockito.when(ds3Client.getObject(getRequestHas(MYBUCKET, "foo", jobId, 6))).thenThrow(new StubException()); Mockito.when(ds3Client.getObject(getRequestHas(MYBUCKET, "baz", jobId, 6))).then(getObjectAnswer("ntents")); - final ConnectionDetails connectionDetails = makeMockConnectionDetails(); + final ConnectionDetails connectionDetails = Mockito.mock(ConnectionDetails.class); + Mockito.when(connectionDetails.getEndpoint()).thenReturn("endpoint"); Mockito.when(ds3Client.getConnectionDetails()).thenReturn(connectionDetails); final Job job = Ds3ClientHelpers.wrap(ds3Client).startReadJob(MYBUCKET, Lists.newArrayList( @@ -137,55 +138,6 @@ public SeekableByteChannel buildChannel(final String key) throws IOException { }); } - private ConnectionDetails makeMockConnectionDetails() { - return new ConnectionDetails() { - @Override - public String getEndpoint() { - return "endpoint"; - } - - @Override - public Credentials getCredentials() { - return null; - } - - @Override - public boolean isHttps() { - return false; - } - - @Override - public URI getProxy() { - return null; - } - - @Override - public int getRetries() { - return 0; - } - - @Override - public int getBufferSize() { - return 0; - } - - @Override - public int getConnectionTimeout() { - return 0; - } - - @Override - public int getSocketTimeout() { - return 0; - } - - @Override - public boolean isCertificateVerification() { - return false; - } - }; - } - @Test public void testWriteObjects() throws IOException, ParseException { final Ds3Client ds3Client = buildDs3ClientForBulk(); @@ -686,7 +638,8 @@ public void testReadObjectsWithRetryAfter() throws IOException, ParseException { .getJobChunksReadyForClientProcessingSpectraS3(hasJobId(jobId))) .thenReturn(jobChunksResponse); - final ConnectionDetails connectionDetails = makeMockConnectionDetails(); + final ConnectionDetails connectionDetails = Mockito.mock(ConnectionDetails.class); + Mockito.when(connectionDetails.getEndpoint()).thenReturn("endpoint"); Mockito.when(ds3Client.getConnectionDetails()).thenReturn(connectionDetails); final Job job = Ds3ClientHelpers.wrap(ds3Client, 1).startReadJob(MYBUCKET, Lists.newArrayList( From a06c5ec765b66083aa78d27b6cb107b0d00f7752 Mon Sep 17 00:00:00 2001 From: GraciesPadre Date: Thu, 13 Oct 2016 09:57:44 -0600 Subject: [PATCH 6/7] Cleaning up tests --- .../src/test/java/com/spectralogic/ds3client/MockedHeaders.java | 2 +- .../src/test/java/com/spectralogic/ds3client/MockedHeaders.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/MockedHeaders.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/MockedHeaders.java index 8e1296bf8..b19c35a3d 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/MockedHeaders.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/MockedHeaders.java @@ -31,7 +31,7 @@ public MockedHeaders(final Map headerValues) { } private static Map normalizeHeaderValues(final Map headerValues) { - final HashMap headers = new HashMap<>(); + final Map headers = new HashMap<>(); for (final Map.Entry entry : headerValues.entrySet()) { headers.put(entry.getKey().toLowerCase(), entry.getValue()); } diff --git a/ds3-sdk/src/test/java/com/spectralogic/ds3client/MockedHeaders.java b/ds3-sdk/src/test/java/com/spectralogic/ds3client/MockedHeaders.java index 8e1296bf8..b19c35a3d 100644 --- a/ds3-sdk/src/test/java/com/spectralogic/ds3client/MockedHeaders.java +++ b/ds3-sdk/src/test/java/com/spectralogic/ds3client/MockedHeaders.java @@ -31,7 +31,7 @@ public MockedHeaders(final Map headerValues) { } private static Map normalizeHeaderValues(final Map headerValues) { - final HashMap headers = new HashMap<>(); + final Map headers = new HashMap<>(); for (final Map.Entry entry : headerValues.entrySet()) { headers.put(entry.getKey().toLowerCase(), entry.getValue()); } From 899ef4dbb923151760b3bc99c7fe909ec644d0b1 Mon Sep 17 00:00:00 2001 From: GraciesPadre Date: Thu, 13 Oct 2016 10:32:51 -0600 Subject: [PATCH 7/7] Removing a misguided attempt to take a job out of running state. --- .../integration/GetJobManagement_Test.java | 16 ---------------- .../integration/PutJobManagement_Test.java | 16 ---------------- .../ds3client/helpers/ReadJobImpl.java | 1 - .../ds3client/helpers/WriteJobImpl.java | 3 +-- 4 files changed, 1 insertion(+), 35 deletions(-) diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java index c3c454b26..b0cff760b 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java @@ -344,14 +344,6 @@ public void onFailure(final FailureEvent failureEvent) { readJob.transfer(new FileObjectGetter(tempDirectory)); } catch (final IOException e) { assertEquals(1, numFailuresRecorded.getValue()); - - readJob.removeFailureEventListener(failureEventListener); - - try { - readJob.transfer(new FileObjectGetter(tempDirectory)); - } catch (final IOException ioe) { - assertEquals(1, numFailuresRecorded.getValue()); - } } } finally { FileUtils.deleteDirectory(tempDirectory.toFile()); @@ -410,14 +402,6 @@ public void onFailure(final FailureEvent failureEvent) { readJob.transfer(new FileObjectGetter(tempDirectory)); } catch (final IOException e) { assertEquals(1, numFailuresRecorded.getValue()); - - readJob.removeFailureEventListener(failureEventListener); - - try { - readJob.transfer(new FileObjectGetter(tempDirectory)); - } catch (final IOException ioe) { - assertEquals(1, numFailuresRecorded.getValue()); - } } } finally { FileUtils.deleteDirectory(tempDirectory.toFile()); diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java index 63f8dc601..178d84980 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java @@ -957,14 +957,6 @@ public void onFailure(final FailureEvent failureEvent) { writeJob.transfer(new FileObjectPutter(tempDirectory)); } catch (final Ds3NoMoreRetriesException e) { assertEquals(1, numFailureEventsFired.getValue()); - - writeJob.removeFailureEventListener(failureEventListener); - - try { - writeJob.transfer(new FileObjectPutter(tempDirectory)); - } catch (final Ds3NoMoreRetriesException nmre) { - assertEquals(1, numFailureEventsFired.getValue()); - } } } finally { FileUtils.deleteDirectory(tempDirectory.toFile()); @@ -1028,14 +1020,6 @@ public void onFailure(final FailureEvent failureEvent) { writeJob.transfer(new FileObjectPutter(tempDirectory)); } catch (final RuntimeException e) { assertEquals(1, numFailureEventsFired.getValue()); - - writeJob.removeFailureEventListener(failureEventListener); - - try { - writeJob.transfer(new FileObjectPutter(tempDirectory)); - } catch (final RuntimeException nmre) { - assertEquals(1, numFailureEventsFired.getValue()); - } } } finally { FileUtils.deleteDirectory(tempDirectory.toFile()); diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java index 0776fa089..b86896ff6 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java @@ -161,7 +161,6 @@ public void emitWaitingForChunksEvents(final int secondsToDelay) { throw new RuntimeException(e); } } catch (final Throwable t) { - running = false; emitFailureEvent(makeFailureEvent(FailureEvent.FailureActivity.GettingObject, t, masterObjectList.getObjects().get(0))); throw t; } diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java index 034de8be7..9fa1fde97 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java @@ -50,7 +50,6 @@ class WriteJobImpl extends JobImpl { private final int retryAfter; // Negative retryAfter value represent infinity retries private final int retryDelay; //Negative value means use default - private int retryAfterLeft; // The number of retries left private Ds3ClientHelpers.MetadataAccess metadataAccess = null; private ChecksumFunction checksumFunction = null; @@ -74,7 +73,7 @@ public WriteJobImpl( this.partTracker = JobPartTrackerFactory .buildPartTracker(ReadJobImpl.getAllBlobApiBeans(filteredChunks), eventRunner); } - this.retryAfter = this.retryAfterLeft = retryAfter; + this.retryAfter = retryAfter; this.retryDelay = retryDelay; this.checksumType = type;