Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-21406 "status 'replication'" should not show SINK if the cluste… #1761

Merged
merged 6 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
public class ReplicationLoadSink {
private final long ageOfLastAppliedOp;
private final long timestampsOfLastAppliedOp;
private final long timestampStarted;
private final long totalOpsProcessed;

// TODO: add the builder for this class
@InterfaceAudience.Private
public ReplicationLoadSink(long age, long timestamp) {
public ReplicationLoadSink(long age, long timestamp, long timestampStarted,
long totalOpsProcessed) {
this.ageOfLastAppliedOp = age;
this.timestampsOfLastAppliedOp = timestamp;
this.timestampStarted = timestampStarted;
this.totalOpsProcessed = totalOpsProcessed;
}

public long getAgeOfLastAppliedOp() {
Expand All @@ -34,4 +39,12 @@ public long getAgeOfLastAppliedOp() {
public long getTimestampsOfLastAppliedOp() {
return this.timestampsOfLastAppliedOp;
}

public long getTimestampStarted() {
return timestampStarted;
}

public long getTotalOpsProcessed() {
return totalOpsProcessed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2843,7 +2843,10 @@ public static void mergeFrom(Message.Builder builder, CodedInputStream codedInpu

public static ReplicationLoadSink toReplicationLoadSink(
ClusterStatusProtos.ReplicationLoadSink rls) {
return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(), rls.getTimeStampsOfLastAppliedOp());
return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(),
rls.getTimeStampsOfLastAppliedOp(),
rls.getTimestampStarted(),
rls.getTotalOpsProcessed());
}

public static ReplicationLoadSource toReplicationLoadSource(
Expand Down Expand Up @@ -3438,6 +3441,8 @@ public static ClusterStatusProtos.ReplicationLoadSink toReplicationLoadSink(
return ClusterStatusProtos.ReplicationLoadSink.newBuilder()
.setAgeOfLastAppliedOp(rls.getAgeOfLastAppliedOp())
.setTimeStampsOfLastAppliedOp(rls.getTimestampsOfLastAppliedOp())
.setTimestampStarted(rls.getTimestampStarted())
.setTotalOpsProcessed(rls.getTotalOpsProcessed())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ public interface MetricsReplicationSinkSource {
void incrAppliedOps(long batchsize);
long getLastAppliedOpAge();
void incrAppliedHFiles(long hfileSize);
long getSinkAppliedOps();
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ public long getLastAppliedOpAge() {
public void incrAppliedHFiles(long hfiles) {
hfilesCounter.incr(hfiles);
}

@Override public long getSinkAppliedOps() {
return opsCounter.value();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ message ClientMetrics {
message ReplicationLoadSink {
required uint64 ageOfLastAppliedOp = 1;
required uint64 timeStampsOfLastAppliedOp = 2;
required uint64 timestampStarted = 3;
required uint64 totalOpsProcessed = 4;
}

message ReplicationLoadSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
public class MetricsSink {

private long lastTimestampForAge = System.currentTimeMillis();
private long startTimestamp = System.currentTimeMillis();
private final MetricsReplicationSinkSource mss;

public MetricsSink() {
Expand Down Expand Up @@ -98,4 +99,21 @@ public long getAgeOfLastAppliedOp() {
public long getTimestampOfLastAppliedOp() {
return this.lastTimestampForAge;
}

/**
* Gets the time stamp from when the Sink was initialized.
* @return startTimestamp
*/
public long getStartTimestamp() {
return startTimestamp;
wchevreuil marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Gets the total number of OPs delivered to this sink.
* @return totalAplliedOps
*/
public long getAppliedOps() {
return this.mss.getSinkAppliedOps();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public void buildReplicationLoad(final List<ReplicationSourceInterface> sources,
ClusterStatusProtos.ReplicationLoadSink.newBuilder();
rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
this.replicationLoadSink = rLoadSinkBuild.build();

this.replicationLoadSourceEntries = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ public class TestReplicationStatus extends TestReplicationBase {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationStatus.class);

private void insertRowsOnSource() throws IOException {
final byte[] qualName = Bytes.toBytes("q");
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
htable1.put(p);
}
}

/**
* Test for HBASE-9531.
* <p/>
Expand All @@ -70,12 +79,7 @@ public void testReplicationStatus() throws Exception {
Admin hbaseAdmin = UTIL1.getAdmin();
// disable peer <= WHY? I DON'T GET THIS DISABLE BUT TEST FAILS W/O IT.
hbaseAdmin.disableReplicationPeer(PEER_ID2);
final byte[] qualName = Bytes.toBytes("q");
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
htable1.put(p);
}
insertRowsOnSource();
LOG.info("AFTER PUTS");
// TODO: Change this wait to a barrier. I tried waiting on replication stats to
// change but sleeping in main thread seems to mess up background replication.
Expand Down Expand Up @@ -120,6 +124,35 @@ public void testReplicationStatus() throws Exception {
assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
}

@Test
public void testReplicationStatusSink() throws Exception {
try (Admin hbaseAdmin = UTIL2.getConnection().getAdmin()) {
ServerName server = UTIL2.getHBaseCluster().getRegionServer(0).getServerName();
ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server);
//First checks if status of timestamp of last applied op is same as RS start, since no edits
//were replicated yet
assertEquals(loadSink.getTimestampStarted(), loadSink.getTimestampsOfLastAppliedOp());
//now insert some rows on source, so that it gets delivered to target
insertRowsOnSource();
long wait = Waiter.waitFor(UTIL2.getConfiguration(),
100000, new Waiter.Predicate<Exception>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to keep it 10s instead of 100s? :)

@Override
public boolean evaluate() throws Exception {
ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server);
return loadSink.getTimestampsOfLastAppliedOp()>loadSink.getTimestampStarted();
}
});
//If wait is -1, we know predicate condition was never true
assertTrue(wait>=0);
}
}

private ReplicationLoadSink getLatestSinkMetric(Admin admin, ServerName server)
throws IOException {
ClusterMetrics metrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
return sm.getReplicationLoadSink();
}
/**
* Wait until Master shows metrics counts for ReplicationLoadSourceList that are
* greater than <code>greaterThan</code> for <code>serverName</code> before
Expand Down
14 changes: 9 additions & 5 deletions hbase-shell/src/main/ruby/hbase/admin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -835,12 +835,16 @@ def status(format, type)
r_source_string = ' SOURCE:'
r_load_sink = sl.getReplicationLoadSink
next if r_load_sink.nil?
if (r_load_sink.getTimestampsOfLastAppliedOp() == r_load_sink.getTimeStampStarted())
wchevreuil marked this conversation as resolved.
Show resolved Hide resolved
r_sink_string << " TimeStampStarted=" + r_load_sink.getTimeStampStarted().to_s
r_sink_string << ", Waiting for OPs... "
else
r_sink_string << " TimeStampStarted=" + r_load_sink.getTimeStampStarted().to_s
r_sink_string << ", AgeOfLastAppliedOp=" + r_load_sink.getAgeOfLastAppliedOp().to_s
r_sink_string << ", TimeStampsOfLastAppliedOp=" +
(java.util.Date.new(r_load_sink.getTimestampsOfLastAppliedOp())).toString()
end

r_sink_string << ' AgeOfLastAppliedOp=' +
r_load_sink.getAgeOfLastAppliedOp.to_s
r_sink_string << ', TimeStampsOfLastAppliedOp=' +
java.util.Date.new(r_load_sink
.getTimestampsOfLastAppliedOp).toString
r_load_source_map = sl.getReplicationLoadSourceMap
build_source_string(r_load_source_map, r_source_string)
puts(format(' %<host>s:', host: server_name.getHostname))
Expand Down