Skip to content

Commit

Permalink
PHOENIX-3655 Global Phoenix Client Metrics for PQS
Browse files Browse the repository at this point in the history
  • Loading branch information
karanmehta93 committed Aug 2, 2018
1 parent da2796a commit 1c82e03
Show file tree
Hide file tree
Showing 10 changed files with 537 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.monitoring;

import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.phoenix.util.PhoenixRuntime;

import java.util.Map;

import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

public class GlobalPhoenixMetricsTestSink implements MetricsSink {

public static final String PHOENIX_METRICS_RECORD_NAME = "PHOENIX";
// PhoenixMetricsIT tests verifies these metrics from this sink in a separate thread
// GlobalPhoenixMetricsTestSink is invoked based on time defined in hadoop-metrics2.properties
// This lock is to prevent concurrent access to metrics Iterable for these threads
static Object lock = new Object();
static Iterable<AbstractMetric> metrics;

@Override
public void putMetrics(MetricsRecord metricsRecord) {
if (metricsRecord.name().equals(PHOENIX_METRICS_RECORD_NAME)) {
synchronized (GlobalPhoenixMetricsTestSink.lock) {
GlobalPhoenixMetricsTestSink.metrics = metricsRecord.metrics();
}
}
}

@Override
public void flush() {
}

@Override
public void init(SubsetConfiguration subsetConfiguration) {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.monitoring;

import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.query.ConfigurationFactory;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.InstanceResolver;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
import org.hamcrest.CoreMatchers;
import org.junit.BeforeClass;
import org.junit.Test;

import java.sql.DriverManager;
import java.util.Map;

import static org.apache.phoenix.monitoring.NoOpGlobalMetricImpl.NO_SAMPLES;
import static org.apache.phoenix.monitoring.NoOpGlobalMetricImpl.NO_VALUE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

public class PhoenixMetricsDisabledIT extends BaseUniqueNamesOwnClusterIT {

@BeforeClass
public static void doSetup() throws Exception {
final Configuration conf = HBaseConfiguration.create();
conf.set(QueryServices.GLOBAL_METRICS_ENABLED, String.valueOf(false));
conf.set(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
// Clear the cached singletons so we can inject our own.
InstanceResolver.clearSingletons();
// Make sure the ConnectionInfo doesn't try to pull a default Configuration
InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public Configuration getConfiguration(Configuration confToClone) {
Configuration copy = new Configuration(conf);
copy.addResource(confToClone);
return copy;
}
});

Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));

DriverManager.registerDriver(PhoenixDriver.INSTANCE);
}

@Test
public void testResetGlobalPhoenixMetrics() {
for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
assertThat(m, CoreMatchers.<GlobalMetric>instanceOf(NoOpGlobalMetricImpl.class));
assertEquals(NO_VALUE, m.getTotalSum());
assertEquals(NO_SAMPLES, m.getNumberOfSamples());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -50,6 +51,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -61,6 +63,9 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.exception.SQLExceptionCode;
Expand All @@ -73,6 +78,7 @@
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
import org.hamcrest.CoreMatchers;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
Expand All @@ -83,8 +89,18 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

/**
* Tests that
* 1. Phoenix Global metrics are exposed via
* a. PhoenixRuntime b. Hadoop-Metrics2 defined sinks
* 2. Phoenix Request level metrics are exposed via
* a. PhoenixRuntime
*/
public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {

private static final Log LOG = LogFactory.getLog(PhoenixMetricsIT.class);
private static final int MAX_RETRIES = 5;

private static final List<MetricType> mutationMetricsToSkip =
Lists.newArrayList(MetricType.MUTATION_COMMIT_TIME);
private static final List<MetricType> readMetricsToSkip =
Expand All @@ -97,22 +113,26 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
// Phoenix Global client metrics are enabled by default
// Enable request metric collection at the driver level
props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
// disable renewing leases as this will force spooling to happen.
props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
// need the non-test driver for some tests that check number of hconnections, etc.
DriverManager.registerDriver(PhoenixDriver.INSTANCE);

}

@Test
public void testResetGlobalPhoenixMetrics() {
public void testResetGlobalPhoenixMetrics() throws Exception {
resetGlobalMetrics();
for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
assertThat(m, CoreMatchers.<GlobalMetric>instanceOf(GlobalMetricImpl.class));
assertEquals(0, m.getTotalSum());
assertEquals(0, m.getNumberOfSamples());
}
assertTrue(verifyMetricsFromSink());
}

@Test
Expand Down Expand Up @@ -146,6 +166,8 @@ public void testGlobalPhoenixMetricsForQueries() throws Exception {
assertTrue(GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS.getMetric().getTotalSum() > 0);
assertTrue(GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS.getMetric().getTotalSum() > 0);
assertTrue(GLOBAL_HBASE_COUNT_SCANNED_REGIONS.getMetric().getTotalSum() > 0);

assertTrue(verifyMetricsFromSink());
}

@Test
Expand All @@ -163,6 +185,8 @@ public void testGlobalPhoenixMetricsForMutations() throws Exception {
assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
assertEquals(0, GLOBAL_MUTATION_BATCH_FAILED_COUNT.getMetric().getTotalSum());

assertTrue(verifyMetricsFromSink());
}

@Test
Expand Down Expand Up @@ -196,6 +220,8 @@ public void testGlobalPhoenixMetricsForUpsertSelect() throws Exception {
assertTrue(GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS.getMetric().getTotalSum() > 0);
assertTrue(GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS.getMetric().getTotalSum() > 0);
assertTrue(GLOBAL_HBASE_COUNT_SCANNED_REGIONS.getMetric().getTotalSum() > 0);

assertTrue(verifyMetricsFromSink());
}

private static void resetGlobalMetrics() {
Expand All @@ -204,6 +230,53 @@ private static void resetGlobalMetrics() {
}
}

// Phoenix Client Metrics are transported via Hadoop-metrics2 sink
// The test sink is defined at GlobalPhoenixMetricsTestSink
// Configuration for Hadoop-metrics2 comes from hadoop-metrics2.properties file located in test/resources
private boolean verifyMetricsFromSink() {
Map<String, Long> expectedMetrics = new HashMap<>();
for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
expectedMetrics.put(m.getMetricType().name(), m.getTotalSum());
}

for (int i = 0; i < MAX_RETRIES; i++) {
LOG.info("Verifying Global Metrics from Hadoop Sink, Retry: " + (i + 1));
if (verifyMetricsFromSinkOnce(expectedMetrics)) {
LOG.info("Values from Hadoop Metrics Sink match actual values");
return true;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}

private boolean verifyMetricsFromSinkOnce(Map<String, Long> expectedMetrics) {
synchronized (GlobalPhoenixMetricsTestSink.lock) {
for (AbstractMetric metric : GlobalPhoenixMetricsTestSink.metrics) {
if (expectedMetrics.containsKey(metric.name())) {
Long value = expectedMetrics.get(metric.name());
if (value != null) {
long expectedValue = value;
long actualValue = metric.value().longValue();
if (expectedValue != actualValue) {
LOG.warn("Metric from Hadoop Sink: " + metric.name() + " didn't match expected.");
return false;
}
expectedMetrics.remove(metric.name());
}
}
}
}
assertTrue("Metric expected but not present in Hadoop Metrics Sink (GlobalPhoenixMetricsTestSink)",
expectedMetrics.size() == 0);
return true;
}

private static void createTableAndInsertValues(String tableName, boolean resetGlobalMetricsAfterTableCreate)
throws Exception {
String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
Expand Down

0 comments on commit 1c82e03

Please sign in to comment.