Skip to content

Eventually Consistent scans / ScanServer feature#2665

Merged
dlmarion merged 67 commits intoapache:mainfrom
dlmarion:2411_scan_server_feature
Aug 9, 2022
Merged

Eventually Consistent scans / ScanServer feature#2665
dlmarion merged 67 commits intoapache:mainfrom
dlmarion:2411_scan_server_feature

Conversation

@dlmarion
Copy link
Contributor

This commit builds on the changes added in prior commits
8be98d6, 50b9267, and 39bc7a0 to create a new server
component that implements TabletHostingServer and uses
the TabletScanClientService Thrift API to serve client
scan requests on Tablets outside the TabletServer. To
accomplish this the new server (ScanServer) constructs a
new type of tablet called a SnapshotTablet which is comprised
of the files in the metadata table and not the in-memory
mutations that the TabletServer might contain. The Accumulo
client has been modified to allow the user to set a flag
on scans to make them eventually consistent, meaning that
the user is ok with scanning data that may not be immediately
consistent with the version of the Tablet that is being hosted
by the TabletServer.

This feature is optional and experimental.

Closes #2411

Co-authored-by: Keith Turner kturner@apache.org

This commit builds on the changes added in prior commits
8be98d6, 50b9267, and 39bc7a0 to create a new server
component that implements TabletHostingServer and uses
the TabletScanClientService Thrift API to serve client
scan requests on Tablets outside the TabletServer. To
accomplish this the new server (ScanServer) constructs a
new type of tablet called a SnapshotTablet which is comprised
of the files in the metadata table and not the in-memory
mutations that the TabletServer might contain. The Accumulo
client has been modified to allow the user to set a flag
on scans to make them eventually consistent, meaning that
the user is ok with scanning data that may not be immediately
consistent with the version of the Tablet that is being hosted
by the TabletServer.

This feature is optional and experimental.

Closes apache#2411

Co-authored-by: Keith Turner <kturner@apache.org>
@dlmarion
Copy link
Contributor Author

dlmarion commented Apr 29, 2022

Background

Accumulo TabletServers are responsible for:

  1. ingesting new data
  2. compacting (merging) new and old data into files
  3. reading data from files to support system and user scans
  4. performing maintenance on Tablets (assignments, merging, splitting, bulk importing, etc).

To support these activities newly ingested data is hosted in memory (in-memory maps) until it's written to a file, and blocks of accessed files may be cached within the TabletServer for better performance. The TabletServer has configuration properties to control the amout of memory available to the heap, in-memory maps, and block caches, and the size of the various thread pools that perform these activities. For example:

tserver.assignment.concurrent.max
tserver.bulk.process.threads
tserver.cache.data.size
tserver.cache.index.size
tserver.cache.summary.size
tserver.compaction.major.concurrent.max
tserver.compaction.minor.concurrent.max
tserver.memory.maps.max
tserver.migrations.concurrent.max
tserver.recovery.concurrent.max
tserver.scan.executors.default.threads
tserver.scan.executors.meta.threads
tserver.scan.files.open.max
tserver.server.threads.minimum
tserver.sort.buffer.size
tserver.summary.partition.threads
tserver.summary.remote.threads
tserver.total.mutation.queue.max
tserver.workq.threads

When a TabletServer exhausts available memory, for whatever reason, an OutOfMemoryError will be raised and the TabletServer will be terminated. When this happens the clients running scans on that TabletServer are paused while the Tablets are re-hosted and then the scans continue on the new TabletServers once the re-hosting process is complete. If the cause of the TabletServer failure was due to scans on a particular Tablet, then this process will repeat until there are no TabletServers remaining or the pattern is identified by a user/admin and the scan process is terminated.

Objective

Provide Accumulo users with the ability to run scans without terminating the TabletServer.

Possible approaches

  1. Run the scan in a separate process
  2. Restrict memory usage on a per-scan basis
  3. Read directly from files in client side scan code. This approach does not allow a small number of clients to scale out a large number of expensive queries to tablet and/or scan servers. It also may lead to an OOM killing a client process that may be executing multiple concurrent scans. It also does not allow client to leverage cache of data and metadata on a scan server or tablet server.

