Skip to content

Commit

Permalink
#502 make build pass
Browse files Browse the repository at this point in the history
  • Loading branch information
nkorange committed Jan 16, 2019
1 parent 425a801 commit b5f09f6
Show file tree
Hide file tree
Showing 30 changed files with 215 additions and 98 deletions.
Expand Up @@ -18,22 +18,78 @@
import com.alibaba.nacos.api.exception.NacosException;

/**
* Consistence service for all implementations to derive.
* <p>
* We announce this consistency service to decouple the specific consistency implementation with business logic.
* User should not be aware of what consistency protocol is being used.
* <p>
* In this way, we also provide space for user to extend the underlying consistency protocols, as long as they
* obey our consistency baseline.
*
* @author <a href="mailto:zpf.073@gmail.com">nkorange</a>
* @since 1.0.0
*/
public interface ConsistencyService {

/**
* Put a data related to a key to Nacos cluster
*
* @param key key of data
* @param value value of data
* @throws NacosException
*/
void put(Object key, Object value) throws NacosException;

/**
* Remove a data from Nacos cluster
*
* @param key key of data
* @throws NacosException
*/
void remove(Object key) throws NacosException;

/**
* Get a data from Nacos cluster
*
* @param key key of data
* @return data related to the key
* @throws NacosException
*/
Object get(Object key) throws NacosException;

/**
* Listen for changes of a data
*
* @param key key of data
* @param listener callback of data change
* @throws NacosException
*/
void listen(Object key, DataListener listener) throws NacosException;

/**
* Cancel listening of a data
*
* @param key key of data
* @param listener callback of data change
* @throws NacosException
*/
void unlisten(Object key, DataListener listener) throws NacosException;

/**
* Is the local server responsible for a data.
* <p>
* Any write operation to a data in a server not responsible for the data is refused.
*
* @param key key of data
* @return true if the local server is responsible for the data
*/
boolean isResponsible(Object key);

/**
* Get the responsible server for a data
*
* @param key key of data
* @return responsible server for the data
*/
String getResponsibleServer(Object key);
}
Expand Up @@ -16,6 +16,8 @@
package com.alibaba.nacos.naming.consistency;

/**
* Data listener public interface
*
* @author nacos
*/
public interface DataListener {
Expand Down
Expand Up @@ -16,19 +16,21 @@
package com.alibaba.nacos.naming.consistency;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.naming.consistency.ap.ApConsistencyService;
import com.alibaba.nacos.naming.consistency.cp.CpConsistencyService;
import com.alibaba.nacos.naming.consistency.ephemeral.ApConsistencyService;
import com.alibaba.nacos.naming.consistency.persistent.CpConsistencyService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

/**
* Publish execute delegate
* Publish execution delegate
*
* @author <a href="mailto:zpf.073@gmail.com">nkorange</a>
* @since 1.0.0
*/
@Component
public class PublishDelegate implements ConsistencyService {
@Component("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {

@Autowired
private CpConsistencyService cpConsistencyService;
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ap;
package com.alibaba.nacos.naming.consistency.ephemeral;

import com.alibaba.nacos.naming.consistency.ConsistencyService;

Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ap.renew;
package com.alibaba.nacos.naming.consistency.ephemeral.partialrenew;

/**
* @author <a href="mailto:zpf.073@gmail.com">nkorange</a>
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.cp;
package com.alibaba.nacos.naming.consistency.persistent;

import com.alibaba.nacos.naming.consistency.ConsistencyService;

Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.cp.simpleraft;
package com.alibaba.nacos.naming.consistency.persistent.simpleraft;

import java.util.concurrent.atomic.AtomicLong;

Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.cp.simpleraft;
package com.alibaba.nacos.naming.consistency.persistent.simpleraft;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.cp.simpleraft;
package com.alibaba.nacos.naming.consistency.persistent.simpleraft;

import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.naming.misc.HttpClient;
Expand Down
@@ -1,8 +1,8 @@
package com.alibaba.nacos.naming.consistency.cp.simpleraft;
package com.alibaba.nacos.naming.consistency.persistent.simpleraft;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.naming.consistency.DataListener;
import com.alibaba.nacos.naming.consistency.cp.CpConsistencyService;
import com.alibaba.nacos.naming.consistency.persistent.CpConsistencyService;
import com.alibaba.nacos.naming.misc.Loggers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand All @@ -14,7 +14,7 @@
* @since 1.0.0
*/
@Component
public class RaftConsistencyService implements CpConsistencyService {
public class RaftConsistencyServiceImpl implements CpConsistencyService {

@Autowired
private RaftCore raftCore;
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.cp.simpleraft;
package com.alibaba.nacos.naming.consistency.persistent.simpleraft;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
Expand All @@ -27,6 +27,7 @@
import com.ning.http.client.Response;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.javatuples.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -103,6 +104,9 @@ public Thread newThread(Runnable r) {
@Autowired
private RaftProxy raftProxy;

@Autowired
private RaftStore raftStore;

public volatile Notifier notifier = new Notifier();

@PostConstruct
Expand All @@ -116,7 +120,15 @@ public void init() throws Exception {

long start = System.currentTimeMillis();

RaftStore.load();
ConcurrentMap<String, Datum> datumMap = raftStore.loadDatums();
if (datumMap != null && !datumMap.isEmpty()) {
datums = datumMap;
for (Map.Entry<String, Datum> entry : datumMap.entrySet()) {
notifier.addTask(entry.getValue(), ApplyAction.CHANGE);
}
}

setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));

Loggers.RAFT.info("cache loaded, peer count: {}, datum count: {}, current term: {}",
peers.size(), datums.size(), peers.getTerm());
Expand All @@ -126,7 +138,6 @@ public void init() throws Exception {
break;
}
Thread.sleep(1000L);
System.out.println(notifier.tasks.size());
}

Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
Expand Down Expand Up @@ -311,7 +322,7 @@ public void onPublish(Datum datum, RaftPeer source) throws Exception {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
RaftStore.updateTerm(local.term.get());
raftStore.updateTerm(local.term.get());

notifier.addTask(datum, ApplyAction.CHANGE);

Expand Down Expand Up @@ -351,7 +362,7 @@ public void onDelete(Datum datum, RaftPeer source) throws Exception {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}

RaftStore.updateTerm(local.term.get());
raftStore.updateTerm(local.term.get());
}

}
Expand Down Expand Up @@ -708,7 +719,7 @@ public Integer onCompleted(Response response) throws Exception {
local.term.addAndGet(100);
}

RaftStore.updateTerm(local.term.get());
raftStore.updateTerm(local.term.get());
}

Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.cp.simpleraft;
package com.alibaba.nacos.naming.consistency.persistent.simpleraft;

import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.cp.simpleraft;
package com.alibaba.nacos.naming.consistency.persistent.simpleraft;

import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.misc.HttpClient;
Expand Down

0 comments on commit b5f09f6

Please sign in to comment.