From 74ff022efcddae7a70f1a833409da693c84f9f6c Mon Sep 17 00:00:00 2001 From: liuxunorg <33611720@qq.com> Date: Sat, 14 Jul 2018 11:25:18 +0800 Subject: [PATCH] [ZEPPELIN-3610] Cluster Raft module design What is this PR for? By using the Raft protocol, multiple Zeppelin-Server groups are built into a Zeppelin cluster, the cluster State Machine is maintained through the Raft protocol, and the services in the cluster are agreed upon. The Zeppelin-Server and Zeppelin-Interperter services and processes are stored in the Cluster MetaData. Metadata information; What type of PR is it? [Feature] Todos - - Added support for checking lambda syntax styles in the pom.xml file - add raft algorithm copycat jar - add Cluster State Machine - add state machine query command - add state machine delete command - add state machine put command What is the Jira issue? - https://issues.apache.org/jira/browse/ZEPPELIN-3610 How should this be tested? CI pass Screenshots (if appropriate) Questions: - Does the licenses files need update? No - Is there breaking changes for older versions? No - Does this needs documentation? No --- pom.xml | 7 + zeppelin-interpreter/pom.xml | 51 +++++ .../zeppelin/cluster/ClusterStateMachine.java | 88 +++++++++ .../zeppelin/cluster/DeleteCommand.java | 41 ++++ .../org/apache/zeppelin/cluster/GetQuery.java | 40 ++++ .../apache/zeppelin/cluster/PutCommand.java | 49 +++++ .../zeppelin/cluster/meta/ClusterMeta.java | 144 ++++++++++++++ .../cluster/meta/ClusterMetaEntity.java | 57 ++++++ .../cluster/meta/ClusterMetaOperation.java | 25 +++ .../cluster/meta/ClusterMetaType.java | 25 +++ .../zeppelin/cluster/ClusterMetaTest.java | 185 ++++++++++++++++++ 11 files changed, 712 insertions(+) create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/DeleteCommand.java create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/GetQuery.java create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/PutCommand.java create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java create mode 100644 zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMetaTest.java diff --git a/pom.xml b/pom.xml index 76dc327e1e4..ecd860d9bb1 100644 --- a/pom.xml +++ b/pom.xml @@ -572,6 +572,13 @@ org.apache.maven.plugins maven-checkstyle-plugin ${plugin.checkstyle.version} + + + com.puppycrawl.tools + checkstyle + 6.11.1 + + diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index 4ee10806922..750cdc40c01 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -44,6 +44,9 @@ 3.0.3 1.0 2.12.1 + 1.2.9-SNAPSHOT + 4.0.23.Final + 1.2.1 2.3 @@ -98,6 +101,54 @@ slf4j-log4j12 + + + io.atomix.copycat + copycat-server + ${copycat.version} + + + io.atomix.copycat + copycat-client + ${copycat.version} + + + io.netty + netty-all + ${netty-all.version} + + + io.atomix.catalyst + catalyst-netty + ${catalyst-netty.version} + + + io.netty + netty-transport + + + io.netty + netty-buffer + + + io.netty + netty-common + + + io.netty + netty-resolver + + + io.netty + netty-handler + + + io.netty + netty-codec + + + + org.apache.maven diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java new file mode 100644 index 00000000000..084398410d6 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster; + +import io.atomix.copycat.server.Commit; +import io.atomix.copycat.server.Snapshottable; +import io.atomix.copycat.server.StateMachine; +import io.atomix.copycat.server.StateMachineExecutor; +import io.atomix.copycat.server.storage.snapshot.SnapshotReader; +import io.atomix.copycat.server.storage.snapshot.SnapshotWriter; +import org.apache.zeppelin.cluster.meta.ClusterMeta; + +/** + * Cluster State Machine + */ +public class ClusterStateMachine extends StateMachine implements Snapshottable { + private ClusterMeta clusterMeta = new ClusterMeta(); + + @Override + protected void configure(StateMachineExecutor executor) { + executor.register(PutCommand.class, this::put); + executor.register(GetQuery.class, this::get); + executor.register(DeleteCommand.class, this::delete); + } + + public Object put(Commit commit) { + try { + clusterMeta.put( + commit.operation().type(), + commit.operation().key(), commit.operation().value()); + } finally { + commit.close(); + } + return null; + } + + public Object get(Commit commit) { + try { + return clusterMeta.get(commit.operation().type(), commit.operation().key()); + } finally { + commit.close(); + } + } + + /** + * Deletes the value. + */ + private Object delete(Commit commit) { + try { + if (clusterMeta.contains(commit.operation().type(), commit.operation().key())) { + clusterMeta.remove(commit.operation().type(), commit.operation().key()); + } + return null; + } finally { + commit.close(); + } + } + + // The following two methods come from interfaces that implement Snapshottable, + // This interface is implemented to enable the replication server to compress the + // local status log. + // And form a snapshot (snapshot), when copycat-server restarts, + // you can restore the status from the snapshot, + // If other servers are added, snapshots can be copied to other servers. + @Override + public void snapshot(SnapshotWriter writer) { + writer.writeObject(clusterMeta); + } + + @Override + public void install(SnapshotReader reader) { + clusterMeta = reader.readObject(); + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/DeleteCommand.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/DeleteCommand.java new file mode 100644 index 00000000000..2597c648859 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/DeleteCommand.java @@ -0,0 +1,41 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ +package org.apache.zeppelin.cluster; + +import io.atomix.copycat.Command; +import org.apache.zeppelin.cluster.meta.ClusterMetaType; + +/** + * Command to delete a variable in cluster state machine + * + */ +public class DeleteCommand implements Command { + private final ClusterMetaType type; + private final String key; + + public DeleteCommand(ClusterMetaType type, String key){ + this.type = type; + this.key = key; + } + + public ClusterMetaType type(){ + return type; + } + + public String key(){ + return key; + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/GetQuery.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/GetQuery.java new file mode 100644 index 00000000000..31c5545989c --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/GetQuery.java @@ -0,0 +1,40 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ +package org.apache.zeppelin.cluster; + +import io.atomix.copycat.Query; +import org.apache.zeppelin.cluster.meta.ClusterMetaType; + +/** + * Command to query a variable in cluster state machine + */ +public class GetQuery implements Query { + private final ClusterMetaType type; + private final String key; + + public GetQuery(ClusterMetaType type, String key){ + this.type = type; + this.key = key; + } + + public ClusterMetaType type(){ + return type; + } + + public String key(){ + return key; + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/PutCommand.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/PutCommand.java new file mode 100644 index 00000000000..f57e190c91b --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/PutCommand.java @@ -0,0 +1,49 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ +package org.apache.zeppelin.cluster; + +import io.atomix.copycat.Command; +import org.apache.zeppelin.cluster.meta.ClusterMetaType; + +import java.util.Map; + +/** + * Command to put a variable in cluster state machine + */ +public class PutCommand implements Command { + private final ClusterMetaType type; + private final String key; + private final Object value; + + public PutCommand(ClusterMetaType type, String key, Map value){ + this.type = type; + this.key = key; + this.value = value; + } + + public ClusterMetaType type(){ + return type; + } + + public String key(){ + return key; + } + + public Object value(){ + return value; + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java new file mode 100644 index 00000000000..d3979e52155 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.meta; + +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * Metadata stores metadata information in a KV key-value pair + */ +public class ClusterMeta implements Serializable { + private static Logger logger = LoggerFactory.getLogger(ClusterMeta.class); + + public static String SERVER_HOST = "SERVER_HOST"; + public static String SERVER_PORT = "SERVER_PORT"; + public static String SERVER_TSERVER_HOST = "SERVER_TSERVER_HOST"; + public static String SERVER_TSERVER_PORT = "SERVER_TSERVER_PORT"; + public static String INTP_TSERVER_HOST = "INTP_TSERVER_HOST"; + public static String INTP_TSERVER_PORT = "INTP_TSERVER_PORT"; + public static String CPU_CAPACITY = "CPU_CAPACITY"; + public static String CPU_USED = "CPU_USED"; + public static String MEMORY_CAPACITY = "MEMORY_CAPACITY"; + public static String MEMORY_USED = "MEMORY_USED"; + public static String HEARTBEAT = "HEARTBEAT"; + + // cluster_name = host:port + // cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...} + private Map> mapServerMeta = new HashMap<>(); + + // InterpreterGroupId -> {cluster_name,intp_tserver_host,...} + private Map> mapInterpreterMeta = new HashMap<>(); + + public void put(ClusterMetaType type, String key, Object value) { + Map mapValue = (Map) value; + + switch (type) { + case ServerMeta: + // Because it may be partially updated metadata information + if (mapServerMeta.containsKey(key)) { + Map values = mapServerMeta.get(key); + values.putAll(mapValue); + } else { + mapServerMeta.put(key, mapValue); + } + break; + case IntpProcessMeta: + if (mapInterpreterMeta.containsKey(key)) { + Map values = mapInterpreterMeta.get(key); + values.putAll(mapValue); + } else { + mapInterpreterMeta.put(key, mapValue); + } + break; + } + } + + public Object get(ClusterMetaType type, String key) { + Map values = null; + + switch (type) { + case ServerMeta: + if (StringUtils.isEmpty(key)) { + return mapServerMeta; + } + if (mapServerMeta.containsKey(key)) { + values = mapServerMeta.get(key); + } else { + logger.warn("can not find key : {}", key); + } + break; + case IntpProcessMeta: + if (StringUtils.isEmpty(key)) { + return mapInterpreterMeta; + } + if (mapInterpreterMeta.containsKey(key)) { + values = mapInterpreterMeta.get(key); + } else { + logger.warn("can not find key : {}", key); + } + break; + } + + return values; + } + + public boolean contains(ClusterMetaType type, String key) { + boolean isExist = false; + + switch (type) { + case ServerMeta: + if (mapServerMeta.containsKey(key)) { + isExist = true; + } + break; + case IntpProcessMeta: + if (mapInterpreterMeta.containsKey(key)) { + isExist = true; + } + break; + } + + return isExist; + } + + public Map remove(ClusterMetaType type, String key) { + switch (type) { + case ServerMeta: + if (mapServerMeta.containsKey(key)) { + return mapServerMeta.remove(key); + } else { + logger.warn("can not find key : {}", key); + } + break; + case IntpProcessMeta: + if (mapInterpreterMeta.containsKey(key)) { + return mapInterpreterMeta.remove(key); + } else { + logger.warn("can not find key : {}", key); + } + break; + } + + return null; + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java new file mode 100644 index 00000000000..73ad785a616 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.meta; + +import java.util.HashMap; +import java.util.Map; + +/** + * Cluster operations, cluster types, encapsulation objects for keys and values + */ +public class ClusterMetaEntity { + private ClusterMetaOperation operation; + private ClusterMetaType type; + private String key; + private Map values = new HashMap<>(); + + public ClusterMetaEntity(ClusterMetaOperation operation, ClusterMetaType type, + String key, Map values) { + this.operation = operation; + this.type = type; + this.key = key; + + if (null != values) { + this.values.putAll(values); + } + } + + public ClusterMetaOperation getOperation() { + return operation; + } + + public ClusterMetaType getMetaType() { + return type; + } + + public String getKey() { + return key; + } + + public Map getValues() { + return values; + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java new file mode 100644 index 00000000000..c9d71f9de66 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.meta; + +/** + * Type of cluster metadata operation + */ +public enum ClusterMetaOperation { + PUT, + DELETE +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java new file mode 100644 index 00000000000..c6229bd6347 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.meta; + +/** + * Type of cluster metadata + */ +public enum ClusterMetaType { + ServerMeta, + IntpProcessMeta +} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMetaTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMetaTest.java new file mode 100644 index 00000000000..1138bac1e62 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMetaTest.java @@ -0,0 +1,185 @@ +package org.apache.zeppelin.cluster; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import io.atomix.catalyst.transport.Address; +import io.atomix.catalyst.transport.netty.NettyTransport; +import io.atomix.copycat.client.ConnectionStrategies; +import io.atomix.copycat.client.CopycatClient; +import io.atomix.copycat.client.RecoveryStrategies; +import io.atomix.copycat.client.ServerSelectionStrategies; +import io.atomix.copycat.server.CopycatServer; +import io.atomix.copycat.server.storage.Storage; +import io.atomix.copycat.server.storage.StorageLevel; +import org.apache.zeppelin.cluster.meta.ClusterMeta; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.zeppelin.cluster.meta.ClusterMetaType.IntpProcessMeta; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class ClusterMetaTest { + static CopycatServer copycatServer = null; + static CopycatClient copycatClient = null; + + @BeforeClass + public static void initClusterEnv() throws IOException, InterruptedException { + System.out.println("initClusterEnv >>>"); + + // Set the cluster IP and port + String zServerHost = RemoteInterpreterUtils.findAvailableHostAddress(); + int zServerPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + Address clusterMembers = new Address(zServerHost, zServerPort); + + // mock cluster manager server + copycatServer = CopycatServer.builder(clusterMembers) + .withStateMachine(ClusterStateMachine::new) + .withTransport(new NettyTransport()) + .withStorage(Storage.builder() + .withDirectory("copycat-logs") + .withStorageLevel(StorageLevel.MEMORY) + .withMaxSegmentSize(1024 * 1024 * 32) + .withMinorCompactionInterval(Duration.ofMinutes(1)) + .withMajorCompactionInterval(Duration.ofMinutes(15)) + .build()) + .build(); + + // Registering the putCommand and GetQuery command classes + copycatServer.serializer().register(PutCommand.class, 1); + copycatServer.serializer().register(GetQuery.class, 2); + copycatServer.serializer().register(DeleteCommand.class, 3); + + copycatServer.onStateChange(state -> { + if (state == CopycatServer.State.CANDIDATE) { + System.out.println("CopycatServer.State CANDIDATE!"); + } else if (state == CopycatServer.State.FOLLOWER) { + System.out.println("CopycatServer.State FOLLOWER!"); + } else if (state == CopycatServer.State.INACTIVE) { + System.out.println("CopycatServer.State INACTIVE!"); + } else if (state == CopycatServer.State.LEADER) { + System.out.println("CopycatServer.State LEADER!"); + } else if (state == CopycatServer.State.PASSIVE) { + System.out.println("CopycatServer.State PASSIVE!"); + } else { + System.out.println("unknown CopycatServer.State!"); + } + }); + + copycatServer.cluster().onJoin(member -> { + System.out.println(member.address() + " joined the cluster."); + }); + + copycatServer.cluster().onLeave(member -> { + System.out.println(member.address() + " left the cluster."); + }); + + copycatServer.bootstrap().join(); + + // initialization Raft client + copycatClient = CopycatClient.builder() + .withTransport(new NettyTransport()) + .withConnectionStrategy(ConnectionStrategies.FIBONACCI_BACKOFF) + .withRecoveryStrategy(RecoveryStrategies.RECOVER) + .withServerSelectionStrategy(ServerSelectionStrategies.LEADER) + .withSessionTimeout(Duration.ofSeconds(15)) + .build(); + + copycatClient.onStateChange(state -> { + if (state == CopycatClient.State.CONNECTED) { + System.out.println("CopycatClient.State CONNECTED!"); + } else if (state == CopycatClient.State.CLOSED) { + System.out.println("CopycatClient.State CLOSED!"); + } else if (state == CopycatClient.State.SUSPENDED) { + System.out.println("CopycatClient.State SUSPENDED!"); + } else { + System.out.println("unknown CopycatClient.State " + state); + } + }); + + copycatClient.serializer().register(PutCommand.class, 1); + copycatClient.serializer().register(GetQuery.class, 2); + copycatClient.serializer().register(DeleteCommand.class, 3); + + copycatClient.connect(clusterMembers).join(); + + // Waiting for cluster startup + Thread.sleep(3000); + } + + @Test + public void putClusterMeta() throws InterruptedException, ExecutionException { + Map meta = new HashMap<>(); + meta.put(ClusterMeta.SERVER_HOST, "SERVER_HOST"); + meta.put(ClusterMeta.SERVER_PORT, "SERVER_PORT"); + meta.put(ClusterMeta.SERVER_TSERVER_HOST, "SERVER_TSERVER_HOST"); + meta.put(ClusterMeta.SERVER_TSERVER_PORT, "SERVER_TSERVER_PORT"); + meta.put(ClusterMeta.INTP_TSERVER_HOST, "INTP_TSERVER_HOST"); + meta.put(ClusterMeta.INTP_TSERVER_PORT, "INTP_TSERVER_PORT"); + meta.put(ClusterMeta.CPU_CAPACITY, "CPU_CAPACITY"); + meta.put(ClusterMeta.CPU_USED, "CPU_USED"); + meta.put(ClusterMeta.MEMORY_CAPACITY, "MEMORY_CAPACITY"); + meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED"); + meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED"); + + PutCommand putCommand = new PutCommand(IntpProcessMeta, "test", meta); + copycatClient.submit(putCommand); + + try { + Object verifyMeta = copycatClient.submit(new GetQuery(IntpProcessMeta, "test")) + .get(3, TimeUnit.SECONDS); + System.out.println("Cluster meta[" + IntpProcessMeta + "] : " + verifyMeta); + assertEquals(verifyMeta, meta); + } catch (TimeoutException e) { + e.printStackTrace(); + } + } + + @Test + public void deleteClusterMeta() throws InterruptedException, ExecutionException { + System.out.println("test deleteClusterMeta"); + putClusterMeta(); + + DeleteCommand deleteCommand = new DeleteCommand(IntpProcessMeta, "test"); + CompletableFuture futures = copycatClient.submit(deleteCommand); + CompletableFuture.allOf(futures) + .thenRun(() -> System.out.println("deleteClusterMeta completed!")); + + try { + Object verifyMeta = copycatClient.submit(new GetQuery(IntpProcessMeta, "test")) + .get(3, TimeUnit.SECONDS); + System.out.println("Cluster meta[" + IntpProcessMeta + "] : " + verifyMeta); + assertNull(verifyMeta); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + e.printStackTrace(); + } + } +}