Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-13101 : Shared storage support in SolrCloud #864

Open
wants to merge 35 commits into
base: branch_8x
from

Conversation

@megancarey
Copy link
Contributor

commented Sep 9, 2019

Description

This PR is being opened to expose the code for integrating SolrCloud with a shared blobstore. This is a work in progress - we are hoping to open this up to community discussion and gather feedback.

I'm making this PR on behalf of Andy Vuong (@andyvuong), Ilan Ginzburg, Prabhdeep Singh Gill (@PrabhdeepsGill), Megha Siddavanahalli, Bilal Waheed, Chock Viswanathan, and others.

Solution

Solr + Blobstore

Overview

This repo introduces a new framework which allows SolrCloud to integrate with an external (typically cloud-based) blobstore. Instead of maintaining a copy of the index on each Solr host, replicating updates to peers, and using a transaction log to maintain consistent ordered updates, Solr hosts will push and pull cores to/from this external store.

TL;DR: For now, SolrCloud can be configured to use blobstore at a collection level. Collections backed by blobstore use a new SHARED replica type. When a Solr node makes an update request to a shared shard, it indexes locally and then pushes the change through to a shared blobstore. Zookeeper manages index versioning and provides a source of truth in the case of concurrent writes. Solr nodes in a cluster will no longer use peer-to-peer replication, and instead will pull updates directly from the shared blobstore.

Please note that this project is a work in progress, and is by no means production-ready. This code is being published early get feedback, which we will incorporate in future work.

In order to modularize these changes and maintain existing functionality, most of the blobstore-related code is isolated to the solr/core/src/java/org/apache/solr/store/blob directory. However, there some key integration touchpoints in HttpSolrCall#init, DistributedZkUpdateProcessor, and CoreContainer#load. These classes all have special handling for blobstore-based shards.

Pulling from Blobstore

Core pulls are, for the most part, asynchronous. When a replica is queried, it enqueues a pull from blobstore but doesn’t wait for the pull to complete before it executes the query, unless the replica is missing a copy of that core altogether. If your operation requires that local cores are in-sync with blobstore, use the method BlobStoreUtils#syncLocalCoreWithSharedStore.

A more in-depth walkthrough of the pull code:

  • BlobCoreSyncer: manages threads that sync between local and blob store, so that if a pull is in progress, we do not create duplicate work.
  • Calls into CorePullTracker: creates PullCoreInfo object containing data about the core to be pulled and adds to a deduplicated list.
  • This queue of pull objects is polled by the CorePullerFeeder, which uses threads from its dedicated thread pool to execute CorePullTasks.
  • CorePullTask: checks if a pull is already underway for this core; if not, executes a pull from blob store. Resolves differences between blob’s version of the core and local version, and stores the updated core

Pushing to Blobstore

This happens synchronously. On every local commit, we push to blobstore and only ack that the update was successful when it is committed both locally and in the shared store.

A more in-depth walkthrough of the push code:

  • DistributedZkUpdateProcessor: once a commit is complete for a SHARED replica (onFinish), we writeToShareStore.
  • This calls into CoreUpdateTracker, which creates a PushPullData object containing data about the collection, core, and most recently pulled version of the core on this replica.
  • CorePusher: resolves the differences between blob’s version of the core and local version, and pushes the updated version to blob store

Resolving Local and Blobstore

The SharedStoreResolutionUtil handles resolving diffs between the Solr node’s local copy of a core and the copy in blobstore. It does so by pulling the metadata for the core from blobstore (BlobCoreMetadata), comparing against the local metadata (ServerSideMetadata), and creating a list of segments to push or pull.

Version Management

Only the leader node can push updates to blobstore. Because a new leader can be elected at any time, there is still a possibility for race conditions on writes to blobstore. In order to maintain a consistent global view of the latest version of a core, we keep version data in Zookeeper.

Zookeeper stores this version data as a random string called metadataSuffix. When a SolrCloud node makes an update request, it first pushes the files to blobstore and then makes a conditional update to the metadataSuffix variable. If Zookeeper rejects the conditional update, the update request fails, and the failure is propagated back to the client.

This communication with Zookeeper is coordinated in the SharedShardMetadataController. The SharedShardMetadataController belongs to the Overseer (i.e. the leader replica).

Try it yourself

If you want to try this out locally, you can start up SolrCloud with the given blobstore code. The code will default to using the local blobstore client, with /tmp/BlobStoreLocal as the blobstore directory (see LocalStorageClient). You can create a shared collection through the Solr admin UI by setting “shared store based” to true.

