Skip to content

Commit

Permalink
Add UserLoginWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
veblush committed Jul 11, 2016
1 parent 0a610f6 commit d237a11
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 158 deletions.
15 changes: 10 additions & 5 deletions src/TalkServer.Tests/ClusterContextFixture.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
using System;
using System.Collections.Generic;
using Akka.Actor;
using Akka.Interfaced;
using Akka.Cluster.Utility;
using Akka.Interfaced;
using Domain;

namespace TalkServer
{
public class ClusterContextFixture : IDisposable
{
public ClusterNodeContext Context { get; private set; }

private List<IActorRef> _actors = new List<IActorRef>();

public ClusterContextFixture()
{
// force interface assembly to be loaded before creating ProtobufSerializer

var type = typeof(IUser);
if (type == null)
throw new InvalidProgramException("!");
}

public void Initialize(ActorSystem system)
{
DeadRequestProcessingActor.Install(system);

var context = new ClusterNodeContext { System = system };

context.ClusterActorDiscovery = system.ActorOf(Props.Create(
Expand All @@ -30,7 +35,7 @@ public void Initialize(ActorSystem system)

context.UserTableContainer = new DistributedActorTableContainerRef<string>(system.ActorOf(
Props.Create(() => new DistributedActorTableContainer<string>(
"User", context.ClusterActorDiscovery, null, null, InterfacedPoisonPill.Instance)),
"User", context.ClusterActorDiscovery, typeof(UserActorFactory), new object[] { context }, InterfacedPoisonPill.Instance)),
"UserTableContainer"));

context.RoomTable = new DistributedActorTableRef<string>(system.ActorOf(
Expand Down
15 changes: 8 additions & 7 deletions src/TalkServer/App.config
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,18 @@
name = ChatCluster
runner {
default = [
{ port = 3001, roles = [ "RoomTable", "UserTable", "BotTable", [ "Room", { port = 9011 } ], [ "User", { port = 9001 } ], "Bot" ] }
{ port = 3001, roles = [ "RoomTable", "UserTable", "BotTable", [ "Room", { port = 9021 } ], [ "User", { port = 9011 } ], [ "UserLogin", { port = 9001 } ], "Bot" ] }
]
cluster = [
{ port = 3001, roles = [ "RoomTable" ] }
{ port = 3002, roles = [ "UserTable" ] }
{ port = 3003, roles = [ "BotTable" ] }
{ port = 3011, roles = [ [ "Room", { port = 9011 } ] ] }
{ port = 3012, roles = [ [ "Room", { port = 9012 } ] ] }
{ port = 3021, roles = [ [ "User", { port = 9001 } ] ] }
{ port = 3022, roles = [ [ "User", { port = 9002 } ] ] }
{ port = 3031, roles = [ "Bot", [ "User", { } ] ] }
{ port = 3011, roles = [ [ "Room", { port = 9021 } ] ] }
{ port = 3012, roles = [ [ "Room", { port = 9022 } ] ] }
{ port = 3021, roles = [ [ "User", { port = 9011 } ] ] }
{ port = 3022, roles = [ [ "User", { port = 9012 } ] ] }
{ port = 3031, roles = [ [ "UserLogin", { port = 9001 } ] ] }
{ port = 3041, roles = [ "Bot", [ "User", { } ] ] }
]
}
}
Expand Down Expand Up @@ -80,7 +81,7 @@
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-8.0.0.0" newVersion="8.0.0.0" />
<bindingRedirect oldVersion="0.0.0.0-9.0.0.0" newVersion="9.0.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
Expand Down
100 changes: 44 additions & 56 deletions src/TalkServer/BotActor.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Net;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Interfaced;
Expand All @@ -9,10 +10,11 @@

namespace TalkServer
{
public class BotActor : InterfacedActor, IActorBoundChannelSync, IUserEventObserverAsync, IRoomObserverAsync, IBotService
public class BotActor : InterfacedActor, IUserEventObserverAsync, IRoomObserverAsync, IBotService
{
private readonly ILog _log;
private readonly ClusterNodeContext _clusterContext;
private ActorBoundChannelRef _channel;
private BotPattern.Context _patternContext;
private BotPattern _pattern;
private UserRef _user;
Expand All @@ -34,6 +36,8 @@ public BotActor(ClusterNodeContext clusterContext, string name, string roomName,
{
_log = LogManager.GetLogger($"Bot({name})");
_clusterContext = clusterContext;
_channel = Context.InterfacedActorOf(() => new ActorBoundDummyChannel()).Cast<ActorBoundChannelRef>();

Self.Tell(new StartMessage { UserId = name, RoomName = roomName, PatternType = patternType });
}

Expand All @@ -44,6 +48,8 @@ protected override void PostStop()
if (_user != null)
_user.CastToIActorRef().Tell(InterfacedPoisonPill.Instance);

_channel.WithNoReply().Close();

base.PostStop();
}

Expand All @@ -65,81 +71,63 @@ private async Task Handle(StartMessage m)

// login by itself

var userLogin = Context.ActorOf(Props.Create(() => new UserLoginActor(_clusterContext, Self.Cast<ActorBoundChannelRef>(), new IPEndPoint(IPAddress.None, 0))))
.Cast<UserLoginRef>().WithRequestWaiter(this);
await userLogin.Login(m.UserId, m.UserId, CreateObserver<IUserEventObserver>());
await LoginUser(m.UserId);
if (_user == null)
throw new InvalidOperationException();

// enter room

await _user.EnterRoom(m.RoomName, CreateObserver<IRoomObserver>());
var roomInfo = await _user.EnterRoom(m.RoomName, CreateObserver<IRoomObserver>());
_occupant = ((OccupantRef)roomInfo.Item1).WithRequestWaiter(this);

// start bot

await _pattern.OnStart();
}

[MessageHandler]
private Task Handle(TimerMessage m)
{
return (_pattern != null) ? _pattern.OnTimer() : Task.FromResult(true);
}

void IActorBoundChannelSync.SetTag(object tag)
{
}

InterfacedActorRef IActorBoundChannelSync.BindActor(InterfacedActorRef actor, ActorBindingFlags bindingFlags)
private async Task LoginUser(string userId)
{
var targetActor = ((AkkaActorTarget)actor.Target).Actor;

var boundActor = ((IActorBoundChannelSync)this).BindActor(targetActor, new TaggedType[] { actor.InterfaceType }, bindingFlags);
if (boundActor == null)
return null;

var actorRef = (InterfacedActorRef)Activator.CreateInstance(actor.GetType());
InterfacedActorRefModifier.SetTarget(actorRef, boundActor);
return actorRef;
}

BoundActorTarget IActorBoundChannelSync.BindActor(IActorRef actor, TaggedType[] types, ActorBindingFlags bindingFlags)
{
// this actor doesn't work as a normal channel.
// it just hooks binding event and save those actors to use later.

if (types[0].Type == typeof(IUser))
IActorRef user;
try
{
_user = actor.Cast<UserRef>().WithRequestWaiter(this);
return new BoundActorTarget(0);
var observer = CreateObserver<IUserEventObserver>();
user = Context.System.ActorOf(
Props.Create(() => new UserActor(_clusterContext, userId, observer)),
"user_" + userId);
}

if (types[0].Type == typeof(IOccupant))
catch (Exception e)
{
_occupant = actor.Cast<OccupantRef>().WithRequestWaiter(this);
return new BoundActorTarget(0);
_log.Error("Failed to create user.", e);
return;
}

_log.ErrorFormat("Unexpected bind type. (InterfaceType={0}, Actor={1})",
types[0].Type.FullName, actor);
return null;
}

bool IActorBoundChannelSync.UnbindActor(IActorRef actor)
{
return true;
}
var registered = false;
for (int i = 0; i < 10; i++)
{
var reply = await _clusterContext.UserTableContainer.Add(userId, user);
if (reply.Added)
{
registered = true;
break;
}
await Task.Delay(200);
}

bool IActorBoundChannelSync.BindType(IActorRef actor, TaggedType[] types)
{
return true;
}
if (registered == false)
{
_log.Error("Failed to register user.");
user.Tell(InterfacedPoisonPill.Instance);
return;
}

bool IActorBoundChannelSync.UnbindType(IActorRef actor, Type[] types)
{
return true;
await _channel.BindActor(user, new TaggedType[] { typeof(IUser) }, ActorBindingFlags.OpenThenNotification);
_user = user.Cast<UserRef>().WithRequestWaiter(this);
}

void IActorBoundChannelSync.Close()
[MessageHandler]
private Task Handle(TimerMessage m)
{
return (_pattern != null) ? _pattern.OnTimer() : Task.FromResult(true);
}

Task IUserEventObserverAsync.Invite(string invitorUserId, string roomName)
Expand Down
99 changes: 84 additions & 15 deletions src/TalkServer/ClusterRoleWorkers.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
Expand Down Expand Up @@ -50,23 +49,29 @@ public class UserWorker : ClusterRoleWorker
private IActorRef _userContainer;
private ChannelType _channelType;
private IPEndPoint _listenEndPoint;
private IPEndPoint _connectEndPoint;
private GatewayRef _gateway;

public UserWorker(ClusterNodeContext context, Config config)
{
_context = context;
_channelType = (ChannelType)Enum.Parse(typeof(ChannelType), config.GetString("type", "Tcp"));
_listenEndPoint = new IPEndPoint(IPAddress.Any, config.GetInt("port", 0));

var connectAddress = config.GetString("connect-address");
var connectPort = config.GetInt("connect-port", _listenEndPoint.Port);
_connectEndPoint = new IPEndPoint(connectAddress != null ? IPAddress.Parse(connectAddress) : IPAddress.Loopback, connectPort);
}

public override async Task Start()
{
// create UserTableContainer

_userContainer = _context.System.ActorOf(
Props.Create(() => new DistributedActorTableContainer<string>("User", _context.ClusterActorDiscovery, null, null, InterfacedPoisonPill.Instance)),
Props.Create(() => new DistributedActorTableContainer<string>(
"User", _context.ClusterActorDiscovery, typeof(UserActorFactory), new object[] { _context }, InterfacedPoisonPill.Instance)),
"UserTableContainer");
_context.UserTableContainer = new DistributedActorTableContainerRef<string>(_userContainer, TimeSpan.FromSeconds(10));
_context.UserTableContainer = new DistributedActorTableContainerRef<string>(_userContainer);

// create gateway for users to connect to

Expand All @@ -78,18 +83,12 @@ public override async Task Start()
var initiator = new GatewayInitiator
{
ListenEndPoint = _listenEndPoint,
ConnectEndPoint = _connectEndPoint,
TokenRequired = true,
GatewayLogger = LogManager.GetLogger(name),
CreateChannelLogger = (ep, _) => LogManager.GetLogger($"Channel({ep})"),
ConnectionSettings = new TcpConnectionSettings { PacketSerializer = serializer },
PacketSerializer = serializer,
CreateInitialActors = (context, connection) => new[]
{
Tuple.Create(
context.ActorOf(Props.Create(() =>
new UserLoginActor(_context, context.Self.Cast<ActorBoundChannelRef>(), GatewayInitiator.GetRemoteEndPoint(connection)))),
new TaggedType[] { typeof(IUserLogin) },
(ActorBindingFlags)0)
}
};

_gateway = (_channelType == ChannelType.Tcp)
Expand All @@ -105,8 +104,9 @@ public override async Task Stop()

if (_gateway != null)
{
await _gateway.Stop();
await _gateway.CastToIActorRef().GracefulStop(TimeSpan.FromSeconds(10), new Identify(0));
await _gateway.CastToIActorRef().GracefulStop(
TimeSpan.FromSeconds(10),
InterfacedMessageBuilder.Request<IGateway>(x => x.Stop()));
}

// stop user container
Expand All @@ -115,6 +115,74 @@ public override async Task Stop()
}
}

public class UserActorFactory : IActorFactory
{
private ClusterNodeContext _clusterContext;

public void Initialize(object[] args)
{
_clusterContext = (ClusterNodeContext)args[0];
}

public IActorRef CreateActor(IActorRefFactory actorRefFactory, object id, object[] args)
{
return actorRefFactory.ActorOf(Props.Create(() => new UserActor(_clusterContext, (string)id, (IUserEventObserver)args[0])));
}
}

[ClusterRole("UserLogin")]
public class UserLoginWorker : ClusterRoleWorker
{
private ClusterNodeContext _context;
private ChannelType _channelType;
private IPEndPoint _listenEndPoint;
private GatewayRef _gateway;

public UserLoginWorker(ClusterNodeContext context, Config config)
{
_context = context;
_channelType = (ChannelType)Enum.Parse(typeof(ChannelType), config.GetString("type", "Tcp"));
_listenEndPoint = new IPEndPoint(IPAddress.Any, config.GetInt("port", 0));
}

public override async Task Start()
{
var serializer = PacketSerializer.CreatePacketSerializer();

var name = "UserLoginGateway";
var initiator = new GatewayInitiator
{
ListenEndPoint = _listenEndPoint,
GatewayLogger = LogManager.GetLogger(name),
CreateChannelLogger = (ep, _) => LogManager.GetLogger($"Channel({ep}"),
ConnectionSettings = new TcpConnectionSettings { PacketSerializer = serializer },
PacketSerializer = serializer,
CreateInitialActors = (context, connection) => new[]
{
Tuple.Create(
context.ActorOf(Props.Create(() =>
new UserLoginActor(_context, context.Self.Cast<ActorBoundChannelRef>(), GatewayInitiator.GetRemoteEndPoint(connection)))),
new TaggedType[] { typeof(IUserLogin) },
ActorBindingFlags.CloseThenStop | ActorBindingFlags.StopThenCloseChannel)
}
};

_gateway = (_channelType == ChannelType.Tcp)
? _context.System.ActorOf(Props.Create(() => new TcpGateway(initiator)), name).Cast<GatewayRef>()
: _context.System.ActorOf(Props.Create(() => new UdpGateway(initiator)), name).Cast<GatewayRef>();
await _gateway.Start();
}

public override async Task Stop()
{
// stop gateway

await _gateway.CastToIActorRef().GracefulStop(
TimeSpan.FromSeconds(10),
InterfacedMessageBuilder.Request<IGateway>(x => x.Stop()));
}
}

[ClusterRole("RoomTable")]
public class RoomTableWorker : ClusterRoleWorker
{
Expand Down Expand Up @@ -199,8 +267,9 @@ public override async Task Stop()
{
// stop gateway

await _gateway.Stop();
await _gateway.CastToIActorRef().GracefulStop(TimeSpan.FromSeconds(10), new Identify(0));
await _gateway.CastToIActorRef().GracefulStop(
TimeSpan.FromSeconds(10),
InterfacedMessageBuilder.Request<IGateway>(x => x.Stop()));

// stop room container

Expand Down
Loading

0 comments on commit d237a11

Please sign in to comment.