Skip to content

Commit

Permalink
Merge pull request #7 from smalldave/akka-cluster-controller
Browse files Browse the repository at this point in the history
Implement Remote Testkit controller
  • Loading branch information
Aaronontheweb committed Sep 19, 2014
2 parents fdb7255 + c225b3b commit 7aecf3b
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/ClusterRemoteWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected override void OnReceive(object message)
/// responsibility for watchees on that node already handled
/// by base RemoteWatcher.
/// </summary>
public void TakeOverResponsibility(Address address)
private void TakeOverResponsibility(Address address)
{
foreach (var watching in Watching)
{
Expand Down
9 changes: 1 addition & 8 deletions src/core/Akka.Remote.TestKit/Akka.Remote.TestKit.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@
</Reference>
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="BarrierCoordinator.cs" />
Expand All @@ -62,6 +58,7 @@
<Compile Include="Extension.cs" />
<Compile Include="MsgDecoder.cs" />
<Compile Include="MsgEncoder.cs" />
<Compile Include="Player.cs" />
<Compile Include="Proto\ProtobufDecoder.cs" />
<Compile Include="Proto\ProtobufEncoder.cs" />
<Compile Include="RemoteConnection.cs" />
Expand Down Expand Up @@ -97,10 +94,6 @@
<Project>{ea4ff8fd-7c53-49c8-b9aa-02e458b3e6a7}</Project>
<Name>Akka.Remote</Name>
</ProjectReference>
<ProjectReference Include="..\Akka.TestKit\Akka.TestKit.csproj">
<Project>{0d3cbad0-bbdb-43e5-afc4-ed1d3ecdc224}</Project>
<Name>Akka.TestKit</Name>
</ProjectReference>
<ProjectReference Include="..\Akka\Akka.csproj">
<Project>{5deddf90-37f0-48d3-a0b0-a5cbd8a7e377}</Project>
<Name>Akka</Name>
Expand Down
23 changes: 19 additions & 4 deletions src/core/Akka.Remote.TestKit/Conductor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,21 @@ public ActorRef Controller
}
}

//TODO: INode should probably be IPEndPoint
public async Task<INode> StartController(int participants, RoleName name, INode controllerPort)
{
if(_controller != null) throw new Exception("TestConductorServer was already started");
_controller = _system.ActorOf(new Props(typeof (Controller), new object[] {participants, controllerPort}),
"controller");
//TODO: Need to review this async stuff
var node = await _controller.Ask<INode>(TestKit.Controller.GetSockAddr.Instance).ConfigureAwait(false);
await StartClient(name, node).ConfigureAwait(false);
return node;
}

public void StartController()
public Task<INode> SockAddr()
{
//TODO: This will be different if using app domains and remoting. Need to have more of a look at the code to work out if that is the right direction
throw new NotImplementedException();
return _controller.Ask<INode>(TestKit.Controller.GetSockAddr.Instance);
}

/// <summary>
Expand Down Expand Up @@ -259,11 +269,16 @@ public void OnException(Exception ex, IConnection erroredChannel)
}
}

class ServerFSM
class ServerFSM : UntypedActor
{
public ServerFSM()
{
throw new NotImplementedException();
}

protected override void OnReceive(object message)
{
throw new NotImplementedException();
}
}
}
185 changes: 183 additions & 2 deletions src/core/Akka.Remote.TestKit/Controller.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using System;
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Event;
using Helios.Net;
using Helios.Topology;

namespace Akka.Remote.TestKit
{
Expand Down Expand Up @@ -74,7 +77,6 @@ public static GetNodes Instance
}
}

//TODO: Necessary?
public class GetSockAddr
{
private GetSockAddr() { }
Expand Down Expand Up @@ -169,11 +171,190 @@ public CreateServerFSM(IConnection channel)
public IConnection Channel { get; private set; }
}

int _initialParticipants;
readonly TestConductorSettings _settings = TestConductor.Get(Context.System).Settings;
readonly RemoteConnection _connection;
readonly ActorRef _barrier;
ImmutableDictionary<RoleName, NodeInfo> _nodes =
ImmutableDictionary.Create<RoleName, NodeInfo>();
// map keeping unanswered queries for node addresses (enqueued upon GetAddress, serviced upon NodeInfo)
ImmutableDictionary<RoleName, ImmutableHashSet<ActorRef>> _addrInterest =
ImmutableDictionary.Create<RoleName, ImmutableHashSet<ActorRef>>();
int _generation = 1;