Note: if you want to try testing with the S3StorageClient, you need to store a valid S3 bucket name and credentials as environment variables (see S3StorageClient#AmazonS3Configs).

Tests

Find tests in the test directory (solr/core/src/test/org/apache/solr/store/blob/).

Checklist

Please review the following and check all that apply:

  • I have reviewed the guidelines for How to Contribute and my code conforms to the standards described there to the best of my ability.
  • I have created a Jira issue and added the issue ID to my pull request title.
  • I am authorized to contribute this code to the ASF and have removed any code I do not have a license to distribute.
  • I have developed this patch against the master branch.
  • I have run ant precommit and the appropriate test suite.
  • I have added tests for my changes.
  • I have added documentation for the Ref Guide (for Solr changes only).
Ilan Ginzburg and others added 30 commits May 17, 2019
SOLR-13101: Added configuration for Collections backed by shared storage and a new SHARE replica type
W-6020845
…ked out the .gitignore file from solr.7.1 branch
Made sync push the default and deleted async push code.
…s might be failing or no longer make sense!) (#315)
…cal filename in the blob name (and a random suffix) (#317)
Adding tracking for shared cores on commit (@W-6150168).
Updating DistributedZkUpdateProcessorTest to include cases for shared and non-shared replica types. Also added a method to the SolrCloudTestCase that allows accessing the core container of a given node so that we can actually access a specific core in tests.
#325)

Add safe writes from shared-based collections to blob with zk correctness mechanisms
Jenkinsfile with build and test steps commented out for the time being. To unblock blob work.
Async pull from blob on new requests working.
Wait for missing core to be pulled when queried, and return results. Also skipping recovery for SHARED replicas.
* Update s3 client info

* update S3 client

* Add comments

* Fix related files

* update s3 client code

* delete test data

* Address review comments and default to local filesystem when storage provider isnt defined

* fix PR comments
Yonik Seeley and others added 5 commits Sep 3, 2019
@megancarey megancarey marked this pull request as ready for review Sep 9, 2019
Copy link

left a comment

Congratulations to Ilan Ginzburg, Yonik Seeley and team for one of the coolest talks I have seen at Activate 2019. Good to know that you are opening this WIP project to the community. :) I took the liberty to give a first time, first glance review. Please, let me know if you find it worth or not, so that I can know if I am going to do a further, in depth look at the code. Again, congrats!

private void pushToSharedStore(SolrCore core) {
// Push the index to blob storage before we set our state to ACTIVE
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
if (cloudDesc.getReplicaType().equals(Replica.Type.SHARED)) {

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 13, 2019

Replica.Type.SHARED is a enum so this line could be as below, right?

Suggested change
if (cloudDesc.getReplicaType().equals(Replica.Type.SHARED)) {
if (cloudDesc.getReplicaType() == Replica.Type.SHARED) {
PutObjectRequest putRequest = new PutObjectRequest(blobBucketName, blobPath, is, objectMetadata);

s3Client.putObject(putRequest);
is.close();

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 13, 2019

Suggestion: put this in a finally block.

/** @return the *first* value for the specified parameter or {@code defaultValue}, if not present. */
private String getParameterValue(Map<String, String[]> params, String parameterName, String defaultValue) {
String[] values = params.get(parameterName);
if (values != null && values.length == 1) {

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 13, 2019

Suggested change
if (values != null && values.length == 1) {
return (values != null && values.length == 1)? values[0] : defaultValue;

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private CoreStorageClient storageClient;

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 13, 2019

Suggested change
private CoreStorageClient storageClient;
private volatile CoreStorageClient storageClient;

This comment has been minimized.

Copy link
@murblanc

murblanc Sep 17, 2019

Good catch.
I think if the first getClient() just does:
return getClient(BlobstoreProviderType.getConfiguredProvider());

Then we're ok. I believe the check of storageClient not null was made as an "optimization" here. We can also synchronize the first getClient(), but I don't like to mix volatile and monitors for access to the same object.

}

private synchronized CoreStorageClient getClient(BlobstoreProviderType blobStorageProviderType) {
if (storageClient != null) {

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 13, 2019

Lines 37-39 duplicate lines 29-31. Maybe remove the redundant lines in the getClient() method?

log.info("Core " + coreName + " expected in dir " + coreIndexDir.getAbsolutePath() + " exists=" + coreIndexDir.exists()
+ " and location.instanceDirectory.getAbsolutePath()=" + coreIndexDir.getAbsolutePath());

if (core != null) {

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 13, 2019

This snippet is alright, but I'd suggest to rewrite as just for making readability a tidy bit better:

    // Core doesn't exists.
    if (core == null) {
       return false;
    }
    core.close();
    return true;
DocCollection collection = controller.getZkStateReader().
getClusterState().getCollection(pci.getCollectionName());
Collection<Replica> replicas = collection.getReplicas();
Replica replica = null;

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 13, 2019

Lines 404-410 could be replaced by:

Optional<Replica> replica = replicas.stream().filter(r -> r.getCoreName().equals(pci.getCoreName()).findFirst()

if (replica.isEmpty()) {
   throw new Exception("Replica " + pci.getCoreName() + " for collection " +
          pci.getCollectionName() + " does not exist in ZK");
}

Then lines below need to replace calls like replica.getName() by replica.get().getName()

@@ -31,6 +31,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Locale;

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 13, 2019

is this being used?

* Initiates a SharedShardMetadataController if it doesn't exist and returns one
*/
public SharedShardMetadataController getSharedShardMetadataController() {
if (sharedShardMetadataController != null) {

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 13, 2019

If this method (and the ones below) can be called by multiple threads then it can possibly hit a situation where two or more threads arrive at this line at the same time and sharedShardMetadataController is null and will create one or more objects. Does it make sense? Would it be the case of synchronizing the method?

*/
private final String blobName;

// TODO add some checksum here to verify blob files are not corrupt

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 13, 2019

import java.util.zip.CRC32
(...)
private long checksum;

public static long calculateDigest(byte[] data) {
      CRC32 digest = new CRC32();
      digest.update(data);
      return digest.getValue();
}

public long getChecksum(){
    return checksum;
}

public void setChecksum(byte[] data) {
    checksum = calculateChecksum(data);
}
public boolean isCorrupt(long digest) {
    return this.checksum == digest;
}

PS: another option is to use Adler32.

@@ -0,0 +1,95 @@
package org.apache.solr.store.blob.client;

import java.util.*;

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 13, 2019

We usually avoid wildcard imports in Apache projects, iirc.

try {
for (String blobPath : paths) {
final File blobFile = new File(getBlobAbsolutePath(blobPath));
blobFile.delete();

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 13, 2019

File.delete() returns false if the file cannot be deleted. Maybe it is the case of using Files utility class that throws an exception if the file cannot be deleted?

This comment has been minimized.

Copy link
@murblanc

murblanc Sep 17, 2019

I believe it was in case of IOException. If the delete fails locally it's not really an issue, as file names are unique and the local file system is just to help dev/debug/tests, not meant to really be used.

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 19, 2019

I see. But I can see people deploying NAS or NTFS mounts as shared index locations. As part of the community, I can see myself resorting to this. :) Other possibilities are HDFS and NoSQL like Cassandra, maybe?

So, if local file system is not mean to be used in production there should be a clear way of stating/preventing this somehow. I feel people will probably (ab)use local file system as a blob storage once this PR is merged. /cc @andyvuong

}

try {
log.info("BlobstoreProviderType: Blob storage provider type configured is " + provider);

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 14, 2019

I am fond of using string interpolation instead of string concatenation. See if it's worth changing these log lines in the PR as below:

Suggested change
log.info("BlobstoreProviderType: Blob storage provider type configured is " + provider);
log.info("BlobstoreProviderType: Blob storage provider type configured is {}", provider);

This comment has been minimized.

Copy link
@murblanc

murblanc Sep 17, 2019

Have you timed the execution of both options? I believe your suggestion is slower than the string concatenation.
Also I find the code harder to read because the values are "far" from where they are used.

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 19, 2019

Good question! :) I'd have to setup a jmh test harness to see.🤔 But see, interpolation is being used in other files of this PR, so maybe reach uniformity?

* Computes a hash of a Solr Directory in order to make sure the directory doesn't change as we pull content into it (if we need to
* pull content into it)
*/
private String getSolrDirectoryHash(Directory coreDir) throws NoSuchAlgorithmException, IOException {

This comment has been minimized.

Copy link
@eribeiro

eribeiro Sep 15, 2019

I would suggest to use a Merkle Tree because you have flexible and fine grained view (if necessary) of the files that changed. It also can be serialized on disk and stored on blob store if necessary. See the quick-and-dirty implementation I did here: https://gist.github.com/eribeiro/39ff8b73c43d453edd041bf1305425e0 (it is really quick and dirty and slow, even tough much time is dominated by stream().map() call. This snippet outputs a tree as a json snippet that can be copy and pasted here (https://www.sitepoint.com/demos/online-json-tree-viewer/) or here (https://vanya.jp.net/vtree/) for example of usage.

@murblanc

This comment has been minimized.

Copy link

commented Sep 17, 2019

Thanks @eribeiro for having looked at this code and providing comments. We'll definitely take them into account and fix the issues you pointed out.

@andyvuong

This comment has been minimized.

Copy link

commented Sep 19, 2019

Thanks @eribeiro for the feedback. This is very much WIP and engagement from the community is very appreciated as we iterate on this work! Definitely feel free to do an in-depth look at the code :)

@yonik

This comment has been minimized.

Copy link
Contributor

commented Sep 30, 2019

OK folks, I've created copy of this PR as a branch in the ASF repo for future work:
https://github.com/apache/lucene-solr/tree/jira/SOLR-13101
Feel free to either reuse that JIRA for small stuff or open new subtask JIRA issues as desired.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants
You can’t perform that action at this time.