This approach

Create a separate server process that is used to run user scans and give the user the option whether or not to use the new server process on a per-scan basis. Provide the user with the ability to control how many scans will be affected if this new process dies and how many of these new processes to use for a single scan.

Implementation

This PR includes:

  1. a new server process called the ScanServer.
  2. changes to the Accumulo client
  3. changes to the GarbageCollector
  4. Ancillary changes

Scan Server

The ScanServer is a TabletHostingServer that hosts SnapshotTablets and implements the TabletScanClientService Thrift API. When the ScanServer receives a request via the scan API, it creates a SnapshotTablet object from the Tablet metadata (which may be cached), and then uses the ThriftScanClientHandler to complete the scan operations. The user scan is run using the same code that the TabletServer uses; the ScanServer is just responsible for ensuring that the Tablet exists for the scan code. The Tablet hosted within the ScanServer may not contain the exact same data as the corresponding Tablet hosted by the TabletServer. The ScanServer does not have any of the Tablet data that may reside within the in-memory maps and the Tablet may reference files that have been compacted as Tablet metadata can be cached within the ScanServer (see Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION). The number of concurrent scans that the ScanServer will run is configurable (Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS and Property.SSERV_SCAN_EXECUTORS_PREFIX). The ScanServer has other configuration properties that can be set to allow it to have different settings than the TabletServer (Thrift, block caches, etc). It is also possible that a ScanServer may be hosting multiple versions of a SnapshotTablet in the case where scans are in progress, the TabletMetadata has expires, and a new scan request arrives.

Scan servers implement a busy timeout parameter on their scan RPCs. The busytimeout allows a client to specify a configurable time during which the scan must either start running or throw a busy thrift exception. On the client side this busy exception can be detected and a different scan server selected.

Client changes

A new method has been added to the client (ScannerBase.setConsistencyLevel) to configure the client to use IMMEDIATE (default) or EVENTUAL consistency for scans. IMMEDIATE means that the user wants to scan all data related to the Tablet at the time of the scan. To accomplish this the client will send the scan request to the TabletServer that is hosting the Tablet. This is the current behavior and is the default configuration, so no code change is required to have the same behavior. The other possible value, EVENTUAL, means that the user is willing to relax the data freshness guarantee that the TabletServer provides and instead potentially improve the chances of their scan completing when their scan is known to take a long time or require a lot of memory. When the consistency level is set to EVENTUAL the client uses a ScanServerDispatcher class to determine which ScanServers to use. The user can supply their own ScanServerDispatcher implementation (ClientProperty.SCAN_SERVER_DISPATCHER) if they don't want to use the DefaultScanServerDispatcher (see class javadoc for a description of the behavior). Scans will be sent to the TabletServer in the event that EVENTUAL consistency is selected for the client and no ScanServers are running.

Default scan server dispatcher

The default scan server dispatcher that executes on the client side has the following strategy for selecting a scan server.

  • It hashes a tablets tableId, end row, and prev endrow. This hash is used to consitently map the tablet to one of three random scan servers. So for a given tablet the same three random scan servers are used by different tablets.
  • The client sends a request to one of the three scan servers with a small busytimeout.
  • If a busytimeout exception happens, then the default scan server dispatcher will notice this and it will choose from a larger set of scan servers.
  • The default scan server dispatcher will expand rapidly to randomly selecting from all scan servers after which point it will start exponentially increasing the busy timeout.

