Skip to content
Permalink
Browse files
Support to detect connection before using if needed (#562)
Fix #556

Change-Id: I622e285f812098d601061f2515d70629f1e07d88
  • Loading branch information
Linary authored and zhoney committed Jun 13, 2019
1 parent 6f6fac9 commit 6a4aaae453b616bfb775e4127a78955b8550a37a
Showing 15 changed files with 130 additions and 63 deletions.
@@ -62,7 +62,7 @@ public class LoadDetectFilter implements ContainerRequestFilter {

@Override
public void filter(ContainerRequestContext context) {
if (isWhiteAPI(context)) {
if (LoadDetectFilter.isWhiteAPI(context)) {
return;
}

@@ -93,7 +93,7 @@ public void filter(ContainerRequestContext context) {
}
}

private static boolean isWhiteAPI(ContainerRequestContext context) {
public static boolean isWhiteAPI(ContainerRequestContext context) {
List<PathSegment> segments = context.getUriInfo().getPathSegments();
E.checkArgument(segments.size() > 0, "Invalid request uri '%s'",
context.getUriInfo().getPath());
@@ -19,8 +19,6 @@

package com.baidu.hugegraph.api.filter;

import java.io.IOException;

import javax.inject.Singleton;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
@@ -39,8 +37,11 @@ public class LoadReleaseFilter implements ContainerResponseFilter {

@Override
public void filter(ContainerRequestContext requestContext,
ContainerResponseContext responseContext)
throws IOException {
ContainerResponseContext responseContext) {
if (LoadDetectFilter.isWhiteAPI(requestContext)) {
return;
}

WorkLoad load = this.loadProvider.get();
load.decrementAndGet();
}
@@ -52,7 +52,7 @@ public static synchronized ServerOptions instance() {
new ConfigOption<>(
"restserver.max_worker_threads",
"The maxmium worker threads of rest server.",
positiveInt(),
rangeInt(2, Integer.MAX_VALUE),
2 * Runtime.getRuntime().availableProcessors()
);

@@ -46,19 +46,21 @@ public class CassandraSessionPool extends BackendSessionPool {
private Cluster cluster;
private String keyspace;

public CassandraSessionPool(String keyspace, String store) {
super(keyspace + "/" + store);
public CassandraSessionPool(HugeConfig config, String keyspace,
String store) {
super(config, keyspace + "/" + store);
this.cluster = null;
this.keyspace = keyspace;
}

@Override
public synchronized void open(HugeConfig config) {
public synchronized void open() {
if (this.opened()) {
throw new BackendException("Please close the old SessionPool " +
"before opening a new one");
}

HugeConfig config = this.config();
// Contact options
String hosts = config.get(CassandraOptions.CASSANDRA_HOST);
int port = config.get(CassandraOptions.CASSANDRA_PORT);
@@ -61,11 +61,10 @@
private final String keyspace;

private final BackendStoreProvider provider;

private final CassandraSessionPool sessions;
// TODO: move to parent class
private final Map<HugeType, CassandraTable> tables;

private CassandraSessionPool sessions;
private HugeConfig conf;

public CassandraStore(final BackendStoreProvider provider,
@@ -77,10 +76,9 @@ public CassandraStore(final BackendStoreProvider provider,

this.keyspace = keyspace;
this.store = store;

this.sessions = new CassandraSessionPool(keyspace, store);
this.tables = new ConcurrentHashMap<>();

this.sessions = null;
this.conf = null;

this.registerMetaHandlers();
@@ -114,11 +112,15 @@ public BackendStoreProvider provider() {
}

@Override
public void open(HugeConfig config) {
public synchronized void open(HugeConfig config) {
LOG.debug("Store open: {}", this.store);

E.checkNotNull(config, "config");

if (this.sessions == null) {
this.sessions = new CassandraSessionPool(config, this.keyspace,
this.store);
}

if (this.sessions.opened()) {
// TODO: maybe we should throw an exception here instead of ignore
LOG.debug("Store {} has been opened before", this.store);
@@ -128,7 +130,7 @@ public void open(HugeConfig config) {
this.conf = config;

// Init cluster
this.sessions.open(config);
this.sessions.open();

// Init a session for current thread
try {
@@ -28,10 +28,26 @@ public abstract class BackendSession {

private int refs;
private TxState txState;
private final long created;
private long updated;

public BackendSession() {
this.refs = 1;
this.txState = TxState.CLEAN;
this.created = System.currentTimeMillis();
this.updated = this.created;
}

public long created() {
return this.created;
}

public long updated() {
return this.updated;
}

public void update() {
this.updated = System.currentTimeMillis();
}

public abstract void close();
@@ -44,6 +60,10 @@ public BackendSession() {

public abstract boolean hasChanges();

protected void reconnectIfNeeded() {
// pass
}

protected int attach() {
return ++this.refs;
}
@@ -19,28 +19,36 @@

package com.baidu.hugegraph.backend.store;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;

import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.util.Log;

public abstract class BackendSessionPool {

private static final Logger LOG = Log.logger(BackendSessionPool.class);

private final HugeConfig config;
private final String name;
private final ThreadLocal<BackendSession> threadLocalSession;
private final AtomicInteger sessionCount;

public BackendSessionPool(String name) {
public BackendSessionPool(HugeConfig config, String name) {
this.config = config;
this.name = name;
this.threadLocalSession = new ThreadLocal<>();
this.sessionCount = new AtomicInteger(0);
}

public HugeConfig config() {
return this.config;
}

public final BackendSession getOrNewSession() {
BackendSession session = this.threadLocalSession.get();
if (session == null) {
@@ -50,6 +58,8 @@ public final BackendSession getOrNewSession() {
this.sessionCount.incrementAndGet();
LOG.debug("Now(after connect({})) session count is: {}",
this, this.sessionCount.get());
} else {
this.detectSession(session);
}
return session;
}
@@ -58,12 +68,23 @@ public BackendSession useSession() {
BackendSession session = this.threadLocalSession.get();
if (session != null) {
session.attach();
this.detectSession(session);
} else {
session = this.getOrNewSession();
}
return session;
}

private void detectSession(BackendSession session) {
// Reconnect if the session idle time exceed specified value
long interval = this.config.get(CoreOptions.CONNECTION_DETECT_INTERVAL);
long now = System.currentTimeMillis();
if (now - session.updated() > TimeUnit.SECONDS.toMillis(interval)) {
session.reconnectIfNeeded();
}
session.update();
}

public Pair<Integer, Integer> closeSession() {
BackendSession session = this.threadLocalSession.get();
if (session == null) {
@@ -112,7 +133,7 @@ public String toString() {
this.getClass().getSimpleName(), this.hashCode());
}

public abstract void open(HugeConfig config) throws Exception;
public abstract void open() throws Exception;

protected abstract boolean opened();

@@ -19,12 +19,12 @@

package com.baidu.hugegraph.config;

import com.baidu.hugegraph.backend.query.Query;

import static com.baidu.hugegraph.backend.tx.GraphTransaction.COMMIT_BATCH;
import static com.baidu.hugegraph.config.OptionChecker.disallowEmpty;
import static com.baidu.hugegraph.config.OptionChecker.rangeInt;

import com.baidu.hugegraph.backend.query.Query;

public class CoreOptions extends OptionHolder {

private CoreOptions() {
@@ -115,6 +115,17 @@ public static synchronized CoreOptions instance() {
10L
);

public static final ConfigOption<Long> CONNECTION_DETECT_INTERVAL =
new ConfigOption<>(
"store.connection_detect_interval",
"The interval in seconds for detecting connections, " +
"if the idle time of a connection exceeds this value, " +
"detect it and reconnect if needed before using, " +
"value 0 means detecting every time.",
rangeInt(0L, Long.MAX_VALUE),
600L
);

public static final ConfigOption<String> VERTEX_DEFAULT_LABEL =
new ConfigOption<>(
"vertex.default_label",
@@ -79,8 +79,8 @@ public class HbaseSessions extends BackendSessionPool {
private final String namespace;
private Connection hbase;

public HbaseSessions(String namespace, String store) {
super(namespace + "/" + store);
public HbaseSessions(HugeConfig config, String namespace, String store) {
super(config, namespace + "/" + store);
this.namespace = namespace;
}

@@ -91,20 +91,21 @@ private Table table(String table) throws IOException {
}

@Override
public synchronized void open(HugeConfig conf) throws IOException {
String hosts = conf.get(HbaseOptions.HBASE_HOSTS);
int port = conf.get(HbaseOptions.HBASE_PORT);
String znodeParent = conf.get(HbaseOptions.HBASE_ZNODE_PARENT);

Configuration config = HBaseConfiguration.create();
config.set(HConstants.ZOOKEEPER_QUORUM, hosts);
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, String.valueOf(port));
config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
public synchronized void open() throws IOException {
HugeConfig config = this.config();
String hosts = config.get(HbaseOptions.HBASE_HOSTS);
int port = config.get(HbaseOptions.HBASE_PORT);
String znodeParent = config.get(HbaseOptions.HBASE_ZNODE_PARENT);

Configuration hConfig = HBaseConfiguration.create();
hConfig.set(HConstants.ZOOKEEPER_QUORUM, hosts);
hConfig.set(HConstants.ZOOKEEPER_CLIENT_PORT, String.valueOf(port));
hConfig.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
// Set hbase.hconnection.threads.max 64 to avoid OOM(default value: 256)
config.setInt("hbase.hconnection.threads.max",
conf.get(HbaseOptions.HBASE_THREADS_MAX));
hConfig.setInt("hbase.hconnection.threads.max",
config.get(HbaseOptions.HBASE_THREADS_MAX));

this.hbase = ConnectionFactory.createConnection(config);
this.hbase = ConnectionFactory.createConnection(hConfig);
}

@Override
@@ -58,7 +58,7 @@ public abstract class HbaseStore extends AbstractBackendStore<Session> {
private final BackendStoreProvider provider;
private final Map<HugeType, HbaseTable> tables;

private final HbaseSessions sessions;
private HbaseSessions sessions;

public HbaseStore(final BackendStoreProvider provider,
final String namespace, final String store) {
@@ -67,7 +67,7 @@ public HbaseStore(final BackendStoreProvider provider,
this.provider = provider;
this.namespace = namespace;
this.store = store;
this.sessions = new HbaseSessions(namespace, store);
this.sessions = null;
}

protected void registerTableManager(HugeType type, HbaseTable table) {
@@ -120,17 +120,21 @@ public BackendFeatures features() {
}

@Override
public void open(HugeConfig config) {
public synchronized void open(HugeConfig config) {
E.checkNotNull(config, "config");

if (this.sessions == null) {
this.sessions = new HbaseSessions(config, this.namespace, this.store);
}

if (this.sessions.opened()) {
LOG.debug("Store {} has been opened before", this.store);
this.sessions.useSession();
return;
}

try {
this.sessions.open(config);
this.sessions.open();
} catch (IOException e) {
if (!e.getMessage().contains("Column family not found")) {
LOG.error("Failed to open HBase '{}'", this.store, e);
@@ -50,7 +50,7 @@ public class MysqlSessions extends BackendSessionPool {
private volatile boolean opened;

public MysqlSessions(HugeConfig config, String database, String store) {
super(database + "/" + store);
super(config, database + "/" + store);
this.config = config;
this.database = database;
this.opened = false;
@@ -73,7 +73,7 @@ public String escapedDatabase() {
* @throws SQLException if a database access error occurs
*/
@Override
public synchronized void open(HugeConfig config) throws Exception {
public synchronized void open() throws Exception {
try (Connection conn = this.open(false)) {
this.opened = true;
}
@@ -356,6 +356,15 @@ public boolean hasChanges() {
return this.count > 0;
}

@Override
protected void reconnectIfNeeded() {
try {
this.execute("SELECT 1;");
} catch (SQLException ignored) {
// pass
}
}

public ResultSet select(String sql) throws SQLException {
assert this.conn.getAutoCommit();
return this.conn.createStatement().executeQuery(sql);

0 comments on commit 6a4aaae

Please sign in to comment.