Skip to content

Commit

Permalink
SOLR-11925: Time Routed Aliases: router.autoDeleteAge feature
Browse files Browse the repository at this point in the history
(cherry picked from commit 02b5172)
  • Loading branch information
dsmiley committed Feb 9, 2018
1 parent 8a370fa commit 5ce8323
Show file tree
Hide file tree
Showing 20 changed files with 338 additions and 162 deletions.
3 changes: 3 additions & 0 deletions solr/CHANGES.txt
Expand Up @@ -138,6 +138,9 @@ New Features

* SOLR-11778: Add per-stage RequestHandler metrics. (ab)

* SOLR-11925: Time Routed Aliases can have their oldest collections automatically deleted via the "router.autoDeleteAge"
setting. (David Smiley)

Bug Fixes
----------------------

Expand Down
Expand Up @@ -84,7 +84,7 @@ private void callCreatePlainAlias(ZkNodeProps message, String aliasName, ZkState
final List<String> canonicalCollectionList = parseCollectionsParameter(message.get("collections"));
final String canonicalCollectionsString = StrUtils.join(canonicalCollectionList, ',');
validateAllCollectionsExistAndNoDups(canonicalCollectionList, zkStateReader);
zkStateReader.aliasesHolder
zkStateReader.aliasesManager
.applyModificationAndExportToZk(aliases -> aliases.cloneWithCollectionAlias(aliasName, canonicalCollectionsString));
}

Expand Down Expand Up @@ -121,12 +121,11 @@ private void callCreateRoutedAlias(ZkNodeProps message, String aliasName, ZkStat
String initialCollectionName = TimeRoutedAlias.formatCollectionNameFromInstant(aliasName, startTime);

// Create the collection
NamedList createResults = new NamedList();
RoutedAliasCreateCollectionCmd.createCollectionAndWait(state, createResults, aliasName, aliasMetadata, initialCollectionName, ocmh);
RoutedAliasCreateCollectionCmd.createCollectionAndWait(state, aliasName, aliasMetadata, initialCollectionName, ocmh);
validateAllCollectionsExistAndNoDups(Collections.singletonList(initialCollectionName), zkStateReader);

// Create/update the alias
zkStateReader.aliasesHolder.applyModificationAndExportToZk(aliases -> aliases
zkStateReader.aliasesManager.applyModificationAndExportToZk(aliases -> aliases
.cloneWithCollectionAlias(aliasName, initialCollectionName)
.cloneWithCollectionAliasMetadata(aliasName, aliasMetadata));
}
Expand Down
Expand Up @@ -37,7 +37,7 @@ public void call(ClusterState state, ZkNodeProps message, NamedList results) thr
String aliasName = message.getStr(NAME);

ZkStateReader zkStateReader = ocmh.zkStateReader;
zkStateReader.aliasesHolder.applyModificationAndExportToZk(a -> a.cloneWithCollectionAlias(aliasName, null));
zkStateReader.aliasesManager.applyModificationAndExportToZk(a -> a.cloneWithCollectionAlias(aliasName, null));
}

}
Expand Up @@ -60,7 +60,7 @@ public void call(ClusterState state, ZkNodeProps message, NamedList results) thr
@SuppressWarnings("unchecked")
Map<String, String> metadata = (Map<String, String>) message.get(META_DATA);

