Skip to content

Commit

Permalink
[DSC-968] Adding pagination on bitstream cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
LucaGiamminonni committed Mar 13, 2023
1 parent b956bcd commit bdf8675
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 65 deletions.
Expand Up @@ -332,8 +332,8 @@ public void updateLastModified(Context context, Bitstream bitstream) {
}

@Override
public List<Bitstream> findDeletedBitstreams(Context context) throws SQLException {
return bitstreamDAO.findDeletedBitstreams(context);
public List<Bitstream> findDeletedBitstreams(Context context, int limit, int offset) throws SQLException {
return bitstreamDAO.findDeletedBitstreams(context, limit, offset);
}

@Override
Expand Down
Expand Up @@ -29,7 +29,7 @@ public interface BitstreamDAO extends DSpaceObjectLegacySupportDAO<Bitstream> {

public Iterator<Bitstream> findAll(Context context, int limit, int offset) throws SQLException;

public List<Bitstream> findDeletedBitstreams(Context context) throws SQLException;
public List<Bitstream> findDeletedBitstreams(Context context, int limit, int offset) throws SQLException;

public List<Bitstream> findDuplicateInternalIdentifier(Context context, Bitstream bitstream) throws SQLException;

Expand Down
Expand Up @@ -41,13 +41,14 @@ protected BitstreamDAOImpl() {
}

@Override
public List<Bitstream> findDeletedBitstreams(Context context) throws SQLException {
public List<Bitstream> findDeletedBitstreams(Context context, int limit, int offset) throws SQLException {
CriteriaBuilder criteriaBuilder = getCriteriaBuilder(context);
CriteriaQuery criteriaQuery = getCriteriaQuery(criteriaBuilder, Bitstream.class);
Root<Bitstream> bitstreamRoot = criteriaQuery.from(Bitstream.class);
criteriaQuery.select(bitstreamRoot);
criteriaQuery.orderBy(criteriaBuilder.desc(bitstreamRoot.get(Bitstream_.ID)));
criteriaQuery.where(criteriaBuilder.equal(bitstreamRoot.get(Bitstream_.deleted), true));
return list(context, criteriaQuery, false, Bitstream.class, -1, -1);
return list(context, criteriaQuery, false, Bitstream.class, limit, offset);

}

Expand Down
Expand Up @@ -183,7 +183,7 @@ public InputStream retrieve(Context context, Bitstream bitstream)
* @return a list of all bitstreams that have been "deleted"
* @throws SQLException if database error
*/
public List<Bitstream> findDeletedBitstreams(Context context) throws SQLException;
public List<Bitstream> findDeletedBitstreams(Context context, int limit, int offset) throws SQLException;


/**
Expand Down
Expand Up @@ -17,6 +17,7 @@
import java.util.UUID;
import javax.annotation.Nullable;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -224,25 +225,62 @@ public InputStream retrieve(Context context, Bitstream bitstream)
@Override
public void cleanup(boolean deleteDbRecords, boolean verbose) throws SQLException, IOException, AuthorizeException {
Context context = new Context(Context.Mode.BATCH_EDIT);
int commitCounter = 0;

int offset = 0;
int limit = 100;

int cleanedBitstreamCount = 0;

int deletedBitstreamCount = bitstreamService.countDeletedBitstreams(context);
System.out.println("Found " + deletedBitstreamCount + " deleted bistream to cleanup");

try {
context.turnOffAuthorisationSystem();

List<Bitstream> storage = bitstreamService.findDeletedBitstreams(context);
for (Bitstream bitstream : storage) {
UUID bid = bitstream.getID();
Map wantedMetadata = new HashMap();
wantedMetadata.put("size_bytes", null);
wantedMetadata.put("modified", null);
Map receivedMetadata = this.getStore(bitstream.getStoreNumber()).about(bitstream, wantedMetadata);
while (cleanedBitstreamCount < deletedBitstreamCount) {

List<Bitstream> storage = bitstreamService.findDeletedBitstreams(context, limit, offset);

if (CollectionUtils.isEmpty(storage)) {
break;
}

for (Bitstream bitstream : storage) {
UUID bid = bitstream.getID();
Map wantedMetadata = new HashMap();
wantedMetadata.put("size_bytes", null);
wantedMetadata.put("modified", null);
Map receivedMetadata = this.getStore(bitstream.getStoreNumber()).about(bitstream, wantedMetadata);


// Make sure entries which do not exist are removed
if (MapUtils.isEmpty(receivedMetadata)) {
log.debug("bitstore.about is empty, so file is not present");
if (deleteDbRecords) {
log.debug("deleting record");
if (verbose) {
System.out.println(" - Deleting bitstream information (ID: " + bid + ")");
}
checksumHistoryService.deleteByBitstream(context, bitstream);
if (verbose) {
System.out.println(" - Deleting bitstream record from database (ID: " + bid + ")");
}
bitstreamService.expunge(context, bitstream);
}
context.uncacheEntity(bitstream);
continue;
}

// This is a small chance that this is a file which is
// being stored -- get it next time.
if (isRecent(Long.valueOf(receivedMetadata.get("modified").toString()))) {
log.debug("file is recent");
context.uncacheEntity(bitstream);
continue;
}

// Make sure entries which do not exist are removed
if (MapUtils.isEmpty(receivedMetadata)) {
log.debug("bitstore.about is empty, so file is not present");
if (deleteDbRecords) {
log.debug("deleting record");
log.debug("deleting db record");
if (verbose) {
System.out.println(" - Deleting bitstream information (ID: " + bid + ")");
}
Expand All @@ -252,64 +290,42 @@ public void cleanup(boolean deleteDbRecords, boolean verbose) throws SQLExceptio
}
bitstreamService.expunge(context, bitstream);
}
context.uncacheEntity(bitstream);
continue;
}

// This is a small chance that this is a file which is
// being stored -- get it next time.
if (isRecent(Long.valueOf(receivedMetadata.get("modified").toString()))) {
log.debug("file is recent");
context.uncacheEntity(bitstream);
continue;
}

if (deleteDbRecords) {
log.debug("deleting db record");
if (verbose) {
System.out.println(" - Deleting bitstream information (ID: " + bid + ")");
if (isRegisteredBitstream(bitstream.getInternalId())) {
context.uncacheEntity(bitstream);
continue; // do not delete registered bitstreams
}
checksumHistoryService.deleteByBitstream(context, bitstream);
if (verbose) {
System.out.println(" - Deleting bitstream record from database (ID: " + bid + ")");


// Since versioning allows for multiple bitstreams, check if the internal
// identifier isn't used on
// another place
if (bitstreamService.findDuplicateInternalIdentifier(context, bitstream).isEmpty()) {
this.getStore(bitstream.getStoreNumber()).remove(bitstream);

String message = ("Deleted bitstreamID " + bid + ", internalID " + bitstream.getInternalId());
if (log.isDebugEnabled()) {
log.debug(message);
}
if (verbose) {
System.out.println(message);
}
}
bitstreamService.expunge(context, bitstream);
}

if (isRegisteredBitstream(bitstream.getInternalId())) {
context.uncacheEntity(bitstream);
continue; // do not delete registered bitstreams
}

// Commit actual changes to DB after dispatch events
System.out.print("Performing incremental commit to the database...");
context.commit();
System.out.println(" Incremental commit done!");

// Since versioning allows for multiple bitstreams, check if the internal identifier isn't used on
// another place
if (bitstreamService.findDuplicateInternalIdentifier(context, bitstream).isEmpty()) {
this.getStore(bitstream.getStoreNumber()).remove(bitstream);

String message = ("Deleted bitstreamID " + bid + ", internalID " + bitstream.getInternalId());
if (log.isDebugEnabled()) {
log.debug(message);
}
if (verbose) {
System.out.println(message);
}
}
cleanedBitstreamCount = cleanedBitstreamCount + storage.size();

// Make sure to commit our outstanding work every 100
// iterations. Otherwise you risk losing the entire transaction
// if we hit an exception, which isn't useful at all for large
// amounts of bitstreams.
commitCounter++;
if (commitCounter % 100 == 0) {
context.dispatchEvents();
// Commit actual changes to DB after dispatch events
System.out.print("Performing incremental commit to the database...");
context.commit();
System.out.println(" Incremental commit done!");
if (!deleteDbRecords) {
offset = offset + limit;
}

context.uncacheEntity(bitstream);
}

System.out.print("Committing changes to the database...");
Expand Down

0 comments on commit bdf8675

Please sign in to comment.