public Controller(int initialParticipants, INode controllerPort)
{
_connection = RemoteConnection.Create(Role.Server, controllerPort, _settings.ServerSocketWorkerPoolSize,
new ConductorHandler(Self, Logging.GetLogger(Context.System, typeof (ConductorHandler))));
_barrier = Context.ActorOf(Props.Create<BarrierCoordinator>(), "barriers");
_initialParticipants = initialParticipants;
}

/// <summary>
/// Supervision of the BarrierCoordinator means to catch all his bad emotions
/// and sometimes console him (BarrierEmpty, BarrierTimeout), sometimes tell
/// him to hate the world (WrongBarrier, DuplicateNode, ClientLost). The latter shall help
/// terminate broken tests as quickly as possible (i.e. without awaiting
/// BarrierTimeouts in the players).
/// </summary>
/// <returns></returns>
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(e =>
{
var barrierTimeout = e as BarrierCoordinator.BarrierTimeout;
if (barrierTimeout != null) return FailBarrier(barrierTimeout.BarrierData);
var failedBarrier = e as BarrierCoordinator.FailedBarrier;
if (failedBarrier != null) return FailBarrier(failedBarrier.BarrierData);
var barrierEmpty = e as BarrierCoordinator.BarrierEmpty;
if(barrierEmpty != null) return Directive.Resume;
var wrongBarrier = e as BarrierCoordinator.WrongBarrier;
if (wrongBarrier != null)
{
wrongBarrier.Client.Tell(new ToClient<BarrierResult>(new BarrierResult(wrongBarrier.Barrier, false)));
return FailBarrier(wrongBarrier.BarrierData);
}
var clientLost = e as BarrierCoordinator.ClientLost;
if (clientLost != null) return FailBarrier(clientLost.BarrierData);
var duplicateNode = e as BarrierCoordinator.DuplicateNode;
if (duplicateNode != null) return FailBarrier(duplicateNode.BarrierData);
throw new InvalidOperationException(String.Format("Cannot process exception of type {0}", e.GetType()));
});
}

private Directive FailBarrier(BarrierCoordinator.Data data)
{
foreach(var c in data.Arrived) c.Tell(new ToClient<BarrierResult>(new BarrierResult(data.Barrier, false)));
return Directive.Restart;
}

//TODO: Logging receieve?
protected override void OnReceive(object message)
{
throw new NotImplementedException();
var createServerFSM = message as CreateServerFSM;
if (createServerFSM != null)
{
var channel = createServerFSM.Channel;
var host = channel.RemoteHost;
var name = host.ToEndPoint() + ":" + host.Port + "-server" + _generation++;
Sender.Tell(
Context.ActorOf(
new Props(typeof (ServerFSM), new object[] {Self, channel}).WithDeploy(Deploy.Local), name));
return;
}
var nodeInfo = message as NodeInfo;
if (nodeInfo != null)
{
_barrier.Forward(nodeInfo);
if (_nodes.ContainsKey(nodeInfo.Name))
{
if (_initialParticipants > 0)
{
foreach (var ni in _nodes.Values)
ni.FSM.Tell(new ToClient<BarrierResult>(new BarrierResult("initial startup", false)));
_initialParticipants = 0;
}
nodeInfo.FSM.Tell(new ToClient<BarrierResult>(new BarrierResult("initial startup", false)));
}
else
{
_nodes = _nodes.Add(nodeInfo.Name, nodeInfo);
if(_initialParticipants < 0) nodeInfo.FSM.Tell(new ToClient<Done>(Done.Instance));
else if (_nodes.Count == _initialParticipants)
{
foreach (var ni in _nodes.Values) ni.FSM.Tell(new ToClient<Done>(Done.Instance));
_initialParticipants = 0;
}
if (_addrInterest.ContainsKey(nodeInfo.Name))
{
foreach(var a in _addrInterest[nodeInfo.Name]) a.Tell(new ToClient<AddressReply>(new AddressReply(nodeInfo.Name, nodeInfo.Addr)));
_addrInterest = _addrInterest.Remove(nodeInfo.Name);
}
}
}
var clientDisconnected = message as ClientDisconnected;
if (clientDisconnected != null)
{
_nodes = _nodes.Remove(clientDisconnected.Name);
_barrier.Forward(clientDisconnected);
}
if (message is IServerOp)
{
if (message is EnterBarrier)
{
_barrier.Forward(message);
return;
}
if (message is FailBarrier)
{
_barrier.Forward(message);
return;
}
var getAddress = message as GetAddress;
if (getAddress != null)
{
var node = getAddress.Node;
if (_nodes.ContainsKey(node))
Sender.Tell(new ToClient<AddressReply>(new AddressReply(node, _nodes[node].Addr)));
else
{
ImmutableHashSet<ActorRef> existing;
_addrInterest = _addrInterest.SetItem(node,
(_addrInterest.TryGetValue(node, out existing)
? existing
: ImmutableHashSet.Create<ActorRef>()
).Add(Sender));
}
return;
}
if (message is Done) return; //FIXME what should happen?
}
if (message is ICommandOp)
{
var throttle = message as Throttle;
if (throttle != null)
{
var t = _nodes[throttle.Target];
_nodes[throttle.Node].FSM.Forward(new ToClient<ThrottleMsg>(new ThrottleMsg(t.Addr, throttle.Direction, throttle.RateMBit)));
return;
}
var disconnect = message as Disconnect;
if (disconnect != null)
{
var t = _nodes[disconnect.Target];
_nodes[disconnect.Node].FSM.Forward((new ToClient<DisconnectMsg>(new DisconnectMsg(t.Addr, disconnect.Abort))));
return;
}
var terminate = message as Terminate;
if (terminate != null)
{
_barrier.Tell(new BarrierCoordinator.RemoveClient(terminate.Node));
_nodes[terminate.Node].FSM.Forward(new ToClient<TerminateMsg>(new TerminateMsg(terminate.ShutdownOrExit)));
_nodes = _nodes.Remove(terminate.Node);
return;
}
var remove = message as Remove;
if (remove != null)
{
_barrier.Tell(new BarrierCoordinator.RemoveClient(remove.Node));
return;
}
}
if (message is GetNodes)
{
Sender.Tell(_nodes.Keys);
return;
}
if (message is GetSockAddr)
{
Sender.Tell(_connection.RemoteHost);
return;
}
}

