Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate datasource retention rules: Reject rules that fully contain subsequent rules' interval #15015

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand Down Expand Up @@ -56,6 +57,12 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
return true;
}

@Override
public Interval getEligibleInterval(DateTime referenceTimestamp)
{
return Intervals.ETERNITY;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -75,4 +82,10 @@ public int hashCode()
{
return Objects.hash(getType());
}

@Override
public String toString()
{
return "ForeverBroadcastDistributionRule{}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.server.coordinator.rules;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand All @@ -46,4 +47,16 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
{
return true;
}

@Override
public Interval getEligibleInterval(DateTime referenceTimestamp)
{
return Intervals.ETERNITY;
}

@Override
public String toString()
{
return "ForeverDropRule{}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand Down Expand Up @@ -60,4 +61,18 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
return true;
}

@Override
public Interval getEligibleInterval(DateTime referenceTimestamp)
{
return Intervals.ETERNITY;
}

@Override
public String toString()
{
return "ForeverLoadRule{" +
"tieredReplicants=" + getTieredReplicants() +
", useDefaultTierForNull=" + useDefaultTierForNull() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ public Interval getInterval()
return interval;
}

@Override
public Interval getEligibleInterval(DateTime referenceTimestamp)
{
return interval;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -83,4 +89,12 @@ public int hashCode()
{
return Objects.hash(getInterval());
}

@Override
public String toString()
{
return "IntervalBroadcastDistributionRule{" +
"interval=" + interval +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp)
return interval.contains(theInterval);
}

@Override
public Interval getEligibleInterval(DateTime referenceTimestamp)
{
return interval;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -84,4 +90,12 @@ public int hashCode()
{
return Objects.hash(interval);
}

@Override
public String toString()
{
return "IntervalDropRule{" +
"interval=" + interval +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand All @@ -34,8 +33,6 @@
*/
public class IntervalLoadRule extends LoadRule
{
private static final Logger log = new Logger(IntervalLoadRule.class);

private final Interval interval;

@JsonCreator
Expand Down Expand Up @@ -74,6 +71,12 @@ public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp)
return Rules.eligibleForLoad(interval, theInterval);
}

@Override
public Interval getEligibleInterval(DateTime referenceTimestamp)
{
return interval;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -95,4 +98,14 @@ public int hashCode()
{
return Objects.hash(super.hashCode(), interval);
}

@Override
public String toString()
{
return "IntervalLoadRule{" +
"interval=" + interval +
", tieredReplicants=" + getTieredReplicants() +
", useDefaultTierForNull=" + useDefaultTierForNull() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
return Rules.eligibleForLoad(period, interval, referenceTimestamp, includeFuture);
}

@Override
public Interval getEligibleInterval(DateTime referenceTimestamp)
{
return includeFuture ? new Interval(referenceTimestamp.minus(period), referenceTimestamp.plus(period))
abhishekrb19 marked this conversation as resolved.
Show resolved Hide resolved
: new Interval(referenceTimestamp.minus(period), referenceTimestamp);
}

@JsonProperty
public Period getPeriod()
{
Expand Down Expand Up @@ -96,4 +103,13 @@ public int hashCode()
{
return Objects.hash(getPeriod(), isIncludeFuture());
}

@Override
public String toString()
{
return "PeriodBroadcastDistributionRule{" +
"period=" + period +
", includeFuture=" + includeFuture +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand Down Expand Up @@ -63,4 +64,18 @@ public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp)
final DateTime periodAgo = referenceTimestamp.minus(period);
return theInterval.getEndMillis() <= periodAgo.getMillis();
}

@Override
public Interval getEligibleInterval(DateTime referenceTimestamp)
{
return new Interval(DateTimes.utc(Long.MIN_VALUE), referenceTimestamp.minus(period));
abhishekrb19 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public String toString()
{
return "PeriodDropBeforeRule{" +
"period=" + period +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.joda.time.Period;

/**
*
*/
public class PeriodDropRule extends DropRule
{
Expand Down Expand Up @@ -80,4 +81,20 @@ public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp)
return currInterval.contains(theInterval);
}
}

@Override
public Interval getEligibleInterval(DateTime referenceTimestamp)
{
return includeFuture ? new Interval(referenceTimestamp.minus(period), referenceTimestamp.plus(period))
abhishekrb19 marked this conversation as resolved.
Show resolved Hide resolved
: new Interval(referenceTimestamp.minus(period), referenceTimestamp);
}

@Override
public String toString()
{
return "PeriodDropRule{" +
"period=" + period +
", includeFuture=" + includeFuture +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand All @@ -35,7 +34,6 @@
*/
public class PeriodLoadRule extends LoadRule
{
private static final Logger log = new Logger(PeriodLoadRule.class);
static final boolean DEFAULT_INCLUDE_FUTURE = true;

private final Period period;
Expand Down Expand Up @@ -85,6 +83,13 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
return Rules.eligibleForLoad(period, interval, referenceTimestamp, includeFuture);
}

@Override
public Interval getEligibleInterval(DateTime referenceTimestamp)
{
return includeFuture ? new Interval(referenceTimestamp.minus(period), referenceTimestamp.plus(period))
: new Interval(referenceTimestamp.minus(period), referenceTimestamp);
}

@Override
public boolean equals(Object o)
{
Expand All @@ -106,4 +111,15 @@ public int hashCode()
{
return Objects.hash(super.hashCode(), period, includeFuture);
}

@Override
public String toString()
{
return "PeriodLoadRule{" +
"period=" + period +
", includeFuture=" + includeFuture +
", tieredReplicants=" + getTieredReplicants() +
", useDefaultTierForNull=" + useDefaultTierForNull() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,11 @@ public interface Rule
boolean appliesTo(Interval interval, DateTime referenceTimestamp);

void run(DataSegment segment, SegmentActionHandler segmentHandler);

/**
* Return an eligible interval from the reference timestamp. Implementations
* must return a valid interval based on the rule type.
* @param referenceTimestamp base timestamp
*/
abhishekrb19 marked this conversation as resolved.
Show resolved Hide resolved
Interval getEligibleInterval(DateTime referenceTimestamp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@

package org.apache.druid.server.coordinator.rules;

import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;

import java.util.List;

public class Rules
{
public static boolean eligibleForLoad(Interval src, Interval target)
abhishekrb19 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -43,4 +48,47 @@ public static boolean eligibleForLoad(Period period, Interval interval, DateTime
private Rules()
{
}

/**
* Validate rules. This method throws an exception if a rule contain an interval that
* fully covers another subsequent rules' interval in the list. Rules that will be evaluated at some point
* are considered to be legitimate.
* @param rules Datasource rules.
*/
public static void validateRules(final List<Rule> rules)
{
if (rules == null) {
return;
}

final DateTime now = DateTimes.nowUtc();
for (int i = 0; i < rules.size(); i++) {
final Rule currRule = rules.get(i);
final Interval currInterval = currRule.getEligibleInterval(now);

for (int j = i + 1; j < rules.size(); j++) {
final Rule nextRule = rules.get(j);
final Interval nextInterval = nextRule.getEligibleInterval(now);
if (currInterval.contains(nextInterval)) {
// If the current rule has eternity, it covers everything following it.
// Or if the current rule still covers the next rule at the current interval boundaries, then the
// next rule will never fire at any time, so throw an exception.
if (Intervals.ETERNITY.equals(currInterval) ||
(currRule.getEligibleInterval(currInterval.getStart())
.contains(nextRule.getEligibleInterval(currInterval.getStart()))
&& currRule.getEligibleInterval(currInterval.getEnd())
.contains(nextRule.getEligibleInterval(currInterval.getEnd())))) {
throw InvalidInput.exception(
"Rule[%s] has an interval that fully contains the interval for rule[%s]."
+ " i.e., interval[%s] hides interval[%s]. Please fix the rules and retry.",
currRule,
nextRule,
currInterval,
nextInterval
);
}
}
}
}
}
}