From e4bd4175372a4a1f6cfa8e1290867d312dbfc085 Mon Sep 17 00:00:00 2001 From: bbpennel Date: Wed, 23 Mar 2016 11:50:01 -0400 Subject: [PATCH 1/5] Using a custom HttpMethodRetryHandler to retry interrupted connections to management api and ACL lookup api --- .../dl/fedora/FedoraAccessControlService.java | 4 ++ .../unc/lib/dl/fedora/ManagementClient.java | 3 ++ ...tionInterruptedHttpMethodRetryHandler.java | 43 +++++++++++++++++++ 3 files changed, 50 insertions(+) create mode 100644 metadata/src/main/java/edu/unc/lib/dl/httpclient/ConnectionInterruptedHttpMethodRetryHandler.java diff --git a/fcrepo-clients/src/main/java/edu/unc/lib/dl/fedora/FedoraAccessControlService.java b/fcrepo-clients/src/main/java/edu/unc/lib/dl/fedora/FedoraAccessControlService.java index 4a4df61246..ad34c04a19 100644 --- a/fcrepo-clients/src/main/java/edu/unc/lib/dl/fedora/FedoraAccessControlService.java +++ b/fcrepo-clients/src/main/java/edu/unc/lib/dl/fedora/FedoraAccessControlService.java @@ -26,6 +26,7 @@ import org.apache.commons.httpclient.NameValuePair; import org.apache.commons.httpclient.UsernamePasswordCredentials; import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.params.HttpMethodParams; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +36,7 @@ import edu.unc.lib.dl.acl.util.ObjectAccessControlsBean; import edu.unc.lib.dl.acl.util.Permission; import edu.unc.lib.dl.fedora.PID; +import edu.unc.lib.dl.httpclient.ConnectionInterruptedHttpMethodRetryHandler; import edu.unc.lib.dl.httpclient.HttpClientUtil; /** @@ -62,6 +64,8 @@ public FedoraAccessControlService() { public void init() { UsernamePasswordCredentials creds = new UsernamePasswordCredentials(username, password); httpClient.getState().setCredentials(HttpClientUtil.getAuthenticationScope(aclEndpointUrl), creds); + httpClient.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, + new ConnectionInterruptedHttpMethodRetryHandler(10, 3000L)); } public void destroy() { diff --git a/fcrepo-clients/src/main/java/edu/unc/lib/dl/fedora/ManagementClient.java b/fcrepo-clients/src/main/java/edu/unc/lib/dl/fedora/ManagementClient.java index e4fee5762a..b69d69c8dc 100644 --- a/fcrepo-clients/src/main/java/edu/unc/lib/dl/fedora/ManagementClient.java +++ b/fcrepo-clients/src/main/java/edu/unc/lib/dl/fedora/ManagementClient.java @@ -98,6 +98,7 @@ import edu.unc.lib.dl.fedora.types.PurgeObjectResponse; import edu.unc.lib.dl.fedora.types.SetDatastreamVersionable; import edu.unc.lib.dl.fedora.types.SetDatastreamVersionableResponse; +import edu.unc.lib.dl.httpclient.ConnectionInterruptedHttpMethodRetryHandler; import edu.unc.lib.dl.httpclient.HttpClientUtil; import edu.unc.lib.dl.util.ContentModelHelper; import edu.unc.lib.dl.util.IllegalRepositoryStateException; @@ -479,6 +480,8 @@ private void initializeConnections() throws Exception { this.httpClient = HttpClientUtil.getAuthenticatedClient(this.getFedoraContextUrl(), this.getUsername(), this.getPassword(), this.httpManager); + httpClient.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, + new ConnectionInterruptedHttpMethodRetryHandler(10, 3000L)); } public void destroy() { diff --git a/metadata/src/main/java/edu/unc/lib/dl/httpclient/ConnectionInterruptedHttpMethodRetryHandler.java b/metadata/src/main/java/edu/unc/lib/dl/httpclient/ConnectionInterruptedHttpMethodRetryHandler.java new file mode 100644 index 0000000000..a430b40059 --- /dev/null +++ b/metadata/src/main/java/edu/unc/lib/dl/httpclient/ConnectionInterruptedHttpMethodRetryHandler.java @@ -0,0 +1,43 @@ +package edu.unc.lib.dl.httpclient; + +import java.io.IOException; +import java.net.SocketException; + +import org.apache.commons.httpclient.HttpMethod; +import org.apache.commons.httpclient.HttpMethodRetryHandler; +import org.apache.commons.httpclient.NoHttpResponseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConnectionInterruptedHttpMethodRetryHandler implements HttpMethodRetryHandler { + private static final Logger log = LoggerFactory.getLogger(ConnectionInterruptedHttpMethodRetryHandler.class); + + private int retries = 5; + private long retryDelay = 0; + + public ConnectionInterruptedHttpMethodRetryHandler(int retries, long retryDelay) { + super(); + this.retries = retries; + this.retryDelay = retryDelay; + } + + @Override + public boolean retryMethod(final HttpMethod method, final IOException e, int executionCount) { + if (executionCount >= retries) { + return false; + } + if (e instanceof NoHttpResponseException || e instanceof SocketException) { + log.warn("Connection interrupted, retrying connection to {}", method.getPath()); + if (retryDelay > 0) { + try { + Thread.sleep(retryDelay); + } catch (InterruptedException e1) { + log.warn("Interrupted while waiting to retry connect"); + } + } + return true; + } + return false; + } + +} From 7d66070156a7975ce00e60bc7007cbbdb21db34b Mon Sep 17 00:00:00 2001 From: bbpennel Date: Wed, 23 Mar 2016 17:20:18 -0400 Subject: [PATCH 2/5] Using supplied filesize while verifying if checksum is okay, instead of pulling from the file system. Added a test to ensure that objects with a tag staging url that come back as a duplicate will not fail the ingest because of not being able to retrieve the path from the tag --- .../lib/deposit/fcrepo3/IngestDeposit.java | 30 ++++----- .../deposit/fcrepo3/IngestDepositTest.java | 64 +++++++++++++++++++ 2 files changed, 79 insertions(+), 15 deletions(-) diff --git a/deposit/src/main/java/edu/unc/lib/deposit/fcrepo3/IngestDeposit.java b/deposit/src/main/java/edu/unc/lib/deposit/fcrepo3/IngestDeposit.java index 1bd7c2eeb4..1e9e0a3616 100644 --- a/deposit/src/main/java/edu/unc/lib/deposit/fcrepo3/IngestDeposit.java +++ b/deposit/src/main/java/edu/unc/lib/deposit/fcrepo3/IngestDeposit.java @@ -9,8 +9,6 @@ import java.net.ConnectException; import java.net.URI; import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; @@ -387,6 +385,7 @@ private void ingestObject(String ingestPid, boolean confirmExisting) throws Depo // Record FOXML throughput metrics metricsClient.incrDepositFileThroughput(foxml.length()); + return; } catch (ServiceException e) { waitIfConnectionLostOrRethrow(e); @@ -398,7 +397,9 @@ private void ingestObject(String ingestPid, boolean confirmExisting) throws Depo + e.getLocalizedMessage()); } catch (ObjectExistsException e) { if (confirmExisting || isDuplicateOkay(pid)) { - ingestsAwaitingConfirmation.remove(ingestPid); + if (ingestsAwaitingConfirmation.remove(ingestPid)) { + addClicks(1); + } } else { throw new DepositException("Object " + pid.getPid() + " already exists in the repository.", e); } @@ -426,23 +427,22 @@ private boolean isDuplicateOkay(PID pid) { Property stagingLocation = dprop(model, DepositRelationship.stagingLocation); if (!objectResc.hasProperty(stagingLocation)) { - // No staging location, so nothing further to check + // No staging location, no file, no reason to check further return true; } - String fileLocation = objectResc.getProperty(stagingLocation).getString(); - fileLocation = new URI(fileLocation).getPath(); - - // Confirm that incoming file is the same size as the one in the repository - long incomingSize = Files.size( - Paths.get(this.getDepositDirectory().getAbsolutePath(), fileLocation)); - // Get information for copy in the repository Datastream ds = client.getDatastream(pid, DATA_FILE.getName()); - if (incomingSize != ds.getSize() && !(ds.getSize() == -1 && incomingSize == 0)) { - // File sizes didn't match, so this is not the correct file - return false; + // Confirm that incoming file is the same size as the one in the repository + Property filesizeProperty = dprop(model, DepositRelationship.size); + if (objectResc.hasProperty(filesizeProperty)) { + long incomingSize = Long.parseLong(objectResc.getProperty(filesizeProperty).getString()); + + if (incomingSize != ds.getSize() && !(ds.getSize() == -1 && incomingSize == 0)) { + // File sizes didn't match, so this is not the correct file + return false; + } } // If a checksum is available, make sure it matches the one in the repository @@ -453,7 +453,7 @@ private boolean isDuplicateOkay(PID pid) { } return true; - } catch (FedoraException | IOException | URISyntaxException e1) { + } catch (FedoraException e1) { log.debug("Failed to get datastream info while checking on duplicate for {}", pid, e1); } finally { closeModel(); diff --git a/deposit/src/test/java/edu/unc/lib/deposit/fcrepo3/IngestDepositTest.java b/deposit/src/test/java/edu/unc/lib/deposit/fcrepo3/IngestDepositTest.java index 8f8f91b56a..200e3822d7 100644 --- a/deposit/src/test/java/edu/unc/lib/deposit/fcrepo3/IngestDepositTest.java +++ b/deposit/src/test/java/edu/unc/lib/deposit/fcrepo3/IngestDepositTest.java @@ -15,7 +15,9 @@ */ package edu.unc.lib.deposit.fcrepo3; +import static edu.unc.lib.deposit.work.DepositGraphUtils.dprop; import static edu.unc.lib.dl.test.TestHelpers.setField; +import static edu.unc.lib.dl.util.ContentModelHelper.Datastream.DATA_FILE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -31,6 +33,7 @@ import static org.mockito.MockitoAnnotations.initMocks; import java.io.File; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -53,6 +56,7 @@ import com.hp.hpl.jena.query.Dataset; import com.hp.hpl.jena.rdf.model.Bag; import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.Property; import com.hp.hpl.jena.rdf.model.Resource; import com.hp.hpl.jena.tdb.TDBFactory; @@ -64,6 +68,7 @@ import edu.unc.lib.dl.fedora.ListenerJob; import edu.unc.lib.dl.fedora.ManagementClient; import edu.unc.lib.dl.fedora.ManagementClient.Format; +import edu.unc.lib.dl.fedora.ObjectExistsException; import edu.unc.lib.dl.fedora.PID; import edu.unc.lib.dl.fedora.types.ObjectProfile; import edu.unc.lib.dl.reporting.ActivityMetricsClient; @@ -71,8 +76,11 @@ import edu.unc.lib.dl.util.DepositStatusFactory; import edu.unc.lib.dl.util.JobStatusFactory; import edu.unc.lib.dl.util.PremisEventLogger; +import edu.unc.lib.dl.util.ContentModelHelper.DepositRelationship; +import edu.unc.lib.dl.util.ContentModelHelper.Relationship; import edu.unc.lib.dl.util.RedisWorkerConstants.DepositField; import edu.unc.lib.dl.util.RedisWorkerConstants.DepositState; +import edu.unc.lib.dl.util.TripleStoreQueryService; /** * @author bbpennel @@ -112,6 +120,8 @@ public class IngestDepositTest { Collection ingestsAwaitingConfirmation; @Mock private ActivityMetricsClient metricsClient; + @Mock + private TripleStoreQueryService tsqs; private IngestDeposit job; @@ -156,6 +166,7 @@ private void createJob(String depositUuid, String depositDirectoryPath) throws E setField(job, "accessClient", accessClient); setField(job, "digitalObjectManager", digitalObjectManager); setField(job, "metricsClient", metricsClient); + setField(job, "tsqs", tsqs); depositStatus.put(DepositField.containerId.name(), "uuid:destination"); depositStatus.put(DepositField.excludeDepositRecord.name(), "false"); @@ -339,6 +350,59 @@ public void uncaughtException(Thread th, Throwable ex) { assertTrue("Job must have been unregistered", jmsListener.registeredJob); } + + /** + * Testing to see that getting an ObjectExistsException on an object with a tag uri for + * its staging location is recoverable. + * + * @throws Exception + */ + @Test + public void testTagDuplicate() throws Exception { + + PID filePid = new PID("info:fedora/uuid:2a5d0363-899b-402d-981b-392a553e17a1"); + + Model model = job.getWritableModel(); + Resource fileResc = model.getResource(filePid.getURI()); + Property stagingLocation = dprop(model, DepositRelationship.stagingLocation); + model.add(fileResc, stagingLocation, "tag:testuser@localhost,2016:/test.txt"); + job.closeModel(); + + when(tsqs.fetchBySubjectAndPredicate(any(PID.class), anyString())) + .thenReturn(Arrays.asList("info:fedora/uuid:bd5ff703-9c2e-466b-b4cc-15bbfd03c8ae")); + + when(client.ingestRaw(any(byte[].class), any(Format.class), anyString())).thenReturn(new PID("pid")) + .thenThrow(new ObjectExistsException("")).thenReturn(new PID("pid")); + + Thread.UncaughtExceptionHandler jobFailedHandler = new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread th, Throwable ex) { + fail("Uncaught exception, job should have completed."); + } + }; + + Thread jobThread = new Thread(job); + Thread finishThread = new Thread(jmsListener); + + jobThread.setUncaughtExceptionHandler(jobFailedHandler); + + jobThread.start(); + finishThread.start(); + + // Start processing with a timelimit to prevent infinite wait in case of failure + jobThread.join(5000L); + finishThread.join(5000L); + + // All ingests, including the timed out object, should have registered as a click + verify(jobStatusFactory, times(job.getIngestObjectCount() + 1)).incrCompletion(eq(job.getJobUUID()), eq(1)); + + // All objects should have been ingested despite the timeout + verify(client, times(job.getIngestObjectCount() + 1)) + .ingestRaw(any(byte[].class), any(Format.class), anyString()); + + // Determine that + verify(client).getDatastream(eq(filePid), eq(DATA_FILE.getName())); + } @Test public void testRunExcludeDepositRecord() throws Exception { From b1f325dc233455cd481ea0338499d8938878be51 Mon Sep 17 00:00:00 2001 From: bbpennel Date: Wed, 23 Mar 2016 17:22:13 -0400 Subject: [PATCH 3/5] Storing the size of simple object file uploads in the deposit model, for consistency and use further down the pipeline. Also causing the deposit to fail rather than continue if the path to the file can't be constructed --- .../java/edu/unc/lib/deposit/normalize/Simple2N3BagJob.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/deposit/src/main/java/edu/unc/lib/deposit/normalize/Simple2N3BagJob.java b/deposit/src/main/java/edu/unc/lib/deposit/normalize/Simple2N3BagJob.java index a71d6dad47..0ff6c0c0c6 100644 --- a/deposit/src/main/java/edu/unc/lib/deposit/normalize/Simple2N3BagJob.java +++ b/deposit/src/main/java/edu/unc/lib/deposit/normalize/Simple2N3BagJob.java @@ -18,6 +18,7 @@ import static edu.unc.lib.deposit.work.DepositGraphUtils.dprop; import static edu.unc.lib.deposit.work.DepositGraphUtils.fprop; import static edu.unc.lib.dl.util.ContentModelHelper.DepositRelationship.label; +import static edu.unc.lib.dl.util.ContentModelHelper.DepositRelationship.size; import static edu.unc.lib.dl.util.ContentModelHelper.DepositRelationship.stagingLocation; import static edu.unc.lib.dl.util.ContentModelHelper.FedoraProperty.hasModel; import static edu.unc.lib.dl.util.ContentModelHelper.Model.AGGREGATE_WORK; @@ -118,6 +119,7 @@ private void populateSimple(Model model, Resource primaryResource, String alabel if(alabel == null) alabel = contentFile.getName(); model.add(primaryResource, dprop(model, label), alabel); + model.add(primaryResource, dprop(model, size), Long.toString(contentFile.length())); if (mimetype != null) { model.add(primaryResource, dprop(model, DepositRelationship.mimetype), mimetype); } @@ -127,7 +129,7 @@ private void populateSimple(Model model, Resource primaryResource, String alabel model.add(primaryResource, dprop(model, stagingLocation), DepositConstants.DATA_DIR + "/" + UriUtils.encodePathSegment(contentFile.getName(), "UTF-8")); } catch (UnsupportedEncodingException e) { - log.error("fail to encode filepath {}", contentFile.getName(), e); + failJob(e, "Failed to add staging location for {} due to encoding issues", contentFile.getName()); } } From d11889b5bafddba011b27426a00a2aeea1b1d362 Mon Sep 17 00:00:00 2001 From: daines Date: Thu, 31 Mar 2016 16:12:01 -0400 Subject: [PATCH 4/5] Fix broken staging URI test Plus signs are now interpreted literally in tag: URIs. --- .../src/test/java/StagingExternalContentManagerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fcrepo-irods-storage/src/test/java/StagingExternalContentManagerTest.java b/fcrepo-irods-storage/src/test/java/StagingExternalContentManagerTest.java index 94ac9ce1c9..c2e72fe8ad 100644 --- a/fcrepo-irods-storage/src/test/java/StagingExternalContentManagerTest.java +++ b/fcrepo-irods-storage/src/test/java/StagingExternalContentManagerTest.java @@ -42,7 +42,7 @@ public void testTagLocally() throws ServerException { Stages stages = externalContentManager.getStages(); StagingArea shc = stages.getAllAreas().get(URI.create("tag:cdr.lib.unc.edu,2013:/storhouse_shc/")); assertTrue("The SHC test stage must be connected: "+shc.getStatus(), shc.isConnected()); - String testURL = "tag:joey@cdr.lib.unc.edu,2013:/storhouse_shc/my+project/_-1/test+file.txt"; + String testURL = "tag:joey@cdr.lib.unc.edu,2013:/storhouse_shc/my%20project/_-1/test%20file.txt"; ContentManagerParams params = new ContentManagerParams(testURL); MIMETypedStream mts = externalContentManager.getExternalContent(params); assertNotNull("Stream result must not be null", mts); From f5b1ca0cd5d2f27ca94c1e3a598b8e57141574a0 Mon Sep 17 00:00:00 2001 From: Ben Pennell Date: Thu, 31 Mar 2016 16:15:35 -0400 Subject: [PATCH 5/5] Finished incomplete comment --- .../java/edu/unc/lib/deposit/fcrepo3/IngestDepositTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deposit/src/test/java/edu/unc/lib/deposit/fcrepo3/IngestDepositTest.java b/deposit/src/test/java/edu/unc/lib/deposit/fcrepo3/IngestDepositTest.java index 200e3822d7..ad3806435f 100644 --- a/deposit/src/test/java/edu/unc/lib/deposit/fcrepo3/IngestDepositTest.java +++ b/deposit/src/test/java/edu/unc/lib/deposit/fcrepo3/IngestDepositTest.java @@ -400,7 +400,7 @@ public void uncaughtException(Thread th, Throwable ex) { verify(client, times(job.getIngestObjectCount() + 1)) .ingestRaw(any(byte[].class), any(Format.class), anyString()); - // Determine that + // Determine that the remote datastream's information was pulled for comparison purposes verify(client).getDatastream(eq(filePid), eq(DATA_FILE.getName())); }