Skip to content

Commit

Permalink
HDDS-3336. Metrics for Recon OzoneManager DB sync.
Browse files Browse the repository at this point in the history
  • Loading branch information
Aravindan Vijayan committed Apr 3, 2020
1 parent 55d78d6 commit 70ece9c
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.recon.metrics;

import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeFloat;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.ozone.OzoneConsts;

/**
* Class for tracking metrics related to Ozone manager sync operations.
*/
@InterfaceAudience.Private
@Metrics(about = "Recon OzoneManagerSync Metrics", context = OzoneConsts.OZONE)
public final class OzoneManagerSyncMetrics {

private static final String SOURCE_NAME =
OzoneManagerSyncMetrics.class.getSimpleName();

private OzoneManagerSyncMetrics() {
}

public static OzoneManagerSyncMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
return ms.register(SOURCE_NAME,
"Recon Ozone Manager Sync Metrics",
new OzoneManagerSyncMetrics());
}

public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
}

@Metric(about = "Number of OM snapshot requests made by Recon.")
private MutableCounterLong numSnapshotRequests;

@Metric(about = "Number of OM snapshot requests that failed.")
private MutableCounterLong numSnapshotRequestsFailed;

@Metric(about = "OM snapshot request latency")
private MutableRate snapshotRequestLatency;

@Metric(about = "Number of OM delta requests made by Recon.")
private MutableCounterLong numDeltaRequests;

@Metric(about = "Number of OM delta requests that failed.")
private MutableCounterLong numDeltaRequestsFailed;

@Metric(about = "OM delta request latency")
private MutableRate deltaRequestLatency;

@Metric(about = "Total number of updates got through OM delta request")
private MutableCounterLong numUpdatesInDeltaTotal;

@Metric(about = "Average number of updates got per OM delta request")
private MutableGaugeFloat averageNumUpdatesInDeltaRequest;

public void incrNumSnapshotRequests() {
this.numSnapshotRequests.incr();
}

public void incrNumSnapshotRequestsFailed() {
this.numSnapshotRequestsFailed.incr();
}

public void updateSnapshotRequestLatency(long time) {
this.snapshotRequestLatency.add(time);
}

public void incrNumDeltaRequests() {
this.numDeltaRequests.incr();
}

public void incrNumDeltaRequestsFailed() {
this.numSnapshotRequestsFailed.incr();
}

public void incrNumUpdatesInDeltaTotal(long n) {
this.numUpdatesInDeltaTotal.incr(n);
setAverageNumUpdatesInDeltaRequest(
(float) this.numUpdatesInDeltaTotal.value() /
(float) this.numDeltaRequests.value());
}

public void updateDeltaRequestLatency(long time) {
this.deltaRequestLatency.add(time);
}

public void setAverageNumUpdatesInDeltaRequest(float avg) {
averageNumUpdatesInDeltaRequest.set(avg);
}

public MutableCounterLong getNumSnapshotRequests() {
return numSnapshotRequests;
}

public MutableCounterLong getNumSnapshotRequestsFailed() {
return numSnapshotRequestsFailed;
}

public MutableRate getSnapshotRequestLatency() {
return snapshotRequestLatency;
}

public MutableCounterLong getNumDeltaRequests() {
return numDeltaRequests;
}

public MutableCounterLong getNumDeltaRequestsFailed() {
return numDeltaRequestsFailed;
}

public MutableRate getDeltaRequestLatency() {
return deltaRequestLatency;
}

public MutableCounterLong getNumUpdatesInDeltaTotal() {
return numUpdatesInDeltaTotal;
}

