Skip to content
Permalink
Browse files
Adding Accumulo Availability Monitor :
Monitor.Java : This file includes core code for scan operation that randomly picks a tablet server, identifies its minimum and maximum rows range and picks out 'distance' (configurable through opts) number of rows to be scanned in one iteration.

MonitorOpts.java : This file is used to provide all the parameters which are taken by the Monitor Class. Interesting Options Include : Ability to set scanner sleep time , between iterations. Setting Batch Size that scanner uses internally. Setting number of iteration or continuous mode.

monitor script : This follows same pattern as cingest, rwalk, performance scripts.

Caching Splits

Schema Agnostic Healthprobe

Added Changes to make the healthprobe schema agnostic, while retaining the functionality to scan configurable number of rows. Rows are scanned from beginning of the tablet. Tablet information is loaded once and cached for later iterations.

Signed-off-by: Marc Parisi <phrocker@apache.org>
  • Loading branch information
Tushar D authored and phrocker committed Apr 27, 2020
1 parent d83ec4b commit c8ef7846b4426c5cb86632ade692b0ff8aa23977
Showing 6 changed files with 283 additions and 1 deletion.
@@ -43,6 +43,7 @@ COPY ./bin/build /opt/at/bin
COPY ./bin/cingest /opt/at/bin
COPY ./bin/rwalk /opt/at/bin
COPY ./bin/gcs /opt/at/bin
COPY ./bin/monitor /opt/at/bin
COPY ./src/main/docker/docker-entry /opt/at/bin

COPY ./target/accumulo-testing-shaded.jar /opt/at/
@@ -32,6 +32,7 @@ Tests are run using the following scripts in `bin/`:
* `performance` - Runs performance test
* `agitator` - Runs agitator
* `gcs` - Runs garbage collection simultation
* `monitor` - Runs availability monitor probe

Run the scripts without arguments to view usage.

@@ -43,6 +44,7 @@ run in Docker:

* `cingest` - All applications can be run except `verify` & `moru` which launch a MapReduce job.
* `rwalk` - All modules can be run.
* `monitor` - All modules can be run.

1. To create the `accumulo-testing` docker image, make sure the following files exist in your clone:

@@ -238,6 +240,11 @@ test and produce json result files.
There are some utilities for working with the json result files, run the `performance` script
with no options to see them.
## Availability Monitor
Monitor class aims at verifying availability of overall accumulo cluster by continually doing
scans of random values across various tablet servers and capturing timing
information related to how long such scans take.
## Automated Cluster Testing
See the [readme.md](/test/automation/README.md).
@@ -0,0 +1,51 @@
#! /usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

bin_dir=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source "${bin_dir}/build"

function print_usage() {
cat <<EOF
Usage: monitor <application> {-o test.<prop>=<value>}
Available applications:
readprobe Runs a probe that scans random keys and reports scan timings
EOF
}

if [ -z "$1" ]; then
echo "ERROR: <application> needs to be set"
print_usage
exit 1
fi

ci_package="org.apache.accumulo.testing.healthprobe"
case "$1" in
readprobe)
ci_main="${ci_package}.Monitor"
;;
*)
echo "Unknown application: $1"
print_usage
exit 1
esac

export CLASSPATH="$TEST_JAR_PATH:$HADOOP_API_JAR:$HADOOP_RUNTIME_JAR:$CLASSPATH"
java $JAVA_OPTS -DINSTRUMENTATIONKEY="$INSTRUMENTATIONKEY" -Dlog4j.configuration="file:$TEST_LOG4J" "$ci_main" "${@:2}" "-c" "$ACCUMULO_CLIENT_PROPS"
@@ -28,6 +28,7 @@ Usage: accumulo-testing <script> (<argument>)
cingest Runs continuous ingest script
rwalk Runs random walk script
gcs Runs garbage collection simulation
monitor Runs a readprobe for monitoring read timings
EOF
}

@@ -43,7 +44,7 @@ if [ -z "$HADOOP_HOME" ]; then
fi

case "$1" in
cingest|rwalk|gcs)
cingest|rwalk|gcs|monitor)
"${at_home}"/bin/"$1" "${@:2}"
;;
-h|help)
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.testing.healthprobe;

