Skip to content
Permalink
Browse files
add "nodes" metadata for backends (#821)
implemented: #816

Change-Id: Ica7be59f1af1f290342b010c0c018cc541a1122b
  • Loading branch information
zhoney committed Feb 4, 2020
1 parent b1b2a4c commit 1428c8bf111ffcccd3d2102597b68b3d161a6ee6
Show file tree
Hide file tree
Showing 14 changed files with 257 additions and 11 deletions.
@@ -22,12 +22,14 @@
import java.io.IOException;
import java.lang.management.MemoryUsage;
import java.util.Map;
import java.util.Set;

import org.apache.cassandra.tools.NodeProbe;

import com.baidu.hugegraph.backend.store.BackendMetrics;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.util.Bytes;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.InsertionOrderUtil;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
@@ -39,8 +41,10 @@ public class CassandraMetrics implements BackendMetrics {
private final String username;
private final String password;

public CassandraMetrics(Cluster cluster, HugeConfig conf) {
this.cluster = cluster;
public CassandraMetrics(CassandraSessionPool sessions, HugeConfig conf) {
E.checkArgumentNotNull(sessions,
"Cassandra sessions have not been initialized");
this.cluster = sessions.cluster();
this.port = conf.get(CassandraOptions.CASSANDRA_JMX_PORT);
this.username = conf.get(CassandraOptions.CASSANDRA_USERNAME);
this.password = conf.get(CassandraOptions.CASSANDRA_PASSWORD);
@@ -50,7 +54,9 @@ public CassandraMetrics(Cluster cluster, HugeConfig conf) {
@Override
public Map<String, Object> getMetrics() {
Map<String, Object> results = InsertionOrderUtil.newMap();
for (Host host : this.cluster.getMetadata().getAllHosts()) {
Set<Host> hosts = this.cluster.getMetadata().getAllHosts();
results.put(NODES, hosts.size());
for (Host host : hosts) {
String address = host.getAddress().getHostAddress();
results.put(address, this.getMetricsByHost(address));
}
@@ -99,7 +99,7 @@ public final synchronized boolean opened() {
return (this.cluster != null && !this.cluster.isClosed());
}

public final synchronized Cluster cluster() {
protected final synchronized Cluster cluster() {
E.checkState(this.cluster != null,
"Cassandra cluster has not been initialized");
return this.cluster;
@@ -89,7 +89,8 @@ public CassandraStore(final BackendStoreProvider provider,

private void registerMetaHandlers() {
this.registerMetaHandler("metrics", (session, meta, args) -> {
CassandraMetrics metrics = new CassandraMetrics(cluster(), this.conf);
CassandraMetrics metrics = new CassandraMetrics(this.sessions,
this.conf);
return metrics.getMetrics();
});
}
@@ -45,7 +45,7 @@ public static synchronized HugeGraph open(Configuration config) {
"Invalid graph name '%s', valid graph name is up to " +
"48 alpha-numeric characters and underscores " +
"and only letters are supported as first letter. " +
"Note: letter is case insensitive");
"Note: letter is case insensitive", name);
name = name.toLowerCase();
HugeGraph graph = graphs.get(name);
if (graph == null || graph.closed()) {
@@ -25,6 +25,8 @@ public interface BackendMetrics {

public String BACKEND = "backend";

public String NODES = "nodes";

// Memory related metrics
public String MEM_USED = "mem_used";
public String MEM_COMMITED = "mem_commited";
@@ -73,6 +73,16 @@ public InMemoryDBStore(final BackendStoreProvider provider,
this.database = database;
this.store = store;
this.tables = new HashMap<>();

this.registerMetaHandlers();
LOG.debug("Store loaded: {}", store);
}

private void registerMetaHandlers() {
this.registerMetaHandler("metrics", (session, meta, args) -> {
InMemoryMetrics metrics = new InMemoryMetrics();
return metrics.getMetrics();
});
}

protected void registerTableManager(HugeType type, InMemoryDBTable table) {
@@ -0,0 +1,33 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.backend.store.memory;

import java.util.Map;

import com.baidu.hugegraph.backend.store.BackendMetrics;
import com.google.common.collect.ImmutableMap;

public class InMemoryMetrics implements BackendMetrics {

@Override
public Map<String, Object> getMetrics() {
return ImmutableMap.of(NODES, 1);
}
}
@@ -0,0 +1,111 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.backend.store.hbase;

import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;

import com.baidu.hugegraph.backend.store.BackendMetrics;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.InsertionOrderUtil;

public class HbaseMetrics implements BackendMetrics {

private final Connection hbase;

public HbaseMetrics(HbaseSessions hbase) {
E.checkArgumentNotNull(hbase, "HBase connection is not opened");
this.hbase = hbase.hbase();
}

@Override
public Map<String, Object> getMetrics() {
Map<String, Object> results = InsertionOrderUtil.newMap();
try (Admin admin = this.hbase.getAdmin()) {
// Cluster info
ClusterMetrics clusterMetrics = admin.getClusterMetrics();
results.put("cluster_id", clusterMetrics.getClusterId());
results.put("average_load", clusterMetrics.getAverageLoad());
results.put("hbase_version", clusterMetrics.getHBaseVersion());
results.put("region_count", clusterMetrics.getRegionCount());
// Region servers info
Collection<ServerName> servers = admin.getRegionServers();
results.put(NODES, servers.size());
Map<ServerName, ServerMetrics> metrics =
clusterMetrics.getLiveServerMetrics();
Map<String, Object> regionServers = InsertionOrderUtil.newMap();
for (Map.Entry<ServerName, ServerMetrics> e : metrics.entrySet()) {
ServerName server = e.getKey();
ServerMetrics serverMetrics = e.getValue();
List<RegionMetrics> regions = admin.getRegionMetrics(server);
regionServers.put(server.getAddress().toString(),
formatMetrics(serverMetrics, regions));
}
results.put("region_servers", regionServers);
} catch (Throwable e) {
results.put(EXCEPTION, e.getMessage());
}
return results;
}

private static Map<String, Object> formatMetrics(
ServerMetrics serverMetrics,
List<RegionMetrics> regions) {
Map<String, Object> metrics = InsertionOrderUtil.newMap();
metrics.put("max_heap_size",
serverMetrics.getMaxHeapSize().get(Size.Unit.MEGABYTE));
metrics.put("used_heap_size",
serverMetrics.getUsedHeapSize().get(Size.Unit.MEGABYTE));
metrics.put("heap_size_unit", "MB");
metrics.put("request_count", serverMetrics.getRequestCount());
metrics.put("request_count_per_second",
serverMetrics.getRequestCountPerSecond());
metrics.put("regions", formatMetrics(regions));
return metrics;
}

private static Map<String, Object> formatMetrics(
List<RegionMetrics> regions) {
Map<String, Object> metrics = InsertionOrderUtil.newMap();
for (RegionMetrics region : regions) {
metrics.put(region.getNameAsString(), formatMetrics(region));
}
return metrics;
}

private static Map<String, Object> formatMetrics(RegionMetrics region) {
Map<String, Object> metrics = InsertionOrderUtil.newMap();
metrics.put("mem_store_size",
region.getMemStoreSize().get(Size.Unit.MEGABYTE));
metrics.put("file_store_size",
region.getStoreFileSize().get(Size.Unit.MEGABYTE));
metrics.put("store_size_unit", "MB");
return metrics;
}
}
@@ -85,6 +85,11 @@ public HbaseSessions(HugeConfig config, String namespace, String store) {
this.namespace = namespace;
}

protected Connection hbase() {
E.checkState(this.hbase != null, "HBase connection is not opened");
return this.hbase;
}

private Table table(String table) throws IOException {
E.checkState(this.hbase != null, "HBase connection is not opened");
TableName tableName = TableName.valueOf(this.namespace, table);
@@ -33,6 +33,7 @@
import org.apache.hadoop.hbase.NamespaceExistException;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Connection;
import org.slf4j.Logger;

import com.baidu.hugegraph.backend.BackendException;
@@ -73,6 +74,16 @@ public HbaseStore(final BackendStoreProvider provider,
this.namespace = namespace;
this.store = store;
this.sessions = null;

this.registerMetaHandlers();
LOG.debug("Store loaded: {}", store);
}

private void registerMetaHandlers() {
this.registerMetaHandler("metrics", (session, meta, args) -> {
HbaseMetrics metrics = new HbaseMetrics(this.sessions);
return metrics.getMetrics();
});
}

protected void registerTableManager(HugeType type, HbaseTable table) {
@@ -0,0 +1,33 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.backend.store.mysql;

import java.util.Map;

import com.baidu.hugegraph.backend.store.BackendMetrics;
import com.google.common.collect.ImmutableMap;

public class MysqlMetrics implements BackendMetrics {

@Override
public Map<String, Object> getMetrics() {
return ImmutableMap.of(NODES, 1);
}
}
@@ -71,9 +71,17 @@ public MysqlStore(final BackendStoreProvider provider,
this.sessions = null;
this.tables = new ConcurrentHashMap<>();

this.registerMetaHandlers();
LOG.debug("Store loaded: {}", store);
}

private void registerMetaHandlers() {
this.registerMetaHandler("metrics", (session, meta, args) -> {
MysqlMetrics metrics = new MysqlMetrics();
return metrics.getMetrics();
});
}

protected void registerTableManager(HugeType type, MysqlTable table) {
this.tables.put(type, table);
}
@@ -49,6 +49,7 @@ public RocksDBMetrics(List<RocksDBSessions> dbs,
@Override
public Map<String, Object> getMetrics() {
Map<String, Object> metrics = InsertionOrderUtil.newMap();
metrics.put(NODES, 1);
// NOTE: the unit of rocksdb mem property is bytes
metrics.put(MEM_USED, this.getMemUsed() / Bytes.MB);
metrics.put(MEM_UNIT, "MB");
@@ -62,17 +62,18 @@ public void testMetricsBackend() {

Assert.assertTrue(value instanceof Map);
Map<?, ?> graph = (Map<?, ?>) value;
assertMapContains(graph, "backend");
assertMapContains(graph, "nodes");
String backend = (String) graph.get("backend");
String notSupport = "Not support metadata 'metrics'";
int nodes = (Integer) graph.get("nodes");
switch (backend) {
case "memory":
case "mysql":
case "hbase":
case "postgresql":
String except = (String) assertMapContains(graph, "exception");
Assert.assertTrue(except, except.contains(notSupport));
Assert.assertEquals(1, nodes);
break;
case "rocksdb":
Assert.assertEquals(1, nodes);
assertMapContains(graph, "mem_used");
assertMapContains(graph, "mem_unit");
assertMapContains(graph, "data_size");
@@ -82,7 +83,7 @@ public void testMetricsBackend() {
for (Map.Entry<?, ?> e : graph.entrySet()) {
String key = (String) e.getKey();
value = e.getValue();
if ("backend".equals(key)) {
if ("backend".equals(key) || "nodes".equals(key)) {
continue;
}
Assert.assertTrue(String.format(
@@ -97,6 +98,30 @@ public void testMetricsBackend() {
assertMapContains(host, "data_size");
}
break;
case "hbase":
assertMapContains(graph, "cluster_id");
assertMapContains(graph, "average_load");
assertMapContains(graph, "hbase_version");
assertMapContains(graph, "region_count");
assertMapContains(graph, "region_servers");
Map<?, ?> servers = (Map<?, ?>) graph.get("region_servers");
for (Map.Entry<?, ?> e : servers.entrySet()) {
String key = (String) e.getKey();
value = e.getValue();
Assert.assertTrue(String.format(
"Expect map value for key %s but got %s",
key, value),
value instanceof Map);
Map<?, ?> regionServer = (Map<?, ?>) value;
assertMapContains(regionServer, "max_heap_size");
assertMapContains(regionServer, "used_heap_size");
assertMapContains(regionServer, "heap_size_unit");
assertMapContains(regionServer, "request_count");
assertMapContains(regionServer,
"request_count_per_second");
assertMapContains(regionServer, "regions");
}
break;
default:
Assert.assertTrue("Unexpected backend " + backend, false);
break;

0 comments on commit 1428c8b

Please sign in to comment.