Skip to content

Commit

Permalink
Merge branch '1.7' into 1.8
Browse files Browse the repository at this point in the history
  • Loading branch information
joshelser committed Oct 12, 2017
2 parents 8a8e088 + 27f1b1f commit 0af1dba
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
Expand All @@ -49,6 +50,7 @@
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.zookeeper.ZooCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -184,6 +186,34 @@ public Map<ReplicationTarget,Long> getPendingReplications() {
return counts;
}

public Set<Path> getPendingReplicationPaths() {
final Set<Path> paths = new HashSet<>();

// Read over the queued work
BatchScanner bs;
try {
bs = context.getConnector().createBatchScanner(ReplicationTable.NAME, Authorizations.EMPTY, 4);
} catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
log.debug("No replication table exists", e);
return paths;
}

bs.setRanges(Collections.singleton(new Range()));
StatusSection.limit(bs);
try {
Text buffer = new Text();
for (Entry<Key,Value> entry : bs) {
Key k = entry.getKey();
k.getRow(buffer);
paths.add(new Path(buffer.toString()));
}
} finally {
bs.close();
}

return paths;
}

/**
* Fetches the absolute path of the file to be replicated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.accumulo.master.metrics;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

Expand All @@ -26,36 +29,59 @@
import org.apache.accumulo.master.Master;
import org.apache.accumulo.server.metrics.Metrics;
import org.apache.accumulo.server.replication.ReplicationUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.metrics2.lib.MutableStat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
*/
public class Metrics2ReplicationMetrics implements Metrics, MetricsSource {
public static final String NAME = MASTER_NAME + ",sub=Replication", DESCRIPTION = "Data-Center Replication Metrics", CONTEXT = "master",
RECORD = "MasterReplication";
public static final String PENDING_FILES = "filesPendingReplication", NUM_PEERS = "numPeers", MAX_REPLICATION_THREADS = "maxReplicationThreads";
public static final String PENDING_FILES = "filesPendingReplication", NUM_PEERS = "numPeers", MAX_REPLICATION_THREADS = "maxReplicationThreads",
REPLICATION_QUEUE_TIME_QUANTILES = "replicationQueue10m", REPLICATION_QUEUE_TIME = "replicationQueue";

private final static Logger log = LoggerFactory.getLogger(Metrics2ReplicationMetrics.class);

private final Master master;
private final MetricsSystem system;
private final MetricsRegistry registry;
private final ReplicationUtil replicationUtil;
private final MutableQuantiles replicationQueueTimeQuantiles;
private final MutableStat replicationQueueTimeStat;
private final Map<Path,Long> pathModTimes;

Metrics2ReplicationMetrics(Master master, MetricsSystem system) {
this.master = master;
this.system = system;

pathModTimes = new HashMap<>();

registry = new MetricsRegistry(Interns.info(NAME, DESCRIPTION));
replicationUtil = new ReplicationUtil(master);
replicationQueueTimeQuantiles = registry.newQuantiles(REPLICATION_QUEUE_TIME_QUANTILES, "Replication queue time quantiles in milliseconds", "ops",
"latency", 600);
replicationQueueTimeStat = registry.newStat(REPLICATION_QUEUE_TIME, "Replication queue time statistics in milliseconds", "ops", "latency", true);
}

protected void snapshot() {
registry.add(PENDING_FILES, getNumFilesPendingReplication());
// Only add these metrics if the replication table is online and there are peers
if (TableState.ONLINE == Tables.getTableState(master.getInstance(), ReplicationTable.ID) && !replicationUtil.getPeers().isEmpty()) {
registry.add(PENDING_FILES, getNumFilesPendingReplication());
addReplicationQueueTimeMetrics();
} else {
registry.add(PENDING_FILES, 0);
}

registry.add(NUM_PEERS, getNumConfiguredPeers());
registry.add(MAX_REPLICATION_THREADS, getMaxReplicationThreads());
}
Expand All @@ -67,6 +93,8 @@ public void getMetrics(MetricsCollector collector, boolean all) {
snapshot();

registry.snapshot(builder, all);
replicationQueueTimeQuantiles.snapshot(builder, all);
replicationQueueTimeStat.snapshot(builder, all);
}

@Override
Expand All @@ -85,18 +113,6 @@ public boolean isEnabled() {
}

protected int getNumFilesPendingReplication() {
if (TableState.ONLINE != Tables.getTableState(master.getInstance(), ReplicationTable.ID)) {
return 0;
}

// Get all of the configured replication peers
Map<String,String> peers = replicationUtil.getPeers();

// A quick lookup to see if have any replication peer configured
if (peers.isEmpty()) {
return 0;
}

// The total set of configured targets
Set<ReplicationTarget> allConfiguredTargets = replicationUtil.getReplicationTargets();

Expand Down Expand Up @@ -124,4 +140,52 @@ protected int getNumConfiguredPeers() {
protected int getMaxReplicationThreads() {
return replicationUtil.getMaxReplicationThreads(master.getMasterMonitorInfo());
}

protected void addReplicationQueueTimeMetrics() {
Set<Path> paths = replicationUtil.getPendingReplicationPaths();

// We'll take a snap of the current time and use this as a diff between any deleted
// file's modification time and now. The reported latency will be off by at most a
// number of seconds equal to the metric polling period
long currentTime = getCurrentTime();

// Iterate through all the pending paths and update the mod time if we don't know it yet
for (Path path : paths) {
if (!pathModTimes.containsKey(path)) {
try {
pathModTimes.put(path, master.getFileSystem().getFileStatus(path).getModificationTime());
} catch (IOException e) {
// Ignore all IOExceptions
// Either the system is unavailable or the file was deleted
// since the initial scan and this check
log.trace("Failed to get file status for {}, file system is unavailable or it does not exist", path);
}
}
}

// Remove all currently pending files
Set<Path> deletedPaths = new HashSet<>(pathModTimes.keySet());
deletedPaths.removeAll(paths);

// Exit early if we have no replicated files to report on
if (deletedPaths.isEmpty()) {
return;
}

replicationQueueTimeStat.resetMinMax();

for (Path path : deletedPaths) {
// Remove this path and add the latency
Long modTime = pathModTimes.remove(path);
if (modTime != null) {
long diff = Math.max(0, currentTime - modTime);
replicationQueueTimeQuantiles.add(diff);
replicationQueueTimeStat.add(diff);
}
}
}

protected long getCurrentTime() {
return System.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.master.metrics;

import java.lang.reflect.Field;

import org.apache.accumulo.master.Master;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.replication.ReplicationUtil;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.metrics2.lib.MutableStat;
import org.easymock.EasyMock;
import org.junit.Test;

import com.google.common.collect.ImmutableSet;

public class Metrics2ReplicationMetricsTest {
private long currentTime = 1000L;

/**
* Extend the class to override the current time for testing
*/
public class TestMetrics2ReplicationMetrics extends Metrics2ReplicationMetrics {
TestMetrics2ReplicationMetrics(Master master, MetricsSystem system) {
super(master, system);
}

@Override
public long getCurrentTime() {
return currentTime;
}
}

@Test
public void testAddReplicationQueueTimeMetrics() throws Exception {
Master master = EasyMock.createMock(Master.class);
MetricsSystem system = EasyMock.createMock(MetricsSystem.class);
VolumeManager fileSystem = EasyMock.createMock(VolumeManager.class);
ReplicationUtil util = EasyMock.createMock(ReplicationUtil.class);
MutableStat stat = EasyMock.createMock(MutableStat.class);
MutableQuantiles quantiles = EasyMock.createMock(MutableQuantiles.class);

Path path1 = new Path("hdfs://localhost:9000/accumulo/wal/file1");
Path path2 = new Path("hdfs://localhost:9000/accumulo/wal/file2");

// First call will initialize the map of paths to modification time
EasyMock.expect(util.getPendingReplicationPaths()).andReturn(ImmutableSet.of(path1, path2));
EasyMock.expect(master.getFileSystem()).andReturn(fileSystem);
EasyMock.expect(fileSystem.getFileStatus(path1)).andReturn(createStatus(100));
EasyMock.expect(master.getFileSystem()).andReturn(fileSystem);
EasyMock.expect(fileSystem.getFileStatus(path2)).andReturn(createStatus(200));

// Second call will recognize the missing path1 and add the latency stat
EasyMock.expect(util.getPendingReplicationPaths()).andReturn(ImmutableSet.of(path2));

// Expect a call to reset the min/max
stat.resetMinMax();
EasyMock.expectLastCall();

// Expect the calls of adding the stats
quantiles.add(currentTime - 100);
EasyMock.expectLastCall();

stat.add(currentTime - 100);
EasyMock.expectLastCall();

EasyMock.replay(master, system, fileSystem, util, stat, quantiles);

Metrics2ReplicationMetrics metrics = new TestMetrics2ReplicationMetrics(master, system);

// Inject our mock objects
replaceField(metrics, "replicationUtil", util);
replaceField(metrics, "replicationQueueTimeQuantiles", quantiles);
replaceField(metrics, "replicationQueueTimeStat", stat);

// Two calls to this will initialize the map and then add metrics
metrics.addReplicationQueueTimeMetrics();
metrics.addReplicationQueueTimeMetrics();

EasyMock.verify(master, system, fileSystem, util, stat, quantiles);
}

private void replaceField(Object instance, String fieldName, Object target) throws NoSuchFieldException, IllegalAccessException {
Field field = instance.getClass().getSuperclass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(instance, target);
}

private FileStatus createStatus(long modtime) {
return new FileStatus(0, false, 0, 0, modtime, 0, null, null, null, null);
}
}

0 comments on commit 0af1dba

Please sign in to comment.