import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.Locations;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class Monitor {

private static final Logger log = LoggerFactory.getLogger(Monitor.class);
static long distance = 1l;
static List<TabletId> tablets;
static boolean cacheTablets = true;

public static void main(String[] args) throws Exception {
MonitorOpts opts = new MonitorOpts();
opts.parseArgs(Monitor.class.getName(), args);
distance = opts.distance;
Authorizations auth = new Authorizations();
if (opts.auth != "") {
auth = new Authorizations(opts.auth);
}

try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build();
Scanner scanner = client.createScanner(opts.tableName, auth)) {
if (opts.isolate) {
scanner.enableIsolation();
}
int scannerSleepMs = opts.sleep_ms;
LoopControl scanning_condition = opts.continuous ? new ContinuousLoopControl()
: new IterativeLoopControl(opts.scan_iterations);

while (scanning_condition.keepScanning()) {

Random tablet_index_generator = new Random();
TabletId pickedTablet = pickTablet(client.tableOperations(), opts.tableName,
tablet_index_generator);
Range range = pickedTablet.toRange();
scanner.setRange(range);

if (opts.batch_size > 0) {
scanner.setBatchSize(opts.batch_size);
}
try {
long startTime = System.nanoTime();
long count = consume(scanner, opts.distance);
long stopTime = System.nanoTime();
MDC.put("StartTime", String.valueOf(startTime));
MDC.put("TabletId", String.valueOf(pickedTablet));
MDC.put("TableName", String.valueOf(opts.tableName));
MDC.put("TotalTime", String.valueOf((stopTime - startTime)));
MDC.put("StartRow", String.valueOf(range.getStartKey()));
MDC.put("EndRow", String.valueOf(range.getEndKey()));
MDC.put("TotalRecords", String.valueOf(count));

log.info("SCN starttime={} tabletindex={} tablename={} totaltime={} totalrecords={}",
startTime, tablet_index_generator, opts.tableName, (stopTime - startTime), count);
if (scannerSleepMs > 0) {
sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
log.error(String.format(
"Exception while scanning range %s. Check the state of Accumulo for errors.", range),
e);
}
}
}
}

public static int consume(Iterable<Entry<Key,Value>> scanner, long numberOfRows) {
RowIterator rowIter = new RowIterator(scanner);
int count = 0;

while (rowIter.hasNext()) {
Iterator<Entry<Key,Value>> itr = rowIter.next();
count++;
if (count >= numberOfRows) {
break;
}
}
return count;
}

public static TabletId pickTablet(TableOperations tops, String table, Random r)
throws TableNotFoundException, AccumuloSecurityException, AccumuloException {

if (cacheTablets) {
Locations locations = tops.locate(table, Collections.singleton(new Range()));
tablets = new ArrayList<TabletId>(locations.groupByTablet().keySet());
cacheTablets = false;
}
int index = r.nextInt(tablets.size());
TabletId tabletId = tablets.get(index);
return tabletId;
}

/*
* These interfaces + implementations are used to determine how many times the scanner should look
* up a random tablet and scan it.
*/
static interface LoopControl {
public boolean keepScanning();
}

// Does a finite number of iterations
static class IterativeLoopControl implements LoopControl {
private final int max;
private int current;

public IterativeLoopControl(int max) {
this.max = max;
this.current = 0;
}

@Override
public boolean keepScanning() {
if (current < max) {
++current;
return true;
} else {
return false;
}
}
}

// Does an infinite number of iterations
static class ContinuousLoopControl implements LoopControl {
@Override
public boolean keepScanning() {
return true;
}
}
}
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.testing.healthprobe;

import org.apache.accumulo.testing.cli.ClientOpts;

import com.beust.jcommander.Parameter;

class MonitorOpts extends ClientOpts {
@Parameter(names = {"-t", "--table"}, description = "table to use")
String tableName = "";

@Parameter(names = "--isolate",
description = "true to turn on scan isolation, false to turn off. default is false.")
boolean isolate = false;

@Parameter(names = "--num-iterations", description = "number of scan iterations")
int scan_iterations = 1024;

@Parameter(names = "--continuous",
description = "continuously scan the table. note that this overrides --num-iterations")
boolean continuous = false;

@Parameter(names = "--scan-batch-size", description = "scanner batch size")
int batch_size = 5;

@Parameter(names = "--scanner-sleep-ms", description = "scanner sleep interval in ms")
int sleep_ms = 60000;

@Parameter(names = "--num-of-rows-per-iteration",
description = "Number of rows scanned together in one iteration of scanner consumption")
long distance = 10l;

@Parameter(names = "--auth", description = "Provide a auth that can access all/most rows")
String auth = "";
}

0 comments on commit c8ef784

Please sign in to comment.