From b2faba1010c9517d1f651e6c69df1b3651c4b13f Mon Sep 17 00:00:00 2001
From: liuxunorg <33611720@qq.com>
Date: Sat, 14 Jul 2018 11:25:18 +0800
Subject: [PATCH 1/4] [ZEPPELIN-3610] Cluster Raft module design
By using the Raft protocol, multiple Zeppelin-Server groups are built into a Zeppelin cluster, the cluster State Machine is maintained through the Raft protocol, and the services in the cluster are agreed upon. The Zeppelin-Server and Zeppelin-Interperter services and processes are stored in the Cluster MetaData. Metadata information;
[Feature]
* [x] added support for checking lambda syntax styles in the pom.xml file
* [x] add raft algorithm copycat jar
* [x] add Cluster State Machine
* [x] add state machine query command
* [x] add state machine delete command
* [x] add state machine put command
* https://issues.apache.org/jira/browse/ZEPPELIN-3610
CI pass
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
---
pom.xml | 7 +
zeppelin-interpreter/pom.xml | 65 ++++++
.../zeppelin/cluster/ClusterStateMachine.java | 88 +++++++++
.../zeppelin/cluster/DeleteCommand.java | 41 ++++
.../org/apache/zeppelin/cluster/GetQuery.java | 40 ++++
.../apache/zeppelin/cluster/PutCommand.java | 49 +++++
.../zeppelin/cluster/meta/ClusterMeta.java | 144 ++++++++++++++
.../cluster/meta/ClusterMetaEntity.java | 57 ++++++
.../cluster/meta/ClusterMetaOperation.java | 25 +++
.../cluster/meta/ClusterMetaType.java | 25 +++
.../zeppelin/cluster/ClusterMetaTest.java | 185 ++++++++++++++++++
11 files changed, 726 insertions(+)
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/DeleteCommand.java
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/GetQuery.java
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/PutCommand.java
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
create mode 100644 zeppelin-zengine/src/test/java/org/apache/zeppelin/cluster/ClusterMetaTest.java
diff --git a/pom.xml b/pom.xml
index 76dc327e1e4..ecd860d9bb1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -572,6 +572,13 @@
org.apache.maven.pluginsmaven-checkstyle-plugin${plugin.checkstyle.version}
+
+
+ com.puppycrawl.tools
+ checkstyle
+ 6.11.1
+
+
diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml
index 4ee10806922..3b1c270cfed 100644
--- a/zeppelin-interpreter/pom.xml
+++ b/zeppelin-interpreter/pom.xml
@@ -44,6 +44,8 @@
3.0.31.02.12.1
+ 1.2.9-SNAPSHOT
+ 1.2.12.3
@@ -98,6 +100,69 @@
slf4j-log4j12
+
+
+ io.atomix.copycat
+ copycat-server
+ ${copycat.version}
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ io.atomix.copycat
+ copycat-client
+ ${copycat.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ io.atomix.catalyst
+ catalyst-netty
+ ${catalyst-netty.version}
+
+
+ io.netty
+ netty-transport
+
+
+ io.netty
+ netty-buffer
+
+
+ io.netty
+ netty-common
+
+
+ io.netty
+ netty-resolver
+
+
+ io.netty
+ netty-handler
+
+
+ io.netty
+ netty-codec
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
org.apache.maven
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
new file mode 100644
index 00000000000..084398410d6
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachine;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import org.apache.zeppelin.cluster.meta.ClusterMeta;
+
+/**
+ * Cluster State Machine
+ */
+public class ClusterStateMachine extends StateMachine implements Snapshottable {
+ private ClusterMeta clusterMeta = new ClusterMeta();
+
+ @Override
+ protected void configure(StateMachineExecutor executor) {
+ executor.register(PutCommand.class, this::put);
+ executor.register(GetQuery.class, this::get);
+ executor.register(DeleteCommand.class, this::delete);
+ }
+
+ public Object put(Commit commit) {
+ try {
+ clusterMeta.put(
+ commit.operation().type(),
+ commit.operation().key(), commit.operation().value());
+ } finally {
+ commit.close();
+ }
+ return null;
+ }
+
+ public Object get(Commit commit) {
+ try {
+ return clusterMeta.get(commit.operation().type(), commit.operation().key());
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Deletes the value.
+ */
+ private Object delete(Commit commit) {
+ try {
+ if (clusterMeta.contains(commit.operation().type(), commit.operation().key())) {
+ clusterMeta.remove(commit.operation().type(), commit.operation().key());
+ }
+ return null;
+ } finally {
+ commit.close();
+ }
+ }
+
+ // The following two methods come from interfaces that implement Snapshottable,
+ // This interface is implemented to enable the replication server to compress the
+ // local status log.
+ // And form a snapshot (snapshot), when copycat-server restarts,
+ // you can restore the status from the snapshot,
+ // If other servers are added, snapshots can be copied to other servers.
+ @Override
+ public void snapshot(SnapshotWriter writer) {
+ writer.writeObject(clusterMeta);
+ }
+
+ @Override
+ public void install(SnapshotReader reader) {
+ clusterMeta = reader.readObject();
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/DeleteCommand.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/DeleteCommand.java
new file mode 100644
index 00000000000..2597c648859
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/DeleteCommand.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2015 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+package org.apache.zeppelin.cluster;
+
+import io.atomix.copycat.Command;
+import org.apache.zeppelin.cluster.meta.ClusterMetaType;
+
+/**
+ * Command to delete a variable in cluster state machine
+ *
+ */
+public class DeleteCommand implements Command
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
index 084398410d6..1f08d903284 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
@@ -33,7 +33,7 @@ public class ClusterStateMachine extends StateMachine implements Snapshottable {
@Override
protected void configure(StateMachineExecutor executor) {
executor.register(PutCommand.class, this::put);
- executor.register(GetQuery.class, this::get);
+ executor.register(GetCommand.class, this::get);
executor.register(DeleteCommand.class, this::delete);
}
@@ -48,7 +48,7 @@ public Object put(Commit commit) {
return null;
}
- public Object get(Commit commit) {
+ public Object get(Commit commit) {
try {
return clusterMeta.get(commit.operation().type(), commit.operation().key());
} finally {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/GetQuery.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/GetCommand.java
similarity index 90%
rename from zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/GetQuery.java
rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/GetCommand.java
index 31c5545989c..91de287a1e1 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/GetQuery.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/GetCommand.java
@@ -21,11 +21,11 @@
/**
* Command to query a variable in cluster state machine
*/
-public class GetQuery implements Query {
+public class GetCommand implements Query {
private final ClusterMetaType type;
private final String key;
- public GetQuery(ClusterMetaType type, String key){
+ public GetCommand(ClusterMetaType type, String key){
this.type = type;
this.key = key;
}
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/cluster/ClusterMetaTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/cluster/ClusterMetaTest.java
index 8816d21f9f4..3aceb43a4f5 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/cluster/ClusterMetaTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/cluster/ClusterMetaTest.java
@@ -73,9 +73,9 @@ public static void initClusterEnv() throws IOException, InterruptedException {
.build())
.build();
- // Registering the putCommand and GetQuery command classes
+ // Registering the putCommand and GetCommand command classes
copycatServer.serializer().register(PutCommand.class, 1);
- copycatServer.serializer().register(GetQuery.class, 2);
+ copycatServer.serializer().register(GetCommand.class, 2);
copycatServer.serializer().register(DeleteCommand.class, 3);
copycatServer.onStateChange(state -> {
@@ -126,7 +126,7 @@ public static void initClusterEnv() throws IOException, InterruptedException {
});
copycatClient.serializer().register(PutCommand.class, 1);
- copycatClient.serializer().register(GetQuery.class, 2);
+ copycatClient.serializer().register(GetCommand.class, 2);
copycatClient.serializer().register(DeleteCommand.class, 3);
copycatClient.connect(clusterMembers).join();
@@ -154,7 +154,7 @@ public void putClusterMeta() throws InterruptedException, ExecutionException {
copycatClient.submit(putCommand);
try {
- Object verifyMeta = copycatClient.submit(new GetQuery(IntpProcessMeta, "test"))
+ Object verifyMeta = copycatClient.submit(new GetCommand(IntpProcessMeta, "test"))
.get(3, TimeUnit.SECONDS);
logger.info("Cluster meta[" + IntpProcessMeta + "] : " + verifyMeta);
assertEquals(verifyMeta, meta);
@@ -174,7 +174,7 @@ public void deleteClusterMeta() throws InterruptedException, ExecutionException
.thenRun(() -> logger.info("deleteClusterMeta completed!"));
try {
- Object verifyMeta = copycatClient.submit(new GetQuery(IntpProcessMeta, "test"))
+ Object verifyMeta = copycatClient.submit(new GetCommand(IntpProcessMeta, "test"))
.get(3, TimeUnit.SECONDS);
logger.info("Cluster meta[" + IntpProcessMeta + "] : " + verifyMeta);
assertNull(verifyMeta);
From b0d93cb345e8474e9bfde1528b377e70c6b6c9fd Mon Sep 17 00:00:00 2001
From: liuxunorg <33611720@qq.com>
Date: Sat, 14 Jul 2018 11:25:18 +0800
Subject: [PATCH 4/4] [ZEPPELIN-3610] Cluster Raft module design
By using the Raft protocol, multiple Zeppelin-Server groups are built into a Zeppelin cluster, the cluster State Machine is maintained through the Raft protocol, and the services in the cluster are agreed upon. The Zeppelin-Server and Zeppelin-Interperter services and processes are stored in the Cluster MetaData. Metadata information;
[Feature]
* [x] added support for checking lambda syntax styles in the pom.xml file
* [x] add raft algorithm copycat jar
* [x] add Cluster State Machine
* [x] add state machine query command
* [x] add state machine delete command
* [x] add state machine put command
* https://issues.apache.org/jira/browse/ZEPPELIN-3610
CI pass
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes
---
zeppelin-interpreter/pom.xml | 65 ++++++
.../zeppelin/cluster/ClusterStateMachine.java | 88 ++++++++
.../zeppelin/cluster/DeleteCommand.java | 41 ++++
.../apache/zeppelin/cluster/GetCommand.java | 40 ++++
.../apache/zeppelin/cluster/PutCommand.java | 49 +++++
.../zeppelin/cluster/meta/ClusterMeta.java | 144 +++++++++++++
.../cluster/meta/ClusterMetaEntity.java | 57 ++++++
.../cluster/meta/ClusterMetaOperation.java | 25 +++
.../cluster/meta/ClusterMetaType.java | 25 +++
.../zeppelin/cluster/ClusterMetaTest.java | 189 ++++++++++++++++++
10 files changed, 723 insertions(+)
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/DeleteCommand.java
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/GetCommand.java
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/PutCommand.java
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java
create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
create mode 100644 zeppelin-zengine/src/test/java/org/apache/zeppelin/cluster/ClusterMetaTest.java
diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml
index 4ee10806922..3b1c270cfed 100644
--- a/zeppelin-interpreter/pom.xml
+++ b/zeppelin-interpreter/pom.xml
@@ -44,6 +44,8 @@
3.0.31.02.12.1
+ 1.2.9-SNAPSHOT
+ 1.2.12.3
@@ -98,6 +100,69 @@
slf4j-log4j12
+
+
+ io.atomix.copycat
+ copycat-server
+ ${copycat.version}
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ io.atomix.copycat
+ copycat-client
+ ${copycat.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ io.atomix.catalyst
+ catalyst-netty
+ ${catalyst-netty.version}
+
+
+ io.netty
+ netty-transport
+
+
+ io.netty
+ netty-buffer
+
+
+ io.netty
+ netty-common
+
+
+ io.netty
+ netty-resolver
+
+
+ io.netty
+ netty-handler
+
+
+ io.netty
+ netty-codec
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
org.apache.maven
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
new file mode 100644
index 00000000000..1f08d903284
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachine;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import org.apache.zeppelin.cluster.meta.ClusterMeta;
+
+/**
+ * Cluster State Machine
+ */
+public class ClusterStateMachine extends StateMachine implements Snapshottable {
+ private ClusterMeta clusterMeta = new ClusterMeta();
+
+ @Override
+ protected void configure(StateMachineExecutor executor) {
+ executor.register(PutCommand.class, this::put);
+ executor.register(GetCommand.class, this::get);
+ executor.register(DeleteCommand.class, this::delete);
+ }
+
+ public Object put(Commit commit) {
+ try {
+ clusterMeta.put(
+ commit.operation().type(),
+ commit.operation().key(), commit.operation().value());
+ } finally {
+ commit.close();
+ }
+ return null;
+ }
+
+ public Object get(Commit commit) {
+ try {
+ return clusterMeta.get(commit.operation().type(), commit.operation().key());
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Deletes the value.
+ */
+ private Object delete(Commit commit) {
+ try {
+ if (clusterMeta.contains(commit.operation().type(), commit.operation().key())) {
+ clusterMeta.remove(commit.operation().type(), commit.operation().key());
+ }
+ return null;
+ } finally {
+ commit.close();
+ }
+ }
+
+ // The following two methods come from interfaces that implement Snapshottable,
+ // This interface is implemented to enable the replication server to compress the
+ // local status log.
+ // And form a snapshot (snapshot), when copycat-server restarts,
+ // you can restore the status from the snapshot,
+ // If other servers are added, snapshots can be copied to other servers.
+ @Override
+ public void snapshot(SnapshotWriter writer) {
+ writer.writeObject(clusterMeta);
+ }
+
+ @Override
+ public void install(SnapshotReader reader) {
+ clusterMeta = reader.readObject();
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/DeleteCommand.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/DeleteCommand.java
new file mode 100644
index 00000000000..2597c648859
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/DeleteCommand.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2015 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+package org.apache.zeppelin.cluster;
+
+import io.atomix.copycat.Command;
+import org.apache.zeppelin.cluster.meta.ClusterMetaType;
+
+/**
+ * Command to delete a variable in cluster state machine
+ *
+ */
+public class DeleteCommand implements Command {
+ private final ClusterMetaType type;
+ private final String key;
+
+ public DeleteCommand(ClusterMetaType type, String key){
+ this.type = type;
+ this.key = key;
+ }
+
+ public ClusterMetaType type(){
+ return type;
+ }
+
+ public String key(){
+ return key;
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/GetCommand.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/GetCommand.java
new file mode 100644
index 00000000000..91de287a1e1
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/GetCommand.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2015 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+package org.apache.zeppelin.cluster;
+
+import io.atomix.copycat.Query;
+import org.apache.zeppelin.cluster.meta.ClusterMetaType;
+
+/**
+ * Command to query a variable in cluster state machine
+ */
+public class GetCommand implements Query {
+ private final ClusterMetaType type;
+ private final String key;
+
+ public GetCommand(ClusterMetaType type, String key){
+ this.type = type;
+ this.key = key;
+ }
+
+ public ClusterMetaType type(){
+ return type;
+ }
+
+ public String key(){
+ return key;
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/PutCommand.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/PutCommand.java
new file mode 100644
index 00000000000..f57e190c91b
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/PutCommand.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2015 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+package org.apache.zeppelin.cluster;
+
+import io.atomix.copycat.Command;
+import org.apache.zeppelin.cluster.meta.ClusterMetaType;
+
+import java.util.Map;
+
+/**
+ * Command to put a variable in cluster state machine
+ */
+public class PutCommand implements Command {
+ private final ClusterMetaType type;
+ private final String key;
+ private final Object value;
+
+ public PutCommand(ClusterMetaType type, String key, Map value){
+ this.type = type;
+ this.key = key;
+ this.value = value;
+ }
+
+ public ClusterMetaType type(){
+ return type;
+ }
+
+ public String key(){
+ return key;
+ }
+
+ public Object value(){
+ return value;
+ }
+
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
new file mode 100644
index 00000000000..d3979e52155
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.meta;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Metadata stores metadata information in a KV key-value pair
+ */
+public class ClusterMeta implements Serializable {
+ private static Logger logger = LoggerFactory.getLogger(ClusterMeta.class);
+
+ public static String SERVER_HOST = "SERVER_HOST";
+ public static String SERVER_PORT = "SERVER_PORT";
+ public static String SERVER_TSERVER_HOST = "SERVER_TSERVER_HOST";
+ public static String SERVER_TSERVER_PORT = "SERVER_TSERVER_PORT";
+ public static String INTP_TSERVER_HOST = "INTP_TSERVER_HOST";
+ public static String INTP_TSERVER_PORT = "INTP_TSERVER_PORT";
+ public static String CPU_CAPACITY = "CPU_CAPACITY";
+ public static String CPU_USED = "CPU_USED";
+ public static String MEMORY_CAPACITY = "MEMORY_CAPACITY";
+ public static String MEMORY_USED = "MEMORY_USED";
+ public static String HEARTBEAT = "HEARTBEAT";
+
+ // cluster_name = host:port
+ // cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...}
+ private Map> mapServerMeta = new HashMap<>();
+
+ // InterpreterGroupId -> {cluster_name,intp_tserver_host,...}
+ private Map> mapInterpreterMeta = new HashMap<>();
+
+ public void put(ClusterMetaType type, String key, Object value) {
+ Map mapValue = (Map) value;
+
+ switch (type) {
+ case ServerMeta:
+ // Because it may be partially updated metadata information
+ if (mapServerMeta.containsKey(key)) {
+ Map values = mapServerMeta.get(key);
+ values.putAll(mapValue);
+ } else {
+ mapServerMeta.put(key, mapValue);
+ }
+ break;
+ case IntpProcessMeta:
+ if (mapInterpreterMeta.containsKey(key)) {
+ Map values = mapInterpreterMeta.get(key);
+ values.putAll(mapValue);
+ } else {
+ mapInterpreterMeta.put(key, mapValue);
+ }
+ break;
+ }
+ }
+
+ public Object get(ClusterMetaType type, String key) {
+ Map values = null;
+
+ switch (type) {
+ case ServerMeta:
+ if (StringUtils.isEmpty(key)) {
+ return mapServerMeta;
+ }
+ if (mapServerMeta.containsKey(key)) {
+ values = mapServerMeta.get(key);
+ } else {
+ logger.warn("can not find key : {}", key);
+ }
+ break;
+ case IntpProcessMeta:
+ if (StringUtils.isEmpty(key)) {
+ return mapInterpreterMeta;
+ }
+ if (mapInterpreterMeta.containsKey(key)) {
+ values = mapInterpreterMeta.get(key);
+ } else {
+ logger.warn("can not find key : {}", key);
+ }
+ break;
+ }
+
+ return values;
+ }
+
+ public boolean contains(ClusterMetaType type, String key) {
+ boolean isExist = false;
+
+ switch (type) {
+ case ServerMeta:
+ if (mapServerMeta.containsKey(key)) {
+ isExist = true;
+ }
+ break;
+ case IntpProcessMeta:
+ if (mapInterpreterMeta.containsKey(key)) {
+ isExist = true;
+ }
+ break;
+ }
+
+ return isExist;
+ }
+
+ public Map remove(ClusterMetaType type, String key) {
+ switch (type) {
+ case ServerMeta:
+ if (mapServerMeta.containsKey(key)) {
+ return mapServerMeta.remove(key);
+ } else {
+ logger.warn("can not find key : {}", key);
+ }
+ break;
+ case IntpProcessMeta:
+ if (mapInterpreterMeta.containsKey(key)) {
+ return mapInterpreterMeta.remove(key);
+ } else {
+ logger.warn("can not find key : {}", key);
+ }
+ break;
+ }
+
+ return null;
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
new file mode 100644
index 00000000000..73ad785a616
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.meta;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Cluster operations, cluster types, encapsulation objects for keys and values
+ */
+public class ClusterMetaEntity {
+ private ClusterMetaOperation operation;
+ private ClusterMetaType type;
+ private String key;
+ private Map values = new HashMap<>();
+
+ public ClusterMetaEntity(ClusterMetaOperation operation, ClusterMetaType type,
+ String key, Map values) {
+ this.operation = operation;
+ this.type = type;
+ this.key = key;
+
+ if (null != values) {
+ this.values.putAll(values);
+ }
+ }
+
+ public ClusterMetaOperation getOperation() {
+ return operation;
+ }
+
+ public ClusterMetaType getMetaType() {
+ return type;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public Map getValues() {
+ return values;
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java
new file mode 100644
index 00000000000..c9d71f9de66
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.meta;
+
+/**
+ * Type of cluster metadata operation
+ */
+public enum ClusterMetaOperation {
+ PUT,
+ DELETE
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
new file mode 100644
index 00000000000..c6229bd6347
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.meta;
+
+/**
+ * Type of cluster metadata
+ */
+public enum ClusterMetaType {
+ ServerMeta,
+ IntpProcessMeta
+}
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/cluster/ClusterMetaTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/cluster/ClusterMetaTest.java
new file mode 100644
index 00000000000..3aceb43a4f5
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/cluster/ClusterMetaTest.java
@@ -0,0 +1,189 @@
+package org.apache.zeppelin.cluster;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.netty.NettyTransport;
+import io.atomix.copycat.client.ConnectionStrategies;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.copycat.client.RecoveryStrategies;
+import io.atomix.copycat.client.ServerSelectionStrategies;
+import io.atomix.copycat.server.CopycatServer;
+import io.atomix.copycat.server.storage.Storage;
+import io.atomix.copycat.server.storage.StorageLevel;
+import org.apache.zeppelin.cluster.meta.ClusterMeta;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.IntpProcessMeta;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ClusterMetaTest {
+ private static Logger logger = LoggerFactory.getLogger(ClusterMetaTest.class);
+
+ static CopycatServer copycatServer = null;
+ static CopycatClient copycatClient = null;
+
+ @BeforeClass
+ public static void initClusterEnv() throws IOException, InterruptedException {
+ logger.info("initClusterEnv >>>");
+
+ // Set the cluster IP and port
+ String zServerHost = RemoteInterpreterUtils.findAvailableHostAddress();
+ int zServerPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+ Address clusterMembers = new Address(zServerHost, zServerPort);
+
+ // mock cluster manager server
+ copycatServer = CopycatServer.builder(clusterMembers)
+ .withStateMachine(ClusterStateMachine::new)
+ .withTransport(new NettyTransport())
+ .withStorage(Storage.builder()
+ .withDirectory("copycat-logs")
+ .withStorageLevel(StorageLevel.MEMORY)
+ .withMaxSegmentSize(1024 * 1024 * 32)
+ .withMinorCompactionInterval(Duration.ofMinutes(1))
+ .withMajorCompactionInterval(Duration.ofMinutes(15))
+ .build())
+ .build();
+
+ // Registering the putCommand and GetCommand command classes
+ copycatServer.serializer().register(PutCommand.class, 1);
+ copycatServer.serializer().register(GetCommand.class, 2);
+ copycatServer.serializer().register(DeleteCommand.class, 3);
+
+ copycatServer.onStateChange(state -> {
+ if (state == CopycatServer.State.CANDIDATE) {
+ logger.info("CopycatServer.State CANDIDATE!");
+ } else if (state == CopycatServer.State.FOLLOWER) {
+ logger.info("CopycatServer.State FOLLOWER!");
+ } else if (state == CopycatServer.State.INACTIVE) {
+ logger.info("CopycatServer.State INACTIVE!");
+ } else if (state == CopycatServer.State.LEADER) {
+ logger.info("CopycatServer.State LEADER!");
+ } else if (state == CopycatServer.State.PASSIVE) {
+ logger.info("CopycatServer.State PASSIVE!");
+ } else {
+ logger.info("unknown CopycatServer.State!");
+ }
+ });
+
+ copycatServer.cluster().onJoin(member -> {
+ logger.info(member.address() + " joined the cluster.");
+ });
+
+ copycatServer.cluster().onLeave(member -> {
+ logger.info(member.address() + " left the cluster.");
+ });
+
+ copycatServer.bootstrap().join();
+
+ // initialization Raft client
+ copycatClient = CopycatClient.builder()
+ .withTransport(new NettyTransport())
+ .withConnectionStrategy(ConnectionStrategies.FIBONACCI_BACKOFF)
+ .withRecoveryStrategy(RecoveryStrategies.RECOVER)
+ .withServerSelectionStrategy(ServerSelectionStrategies.LEADER)
+ .withSessionTimeout(Duration.ofSeconds(15))
+ .build();
+
+ copycatClient.onStateChange(state -> {
+ if (state == CopycatClient.State.CONNECTED) {
+ logger.info("CopycatClient.State CONNECTED!");
+ } else if (state == CopycatClient.State.CLOSED) {
+ logger.info("CopycatClient.State CLOSED!");
+ } else if (state == CopycatClient.State.SUSPENDED) {
+ logger.info("CopycatClient.State SUSPENDED!");
+ } else {
+ logger.info("unknown CopycatClient.State " + state);
+ }
+ });
+
+ copycatClient.serializer().register(PutCommand.class, 1);
+ copycatClient.serializer().register(GetCommand.class, 2);
+ copycatClient.serializer().register(DeleteCommand.class, 3);
+
+ copycatClient.connect(clusterMembers).join();
+
+ // Waiting for cluster startup
+ Thread.sleep(3000);
+ }
+
+ @Test
+ public void putClusterMeta() throws InterruptedException, ExecutionException {
+ Map meta = new HashMap<>();
+ meta.put(ClusterMeta.SERVER_HOST, "SERVER_HOST");
+ meta.put(ClusterMeta.SERVER_PORT, "SERVER_PORT");
+ meta.put(ClusterMeta.SERVER_TSERVER_HOST, "SERVER_TSERVER_HOST");
+ meta.put(ClusterMeta.SERVER_TSERVER_PORT, "SERVER_TSERVER_PORT");
+ meta.put(ClusterMeta.INTP_TSERVER_HOST, "INTP_TSERVER_HOST");
+ meta.put(ClusterMeta.INTP_TSERVER_PORT, "INTP_TSERVER_PORT");
+ meta.put(ClusterMeta.CPU_CAPACITY, "CPU_CAPACITY");
+ meta.put(ClusterMeta.CPU_USED, "CPU_USED");
+ meta.put(ClusterMeta.MEMORY_CAPACITY, "MEMORY_CAPACITY");
+ meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED");
+ meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED");
+
+ PutCommand putCommand = new PutCommand(IntpProcessMeta, "test", meta);
+ copycatClient.submit(putCommand);
+
+ try {
+ Object verifyMeta = copycatClient.submit(new GetCommand(IntpProcessMeta, "test"))
+ .get(3, TimeUnit.SECONDS);
+ logger.info("Cluster meta[" + IntpProcessMeta + "] : " + verifyMeta);
+ assertEquals(verifyMeta, meta);
+ } catch (TimeoutException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void deleteClusterMeta() throws InterruptedException, ExecutionException {
+ logger.info("test deleteClusterMeta");
+ putClusterMeta();
+
+ DeleteCommand deleteCommand = new DeleteCommand(IntpProcessMeta, "test");
+ CompletableFuture futures = copycatClient.submit(deleteCommand);
+ CompletableFuture.allOf(futures)
+ .thenRun(() -> logger.info("deleteClusterMeta completed!"));
+
+ try {
+ Object verifyMeta = copycatClient.submit(new GetCommand(IntpProcessMeta, "test"))
+ .get(3, TimeUnit.SECONDS);
+ logger.info("Cluster meta[" + IntpProcessMeta + "] : " + verifyMeta);
+ assertNull(verifyMeta);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ } catch (TimeoutException e) {
+ e.printStackTrace();
+ }
+ }
+}