Skip to content

Commit

Permalink
Refactor context to support pluggable contexts and use the Netty even…
Browse files Browse the repository at this point in the history
…t loop.
  • Loading branch information
kuujo committed Jul 9, 2015
1 parent 9285832 commit 8661efc
Show file tree
Hide file tree
Showing 31 changed files with 376 additions and 251 deletions.
Expand Up @@ -16,7 +16,7 @@
package net.kuujo.copycat.cluster; package net.kuujo.copycat.cluster;


import net.kuujo.alleycat.Alleycat; import net.kuujo.alleycat.Alleycat;
import net.kuujo.copycat.util.ExecutionContext; import net.kuujo.copycat.util.Context;
import net.kuujo.copycat.util.Managed; import net.kuujo.copycat.util.Managed;


import java.util.Random; import java.util.Random;
Expand All @@ -30,8 +30,8 @@ public abstract class ManagedMember implements Member, Managed<Member> {
protected final MemberInfo info; protected final MemberInfo info;
protected Type type; protected Type type;
protected Status status = Status.DEAD; protected Status status = Status.DEAD;
protected ExecutionContext context; protected Context context;
protected Alleycat alleycat; protected Alleycat serializer;


protected ManagedMember(MemberInfo info, Type type) { protected ManagedMember(MemberInfo info, Type type) {
this.info = info; this.info = info;
Expand All @@ -41,16 +41,16 @@ protected ManagedMember(MemberInfo info, Type type) {
/** /**
* Sets the member context. * Sets the member context.
*/ */
void setContext(ExecutionContext context) { void setContext(Context context) {
this.context = context; this.context = context;
this.alleycat = context.alleycat(); this.serializer = context.serializer();
} }


/** /**
* Returns the current execution context. * Returns the current execution context.
*/ */
protected ExecutionContext getContext() { protected Context getContext() {
ExecutionContext context = ExecutionContext.currentContext(); Context context = Context.currentContext();
return context != null ? context : this.context; return context != null ? context : this.context;
} }


Expand Down
Expand Up @@ -16,7 +16,7 @@
package net.kuujo.copycat.cluster; package net.kuujo.copycat.cluster;


import net.kuujo.alleycat.Alleycat; import net.kuujo.alleycat.Alleycat;
import net.kuujo.copycat.util.ExecutionContext; import net.kuujo.copycat.util.Context;
import net.kuujo.copycat.util.Managed; import net.kuujo.copycat.util.Managed;


import java.util.*; import java.util.*;
Expand All @@ -43,7 +43,7 @@ public abstract class ManagedMembers implements Members, Managed<Members> {


protected ManagedMembers(Collection<? extends ManagedMember> remoteMembers, Alleycat alleycat) { protected ManagedMembers(Collection<? extends ManagedMember> remoteMembers, Alleycat alleycat) {
remoteMembers.forEach(m -> { remoteMembers.forEach(m -> {
((ManagedMember)m).setContext(new ExecutionContext("copycat-cluster-" + m.id(), alleycat)); ((ManagedMember)m).setContext(Context.createContext("copycat-cluster-%d", alleycat));
this.members.put(m.id(), m); this.members.put(m.id(), m);
this.sortedMembers.add(m); this.sortedMembers.add(m);
}); });
Expand Down
118 changes: 118 additions & 0 deletions cluster/src/main/java/net/kuujo/copycat/util/Context.java
@@ -0,0 +1,118 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.util;

import net.kuujo.alleycat.Alleycat;

import java.util.ServiceLoader;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* Execution context.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public abstract class Context implements Executor, AutoCloseable {
private static final ContextFactory factory = ServiceLoader.load(ContextFactory.class).iterator().next();

/**
* Creates a new context.
*
* @param name The context name.
* @param serializer The Alleycat serializer.
* @return A new context.
*/
public static Context createContext(String name, Alleycat serializer) {
return factory.createContext(name, serializer);
}

/**
* Returns the current context.
*
* @return The current context.
*/
public static Context currentContext() {
Thread thread = Thread.currentThread();
return thread instanceof CopycatThread ? ((CopycatThread) thread).getContext() : null;
}

private final String name;
private final Alleycat serializer;

protected Context(String name, CopycatThread thread, Alleycat serializer) {
if (thread == null)
throw new NullPointerException("thread cannot be null");
thread.setName(name);
thread.setContext(this);
this.name = name;
this.serializer = serializer;
}

/**
* Checks that the current thread is the correct context thread.
*/
public void checkThread() {
Thread thread = Thread.currentThread();
if (!(thread instanceof CopycatThread && ((CopycatThread) thread).getContext() == this)) {
throw new IllegalStateException("not running on the correct thread");
}
}

/**
* Returns the context name.
*
* @return The context name.
*/
public String name() {
return name;
}

/**
* Returns the context serializer.
*
* @return The context serializer.
*/
public Alleycat serializer() {
return serializer;
}

/**
* Schedules a runnable on the context.
*
* @param runnable The runnable to schedule.
* @param delay The delay at which to schedule the runnable.
* @param unit The time unit.
*/
public abstract ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit unit);

/**
* Schedules a runnable at a fixed rate on the context.
*
* @param runnable The runnable to schedule.
* @param delay The delay at which to schedule the runnable.
* @param unit The time unit.
*/
public abstract ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long delay, long rate, TimeUnit unit);

/**
* Closes the context.
*/
@Override
public abstract void close();

}
Expand Up @@ -15,26 +15,22 @@
*/ */
package net.kuujo.copycat.util; package net.kuujo.copycat.util;


import net.kuujo.alleycat.Alleycat;

/** /**
* Context thread checker. * Context factory.
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class ThreadChecker { public interface ContextFactory {
private final ExecutionContext context;

public ThreadChecker(ExecutionContext context) {
this.context = context;
}


/** /**
* Checks that the current thread is the correct context thread. * Creates a new execution context.
*
* @param name The context name.
* @param serializer The Alleycat serializer.
* @return The execution context.
*/ */
public void checkThread() { Context createContext(String name, Alleycat serializer);
Thread thread = Thread.currentThread();
if (!(thread instanceof net.kuujo.copycat.util.CopycatThread && ((net.kuujo.copycat.util.CopycatThread) thread).getContext() == context)) {
throw new IllegalStateException("not running on the correct thread");
}
}


} }
Expand Up @@ -23,7 +23,7 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class CopycatThread extends Thread { public class CopycatThread extends Thread {
private WeakReference<ExecutionContext> context; private WeakReference<Context> context;


public CopycatThread(Runnable target, String name) { public CopycatThread(Runnable target, String name) {
super(target, name); super(target, name);
Expand All @@ -32,14 +32,14 @@ public CopycatThread(Runnable target, String name) {
/** /**
* Sets the thread context. * Sets the thread context.
*/ */
public void setContext(ExecutionContext context) { public void setContext(Context context) {
this.context = new WeakReference<>(context); this.context = new WeakReference<>(context);
} }


/** /**
* Returns the thread context. * Returns the thread context.
*/ */
public ExecutionContext getContext() { public Context getContext() {
return context.get(); return context.get();
} }


Expand Down
128 changes: 0 additions & 128 deletions cluster/src/main/java/net/kuujo/copycat/util/ExecutionContext.java

This file was deleted.

Expand Up @@ -20,7 +20,7 @@
import net.kuujo.alleycat.Alleycat; import net.kuujo.alleycat.Alleycat;
import net.kuujo.alleycat.ServiceLoaderResolver; import net.kuujo.alleycat.ServiceLoaderResolver;
import net.kuujo.copycat.ConfigurationException; import net.kuujo.copycat.ConfigurationException;
import net.kuujo.copycat.util.ExecutionContext; import net.kuujo.copycat.util.Context;


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
Expand Down Expand Up @@ -55,7 +55,7 @@ public NettyCluster(EventLoopGroup eventLoopGroup, NettyLocalMember localMember,
protected ManagedRemoteMember createMember(MemberInfo info) { protected ManagedRemoteMember createMember(MemberInfo info) {
ManagedRemoteMember remoteMember = new NettyRemoteMember((NettyMemberInfo) info, Member.Type.PASSIVE) ManagedRemoteMember remoteMember = new NettyRemoteMember((NettyMemberInfo) info, Member.Type.PASSIVE)
.setEventLoopGroup(eventLoopGroup); .setEventLoopGroup(eventLoopGroup);
remoteMember.setContext(new ExecutionContext(String.format("copycat-cluster-%d", info.id()), alleycat)); remoteMember.setContext(Context.createContext(String.format("copycat-cluster-%d", info.id()), alleycat));
return remoteMember; return remoteMember;
} }


Expand Down

0 comments on commit 8661efc

Please sign in to comment.