Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
ihaoqihao committed Apr 2, 2015
1 parent d469794 commit 350e325
Show file tree
Hide file tree
Showing 25 changed files with 202 additions and 1,004 deletions.
2 changes: 1 addition & 1 deletion FastSocket.Client/Messaging/CommandLineMessage.cs
Expand Up @@ -24,7 +24,7 @@ public class CommandLineMessage : Messaging.IMessage
/// <param name="parameters"></param>
public CommandLineMessage(int seqId, string cmdName, params string[] parameters)
{
if (string.IsNullOrEmpty(cmdName)) throw new ArgumentNullException("cmdName");
if (cmdName == null) throw new ArgumentNullException("cmdName");

this.SeqId = seqId;
this.CmdName = cmdName;
Expand Down
17 changes: 13 additions & 4 deletions FastSocket.Client/Protocol/CommandLineProtocol.cs
Expand Up @@ -15,8 +15,17 @@ public sealed class CommandLineProtocol : IProtocol<Messaging.CommandLineMessage
/// <summary>
/// return false
/// </summary>
public bool IsAsync { get { return false; } }

public bool IsAsync
{
get { return false; }
}
/// <summary>
/// return 1
/// </summary>
public int DefaultSyncSeqId
{
get { return 1; }
}
/// <summary>
/// parse
/// </summary>
Expand All @@ -41,12 +50,12 @@ public sealed class CommandLineProtocol : IProtocol<Messaging.CommandLineMessage
{
readlength = i + 2 - buffer.Offset;

if (readlength == 2) return null;
if (readlength == 2) return new Messaging.CommandLineMessage(1, string.Empty);

string command = Encoding.UTF8.GetString(buffer.Array, buffer.Offset, readlength - 2);
var arr = command.Split(SPLITER, StringSplitOptions.RemoveEmptyEntries);

if (arr.Length == 0) return null;
if (arr.Length == 0) return new Messaging.CommandLineMessage(1, string.Empty);
if (arr.Length == 1) return new Messaging.CommandLineMessage(1, arr[0]);
return new Messaging.CommandLineMessage(1, arr[0], arr.Skip(1).ToArray());
}
Expand Down
4 changes: 4 additions & 0 deletions FastSocket.Client/Protocol/IProtocol.cs
Expand Up @@ -14,6 +14,10 @@ public interface IProtocol<TMessage>
/// </summary>
bool IsAsync { get; }
/// <summary>
/// 当IsAsync=false时,表示默认的seqId
/// </summary>
int DefaultSyncSeqId { get; }
/// <summary>
/// parse
/// </summary>
/// <param name="connection"></param>
Expand Down
7 changes: 7 additions & 0 deletions FastSocket.Client/Protocol/ThriftProtocol.cs
Expand Up @@ -16,6 +16,13 @@ public bool IsAsync
get { return true; }
}
/// <summary>
/// throw NotImplementedException ex
/// </summary>
public int DefaultSyncSeqId
{
get { throw new NotImplementedException(); }
}
/// <summary>
/// find response
/// </summary>
/// <param name="connection"></param>
Expand Down
131 changes: 82 additions & 49 deletions FastSocket.Client/SocketClient.cs
Expand Up @@ -134,6 +134,14 @@ public void Send(Request<TMessage> request)
connection.BeginSend(request);
}
/// <summary>
/// try send next request
/// </summary>
public void TrySendNext()
{
Request<TMessage> request = null;
if (this._pendingQueue.TryDequeue(out request)) this.Send(request);
}
/// <summary>
/// 产生不重复的seqId
/// </summary>
/// <returns></returns>
Expand All @@ -150,18 +158,13 @@ public int NextRequestSeqId()
/// <param name="onException"></param>
/// <param name="onResult"></param>
/// <returns></returns>
public Request<TMessage> NewRequest(string name,
byte[] payload,
public Request<TMessage> NewRequest(string name, byte[] payload,
int millisecondsReceiveTimeout,
Action<Exception> onException,
Action<TMessage> onResult)
Action<Exception> onException, Action<TMessage> onResult)
{
return new Request<TMessage>(this.NextRequestSeqId(),
name,
payload,
millisecondsReceiveTimeout,
onException,
onResult);
var seqId = this._protocol.IsAsync ? this.NextRequestSeqId() : this._protocol.DefaultSyncSeqId;
return new Request<TMessage>(seqId, name, payload,
millisecondsReceiveTimeout, onException, onResult);
}
#endregion

