From 2d94cbd143ac40d7a0fb5136062775ce3d555d55 Mon Sep 17 00:00:00 2001 From: Hrishikesh Gadre Date: Thu, 24 Mar 2016 15:44:38 -0700 Subject: [PATCH] Solr snapshots Change-Id: I26a3aad80dff5b4f2f035f6371fb82ec3b77e4fe --- .../OverseerCollectionMessageHandler.java | 236 +++++++----------- .../solr/cloud/ShardRequestProcessor.java | 220 ++++++++++++++++ .../solr/core/backup/BackupManager.java | 153 ++++++++++++ .../solr/core/backup/CopyFilesStrategy.java | 70 ++++++ .../solr/core/backup/IndexBackupStrategy.java | 33 +++ .../backup/repository/BackupRepository.java | 65 +++++ .../repository/BackupRepositoryFactory.java | 33 +++ .../repository/LocalFileSystemRepository.java | 77 ++++++ 8 files changed, 746 insertions(+), 141 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/cloud/ShardRequestProcessor.java create mode 100644 solr/core/src/java/org/apache/solr/core/backup/BackupManager.java create mode 100644 solr/core/src/java/org/apache/solr/core/backup/CopyFilesStrategy.java create mode 100644 solr/core/src/java/org/apache/solr/core/backup/IndexBackupStrategy.java create mode 100644 solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepository.java create mode 100644 solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepositoryFactory.java create mode 100644 solr/core/src/java/org/apache/solr/core/backup/repository/LocalFileSystemRepository.java diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java index 2b8f0361d654..d47bb223b972 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java @@ -20,6 +20,7 @@ import java.io.Reader; import java.io.Writer; import java.lang.invoke.MethodHandles; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -83,6 +84,11 @@ import org.apache.solr.common.util.SimpleOrderedMap; import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.Utils; +import org.apache.solr.core.backup.BackupManager; +import org.apache.solr.core.backup.IndexBackupStrategy; +import org.apache.solr.core.backup.repository.BackupRepository; +import org.apache.solr.core.backup.repository.BackupRepositoryFactory; +import org.apache.solr.core.backup.CopyFilesStrategy; import org.apache.solr.handler.component.ShardHandler; import org.apache.solr.handler.component.ShardHandlerFactory; import org.apache.solr.handler.component.ShardRequest; @@ -135,7 +141,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler { public static final String NUM_SLICES = "numShards"; - + static final boolean CREATE_NODE_SET_SHUFFLE_DEFAULT = true; public static final String CREATE_NODE_SET_SHUFFLE = "createNodeSet.shuffle"; public static final String CREATE_NODE_SET_EMPTY = "EMPTY"; @@ -583,7 +589,7 @@ private void deleteReplica(ClusterState clusterState, ZkNodeProps message, Named String collectionName = message.getStr(COLLECTION_PROP); String shard = message.getStr(SHARD_ID_PROP); String replicaName = message.getStr(REPLICA_PROP); - + DocCollection coll = clusterState.getCollection(collectionName); Slice slice = coll.getSlice(shard); if (slice == null) { @@ -598,7 +604,7 @@ private void deleteReplica(ClusterState clusterState, ZkNodeProps message, Named throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : " + shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ',')); } - + // If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true // on the command. if (Boolean.parseBoolean(message.getStr(ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) { @@ -633,7 +639,7 @@ private void deleteReplica(ClusterState clusterState, ZkNodeProps message, Named // try and ensure core info is removed from cluster state deleteCoreNode(collectionName, replicaName, replica, core); if (waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return; - + throw new SolrException(ErrorCode.SERVER_ERROR, "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName); } @@ -696,10 +702,10 @@ private void deleteCollection(ZkNodeProps message, NamedList results) throws Kee if (asyncId != null) { requestMap = new HashMap<>(); } - + Set okayExceptions = new HashSet<>(1); okayExceptions.add(NonExistentCoreException.class.getName()); - + collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions); ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection); @@ -773,7 +779,7 @@ private void migrateStateFormat(ZkNodeProps message, NamedList results) private void createAlias(Aliases aliases, ZkNodeProps message) { String aliasName = message.getStr(NAME); String collections = message.getStr("collections"); - + Map> newAliasesMap = new HashMap<>(); Map newCollectionAliasesMap = new HashMap<>(); Map prevColAliases = aliases.getCollectionAliasMap(); @@ -789,7 +795,7 @@ private void createAlias(Aliases aliases, ZkNodeProps message) { } try { zkStateReader.getZkClient().setData(ZkStateReader.ALIASES, jsonBytes, true); - + checkForAlias(aliasName, collections); // some fudge for other nodes Thread.sleep(100); @@ -876,14 +882,14 @@ private boolean createShard(ClusterState clusterState, ZkNodeProps message, Name if (collectionName == null || sliceName == null) throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters"); int numSlices = 1; - + ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); DocCollection collection = clusterState.getCollection(collectionName); int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1)); String createNodeSetStr = message.getStr(CREATE_NODE_SET); List sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, repFactor, createNodeSetStr, overseer.getZkController().getCoreContainer()); - + Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message)); // wait for a while until we see the shard TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS); @@ -895,7 +901,7 @@ private boolean createShard(ClusterState clusterState, ZkNodeProps message, Name } if (!created) throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr(NAME)); - + String configName = message.getStr(COLL_CONF); String async = message.getStr(ASYNC); @@ -909,7 +915,7 @@ private boolean createShard(ClusterState clusterState, ZkNodeProps message, Name String shardName = collectionName + "_" + sliceName + "_replica" + j; log.info("Creating shard " + shardName + " as part of slice " + sliceName + " of collection " + collectionName + " on " + nodeName); - + // Need to create new params for each request ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString()); @@ -922,11 +928,11 @@ private boolean createShard(ClusterState clusterState, ZkNodeProps message, Name sendShardRequest(nodeName, params, shardHandler, async, requestMap); } - + processResponses(results, shardHandler, true, "Failed to create shard", async, requestMap, Collections.emptySet()); - + log.info("Finished create command on all shards for collection: " + collectionName); - + return true; } @@ -934,16 +940,16 @@ private boolean createShard(ClusterState clusterState, ZkNodeProps message, Name private boolean splitShard(ClusterState clusterState, ZkNodeProps message, NamedList results) { String collectionName = message.getStr("collection"); String slice = message.getStr(ZkStateReader.SHARD_ID_PROP); - + log.info("Split shard invoked"); String splitKey = message.getStr("split.key"); ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); - + DocCollection collection = clusterState.getCollection(collectionName); DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT; - + Slice parentSlice; - + if (slice == null) { if (router instanceof CompositeIdRouter) { Collection searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection); @@ -965,13 +971,13 @@ private boolean splitShard(ClusterState clusterState, ZkNodeProps message, Named } else { parentSlice = collection.getSlice(slice); } - + if (parentSlice == null) { // no chance of the collection being null because ClusterState#getCollection(String) would have thrown // an exception already throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice); } - + // find the leader for the shard Replica parentShardLeader = null; try { @@ -979,12 +985,12 @@ private boolean splitShard(ClusterState clusterState, ZkNodeProps message, Named } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - + DocRouter.Range range = parentSlice.getRange(); if (range == null) { range = new PlainIdRouter().fullRange(); } - + List subRanges = null; String rangesStr = message.getStr(CoreAdminParams.RANGES); if (rangesStr != null) { @@ -1043,7 +1049,7 @@ private boolean splitShard(ClusterState clusterState, ZkNodeProps message, Named // todo: fixed to two partitions? subRanges = router.partitionRange(2, range); } - + try { List subSlices = new ArrayList<>(subRanges.size()); List subShardNames = new ArrayList<>(subRanges.size()); @@ -1053,7 +1059,7 @@ private boolean splitShard(ClusterState clusterState, ZkNodeProps message, Named subSlices.add(subSlice); String subShardName = collectionName + "_" + subSlice + "_replica1"; subShardNames.add(subShardName); - + Slice oSlice = collection.getSlice(subSlice); if (oSlice != null) { final Slice.State state = oSlice.getState(); @@ -1079,17 +1085,17 @@ private boolean splitShard(ClusterState clusterState, ZkNodeProps message, Named } } } - + final String asyncId = message.getStr(ASYNC); Map requestMap = new HashMap<>(); - + for (int i = 0; i < subRanges.size(); i++) { String subSlice = subSlices.get(i); String subShardName = subShardNames.get(i); DocRouter.Range subRange = subRanges.get(i); - + log.info("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName); - + Map propMap = new HashMap<>(); propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower()); propMap.put(ZkStateReader.SHARD_ID_PROP, subSlice); @@ -1099,13 +1105,13 @@ private boolean splitShard(ClusterState clusterState, ZkNodeProps message, Named propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName()); DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient()); inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap))); - + // wait until we are able to see the new shard in cluster state waitForNewShard(collectionName, subSlice); - + // refresh cluster state clusterState = zkStateReader.getClusterState(); - + log.info("Adding replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName + " on " + nodeName); propMap = new HashMap<>(); @@ -1128,7 +1134,7 @@ private boolean splitShard(ClusterState clusterState, ZkNodeProps message, Named } processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap); - + for (String subShardName : subShardNames) { // wait for parent leader to acknowledge the sub-shard core log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName); @@ -1140,20 +1146,20 @@ private boolean splitShard(ClusterState clusterState, ZkNodeProps message, Named cmd.setState(Replica.State.ACTIVE); cmd.setCheckLive(true); cmd.setOnlyIfLeader(true); - + ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams()); sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap); } processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up", asyncId, requestMap); - + log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice + " on: " + parentShardLeader); - + log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice " + slice + " of collection " + collectionName + " on " + parentShardLeader); - + ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString()); params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core")); @@ -1162,57 +1168,57 @@ private boolean splitShard(ClusterState clusterState, ZkNodeProps message, Named params.add(CoreAdminParams.TARGET_CORE, subShardName); } params.set(CoreAdminParams.RANGES, rangesStr); - + sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap); processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command", asyncId, requestMap); - + log.info("Index on shard: " + nodeName + " split into two successfully"); - + // apply buffered updates on sub-shards for (int i = 0; i < subShardNames.size(); i++) { String subShardName = subShardNames.get(i); - + log.info("Applying buffered updates on : " + subShardName); - + params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString()); params.set(CoreAdminParams.NAME, subShardName); - + sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap); } processResponses(results, shardHandler, true, "SPLITSHARD failed while asking sub shard leaders" + " to apply buffered updates", asyncId, requestMap); - + log.info("Successfully applied buffered updates on : " + subShardNames); - + // Replica creation for the new Slices - + // look at the replication factor and see if it matches reality // if it does not, find best nodes to create more cores - + // TODO: Have replication factor decided in some other way instead of numShards for the parent - + int repFactor = parentSlice.getReplicas().size(); - + // we need to look at every node and see how many cores it serves // add our new cores to existing nodes serving the least number of cores // but (for now) require that each core goes on a distinct node. - + // TODO: add smarter options that look at the current number of cores per // node? // for now we just go random Set nodes = clusterState.getLiveNodes(); List nodeList = new ArrayList<>(nodes.size()); nodeList.addAll(nodes); - + // TODO: Have maxShardsPerNode param for this operation? - + // Remove the node that hosts the parent shard for replica creation. nodeList.remove(nodeName); - + // TODO: change this to handle sharding a slice into > 2 sub-shards. @@ -1298,11 +1304,11 @@ private boolean splitShard(ClusterState clusterState, ZkNodeProps message, Named } processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap); - + log.info("Successfully created all replica shards for all sub-slices " + subSlices); - + commit(results, slice, parentShardLeader); - + return true; } catch (SolrException e) { throw e; @@ -1396,10 +1402,10 @@ private void waitForNewShard(String collectionName, String sliceName) throws Kee private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) { String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP); String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP); - + log.info("Delete shard invoked"); Slice slice = clusterState.getSlice(collectionName, sliceId); - + if (slice == null) { if (clusterState.hasCollection(collectionName)) { throw new SolrException(ErrorCode.BAD_REQUEST, @@ -1423,7 +1429,7 @@ private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedLi if (asyncId != null) { requestMap = new HashMap<>(slice.getReplicas().size(), 1.0f); } - + try { ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString()); @@ -1438,7 +1444,7 @@ private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedLi ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP, collectionName, ZkStateReader.SHARD_ID_PROP, sliceId); Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m)); - + // wait for a while until we don't see the shard TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS); boolean removed = false; @@ -1455,9 +1461,9 @@ private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedLi throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully remove collection: " + collectionName + " shard: " + sliceId); } - + log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId); - + } catch (SolrException e) { throw e; } catch (Exception e) { @@ -1877,7 +1883,7 @@ private void createCollection(ClusterState clusterState, ZkNodeProps message, Na + nodeList.size() + "). It's unusual to run two replica of the same slice on the same Solr-instance."); } - + int maxShardsAllowedToCreate = maxShardsPerNode * nodeList.size(); int requestedShardsToCreate = numSlices * repFactor; if (maxShardsAllowedToCreate < requestedShardsToCreate) { @@ -2065,14 +2071,14 @@ private Map waitToSeeReplicasInState(String collectionName, Col } } } - + if (result.size() == coreNames.size()) { return result; } if (timeout.hasTimedOut()) { throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas: " + coreNames + " in cluster state."); } - + Thread.sleep(100); } } @@ -2086,9 +2092,9 @@ private void addReplica(ClusterState clusterState, ZkNodeProps message, NamedLis if (StringUtils.isBlank(coreName)) { coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME); } - + final String asyncId = message.getStr(ASYNC); - + DocCollection coll = clusterState.getCollection(collection); if (coll == null) { throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist"); @@ -2125,7 +2131,7 @@ private void addReplica(ClusterState clusterState, ZkNodeProps message, NamedLis } } ModifiableSolrParams params = new ModifiableSolrParams(); - + if (!Overseer.isLegacy(zkStateReader.getClusterProps())) { if (!skipCreateReplicaInClusterState) { ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(), ZkStateReader.COLLECTION_PROP, @@ -2137,12 +2143,12 @@ private void addReplica(ClusterState clusterState, ZkNodeProps message, NamedLis params.set(CoreAdminParams.CORE_NODE_NAME, waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName()); } - + String configName = zkStateReader.readConfigName(collection); String routeKey = message.getStr(ShardParams._ROUTE_); String dataDir = message.getStr(CoreAdminParams.DATA_DIR); String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR); - + params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString()); params.set(CoreAdminParams.NAME, coreName); params.set(COLL_CONF, configName); @@ -2166,7 +2172,7 @@ private void addReplica(ClusterState clusterState, ZkNodeProps message, NamedLis params.set(CoreAdminParams.INSTANCE_DIR, instanceDir); } addPropertyParams(message, params); - + // For tracking async calls. Map requestMap = new HashMap<>(); sendShardRequest(node, params, shardHandler, asyncId, requestMap); @@ -2176,75 +2182,23 @@ private void addReplica(ClusterState clusterState, ZkNodeProps message, NamedLis waitForCoreNodeName(collection, node, coreName); } - private void processBackupAction(ZkNodeProps message, NamedList results) throws IOException, KeeperException, InterruptedException { - String collectionName = message.getStr(COLLECTION_PROP); - String backupName = message.getStr(NAME); + private void processBackupAction(ZkNodeProps message, NamedList results) + throws IOException, KeeperException, InterruptedException { + String collectionName = message.getStr(COLLECTION_PROP); + String backupName = message.getStr(NAME); String location = message.getStr("location"); ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); String asyncId = message.getStr(ASYNC); - Map requestMap = new HashMap<>(); - Instant startTime = Instant.now(); - // note: we assume a shared files system to backup a collection, since a collection is distributed - Path backupPath = Paths.get(location).resolve(backupName).toAbsolutePath(); + // TODO - Support configuring "cluster wide" repositories so that they can be referred by name. + // TODO - Extract the "base" path from the repository configuration. + URI backupPath = Paths.get(location).toUri(); + BackupRepository repository = BackupRepositoryFactory.create(backupPath); + IndexBackupStrategy strategy = new CopyFilesStrategy(new ShardRequestProcessor(shardHandler, adminPath, zkStateReader, asyncId)); + BackupManager mgr = new BackupManager(repository, strategy, zkStateReader, collectionName); - //Validating if the directory already exists. - if (Files.exists(backupPath)) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, - "Backup directory already exists: " + backupPath); - } - Files.createDirectory(backupPath); // create now - - log.info("Starting backup of collection={} with snapshotName={} at location={}", collectionName, backupName, - backupPath); - - for (Slice slice : zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices()) { - Replica replica = slice.getLeader(); - - String coreName = replica.getStr(CORE_NAME_PROP); - - ModifiableSolrParams params = new ModifiableSolrParams(); - params.set(CoreAdminParams.ACTION, CoreAdminAction.BACKUPCORE.toString()); - params.set(NAME, slice.getName()); - params.set("location", backupPath.toString()); // note: index dir will be here then the "snapshot." + slice name - params.set(CORE_NAME_PROP, coreName); - - sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap); - log.debug("Sent backup request to core={} for snapshotName={}", coreName, backupName); - } - log.debug("Sent backup requests to all shard leaders for snapshotName={}", backupName); - - processResponses(results, shardHandler, true, "Could not backup all replicas", asyncId, requestMap); - - log.info("Starting to backup ZK data for snapshotName={}", backupName); - - //Download the configs - String configName = zkStateReader.readConfigName(collectionName); - Path zkBackup = backupPath.resolve("zk_backup"); - zkStateReader.getConfigManager().downloadConfigDir(configName, zkBackup.resolve("configs").resolve(configName)); - - //Save the collection's state. Can be part of the monolithic clusterstate.json or a individual state.json - //Since we don't want to distinguish we extract the state and back it up as a separate json - DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName); - Files.write(zkBackup.resolve("collection_state.json"), - Utils.toJSON(Collections.singletonMap(collectionName, collection))); - - Path propertiesPath = backupPath.resolve("backup.properties"); - Properties properties = new Properties(); - - properties.put("snapshotName", backupName); - properties.put("collection", collectionName); - properties.put("collection.configName", configName); - properties.put("startTime", startTime.toString()); - //TODO: Add MD5 of the configset. If during restore the same name configset exists then we can compare checksums to see if they are the same. - //if they are not the same then we can throw an error or have an 'overwriteConfig' flag - //TODO save numDocs for the shardLeader. We can use it to sanity check the restore. - - try (Writer os = Files.newBufferedWriter(propertiesPath, StandardCharsets.UTF_8)) { - properties.store(os, "Snapshot properties file"); - } - - log.info("Completed backing up ZK data for snapshotName={}", backupName); + // Execute the backup operation. + mgr.createBackup(backupName); } private void processRestoreAction(ZkNodeProps message, NamedList results) throws IOException, KeeperException, InterruptedException { @@ -2451,7 +2405,7 @@ private void processResponses(NamedList results, ShardHandler shardHandler, bool private String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException { String configName = message.getStr(COLL_CONF); - + if (configName == null) { // if there is only one conf, use that List configNames = null; @@ -2470,7 +2424,7 @@ private String getConfigName(String coll, ZkNodeProps message) throws KeeperExce } return configName; } - + private boolean validateConfig(String configName) throws KeeperException, InterruptedException { return zkStateReader.getZkClient().exists(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName, true); } @@ -2480,7 +2434,7 @@ private boolean validateConfig(String configName) throws KeeperException, Interr * That check should be done before the config node is created. */ private void createConfNode(String configName, String coll, boolean isLegacyCloud) throws KeeperException, InterruptedException { - + if (configName != null) { String collDir = ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll; log.info("creating collections conf node {} ", collDir); @@ -2499,7 +2453,7 @@ private void createConfNode(String configName, String coll, boolean isLegacyClou } } - + private void collectionCmd(ZkNodeProps message, ModifiableSolrParams params, NamedList results, Replica.State stateMatcher, String asyncId, Map requestMap) { collectionCmd( message, params, results, stateMatcher, asyncId, requestMap, Collections.emptySet()); @@ -2514,7 +2468,7 @@ private void collectionCmd(ZkNodeProps message, ModifiableSolrParams params, ClusterState clusterState = zkStateReader.getClusterState(); DocCollection coll = clusterState.getCollection(collectionName); - + for (Slice slice : coll.getSlices()) { sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap); } @@ -2539,7 +2493,7 @@ private void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Re } } } - + private void processResponse(NamedList results, ShardResponse srsp, Set okayExceptions) { Throwable e = srsp.getException(); String nodeName = srsp.getNodeName(); diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardRequestProcessor.java b/solr/core/src/java/org/apache/solr/cloud/ShardRequestProcessor.java new file mode 100644 index 000000000000..5fdca27a6327 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/ShardRequestProcessor.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; + +import java.lang.invoke.MethodHandles; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.solr.client.solrj.SolrResponse; +import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.handler.component.ShardHandler; +import org.apache.solr.handler.component.ShardRequest; +import org.apache.solr.handler.component.ShardResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class provides functionality required to invoke operations for Solr collection replicas + * and process the results of those operations. + * TODO - Needs to cleanup this class as well as OverseerCollectionMessageHandler + */ +public class ShardRequestProcessor { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final ShardHandler shardHandler; + private final String adminPath; + private final ZkStateReader zkStateReader; + private final String asyncId; + private final Map requestMap = new HashMap<>(); + + public ShardRequestProcessor(ShardHandler shardHandler, String adminPath, ZkStateReader zkStateReader, String asyncId) { + this.shardHandler = shardHandler; + this.adminPath = adminPath; + this.zkStateReader = zkStateReader; + this.asyncId = asyncId; + } + + public ZkStateReader getZkStateReader() { + return zkStateReader; + } + + public void sendShardRequest(String nodeName, ModifiableSolrParams params) { + if (asyncId != null) { + String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime()); + params.set(ASYNC, coreAdminAsyncId); + requestMap.put(nodeName, coreAdminAsyncId); + } + + ShardRequest sreq = new ShardRequest(); + params.set("qt", adminPath); + sreq.purpose = 1; + String replica = zkStateReader.getBaseUrlForNodeName(nodeName); + sreq.shards = new String[]{replica}; + sreq.actualShards = sreq.shards; + sreq.nodeName = nodeName; + sreq.params = params; + + shardHandler.submit(sreq, replica, sreq.params); + } + + public NamedList processResponses(boolean abortOnError, String msgOnError, Set okayExceptions) { + NamedList results = new NamedList<>(); + // Processes all shard responses + ShardResponse srsp; + do { + srsp = shardHandler.takeCompletedOrError(); + if (srsp != null) { + processResponse(results, srsp, okayExceptions); + Throwable exception = srsp.getException(); + if (abortOnError && exception != null) { + // drain pending requests + while (srsp != null) { + srsp = shardHandler.takeCompletedOrError(); + } + throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception); + } + } + } while (srsp != null); + + // If request is async wait for the core admin to complete before returning + if (asyncId != null) { + waitForAsyncCallsToComplete(requestMap, results); + requestMap.clear(); + } + + return results; + } + + private void processResponse(NamedList results, ShardResponse srsp, Set okayExceptions) { + Throwable e = srsp.getException(); + String nodeName = srsp.getNodeName(); + SolrResponse solrResponse = srsp.getSolrResponse(); + String shard = srsp.getShard(); + + processResponse(results, e, nodeName, solrResponse, shard, okayExceptions); + } + + @SuppressWarnings("unchecked") + private void processResponse(NamedList results, Throwable e, String nodeName, SolrResponse solrResponse, String shard, Set okayExceptions) { + String rootThrowable = null; + if (e instanceof RemoteSolrException) { + rootThrowable = ((RemoteSolrException) e).getRootThrowable(); + } + + if (e != null && (rootThrowable == null || !okayExceptions.contains(rootThrowable))) { + log.error("Error from shard: " + shard, e); + + SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure"); + if (failure == null) { + failure = new SimpleOrderedMap(); + results.add("failure", failure); + } + + failure.add(nodeName, e.getClass().getName() + ":" + e.getMessage()); + + } else { + + SimpleOrderedMap success = (SimpleOrderedMap) results.get("success"); + if (success == null) { + success = new SimpleOrderedMap(); + results.add("success", success); + } + + success.add(nodeName, solrResponse.getResponse()); + } + } + + @SuppressWarnings("unchecked") + private void waitForAsyncCallsToComplete(Map requestMap, NamedList results) { + for (String k:requestMap.keySet()) { + log.debug("I am Waiting for :{}/{}", k, requestMap.get(k)); + results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k))); + } + } + + private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString()); + params.set(CoreAdminParams.REQUESTID, requestId); + int counter = 0; + ShardRequest sreq; + do { + sreq = new ShardRequest(); + params.set("qt", adminPath); + sreq.purpose = 1; + String replica = zkStateReader.getBaseUrlForNodeName(nodeName); + sreq.shards = new String[] {replica}; + sreq.actualShards = sreq.shards; + sreq.params = params; + + shardHandler.submit(sreq, replica, sreq.params); + + ShardResponse srsp; + do { + srsp = shardHandler.takeCompletedOrError(); + if (srsp != null) { + NamedList results = new NamedList(); + processResponse(results, srsp, Collections.emptySet()); + String r = (String) srsp.getSolrResponse().getResponse().get("STATUS"); + if (r.equals("running")) { + log.debug("The task is still RUNNING, continuing to wait."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + continue; + + } else if (r.equals("completed")) { + log.debug("The task is COMPLETED, returning"); + return srsp.getSolrResponse().getResponse(); + } else if (r.equals("failed")) { + // TODO: Improve this. Get more information. + log.debug("The task is FAILED, returning"); + return srsp.getSolrResponse().getResponse(); + } else if (r.equals("notfound")) { + log.debug("The task is notfound, retry"); + if (counter++ < 5) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + break; + } + throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request for requestId: " + requestId + "" + srsp.getSolrResponse().getResponse().get("STATUS") + + "retried " + counter + "times"); + } else { + throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request " + srsp.getSolrResponse().getResponse().get("STATUS")); + } + } + } while (srsp != null); + } while(true); + } +} diff --git a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java new file mode 100644 index 000000000000..96a47fb8cfd8 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.core.backup; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.net.URI; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Properties; + +import org.apache.lucene.util.Version; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.backup.repository.BackupRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * This class implements functionality to create a backup with extension points provided + * to integrate with different types of file-systems. + */ +public class BackupManager { + protected static Logger log = LoggerFactory.getLogger(BackupManager.class); + public static final String COLLECTION_PROPS_FILE = "collection_state.json"; + public static final String BACKUP_PROPS_FILE = "backup.properties"; + public static final String ZK_STATE_DIR = "zk_backup"; + public static final String CONFIG_STATE_DIR = "configs"; + + // Backup properties + public static final String COLLECTION_NAME_PROP = "collection"; + public static final String BACKUP_ID_PROP = "backupId"; + public static final String COLLECTION_CONFIG_NAME_PROP = "collection.configName"; + public static final String INDEX_VERSION_PROP = "index.version"; + // version of the backup implementation. Defined to enable backwards compatibility + public static final String VERSION_PROP = "version"; + + protected final ZkStateReader zkStateReader; + protected final String collectionName; + protected final BackupRepository repository; + protected final IndexBackupStrategy strategy; + + public BackupManager(BackupRepository repository, IndexBackupStrategy strategy, ZkStateReader zkStateReader, + String collectionName) { + this.repository = Preconditions.checkNotNull(repository); + this.strategy = Preconditions.checkNotNull(strategy); + this.zkStateReader = Preconditions.checkNotNull(zkStateReader); + this.collectionName = Preconditions.checkNotNull(collectionName); + } + + /** + * @return The version of this backup implementation. + */ + public final String getVersion() { + return "1.0"; + } + + /** + * This method implements the backup functionality. + * + * @param backupId The unique name for the backup to be created + * @throws IOException in case of errors + */ + public final void createBackup(String backupId) throws IOException { + Preconditions.checkNotNull(backupId); + + // Fetch the collection state (and validate its existence). + DocCollection collectionState = zkStateReader.getClusterState().getCollection(collectionName); + + // Backup location + URI backupPath = repository.createURI(backupId); + + if(repository.exists(backupPath)) { + throw new SolrException(ErrorCode.SERVER_ERROR, "The backup directory already exists " + backupPath); + } + + // Create a directory to store backup details. + repository.createDirectory(backupPath); + + try { + // Backup the index data. + log.info("Starting backup of collection={} with backupName={} at location={}", collectionName, backupId, + backupPath); + strategy.createBackup(backupPath, collectionName, backupId); + + log.info("Starting to backup ZK data for backupName={}", backupId); + + // Save the configset for the collection + String configName = zkStateReader.readConfigName(collectionName); + URI configPath = repository.createURI(backupId, ZK_STATE_DIR, CONFIG_STATE_DIR, configName); + downloadConfigDir(configName, configPath); + + Properties props = new Properties(); + props.setProperty(COLLECTION_NAME_PROP, collectionName); + props.setProperty(BACKUP_ID_PROP, backupId); + props.put(COLLECTION_CONFIG_NAME_PROP, configName); + props.put(INDEX_VERSION_PROP, Version.LATEST.toString()); + props.put(VERSION_PROP, getVersion()); + + try ( Writer propsWriter = new OutputStreamWriter(repository.createOutput(repository.createURI(backupId, BACKUP_PROPS_FILE))); + OutputStream collectionStateOs = repository.createOutput(repository.createURI(backupId, ZK_STATE_DIR, COLLECTION_PROPS_FILE))) { + collectionStateOs.write(Utils.toJSON(Collections.singletonMap(collectionName, collectionState))); + props.store(propsWriter, null); + } + + log.info("Completed backing up ZK data for backupName={}", backupId); + } catch (IOException ex) { + log.error("Unable to create a snapshot...", ex); + try { + log.info("Cleaning up the snapshot " + backupId + " for collection " + collectionName); + deleteBackup(backupId); + } catch (IOException e) { + log.error("Failed to cleanup a snapshot...", e); + } + } + } + + /** + * @param backupId The name for the backup to be deleted + * @throws IOException in case of errors + */ + public void deleteBackup(String backupId) throws IOException { + // Backup location + URI backupPath = repository.createURI(backupId); + repository.deleteDirectory(backupPath); + } + + private void downloadConfigDir(String configName, URI dest) throws IOException { + // TODO - Enable copying ZK data to other file-systems too. + zkStateReader.getConfigManager().downloadConfigDir(configName, Paths.get(dest)); + } +} diff --git a/solr/core/src/java/org/apache/solr/core/backup/CopyFilesStrategy.java b/solr/core/src/java/org/apache/solr/core/backup/CopyFilesStrategy.java new file mode 100644 index 000000000000..3f33bff771c8 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/core/backup/CopyFilesStrategy.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.core.backup; + +import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; +import static org.apache.solr.common.params.CommonParams.NAME; + +import java.lang.invoke.MethodHandles; +import java.net.URI; +import java.util.Collections; + +import org.apache.solr.cloud.ShardRequestProcessor; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A concrete implementation of {@linkplain IndexBackupStrategy} which copies files associated + * with the index commit to the specified location. + */ +public class CopyFilesStrategy implements IndexBackupStrategy { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final ShardRequestProcessor processor; + + public CopyFilesStrategy(ShardRequestProcessor processor) { + this.processor = processor; + } + + @Override + public void createBackup(URI basePath, String collectionName, String backupName) { + ZkStateReader zkStateReader = processor.getZkStateReader(); + + for (Slice slice : zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices()) { + Replica replica = slice.getLeader(); + String coreName = replica.getStr(CORE_NAME_PROP); + + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminAction.BACKUPCORE.toString()); + params.set(NAME, slice.getName()); + // TODO - Fix the core level BACKUP operation to accept a URI so that it can also work with other file-systems. + params.set("location", basePath.getPath()); // note: index dir will be here then the "snapshot." + slice name + params.set(CORE_NAME_PROP, coreName); + + processor.sendShardRequest(replica.getNodeName(), params); + log.debug("Sent backup request to core={} for backupname={}", coreName, backupName); + } + + processor.processResponses( true, "Could not backup all replicas", Collections.emptySet()); + } +} diff --git a/solr/core/src/java/org/apache/solr/core/backup/IndexBackupStrategy.java b/solr/core/src/java/org/apache/solr/core/backup/IndexBackupStrategy.java new file mode 100644 index 000000000000..e5d43d4e0b2c --- /dev/null +++ b/solr/core/src/java/org/apache/solr/core/backup/IndexBackupStrategy.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.core.backup; + +import java.net.URI; + +/** + * This interface defines the strategy used to backup the Solr collection index data. + */ +public interface IndexBackupStrategy { + /** + * Backups the index data for specified Solr collection at the given location. + * + * @param basePath The base location where the backup needs to be stored. + * @param collectionName The name of the collection which needs to be backed up. + * @param backupName The unique name of the backup. + */ + public void createBackup(URI basePath, String collectionName, String backupName); +} diff --git a/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepository.java b/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepository.java new file mode 100644 index 000000000000..0d1522d4f96f --- /dev/null +++ b/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepository.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.core.backup.repository; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; + +public interface BackupRepository { + /** + * This method creates a URI using the specified base location (during object creation) and the path + * components (as method arguments). + * @param pathComponents The directory (or file-name) to be included in the URI. + * @return A URI containing absolute path + */ + URI createURI(String... pathComponents); + + /** + * This method returns a {@linkplain OutputStream} instance for the specified path + * + * @param path The path for which {@linkplain OutputStream} needs to be created + * @return {@linkplain OutputStream} instance for the specified path + * @throws IOException in case of errors + */ + OutputStream createOutput(URI path) throws IOException; + + /** + * This method checks if the specified path exists in this repository. + * + * @param path The path whose existence needs to be checked. + * @return if the specified path exists in this repository. + * @throws IOException in case of errors + */ + boolean exists(URI path) throws IOException; + + /** + * This method creates a directory at the specified path. + * + * @param path The path where the directory needs to be created. + * @throws IOException in case of errors + */ + void createDirectory(URI path) throws IOException; + + /** + * This method deletes a directory at the specified path. + * + * @param path The path referring to the directory to be deleted. + * @throws IOException in case of errors + */ + void deleteDirectory(URI path) throws IOException; +} diff --git a/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepositoryFactory.java b/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepositoryFactory.java new file mode 100644 index 000000000000..0002767049bb --- /dev/null +++ b/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepositoryFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.core.backup.repository; + +import java.net.URI; +import com.google.common.base.Preconditions; + +public class BackupRepositoryFactory { + public static BackupRepository create(URI baseLocation) { + Preconditions.checkNotNull(baseLocation); + + String scheme = baseLocation.getScheme(); + if("file".equalsIgnoreCase(scheme)) { + return new LocalFileSystemRepository(baseLocation); + } + throw new UnsupportedOperationException("Backup operation is not supported for specified URI: " + baseLocation); + } + +} diff --git a/solr/core/src/java/org/apache/solr/core/backup/repository/LocalFileSystemRepository.java b/solr/core/src/java/org/apache/solr/core/backup/repository/LocalFileSystemRepository.java new file mode 100644 index 000000000000..a17354816ef8 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/core/backup/repository/LocalFileSystemRepository.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.core.backup.repository; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; + +public class LocalFileSystemRepository implements BackupRepository { + private final URI baseLocation; + + public LocalFileSystemRepository(URI baseLocation) { + this.baseLocation = baseLocation; + } + + @Override + public URI createURI(String... pathComponents) { + Path result = Paths.get(baseLocation); + for(String path : pathComponents) { + result = result.resolve(path); + } + return result.toUri(); + } + + @Override + public void createDirectory(URI path) throws IOException { + Files.createDirectory(Paths.get(path)); + } + + @Override + public void deleteDirectory(URI path) throws IOException { + Files.walkFileTree(Paths.get(path), new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) + throws IOException { + Files.delete(file); + return FileVisitResult.CONTINUE; + } + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) + throws IOException { + Files.delete(dir); + return FileVisitResult.CONTINUE; + } + }); + } + + @Override + public boolean exists(URI path) throws IOException { + return Files.exists(Paths.get(path)); + } + + @Override + public OutputStream createOutput(URI path) throws IOException { + return Files.newOutputStream(Paths.get(path)); + } +}