Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
bbpennel committed Apr 5, 2016
2 parents f15dcb0 + f3c6efc commit f3b4b9f
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 17 deletions.
Expand Up @@ -9,8 +9,6 @@
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -387,6 +385,7 @@ private void ingestObject(String ingestPid, boolean confirmExisting) throws Depo

// Record FOXML throughput metrics
metricsClient.incrDepositFileThroughput(foxml.length());

return;
} catch (ServiceException e) {
waitIfConnectionLostOrRethrow(e);
Expand All @@ -398,7 +397,9 @@ private void ingestObject(String ingestPid, boolean confirmExisting) throws Depo
+ e.getLocalizedMessage());
} catch (ObjectExistsException e) {
if (confirmExisting || isDuplicateOkay(pid)) {
ingestsAwaitingConfirmation.remove(ingestPid);
if (ingestsAwaitingConfirmation.remove(ingestPid)) {
addClicks(1);
}
} else {
throw new DepositException("Object " + pid.getPid() + " already exists in the repository.", e);
}
Expand Down Expand Up @@ -426,23 +427,22 @@ private boolean isDuplicateOkay(PID pid) {

Property stagingLocation = dprop(model, DepositRelationship.stagingLocation);
if (!objectResc.hasProperty(stagingLocation)) {
// No staging location, so nothing further to check
// No staging location, no file, no reason to check further
return true;
}

String fileLocation = objectResc.getProperty(stagingLocation).getString();
fileLocation = new URI(fileLocation).getPath();

// Confirm that incoming file is the same size as the one in the repository
long incomingSize = Files.size(
Paths.get(this.getDepositDirectory().getAbsolutePath(), fileLocation));

// Get information for copy in the repository
Datastream ds = client.getDatastream(pid, DATA_FILE.getName());

if (incomingSize != ds.getSize() && !(ds.getSize() == -1 && incomingSize == 0)) {
// File sizes didn't match, so this is not the correct file
return false;
// Confirm that incoming file is the same size as the one in the repository
Property filesizeProperty = dprop(model, DepositRelationship.size);
if (objectResc.hasProperty(filesizeProperty)) {
long incomingSize = Long.parseLong(objectResc.getProperty(filesizeProperty).getString());

if (incomingSize != ds.getSize() && !(ds.getSize() == -1 && incomingSize == 0)) {
// File sizes didn't match, so this is not the correct file
return false;
}
}

// If a checksum is available, make sure it matches the one in the repository
Expand All @@ -453,7 +453,7 @@ private boolean isDuplicateOkay(PID pid) {
}

return true;
} catch (FedoraException | IOException | URISyntaxException e1) {
} catch (FedoraException e1) {
log.debug("Failed to get datastream info while checking on duplicate for {}", pid, e1);
} finally {
closeModel();
Expand Down
Expand Up @@ -18,6 +18,7 @@
import static edu.unc.lib.deposit.work.DepositGraphUtils.dprop;
import static edu.unc.lib.deposit.work.DepositGraphUtils.fprop;
import static edu.unc.lib.dl.util.ContentModelHelper.DepositRelationship.label;
import static edu.unc.lib.dl.util.ContentModelHelper.DepositRelationship.size;
import static edu.unc.lib.dl.util.ContentModelHelper.DepositRelationship.stagingLocation;
import static edu.unc.lib.dl.util.ContentModelHelper.FedoraProperty.hasModel;
import static edu.unc.lib.dl.util.ContentModelHelper.Model.AGGREGATE_WORK;
Expand Down Expand Up @@ -118,6 +119,7 @@ private void populateSimple(Model model, Resource primaryResource, String alabel

if(alabel == null) alabel = contentFile.getName();
model.add(primaryResource, dprop(model, label), alabel);
model.add(primaryResource, dprop(model, size), Long.toString(contentFile.length()));
if (mimetype != null) {
model.add(primaryResource, dprop(model, DepositRelationship.mimetype), mimetype);
}
Expand All @@ -127,7 +129,7 @@ private void populateSimple(Model model, Resource primaryResource, String alabel
model.add(primaryResource, dprop(model, stagingLocation),
DepositConstants.DATA_DIR + "/" + UriUtils.encodePathSegment(contentFile.getName(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
log.error("fail to encode filepath {}", contentFile.getName(), e);
failJob(e, "Failed to add staging location for {} due to encoding issues", contentFile.getName());
}
}

Expand Down
Expand Up @@ -15,7 +15,9 @@
*/
package edu.unc.lib.deposit.fcrepo3;

import static edu.unc.lib.deposit.work.DepositGraphUtils.dprop;
import static edu.unc.lib.dl.test.TestHelpers.setField;
import static edu.unc.lib.dl.util.ContentModelHelper.Datastream.DATA_FILE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -31,6 +33,7 @@
import static org.mockito.MockitoAnnotations.initMocks;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -53,6 +56,7 @@
import com.hp.hpl.jena.query.Dataset;
import com.hp.hpl.jena.rdf.model.Bag;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.Property;
import com.hp.hpl.jena.rdf.model.Resource;
import com.hp.hpl.jena.tdb.TDBFactory;

Expand All @@ -64,15 +68,19 @@
import edu.unc.lib.dl.fedora.ListenerJob;
import edu.unc.lib.dl.fedora.ManagementClient;
import edu.unc.lib.dl.fedora.ManagementClient.Format;
import edu.unc.lib.dl.fedora.ObjectExistsException;
import edu.unc.lib.dl.fedora.PID;
import edu.unc.lib.dl.fedora.types.ObjectProfile;
import edu.unc.lib.dl.reporting.ActivityMetricsClient;
import edu.unc.lib.dl.services.DigitalObjectManager;
import edu.unc.lib.dl.util.DepositStatusFactory;
import edu.unc.lib.dl.util.JobStatusFactory;
import edu.unc.lib.dl.util.PremisEventLogger;
import edu.unc.lib.dl.util.ContentModelHelper.DepositRelationship;
import edu.unc.lib.dl.util.ContentModelHelper.Relationship;
import edu.unc.lib.dl.util.RedisWorkerConstants.DepositField;
import edu.unc.lib.dl.util.RedisWorkerConstants.DepositState;
import edu.unc.lib.dl.util.TripleStoreQueryService;

/**
* @author bbpennel
Expand Down Expand Up @@ -112,6 +120,8 @@ public class IngestDepositTest {
Collection<String> ingestsAwaitingConfirmation;
@Mock
private ActivityMetricsClient metricsClient;
@Mock
private TripleStoreQueryService tsqs;

private IngestDeposit job;

Expand Down Expand Up @@ -156,6 +166,7 @@ private void createJob(String depositUuid, String depositDirectoryPath) throws E
setField(job, "accessClient", accessClient);
setField(job, "digitalObjectManager", digitalObjectManager);
setField(job, "metricsClient", metricsClient);
setField(job, "tsqs", tsqs);

depositStatus.put(DepositField.containerId.name(), "uuid:destination");
depositStatus.put(DepositField.excludeDepositRecord.name(), "false");
Expand Down Expand Up @@ -339,6 +350,59 @@ public void uncaughtException(Thread th, Throwable ex) {
assertTrue("Job must have been unregistered", jmsListener.registeredJob);

}

/**
* Testing to see that getting an ObjectExistsException on an object with a tag uri for
* its staging location is recoverable.
*
* @throws Exception
*/
@Test
public void testTagDuplicate() throws Exception {

PID filePid = new PID("info:fedora/uuid:2a5d0363-899b-402d-981b-392a553e17a1");

Model model = job.getWritableModel();
Resource fileResc = model.getResource(filePid.getURI());
Property stagingLocation = dprop(model, DepositRelationship.stagingLocation);
model.add(fileResc, stagingLocation, "tag:testuser@localhost,2016:/test.txt");
job.closeModel();

when(tsqs.fetchBySubjectAndPredicate(any(PID.class), anyString()))
.thenReturn(Arrays.asList("info:fedora/uuid:bd5ff703-9c2e-466b-b4cc-15bbfd03c8ae"));

when(client.ingestRaw(any(byte[].class), any(Format.class), anyString())).thenReturn(new PID("pid"))
.thenThrow(new ObjectExistsException("")).thenReturn(new PID("pid"));

Thread.UncaughtExceptionHandler jobFailedHandler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread th, Throwable ex) {
fail("Uncaught exception, job should have completed.");
}
};

Thread jobThread = new Thread(job);
Thread finishThread = new Thread(jmsListener);

jobThread.setUncaughtExceptionHandler(jobFailedHandler);

jobThread.start();
finishThread.start();

// Start processing with a timelimit to prevent infinite wait in case of failure
jobThread.join(5000L);
finishThread.join(5000L);

// All ingests, including the timed out object, should have registered as a click
verify(jobStatusFactory, times(job.getIngestObjectCount() + 1)).incrCompletion(eq(job.getJobUUID()), eq(1));

// All objects should have been ingested despite the timeout
verify(client, times(job.getIngestObjectCount() + 1))
.ingestRaw(any(byte[].class), any(Format.class), anyString());

// Determine that the remote datastream's information was pulled for comparison purposes
verify(client).getDatastream(eq(filePid), eq(DATA_FILE.getName()));
}

@Test
public void testRunExcludeDepositRecord() throws Exception {
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.commons.httpclient.NameValuePair;
import org.apache.commons.httpclient.UsernamePasswordCredentials;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,6 +36,7 @@
import edu.unc.lib.dl.acl.util.ObjectAccessControlsBean;
import edu.unc.lib.dl.acl.util.Permission;
import edu.unc.lib.dl.fedora.PID;
import edu.unc.lib.dl.httpclient.ConnectionInterruptedHttpMethodRetryHandler;
import edu.unc.lib.dl.httpclient.HttpClientUtil;

/**
Expand Down Expand Up @@ -62,6 +64,8 @@ public FedoraAccessControlService() {
public void init() {
UsernamePasswordCredentials creds = new UsernamePasswordCredentials(username, password);
httpClient.getState().setCredentials(HttpClientUtil.getAuthenticationScope(aclEndpointUrl), creds);
httpClient.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,
new ConnectionInterruptedHttpMethodRetryHandler(10, 3000L));
}

public void destroy() {
Expand Down
Expand Up @@ -98,6 +98,7 @@
import edu.unc.lib.dl.fedora.types.PurgeObjectResponse;
import edu.unc.lib.dl.fedora.types.SetDatastreamVersionable;
import edu.unc.lib.dl.fedora.types.SetDatastreamVersionableResponse;
import edu.unc.lib.dl.httpclient.ConnectionInterruptedHttpMethodRetryHandler;
import edu.unc.lib.dl.httpclient.HttpClientUtil;
import edu.unc.lib.dl.util.ContentModelHelper;
import edu.unc.lib.dl.util.IllegalRepositoryStateException;
Expand Down Expand Up @@ -479,6 +480,8 @@ private void initializeConnections() throws Exception {

this.httpClient = HttpClientUtil.getAuthenticatedClient(this.getFedoraContextUrl(), this.getUsername(),
this.getPassword(), this.httpManager);
httpClient.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,
new ConnectionInterruptedHttpMethodRetryHandler(10, 3000L));
}

public void destroy() {
Expand Down
Expand Up @@ -42,7 +42,7 @@ public void testTagLocally() throws ServerException {
Stages stages = externalContentManager.getStages();
StagingArea shc = stages.getAllAreas().get(URI.create("tag:cdr.lib.unc.edu,2013:/storhouse_shc/"));
assertTrue("The SHC test stage must be connected: "+shc.getStatus(), shc.isConnected());
String testURL = "tag:joey@cdr.lib.unc.edu,2013:/storhouse_shc/my+project/_-1/test+file.txt";
String testURL = "tag:joey@cdr.lib.unc.edu,2013:/storhouse_shc/my%20project/_-1/test%20file.txt";
ContentManagerParams params = new ContentManagerParams(testURL);
MIMETypedStream mts = externalContentManager.getExternalContent(params);
assertNotNull("Stream result must not be null", mts);
Expand Down
@@ -0,0 +1,43 @@
package edu.unc.lib.dl.httpclient;

import java.io.IOException;
import java.net.SocketException;

import org.apache.commons.httpclient.HttpMethod;
import org.apache.commons.httpclient.HttpMethodRetryHandler;
import org.apache.commons.httpclient.NoHttpResponseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionInterruptedHttpMethodRetryHandler implements HttpMethodRetryHandler {
private static final Logger log = LoggerFactory.getLogger(ConnectionInterruptedHttpMethodRetryHandler.class);

private int retries = 5;
private long retryDelay = 0;

public ConnectionInterruptedHttpMethodRetryHandler(int retries, long retryDelay) {
super();
this.retries = retries;
this.retryDelay = retryDelay;
}

@Override
public boolean retryMethod(final HttpMethod method, final IOException e, int executionCount) {
if (executionCount >= retries) {
return false;
}
if (e instanceof NoHttpResponseException || e instanceof SocketException) {
log.warn("Connection interrupted, retrying connection to {}", method.getPath());
if (retryDelay > 0) {
try {
Thread.sleep(retryDelay);
} catch (InterruptedException e1) {
log.warn("Interrupted while waiting to retry connect");
}
}
return true;
}
return false;
}

}

0 comments on commit f3b4b9f

Please sign in to comment.