public MutableGaugeFloat getAverageNumUpdatesInDeltaRequest() {
return averageNumUpdatesInDeltaRequest;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This package contains Recon metrics related classes.
*/
package org.apache.hadoop.ozone.recon.metrics;
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort.Type;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.metrics.OzoneManagerSyncMetrics;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler;
import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
import org.apache.hadoop.util.Time;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
Expand Down Expand Up @@ -104,6 +106,7 @@ public class OzoneManagerServiceProviderImpl
private ReconTaskController reconTaskController;
private ReconTaskStatusDao reconTaskStatusDao;
private ReconUtils reconUtils;
private OzoneManagerSyncMetrics metrics;

/**
* OM Snapshot related task names.
Expand Down Expand Up @@ -173,6 +176,7 @@ public OzoneManagerServiceProviderImpl(
this.reconTaskStatusDao = reconTaskController.getReconTaskStatusDao();
this.ozoneManagerClient = ozoneManagerClient;
this.configuration = configuration;
this.metrics = OzoneManagerSyncMetrics.create();
}

public void registerOMDBTasks() {
Expand Down Expand Up @@ -239,6 +243,7 @@ public void stop() throws Exception {
reconTaskController.stop();
omMetadataManager.stop();
scheduler.shutdownNow();
metrics.unRegister();
}

/**
Expand Down Expand Up @@ -308,7 +313,9 @@ DBCheckpoint getOzoneManagerDBSnapshot() {
boolean updateReconOmDBWithNewSnapshot() throws IOException {
// Obtain the current DB snapshot from OM and
// update the in house OM metadata managed DB instance.
long startTime = Time.monotonicNowNanos();
DBCheckpoint dbSnapshot = getOzoneManagerDBSnapshot();
metrics.updateSnapshotRequestLatency(Time.monotonicNowNanos() - startTime);
if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
LOG.info("Got new checkpoint from OM : " +
dbSnapshot.getCheckpointLocation());
Expand Down Expand Up @@ -339,13 +346,16 @@ void getAndApplyDeltaUpdatesFromOM(
throws IOException, RocksDBException {
DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder()
.setSequenceNumber(fromSequenceNumber).build();
long startTime = Time.monotonicNowNanos();
DBUpdatesWrapper dbUpdates = ozoneManagerClient.getDBUpdates(
dbUpdatesRequest);
metrics.updateDeltaRequestLatency(Time.monotonicNowNanos() - startTime);
if (null != dbUpdates) {
RDBStore rocksDBStore = (RDBStore) omMetadataManager.getStore();
RocksDB rocksDB = rocksDBStore.getDb();
LOG.debug("Number of updates received from OM : {}",
dbUpdates.getData().size());
metrics.incrNumUpdatesInDeltaTotal(dbUpdates.getData().size());
for (byte[] data : dbUpdates.getData()) {
try (WriteBatch writeBatch = new WriteBatch(data)) {
writeBatch.iterate(omdbUpdatesHandler);
Expand Down Expand Up @@ -374,6 +384,7 @@ public void syncDataFromOM() {
if (currentSequenceNumber <= 0) {
fullSnapshot = true;
} else {
metrics.incrNumDeltaRequests();
try (OMDBUpdatesHandler omdbUpdatesHandler =
new OMDBUpdatesHandler(omMetadataManager)) {
LOG.info("Obtaining delta updates from Ozone Manager");
Expand All @@ -392,13 +403,15 @@ public void syncDataFromOM() {
} catch (InterruptedException intEx) {
Thread.currentThread().interrupt();
} catch (Exception e) {
metrics.incrNumDeltaRequestsFailed();
LOG.warn("Unable to get and apply delta updates from OM.", e);
fullSnapshot = true;
}
}

if (fullSnapshot) {
try {
metrics.incrNumSnapshotRequests();
LOG.info("Obtaining full snapshot from Ozone Manager");
// Update local Recon OM DB to new snapshot.
boolean success = updateReconOmDBWithNewSnapshot();
Expand All @@ -417,6 +430,7 @@ public void syncDataFromOM() {
} catch (InterruptedException intEx) {
Thread.currentThread().interrupt();
} catch (Exception e) {
metrics.incrNumSnapshotRequestsFailed();
LOG.error("Unable to update Recon's metadata with new OM DB. ", e);
}
}
Expand All @@ -429,5 +443,9 @@ public void syncDataFromOM() {
private long getCurrentOMDBSequenceNumber() {
return omMetadataManager.getLastSequenceNumberFromDB();
}

public OzoneManagerSyncMetrics getMetrics() {
return metrics;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.metrics.OzoneManagerSyncMetrics;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler;
import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch;
Expand Down Expand Up @@ -202,11 +203,19 @@ public void testGetAndApplyDeltaUpdatesFromOM() throws Exception {
getMockTaskController(), new ReconUtils(),
getMockOzoneManagerClient(dbUpdatesWrapper));

OzoneManagerSyncMetrics metrics = ozoneManagerServiceProvider.getMetrics();
// To make sire averageNumUpdatesInDeltaRequest does not result in division
// by zero.
metrics.incrNumDeltaRequests();

OMDBUpdatesHandler updatesHandler =
new OMDBUpdatesHandler(omMetadataManager);
ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(
0L, updatesHandler);

assertEquals(4.0,
metrics.getAverageNumUpdatesInDeltaRequest().value(), 0.0);

// In this method, we have to assert the "GET" part and the "APPLY" path.

// Assert GET path --> verify if the OMDBUpdatesHandler picked up the 4
Expand Down Expand Up @@ -246,6 +255,9 @@ public void testSyncDataFromOMFullSnapshot() throws Exception {
new MockOzoneServiceProvider(configuration, omMetadataManager,
reconTaskControllerMock, new ReconUtils(), ozoneManagerProtocol);

OzoneManagerSyncMetrics metrics = ozoneManagerServiceProvider.getMetrics();
assertEquals(0, metrics.getNumSnapshotRequests().value());

// Should trigger full snapshot request.
ozoneManagerServiceProvider.syncDataFromOM();

Expand All @@ -257,6 +269,8 @@ public void testSyncDataFromOMFullSnapshot() throws Exception {
.equals(OmSnapshotRequest.name()));
verify(reconTaskControllerMock, times(1))
.reInitializeTasks(omMetadataManager);
assertEquals(1, metrics.getNumSnapshotRequests().value());
assertEquals(0, metrics.getNumDeltaRequests().value());
}

@Test
Expand All @@ -281,6 +295,9 @@ public void testSyncDataFromOMDeltaUpdates() throws Exception {
new OzoneManagerServiceProviderImpl(configuration, omMetadataManager,
reconTaskControllerMock, new ReconUtils(), ozoneManagerProtocol);

OzoneManagerSyncMetrics metrics = ozoneManagerServiceProvider.getMetrics();
assertEquals(0, metrics.getNumDeltaRequests().value());

// Should trigger delta updates.
ozoneManagerServiceProvider.syncDataFromOM();

Expand All @@ -293,6 +310,9 @@ public void testSyncDataFromOMDeltaUpdates() throws Exception {
verify(reconTaskControllerMock, times(1))
.consumeOMEvents(any(OMUpdateEventBatch.class),
any(OMMetadataManager.class));
assertEquals(1, metrics.getNumDeltaRequests().value());
assertEquals(0, metrics.getNumSnapshotRequests().value());
assertEquals(1, metrics.getDeltaRequestLatency().lastStat().numSamples());
}

private ReconTaskController getMockTaskController() {
Expand Down

0 comments on commit 70ece9c

Please sign in to comment.