Expand Down Expand Up @@ -206,8 +209,7 @@ protected virtual void OnSent(SocketBase.IConnection connection, Request<TMessag
/// <param name="connection"></param>
/// <param name="request"></param>
/// <param name="message"></param>
protected virtual void OnReceived(SocketBase.IConnection connection,
Request<TMessage> request, TMessage message)
protected virtual void OnReceived(SocketBase.IConnection connection, Request<TMessage> request, TMessage message)
{
ThreadPool.QueueUserWorkItem(_ =>
{
Expand All @@ -217,9 +219,10 @@ protected virtual void OnSent(SocketBase.IConnection connection, Request<TMessag

if (!this._protocol.IsAsync)
{
Request<TMessage> next;
if (this._pendingQueue.TryDequeue(out next)) connection.BeginSend(next);
else this._connectionPool.Release(connection);
//release connection
this._connectionPool.Release(connection);
//try send next request
this.TrySendNext();
}
}
/// <summary>
Expand Down Expand Up @@ -315,7 +318,7 @@ protected override void OnSendCallback(SocketBase.IConnection connection, Socket
}

Request<TMessage> removed;
if (this._receivingQueue.TryRemove(request.SeqId, out removed))
if (this._receivingQueue.TryRemove(connection.ConnectionID, request.SeqId, out removed))
removed.SendConnection = null;

if (!request.AllowRetry)
Expand Down Expand Up @@ -360,7 +363,7 @@ protected override void OnSendCallback(SocketBase.IConnection connection, Socket
if (message != null)
{
Request<TMessage> request = null;
if (this._receivingQueue.TryRemove(message.SeqId, out request))
if (this._receivingQueue.TryRemove(connection.ConnectionID, message.SeqId, out request))
this.OnReceived(connection, request, message);
else this.OnReceivedUnknowMessage(connection, message);
}
Expand Down Expand Up @@ -416,8 +419,8 @@ public PendingSendQueue(SocketClient<TMessage> client)
this._client.OnPendingSendTimeout(request);
}
this._timer.Change(50, 0);
}, null, 50, 0);
this._timer.Change(500, 500);
}, null, 500, 500);
}
#endregion

Expand Down Expand Up @@ -450,10 +453,18 @@ public bool TryDequeue(out Request<TMessage> request)
private class ReceivingQueue
{
#region Private Members
/// <summary>
/// socket client
/// </summary>
private readonly SocketClient<TMessage> _client = null;

private readonly ConcurrentDictionary<int, Request<TMessage>> _dic =
new ConcurrentDictionary<int, Request<TMessage>>();
/// <summary>
/// key:connectionId:request.SeqId
/// </summary>
private readonly ConcurrentDictionary<string, Request<TMessage>> _dic =
new ConcurrentDictionary<string, Request<TMessage>>();
/// <summary>
/// timer for check receive timeout
/// </summary>
private readonly Timer _timer = null;
#endregion

Expand Down Expand Up @@ -482,8 +493,30 @@ public ReceivingQueue(SocketClient<TMessage> client)
this._client.OnReceiveTimeout(request);
}
this._timer.Change(500, 0);
}, null, 500, 0);
this._timer.Change(500, 500);
}, null, 500, 500);
}
#endregion

#region Private Methods
/// <summary>
/// to key
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
private string ToKey(Request<TMessage> request)
{
return this.ToKey(request.SendConnection.ConnectionID, request.SeqId);
}
/// <summary>
/// to key
/// </summary>
/// <param name="connectionId"></param>
/// <param name="seqId"></param>
/// <returns></returns>
private string ToKey(long connectionId, int seqId)
{
return string.Concat(connectionId.ToString(), "/", seqId.ToString());
}
#endregion

Expand All @@ -495,17 +528,18 @@ public ReceivingQueue(SocketClient<TMessage> client)
/// <returns></returns>
public bool TryAdd(Request<TMessage> request)
{
return this._dic.TryAdd(request.SeqId, request);
return this._dic.TryAdd(this.ToKey(request), request);
}
/// <summary>
/// try remove
/// </summary>
/// <param name="seqID"></param>
/// <param name="connectionId"></param>
/// <param name="seqId"></param>
/// <param name="request"></param>
/// <returns></returns>
public bool TryRemove(int seqID, out Request<TMessage> request)
public bool TryRemove(long connectionId, int seqId, out Request<TMessage> request)
{
return this._dic.TryRemove(seqID, out request);
return this._dic.TryRemove(this.ToKey(connectionId, seqId), out request);
}
#endregion
}
Expand Down Expand Up @@ -735,30 +769,29 @@ private void ConnectCallback(Node node, Task<Socket> task)
return;
}

node.InitFunc(new SendContext(connection))
.ContinueWith(c =>
node.InitFunc(new SendContext(connection, false)).ContinueWith(c =>
{
if (c.IsFaulted)
{
if (c.IsFaulted)
{
connection.BeginDisconnect(c.Exception.InnerException);
return;
}
connection.BeginDisconnect(c.Exception.InnerException);
return;
}
bool isExists;
lock (this)
{
if (isExists = this._dicNodes.ContainsKey(node.Id))
this._dicConnections[node.Id] = connection;
}
bool isExists;
lock (this)
{
if (isExists = this._dicNodes.ContainsKey(node.Id))
this._dicConnections[node.Id] = connection;
}
if (isExists)
{
//fire node already event.
if (this.Already != null)
this.Already(node.Name, connection);
}
else connection.BeginDisconnect();
});
if (isExists)
{
//fire node already event.
if (this.Already != null)
this.Already(node.Name, connection);
}
else connection.BeginDisconnect();
});
}
#endregion
}
Expand Down

0 comments on commit 350e325

Please sign in to comment.