zkStateReader.aliasesHolder.applyModificationAndExportToZk(aliases1 -> {
zkStateReader.aliasesManager.applyModificationAndExportToZk(aliases1 -> {
for (Map.Entry<String, String> entry : metadata.entrySet()) {
String key = entry.getKey();
if ("".equals(key.trim())) {
Expand Down
Expand Up @@ -18,11 +18,18 @@
package org.apache.solr.cloud.api.collections;

import java.lang.invoke.MethodHandles;
import java.text.ParseException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;

import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.common.SolrException;
Expand All @@ -32,10 +39,13 @@
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.util.DateMathParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,25 +55,42 @@
import static org.apache.solr.common.params.CommonParams.NAME;

/**
* For "routed aliases", creates another collection and adds it to the alias. In some cases it will not
* add a new collection.
* If a collection is created, then collection creation info is returned.
* (Internal) For "time routed aliases", both deletes old collections and creates new collections
* associated with routed aliases.
*
* Note: this logic is within an Overseer because we want to leverage the mutual exclusion
* property afforded by the lock it obtains on the alias name.
*
* @since 7.3
* @lucene.internal
*/
// TODO rename class to MaintainRoutedAliasCmd
public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public static final String IF_MOST_RECENT_COLL_NAME = "ifMostRecentCollName";
public static final String IF_MOST_RECENT_COLL_NAME = "ifMostRecentCollName"; //TODO rename to createAfter

private final OverseerCollectionMessageHandler ocmh;

public RoutedAliasCreateCollectionCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
}

/** Invokes this command from the client. If there's a problem it will throw an exception. */
public static NamedList remoteInvoke(CollectionsHandler collHandler, String aliasName, String mostRecentCollName)
throws Exception {
final String operation = CollectionParams.CollectionAction.ROUTEDALIAS_CREATECOLL.toLower();
Map<String, Object> msg = new HashMap<>();
msg.put(Overseer.QUEUE_OPERATION, operation);
msg.put(CollectionParams.NAME, aliasName);
msg.put(RoutedAliasCreateCollectionCmd.IF_MOST_RECENT_COLL_NAME, mostRecentCollName);
final SolrResponse rsp = collHandler.sendToOCPQueue(new ZkNodeProps(msg));
if (rsp.getException() != null) {
throw rsp.getException();
}
return rsp.getResponse();
}

@Override
public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
//---- PARSE PRIMARY MESSAGE PARAMS
Expand All @@ -75,13 +102,12 @@ public void call(ClusterState clusterState, ZkNodeProps message, NamedList resul
// TODO collection param (or intervalDateMath override?), useful for data capped collections

//---- PARSE ALIAS INFO FROM ZK
final ZkStateReader.AliasesManager aliasesHolder = ocmh.zkStateReader.aliasesHolder;
final Aliases aliases = aliasesHolder.getAliases();
final ZkStateReader.AliasesManager aliasesManager = ocmh.zkStateReader.aliasesManager;
final Aliases aliases = aliasesManager.getAliases();
final Map<String, String> aliasMetadata = aliases.getCollectionAliasMetadata(aliasName);
if (aliasMetadata == null) {
throw newAliasMustExistException(aliasName); // if it did exist, we'd have a non-null map
}

final TimeRoutedAlias timeRoutedAlias = new TimeRoutedAlias(aliasName, aliasMetadata);

final List<Map.Entry<Instant, String>> parsedCollections =
Expand Down Expand Up @@ -113,13 +139,21 @@ public void call(ClusterState clusterState, ZkNodeProps message, NamedList resul
final Instant nextCollTimestamp = timeRoutedAlias.computeNextCollTimestamp(mostRecentCollTimestamp);
final String createCollName = TimeRoutedAlias.formatCollectionNameFromInstant(aliasName, nextCollTimestamp);

//---- CREATE THE COLLECTION
createCollectionAndWait(clusterState, results, aliasName, aliasMetadata, createCollName, ocmh);
//---- DELETE OLDEST COLLECTIONS AND REMOVE FROM ALIAS (if configured)
NamedList deleteResults = deleteOldestCollectionsAndUpdateAlias(timeRoutedAlias, aliasesManager, nextCollTimestamp);
if (deleteResults != null) {
results.add("delete", deleteResults);
}

//TODO delete some of the oldest collection(s) ?
//---- CREATE THE COLLECTION
NamedList createResults = createCollectionAndWait(clusterState, aliasName, aliasMetadata,
createCollName, ocmh);
if (createResults != null) {
results.add("create", createResults);
}

//---- UPDATE THE ALIAS
aliasesHolder.applyModificationAndExportToZk(curAliases -> {
//---- UPDATE THE ALIAS WITH NEW COLLECTION
aliasesManager.applyModificationAndExportToZk(curAliases -> {
final List<String> curTargetCollections = curAliases.getCollectionAliasListMap().get(aliasName);
if (curTargetCollections.contains(createCollName)) {
return curAliases;
Expand All @@ -134,12 +168,92 @@ public void call(ClusterState clusterState, ZkNodeProps message, NamedList resul

}

/**
* Deletes some of the oldest collection(s) based on {@link TimeRoutedAlias#getAutoDeleteAgeMath()}. If not present
* then does nothing. Returns non-null results if something was deleted (or if we tried to).
* {@code now} is the date from which the math is relative to.
*/
NamedList deleteOldestCollectionsAndUpdateAlias(TimeRoutedAlias timeRoutedAlias,
ZkStateReader.AliasesManager aliasesManager,
Instant now) throws Exception {
final String autoDeleteAgeMathStr = timeRoutedAlias.getAutoDeleteAgeMath();
if (autoDeleteAgeMathStr == null) {
return null;
}
final Instant delBefore;
try {
delBefore = new DateMathParser(Date.from(now), timeRoutedAlias.getTimeZone()).parseMath(autoDeleteAgeMathStr).toInstant();
} catch (ParseException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e); // note: should not happen by this point
}

String aliasName = timeRoutedAlias.getAliasName();

Collection<String> collectionsToDelete = new LinkedHashSet<>();

// First update the alias (there may be no change to make!)
aliasesManager.applyModificationAndExportToZk(curAliases -> {
// note: we could re-parse the TimeRoutedAlias object from curAliases but I don't think there's a point to it.

final List<Map.Entry<Instant, String>> parsedCollections =
timeRoutedAlias.parseCollections(curAliases, () -> newAliasMustExistException(aliasName));

//iterating from newest to oldest, find the first collection that has a time <= "before". We keep this collection
// (and all newer to left) but we delete older collections, which are the ones that follow.
// This logic will always keep the first collection, which we can't delete.
int numToKeep = 0;
for (Map.Entry<Instant, String> parsedCollection : parsedCollections) {
numToKeep++;
final Instant colInstant = parsedCollection.getKey();
if (colInstant.isBefore(delBefore) || colInstant.equals(delBefore)) {
break;
}
}
if (numToKeep == parsedCollections.size()) {
log.debug("No old time routed collections to delete.");
return curAliases;
}

final List<String> targetList = curAliases.getCollectionAliasListMap().get(aliasName);
// remember to delete these... (oldest to newest)
for (int i = targetList.size() - 1; i >= numToKeep; i--) {
collectionsToDelete.add(targetList.get(i));
}
// new alias list has only "numToKeep" first items
final List<String> collectionsToKeep = targetList.subList(0, numToKeep);
final String collectionsToKeepStr = StrUtils.join(collectionsToKeep, ',');
return curAliases.cloneWithCollectionAlias(aliasName, collectionsToKeepStr);
});

if (collectionsToDelete.isEmpty()) {
return null;
}

log.info("Removing old time routed collections: {}", collectionsToDelete);
// Should this be done asynchronously? If we got "ASYNC" then probably.
// It would shorten the time the Overseer holds a lock on the alias name
// (deleting the collections will be done later and not use that lock).
// Don't bother about parallel; it's unusual to have more than 1.
// Note we don't throw an exception here under most cases; instead the response will have information about
// how each delete request went, possibly including a failure message.
final CollectionsHandler collHandler = ocmh.overseer.getCoreContainer().getCollectionsHandler();
NamedList results = new NamedList();
for (String collection : collectionsToDelete) {
final SolrParams reqParams = CollectionAdminRequest.deleteCollection(collection).getParams();
SolrQueryResponse rsp = new SolrQueryResponse();
collHandler.handleRequestBody(new LocalSolrQueryRequest(null, reqParams), rsp);
results.add(collection, rsp.getValues());
}
return results;
}

/**
* Creates a collection (for use in a routed alias), waiting for it to be ready before returning.
* If the collection already exists then this is not an error.
* IMPORTANT: Only call this from an {@link OverseerCollectionMessageHandler.Cmd}.
*/
static void createCollectionAndWait(ClusterState clusterState, NamedList results, String aliasName, Map<String, String> aliasMetadata, String createCollName, OverseerCollectionMessageHandler ocmh) throws Exception {
static NamedList createCollectionAndWait(ClusterState clusterState, String aliasName, Map<String, String> aliasMetadata,
String createCollName, OverseerCollectionMessageHandler ocmh) throws Exception {
// Map alias metadata starting with a prefix to a create-collection API request
final ModifiableSolrParams createReqParams = new ModifiableSolrParams();
for (Map.Entry<String, String> e : aliasMetadata.entrySet()) {
Expand All @@ -161,6 +275,7 @@ static void createCollectionAndWait(ClusterState clusterState, NamedList results
ocmh.overseer.getCoreContainer().getCollectionsHandler());
createMsgMap.put(Overseer.QUEUE_OPERATION, "create");

NamedList results = new NamedList();
try {
// Since we are running in the Overseer here, send the message directly to the Overseer CreateCollectionCmd.
// note: there's doesn't seem to be any point in locking on the collection name, so we don't. We currently should
Expand All @@ -173,7 +288,9 @@ static void createCollectionAndWait(ClusterState clusterState, NamedList results
}
}

CollectionsHandler.waitForActiveCollection(createCollName, null, ocmh.overseer.getCoreContainer(), new OverseerSolrResponse(results));
CollectionsHandler.waitForActiveCollection(createCollName, ocmh.overseer.getCoreContainer(),
new OverseerSolrResponse(results));
return results;
}

private SolrException newAliasMustExistException(String aliasName) {
Expand Down
Expand Up @@ -17,7 +17,6 @@

package org.apache.solr.cloud.api.collections;

import java.text.ParseException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -64,6 +63,7 @@ public class TimeRoutedAlias {
public static final String ROUTER_START = ROUTER_PREFIX + "start";
public static final String ROUTER_INTERVAL = ROUTER_PREFIX + "interval";
public static final String ROUTER_MAX_FUTURE = ROUTER_PREFIX + "max-future-ms";
public static final String ROUTER_AUTO_DELETE_AGE = ROUTER_PREFIX + "autoDeleteAge";
public static final String CREATE_COLLECTION_PREFIX = "create-collection.";
// plus TZ and NAME

Expand Down Expand Up @@ -122,8 +122,9 @@ public static String formatCollectionNameFromInstant(String aliasName, Instant t

private final String aliasName;
private final String routeField;
private final String intervalMath; // ex: +1DAY
private final long maxFutureMs;
private final String intervalDateMath; // ex: +1DAY
private final String autoDeleteAgeMath; // ex: /DAY-30DAYS *optional*
private final TimeZone timeZone;

public TimeRoutedAlias(String aliasName, Map<String, String> aliasMetadata) {
Expand All @@ -134,21 +135,37 @@ public TimeRoutedAlias(String aliasName, Map<String, String> aliasMetadata) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Only 'time' routed aliases is supported right now.");
}
routeField = required.get(ROUTER_FIELD);
intervalDateMath = required.get(ROUTER_INTERVAL);
intervalMath = required.get(ROUTER_INTERVAL);

//optional:
maxFutureMs = params.getLong(ROUTER_MAX_FUTURE, TimeUnit.MINUTES.toMillis(10));
autoDeleteAgeMath = params.get(ROUTER_AUTO_DELETE_AGE); // no default
timeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ));

// More validation:

// check that the interval is valid date math
// check that the date math is valid
final Date now = new Date();
try {
new DateMathParser(timeZone).parseMath(intervalDateMath);
} catch (ParseException e) {
final Date after = new DateMathParser(now, timeZone).parseMath(intervalMath);
if (!after.after(now)) {
throw new SolrException(BAD_REQUEST, "duration must add to produce a time in the future");
}
} catch (Exception e) {
throw new SolrException(BAD_REQUEST, "bad " + TimeRoutedAlias.ROUTER_INTERVAL + ", " + e, e);
}

if (autoDeleteAgeMath != null) {
try {
final Date before = new DateMathParser(now, timeZone).parseMath(autoDeleteAgeMath);
if (now.before(before)) {
throw new SolrException(BAD_REQUEST, "duration must round or subtract to produce a time in the past");
}
} catch (Exception e) {
throw new SolrException(BAD_REQUEST, "bad " + TimeRoutedAlias.ROUTER_AUTO_DELETE_AGE + ", " + e, e);
}
}

if (maxFutureMs < 0) {
throw new SolrException(BAD_REQUEST, ROUTER_MAX_FUTURE + " must be >= 0");
}
Expand All @@ -162,12 +179,16 @@ public String getRouteField() {
return routeField;
}

public String getIntervalMath() {
return intervalMath;
}

public long getMaxFutureMs() {
return maxFutureMs;
}

public String getIntervalDateMath() {
return intervalDateMath;
public String getAutoDeleteAgeMath() {
return autoDeleteAgeMath;
}

public TimeZone getTimeZone() {
Expand All @@ -179,8 +200,9 @@ public String toString() {
return Objects.toStringHelper(this)
.add("aliasName", aliasName)
.add("routeField", routeField)
.add("intervalMath", intervalMath)
.add("maxFutureMs", maxFutureMs)
.add("intervalDateMath", intervalDateMath)
.add("autoDeleteAgeMath", autoDeleteAgeMath)
.add("timeZone", timeZone)
.toString();
}
Expand All @@ -204,7 +226,7 @@ public List<Map.Entry<Instant,String>> parseCollections(Aliases aliases, Supplie
/** Computes the timestamp of the next collection given the timestamp of the one before. */
public Instant computeNextCollTimestamp(Instant fromTimestamp) {
final Instant nextCollTimestamp =
DateMathParser.parseMath(Date.from(fromTimestamp), "NOW" + intervalDateMath, timeZone).toInstant();
DateMathParser.parseMath(Date.from(fromTimestamp), "NOW" + intervalMath, timeZone).toInstant();
assert nextCollTimestamp.isAfter(fromTimestamp);
return nextCollTimestamp;
}
Expand Down

0 comments on commit 5ce8323

Please sign in to comment.