Skip to content

Commit

Permalink
HBASE-21406 "status 'replication'" should not show SINK if the cluste… (
Browse files Browse the repository at this point in the history
#1761)

Signed-off-by: Jan Hentschel <jan.hentschel@ultratendency.com>
Signed-off by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Josh Elser <elserj@apache.org>

(Cherry picked from commit e5345b3)
  • Loading branch information
wchevreuil committed Jun 3, 2020
1 parent b3c6af9 commit 11d093b
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
public class ReplicationLoadSink {
private final long ageOfLastAppliedOp;
private final long timestampsOfLastAppliedOp;
private final long timestampStarted;
private final long totalOpsProcessed;

// TODO: add the builder for this class
@InterfaceAudience.Private
public ReplicationLoadSink(long age, long timestamp) {
public ReplicationLoadSink(long age, long timestamp, long timestampStarted,
long totalOpsProcessed) {
this.ageOfLastAppliedOp = age;
this.timestampsOfLastAppliedOp = timestamp;
this.timestampStarted = timestampStarted;
this.totalOpsProcessed = totalOpsProcessed;
}

public long getAgeOfLastAppliedOp() {
Expand All @@ -43,4 +48,12 @@ public long getTimeStampsOfLastAppliedOp() {
public long getTimestampsOfLastAppliedOp() {
return this.timestampsOfLastAppliedOp;
}

public long getTimestampStarted() {
return timestampStarted;
}

public long getTotalOpsProcessed() {
return totalOpsProcessed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2801,7 +2801,10 @@ public static void mergeFrom(Message.Builder builder, CodedInputStream codedInpu

public static ReplicationLoadSink toReplicationLoadSink(
ClusterStatusProtos.ReplicationLoadSink rls) {
return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(), rls.getTimeStampsOfLastAppliedOp());
return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(),
rls.getTimeStampsOfLastAppliedOp(),
rls.getTimestampStarted(),
rls.getTotalOpsProcessed());
}

public static ReplicationLoadSource toReplicationLoadSource(
Expand Down Expand Up @@ -3394,6 +3397,8 @@ public static ClusterStatusProtos.ReplicationLoadSink toReplicationLoadSink(
return ClusterStatusProtos.ReplicationLoadSink.newBuilder()
.setAgeOfLastAppliedOp(rls.getAgeOfLastAppliedOp())
.setTimeStampsOfLastAppliedOp(rls.getTimestampsOfLastAppliedOp())
.setTimestampStarted(rls.getTimestampStarted())
.setTotalOpsProcessed(rls.getTotalOpsProcessed())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ public interface MetricsReplicationSinkSource {
void incrAppliedOps(long batchsize);
long getLastAppliedOpAge();
void incrAppliedHFiles(long hfileSize);
long getSinkAppliedOps();
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ public long getLastAppliedOpAge() {
public void incrAppliedHFiles(long hfiles) {
hfilesCounter.incr(hfiles);
}

@Override public long getSinkAppliedOps() {
return opsCounter.value();
}
}
2 changes: 2 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ message ClientMetrics {
message ReplicationLoadSink {
required uint64 ageOfLastAppliedOp = 1;
required uint64 timeStampsOfLastAppliedOp = 2;
required uint64 timestampStarted = 3;
required uint64 totalOpsProcessed = 4;
}

message ReplicationLoadSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
public class MetricsSink {

private long lastTimestampForAge = System.currentTimeMillis();
private long startTimestamp = System.currentTimeMillis();
private final MetricsReplicationSinkSource mss;

public MetricsSink() {
Expand Down Expand Up @@ -110,4 +111,21 @@ public long getTimeStampOfLastAppliedOp() {
public long getTimestampOfLastAppliedOp() {
return this.lastTimestampForAge;
}

/**
* Gets the time stamp from when the Sink was initialized.
* @return startTimestamp
*/
public long getStartTimestamp() {
return this.startTimestamp;
}

/**
* Gets the total number of OPs delivered to this sink.
* @return totalAplliedOps
*/
public long getAppliedOps() {
return this.mss.getSinkAppliedOps();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public void buildReplicationLoad(final List<ReplicationSourceInterface> sources,
ClusterStatusProtos.ReplicationLoadSink.newBuilder();
rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
this.replicationLoadSink = rLoadSinkBuild.build();

this.replicationLoadSourceEntries = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ public class TestReplicationStatus extends TestReplicationBase {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationStatus.class);

private void insertRowsOnSource() throws IOException {
final byte[] qualName = Bytes.toBytes("q");
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
htable1.put(p);
}
}

/**
* Test for HBASE-9531.
* <p/>
Expand All @@ -70,12 +79,7 @@ public void testReplicationStatus() throws Exception {
Admin hbaseAdmin = UTIL1.getAdmin();
// disable peer <= WHY? I DON'T GET THIS DISABLE BUT TEST FAILS W/O IT.
hbaseAdmin.disableReplicationPeer(PEER_ID2);
final byte[] qualName = Bytes.toBytes("q");
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
htable1.put(p);
}
insertRowsOnSource();
LOG.info("AFTER PUTS");
// TODO: Change this wait to a barrier. I tried waiting on replication stats to
// change but sleeping in main thread seems to mess up background replication.
Expand Down Expand Up @@ -120,6 +124,35 @@ public void testReplicationStatus() throws Exception {
assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
}

@Test
public void testReplicationStatusSink() throws Exception {
try (Admin hbaseAdmin = UTIL2.getConnection().getAdmin()) {
ServerName server = UTIL2.getHBaseCluster().getRegionServer(0).getServerName();
ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server);
//First checks if status of timestamp of last applied op is same as RS start, since no edits
//were replicated yet
assertEquals(loadSink.getTimestampStarted(), loadSink.getTimestampsOfLastAppliedOp());
//now insert some rows on source, so that it gets delivered to target
insertRowsOnSource();
long wait = Waiter.waitFor(UTIL2.getConfiguration(),
10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server);
return loadSink.getTimestampsOfLastAppliedOp()>loadSink.getTimestampStarted();
}
});
//If wait is -1, we know predicate condition was never true
assertTrue(wait>=0);
}
}

private ReplicationLoadSink getLatestSinkMetric(Admin admin, ServerName server)
throws IOException {
ClusterMetrics metrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
return sm.getReplicationLoadSink();
}
/**
* Wait until Master shows metrics counts for ReplicationLoadSourceList that are
* greater than <code>greaterThan</code> for <code>serverName</code> before
Expand Down
20 changes: 13 additions & 7 deletions hbase-shell/src/main/ruby/hbase/admin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -812,12 +812,18 @@ def status(format, type)
r_source_string = ' SOURCE:'
r_load_sink = sl.getReplicationLoadSink
next if r_load_sink.nil?
if r_load_sink.getTimestampsOfLastAppliedOp() == r_load_sink.getTimestampStarted()
# If we have applied no operations since we've started replication,
# assume that we're not acting as a sink and don't print the normal information
r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
r_sink_string << ", Waiting for OPs... "
else
r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
r_sink_string << ", AgeOfLastAppliedOp=" + r_load_sink.getAgeOfLastAppliedOp().to_s
r_sink_string << ", TimeStampsOfLastAppliedOp=" +
(java.util.Date.new(r_load_sink.getTimestampsOfLastAppliedOp())).toString()
end

r_sink_string << ' AgeOfLastAppliedOp=' +
r_load_sink.getAgeOfLastAppliedOp.to_s
r_sink_string << ', TimeStampsOfLastAppliedOp=' +
java.util.Date.new(r_load_sink
.getTimeStampsOfLastAppliedOp).toString
r_load_source_map = sl.getReplicationLoadSourceMap
build_source_string(r_load_source_map, r_source_string)
puts(format(' %<host>s:', host: server_status.getHostname))
Expand Down Expand Up @@ -888,15 +894,15 @@ def build_running_source_stats(source_load, r_source_string)
end

def build_shipped_stats(source_load, r_source_string)
r_source_string << if source_load.getTimeStampOfLastShippedOp.zero?
r_source_string << if source_load.getTimestampOfLastShippedOp.zero?
"\n " \
'No Ops shipped since last restart'
else
"\n AgeOfLastShippedOp=" +
source_load.getAgeOfLastShippedOp.to_s +
', TimeStampOfLastShippedOp=' +
java.util.Date.new(source_load
.getTimeStampOfLastShippedOp).toString
.getTimestampOfLastShippedOp).toString
end
end

Expand Down

0 comments on commit 11d093b

Please sign in to comment.