For example if there are 1000 scan servers and a lot of them are busy, the default scan dispatcher might do something like the following. This example shows how it will rapidly increase the set of servers chosen from and then start rapidly increasing the busy timeout. The reason to start increasing the busy timeout after observing a lot busy exceptions is that those provide evidence that the entire cluster of scan servers may be busy. So eventually its better to just go to a scan server and queue up rather look for a non-busy scan server.

  1. Choose scan server S1 from 3 random scan servers with a busy timeout of 33ms.
  2. If a busy exceptions happens. Choose scan server S2 from 21 random scan servers with a busy timeout of 33ms.
  3. If a busy exceptions happens. Choose scan server S3 from 147 random scan servers with a busy timeout of 33ms.
  4. If a busy exceptions happens. Choose scan server S4 from 1000 random scan servers with a busy timeout of 33ms.
  5. If a busy exceptions happens. Choose scan server S5 from 1000 random scan servers with a busy timeout of 66ms.
  6. If a busy exceptions happens. Choose scan server S6 from 1000 random scan servers with a busy timeout of 132ms.

This default behavior makes tablets sticky to scan servers which is good for cache utilization and reusing cached tablet metadata. In the case where those few scan servers are busy the client starts searching for other places to run.

Garbage Collector changes

The ScanServer inserts entries into a new section (~sserv) of the metadata table to place a reservation on the file so that the GarbageCollector process does not remove the files that are being used for the scan. Accordingly GCEnv.getReferences has been modified to include these file reservations in the list of active file references. The ScanServer has a background thread that removes the file reservations from the metadata table after some period of time after the file is no longer used (see Property.SSERVER_SCAN_REFERENCE_EXPIRATION_TIME). The Manager has a new background thread that calls the ScanServerMetadataEntries.clean method on a periodic basis. Users can use the ScanServerMetadataEntries utility to remove file reservations that exist in the metadata table with no corresponding running ScanServer.

In order to avoid race conditions with the Accumulo GC, Scan servers use the following algorithm when first reading a tablets metadata.

  1. Read metadata for tablet
  2. Write an ~sserv entries for the tablets files to the metadata table to prevent GC
  3. Read the meadata again and see if it changed. If it did changes delete the entries from step 2 and go back to step 1.

The above algorithm may be a bit expensive the first time a tablet is scanned on scan server. However subsequent scans of the same tablet will use cached tablet metadata for a configurable time and not repeate the above steps. In the future we may want to look into faster ways of preventing GC of files used by scan servers.

Ancillary changes

  1. Modifications to scripts (accumulo-cluster, accumulo-service and accumulo-env.sh) have been made to start/stop one or more ScanServers per host.
  2. The shell commands grep and scan have been modified to accept a consistency level (cl) argument
  3. The shell command listscans has been modified to include scans running on ScanServers
  4. ZooZap has been modified to remove ScanServer entries in ZooKeeper
  5. MiniAccumuloCluster has been modified to include the ability to start/stop ScanServers (used by the ITs)
  6. A new utility (ScanServerMetadataEntries) has been created to cleanup any dangling scan server file references in the metadata table.

Shell Example

Below is an example of how this works using the scan command in the shell.

root@test> createtable test (1)
root@test test> insert a b c d (2)
root@test test> scan (3)
a b:c []	d
root@test test> scan -cl immediate (4)
a b:c []	d
root@test test> scan -cl eventual (5)
root@test test> flush (6)
2022-01-28T18:58:10,693 [shell.Shell] INFO : Flush of table test  initiated...
root@test test> scan (7)
a b:c []	d
root@test test> scan -cl eventual (8)
a b:c []	d

In this example, I create a table (1) and insert some data (2). When I run a scan (3,4) with the immediate consistency level, which happens to be the default, the client uses the normal code path and issues the scan command against the Tablet Server. Data is returned because the Tablet Server code path also returns data that is in the in-memory map. When I scan with the eventual consistency level (5) no data is returned because the Scan Server only uses the data in the Tablet's files. When I flush (6) the data to write a file in HDFS, the subsequent scans with immediate (7) and eventual (8) consistency level return the data.