protected override void PostStop()
{
RemoteConnection.Shutdown(_connection);
}
}
}
6 changes: 3 additions & 3 deletions src/core/Akka.Remote.TestKit/DataTypes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public override int GetHashCode()
}
}

sealed class Throttle
sealed class Throttle : ICommandOp
{
readonly RoleName _node;
readonly RoleName _target;
Expand Down Expand Up @@ -418,7 +418,7 @@ public override int GetHashCode()
}
}

sealed class ThrottleMsg
sealed class ThrottleMsg : IConfirmedClientOp, INetworkOp
{
readonly Address _target;
readonly ThrottleTransportAdapter.Direction _direction;
Expand Down Expand Up @@ -796,7 +796,7 @@ public static Done Instance
}
}

sealed class Remove
sealed class Remove : ICommandOp
{
readonly RoleName _node;

Expand Down
3 changes: 3 additions & 0 deletions src/core/Akka.Remote.TestKit/Extension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@ public static TestConductor Get(ActorSystem system)
/// </summary>
public Address Address { get { return _address; } }

readonly ExtendedActorSystem _system;

public TestConductor(ExtendedActorSystem system)
{
_settings = new TestConductorSettings(system.Settings.Config.GetConfig("akka.testconductor"));
_transport = system.Provider.AsInstanceOf<RemoteActorRefProvider>().Transport;
_address = _transport.DefaultAddress;
_system = system;
}
}

Expand Down
20 changes: 20 additions & 0 deletions src/core/Akka.Remote.TestKit/Player.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using System.Threading.Tasks;
using Helios.Topology;

namespace Akka.Remote.TestKit
{
/// <summary>
/// The Player is the client component of the
/// test conductor extension. It registers with
/// the conductor's controller
/// in order to participate in barriers and enable network failure injection
/// </summary>
partial class TestConductor //Player trait in JVM version
{
public Task<Done> StartClient(RoleName name, INode controllerAddr)
{
throw new NotImplementedException();
}
}
}
7 changes: 7 additions & 0 deletions src/core/Akka.Remote.TestKit/RemoteConnection.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Net;
using Akka.Actor;
using Akka.Remote.TestKit.Proto;
using Akka.Remote.Transport.Helios;
using Helios.Buffers;
Expand Down Expand Up @@ -113,6 +114,12 @@ public static RemoteConnection Create(Role role, INode socketAddress, int poolSi
}
}

public static void Shutdown(RemoteConnection connection)
{
//TODO: Correct?
connection.Close();
}

#endregion
}
}

0 comments on commit 7aecf3b

Please sign in to comment.