Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-10657: shrink in-DB documents after updates fail due to 16MB limit #1314

Draft
wants to merge 6 commits into
base: trunk
Choose a base branch
from
Expand Up @@ -82,7 +82,7 @@ public void addRemoveNodes() throws Exception {
}

@Test
@Ignore //OAK-10646
// @Ignore //OAK-10646
public void orderableAddManyChildrenWithSave() throws Exception {
int childCount = 1000;
StringBuilder prefix = new StringBuilder("");
Expand All @@ -98,7 +98,7 @@ public void orderableAddManyChildrenWithSave() throws Exception {
}

@Test
@Ignore //OAK-10646
// @Ignore //OAK-10646
public void moveOrderableWithManyChildren() throws Exception {
int childCount = 1000;
int moveCount = 1;
Expand All @@ -121,7 +121,7 @@ public void moveOrderableWithManyChildren() throws Exception {
}

@Test
@Ignore //OAK-10646
// @Ignore //OAK-10646
public void copyOrderableWithManyChildren() throws Exception {
int childCount = 1000;
int copyCount = 1;
Expand Down
Expand Up @@ -51,6 +51,8 @@
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
Expand Down Expand Up @@ -101,6 +103,8 @@ public class MemoryDocumentStore implements DocumentStore {

private static final long SIZE_LIMIT = SystemPropertySupplier.create("memoryds.size.limit", -1).get();

private static final Logger LOG = LoggerFactory.getLogger(MemoryDocumentStore.class);

public MemoryDocumentStore() {
this(false);
}
Expand Down Expand Up @@ -341,7 +345,22 @@ private <T extends Document> T internalCreateOrUpdate(Collection<T> collection,
// update the document
UpdateUtils.applyChanges(doc, update);
maintainModCount(doc);
checkSize(doc);
try {
checkSize(doc);
} catch (DocumentStoreException ex) {
// slightly hacky approach to find "our" cluster id
if (update.hasChanges()) {
final int clusterid = Utils.extractClusterId(update);
UpdateOp shrink = Utils.getShrinkOp(doc, ":childOrder", r -> r.getClusterId() == clusterid);
// try cleanup and then retry once
long before = doc.getMemory();
UpdateUtils.applyChanges(doc, shrink);
long after = doc.getMemory();
LOG.info("Doc size was exceeded for {}: {} bytes. Applied shrink ops: {}. New size: {}. Doing one retry.",
doc.getId(), before, shrink, after);
}
checkSize(doc);
}
doc.seal();
map.put(update.getId(), doc);
return oldDoc;
Expand Down Expand Up @@ -474,7 +493,10 @@ public long determineServerTimeDifferenceMillis() {
return 0;
}

private void checkSize(Document doc) {
/**
* aborts the operation if a size limit is configured and exceeded
*/
private void checkSize(Document doc) throws DocumentStoreException {
if (SIZE_LIMIT >= 0) {
int size = doc.getMemory();
if (size >= SIZE_LIMIT) {
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
Expand All @@ -46,6 +47,7 @@
import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo;
import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument;
import org.apache.jackrabbit.oak.plugins.document.Collection;
import org.apache.jackrabbit.oak.plugins.document.Document;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStoreBuilder;
import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException;
Expand All @@ -54,6 +56,8 @@
import org.apache.jackrabbit.oak.plugins.document.Revision;
import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import org.apache.jackrabbit.oak.spi.toggle.Feature;
import org.apache.jackrabbit.oak.stats.Clock;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -274,6 +278,73 @@ private static String diagsForEntry(Map.Entry<String, PropertyStats> member) {
}
}

/**
* @return cluster if from first revision found in op, {@code -1} otherwise
*/
public static int extractClusterId(UpdateOp op) {
for (Key key : op.getChanges().keySet()) {
if (key.getRevision() != null) {
return key.getRevision().getClusterId();
}
}
return -1;
}

/**
* Produce an {@link UpdateOp} suitable for shrinking branch revision entries for given property in {@link Document}, {@code null} otherwise.
*
* @param doc document to inspect for repeated branch commits
* @param propertName property to check for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it @param propertyName, please.

* @param revisionChecker filter for revisions (for instance, to check for cluster id)
* @return {@link UpdateOp} suitable for shrinking document, {@code null} otherwise
*/
public static @Nullable UpdateOp getShrinkOp(Document doc, String propertyName, Predicate<Revision> revisionChecker) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If doc could be a NodeDocument instead of the generic Document some of those instanceof could be avoided...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do that once we use that from NodeDocumentStore, not DocumentStore...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbaedke - this is where we could check the feature flug for now...

Object t_bc = doc.get("_bc");
Object t_property = doc.get(propertyName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use camelCase.

if (t_bc instanceof Map && t_property instanceof Map) {
@SuppressWarnings("unchecked")
Map<Revision, String> _bc = (Map<Revision, String>)t_bc;
@SuppressWarnings("unchecked")
Map<Revision, String> pMap = (Map<Revision, String>)t_property;
List<Revision> revs = new ArrayList<>();
for (Map.Entry<Revision, String> en : pMap.entrySet()) {
Revision r = en.getKey();
if (revisionChecker.apply(r)) {
String bcv = _bc.get(r);
if ("true".equals(bcv)) {
revs.add(r);
}
}
}
// sort by age
Collections.sort(revs, new Comparator<Revision>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if there isn't such a Comparator in oak land already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in Revision, as far as I can tell. I wanted a comparator that sorts by clusterId first; we may not need this if we always filter by cluster id though.

@Override
public int compare(Revision r1, Revision r2) {
if (r1.getClusterId() != r2.getClusterId()) {
return r1.getClusterId() - r2.getClusterId();
} else if (r1.getTimestamp() != r2.getTimestamp()) {
return r1.getTimestamp() > r2.getTimestamp() ? 1 : -1;
} else {
return r1.getCounter() - r2.getCounter();
}
}});
Comment on lines +320 to +330
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Collections.sort(revs, new Comparator<Revision>() {
@Override
public int compare(Revision r1, Revision r2) {
if (r1.getClusterId() != r2.getClusterId()) {
return r1.getClusterId() - r2.getClusterId();
} else if (r1.getTimestamp() != r2.getTimestamp()) {
return r1.getTimestamp() > r2.getTimestamp() ? 1 : -1;
} else {
return r1.getCounter() - r2.getCounter();
}
}});
revs.sort((c1, c2) -> Comparator.comparing(Revision::getClusterId).thenComparing(Revision::getTimestamp).thenComparing(Revision::getCounter).compare(c1, c2));


UpdateOp clean = new UpdateOp(doc.getId(), false);
Revision last = null;
for (Revision r : revs) {
if (last != null) {
if (last.getClusterId() == r.getClusterId()) {
clean.removeMapEntry(propertyName, last);
}
}
last = r;
}
Comment on lines +334 to +341
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • this seems to be a bit a more broader GC than expected. It deletes basically all older revisions of the same clusterId. That seems fine - but at that point I'm wondering why restrict it to only branch commits - i.e why specifically only branch commits. Which would lead to the assumption that the idea was to only remove "overwritten branch commits"?
  • is the clusterId check here still necessary given the predicate check introduced?
  • this doesn't take the usual 24h GC max time nor active checkpoints into account.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS:

  • it doesn't take late-writes into account (i.e. where traversed state isn't equal head state)

Copy link
Contributor Author

@reschke reschke Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • all older revisions that are branch commits (filtered earlier in the code); is there more that we can check? That would require _revisions, but those might be in a different document, right?
  • clusterid check - yes, unless we guarantee that the predicate will filter by id
  • ack - suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some ideas:

  • we could restrict it to "overwritten unmerged branch commits" : those we know are garbage whatsoever
  • but that might still leave us with a too large doc/prop. we could then try to get the traversed state from "24h ago or the oldest checkpoint" (whichever is older) - then delete anything older than that
  • but that still might leave us with a too large doc/prop. then we might have to do an impromptu split and move anything younger than the previous into a split doc ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those 3 cases could be .. test cases .. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding

get the traversed state from "24h ago or the oldest checkpoint" (whichever is older)

that might actually be a tricky thing to achieve - and I believe we might not have done that properly in the DetailedGC effort so far. I think we might need an actual checkpoint that corresponds to "24h ago"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might need an actual checkpoint that corresponds to "24h ago"

... or maybe not a physical checkpoint, but a root revision that corresponds to reading 24h ago : which we might substitute with corresponding revisions (with timestamp 24h minus 1 millisecond) for each known clusterId ... or something like that ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might not have done that properly in the DetailedGC effort so far

... taking that back .. the difference between DetailedGC and this runtime GC case here is : in DetailedGC we're only looking at documents that have not been modified for 24+ hours. That means, reading their traversed state with headRevision of "now" is fine. But in this runtime GC case here, that is not fine (as we need to respect those 24+ hours worth of MVCC)

return clean.hasChanges() ? clean : null;
} else {
return null;
}
}

/**
* List of property names that are system-defined by JCR and thus do not
* need to be redacted (to be expanded later)
Expand Down