public interface ScannerBase extends Iterable<Entry<Key,Value>>, AutoCloseable {

/**
* Consistency level for the scanner. The default level is IMMEDIATE, which means that this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Consistency level for the scanner. The default level is IMMEDIATE, which means that this
  • scanner will see ALL keys and values that have been successfully written to a TabletServer.
  • EVENTUAL means that the scanner may not see the latest data that was written to a TabletServer,
  • but may instead see an older version of data. Data that has not yet been minor compacted will
  • not be seen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not just data that has not been minor compacted. The ScanServer contains a cache of metadata entries. If the metadata entries are cached for a Tablet, then when the ScanServer creates a SnapshotTablet, it will be from those files, not necessarily the up-to-date list of files in the metadata table. The tablet metadata cache expiration is set using Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION.


public BlockCacheConfiguration(AccumuloConfiguration conf) {
genProps = conf.getAllPropertiesWithPrefix(Property.TSERV_PREFIX);
public BlockCacheConfiguration(AccumuloConfiguration conf, Property serverPrefix,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an idea, but the code could be a little less susceptible to coding error if we pass in a single flag for TSERVER vs SSERVER. The properties used for each could be encoded in this class. This is just a thought and does not require change unless you agree with my argument.

Conflicts:
	server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
Copy link
Contributor

@ivakegg ivakegg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments. Still working through this.

Duration scanServerDispatcherDelay = null;
Map<String,ScanAttemptsImpl.ScanAttemptReporter> reporters = Map.of();

if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have the consistency level set to EVENTUAL but have no scan servers? I think I see that in that case the rebinToScanServers will delegate the scans to tservers. But in that case we don't get the advantage of the code in the else statement. I guess I am wondering whether the ScanServerData concept should be used no matter the consistency level and the rebinToScanServers will take care of delegating to scan servers vs tservers based on the consistency level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first question, if the consistency level is set to EVENTUAL and there are no ScanServers, then the code will send the scans to the TabletServers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in that case we don't get the advantage of the code in the else statement.

That was an intentional decision to leave the else stmt just for the tserver/immediate path. Was thinking for the eventual path that all control is delegated to the ScanServerDispatcher plugin avoiding hardcoded decisions. The ScanServerDispatcher plugin does not currently support sending multiple threads to a single scan server (that is what the else code is doing for tservers). I had wanted to refactor the ScanServerDispatcher to support this use case, but did not get to it. I think this would be worthwhile and could look into that after this PR is merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the ScanServerDispatcher support multiple threads would need to modify the interface ScanServerDispatcher.Actions to allow communicating intent about threads. That interface is how a dispatcher communicates what it wants done. Would probably also need to modify ScanServerDispatcher.DispatcherParameters to pass in information about the number of threads available.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird to ask for eventual consistency. Nobody wants eventual consistency. Eventual consistency is always tolerated, never desired.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we serving stale data if tables only ever have data bulk loaded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ScanServer serves data from a Tablet's files based on the Tablet metadata in the cache in the ScanServer. The duration for which the metadata is cached (and then refreshed) is based on the property SSERV_CACHED_TABLET_METADATA_EXPIRATION. When the Tablet metadata cache in the ScanServer is refreshed, then the current set of files (newly bulk loaded or compacted) will then be available. Does this answer your question @wjsl ?

if (getClass() != obj.getClass())
return false;
ScanServerRefTabletFile other = (ScanServerRefTabletFile) obj;
if (colf == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An EqualsBuilder may make this a little cleaner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just return Objects.equals(field1, other.field1) && Objects.equals(field2, other.field2) && ...

Copy link
Contributor

@ivakegg ivakegg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done one full pass. I may try to stand up a cluster with this later, but not today.

Copy link
Member

@ctubbsii ctubbsii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a massive PR, so this is just my first pass review. I expect there will be more feedback, but this is what I can manage for today.


@Override
public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) {
public MultiScanResult continueMultiScan(TInfo tinfo, long scanID, long busyTimeout) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this addition of a busyTimeout parameter be done in a separate distinct smaller change first?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could, but it was only added for the scanserver. Not sure it would be make sense on its own.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just thinking that adding such things as groundwork makes it easier to incrementally review and merge discrete elements of a larger set of changes.

Comment on lines +114 to +117
var tmp = Maps.transformValues(attempts, tabletAttemptList -> Collections2
.filter(tabletAttemptList, sai -> sai.getMutationCount() < snapMC));

return Maps.filterEntries(tmp, entry -> !entry.getValue().isEmpty());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to be using Guava-specific Functional APIs. It's better to use Java built-in Functional APIs/Streams instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is still applicable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #2856 to address this in the future.

Duration scanServerDispatcherDelay = null;
Map<String,ScanAttemptsImpl.ScanAttemptReporter> reporters = Map.of();

if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird to ask for eventual consistency. Nobody wants eventual consistency. Eventual consistency is always tolerated, never desired.

Copy link
Member

@ctubbsii ctubbsii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a lot of little comments for quality improvements. If you want to merge as-is, and then work on the comments as follow-up, I'd be fine with that.

Comment on lines +387 to +403
/**
* Get the configured consistency level
*
* @return consistency level
* @since 2.1.0
*/
public ConsistencyLevel getConsistencyLevel();

/**
* Set the desired consistency level for this scanner.
*
* @param level
* consistency level
* @since 2.1.0
*/
public void setConsistencyLevel(ConsistencyLevel level);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not comfortable with this terminology in the API, but I don't have an alternative at this time, so I'll consider this settled for now.

Comment on lines +168 to +171
Class<? extends ScanServerSelector> impl =
Class.forName(clazz).asSubclass(ScanServerSelector.class);
ScanServerSelector scanServerSelector = impl.getDeclaredConstructor().newInstance();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we had a centralized utility class for loading classes from the configuration. This is probably redundant, but I haven't dug to find it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be thinking of ClassLoaderUtil.loadClass. I don't think it's used in the client code. Looking at it, I'm not sure it will work in the client code.

Comment on lines +172 to +180
Map<String,String> sserverProps = new HashMap<>();
ClientProperty
.getPrefix(info.getProperties(), ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey())
.forEach((k, v) -> {
sserverProps.put(
k.toString()
.substring(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey().length()),
v.toString());
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a map, then using forEach to populate it is a bit more clunky than using a stream map collector to construct the map directly from the iteration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This returns Map<Object,Object>, not Map<String,String>, so it doesn't work. Do you have an alternative implementation?

      Map<String,String> sserverProps = ClientProperty
          .getPrefix(info.getProperties(), ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey())
          .entrySet()
          .stream()
          .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getvalue()),
              (prev, next) -> next, HashMap::new));

if (getClass() != obj.getClass())
return false;
ScanServerRefTabletFile other = (ScanServerRefTabletFile) obj;
if (colf == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just return Objects.equals(field1, other.field1) && Objects.equals(field2, other.field2) && ...

Comment on lines +114 to +117
var tmp = Maps.transformValues(attempts, tabletAttemptList -> Collections2
.filter(tabletAttemptList, sai -> sai.getMutationCount() < snapMC));

return Maps.filterEntries(tmp, entry -> !entry.getValue().isEmpty());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is still applicable.

@dlmarion
Copy link
Contributor Author

dlmarion commented Aug 8, 2022

@ctubbsii - I believe that I have addressed most, if not all, of your comments either by responding, creating a follow-on issue, or fixing the code.

@ctubbsii @keith-turner @milleruntime @EdColeman - If there is no further discussion, then I intend on merging this by COB today.

@ctubbsii
Copy link
Member

ctubbsii commented Aug 8, 2022

I haven't looked at the changes since my last PR, but feel free to merge... I can look at them later. I saw a bunch of new issues, but they are all over the place, and it's hard for me to track those all. After this is merged, I'll probably go line by line through my comments here to ensure that each of them have been done, or I'll do them as a follow on. If I do that, I'm not going to track down each of the separate issues that were created, though... I'm just going to do one PR to fix what I think needs fixing (if it hasn't already been done by the time I take a look).

@dlmarion dlmarion merged commit c0e94cc into apache:main Aug 9, 2022
@dlmarion dlmarion deleted the 2411_scan_server_feature branch August 11, 2022 11:16
@ctubbsii ctubbsii added this to the 2.1.0 milestone Jul 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

External Isolated Scan Server

7 participants