Permalink
Browse files

fixed previus networking issue.

  • Loading branch information...
1 parent 47f8e9c commit d240461697aa1886988228fdfc374f79691d7806 @CharlesMartel committed Apr 13, 2012
@@ -10,7 +10,7 @@
<ThreadBasedAnalysisSelected>true</ThreadBasedAnalysisSelected>
<UniqueReport>Timestamp</UniqueReport>
<SamplingMethod>Cycles</SamplingMethod>
- <CycleCount>10000000</CycleCount>
+ <CycleCount>100000000</CycleCount>
<PageFaultCount>10</PageFaultCount>
<SysCallCount>10</SysCallCount>
<SamplingCounter Name="InstructionsRetired" ReloadValue="00000000000f4240" DisplayName="Instructions Retired" />
@@ -5,6 +5,5 @@ namespace BasicModule {
[Serializable]
class FunctionInput : PrestoParameter {
public int value = 0;
- public string randomString = "hfkdjahkfljdhjfhjksdhfjkhdajksfh klhfjkdhsakjfhjhewqhuiohfu hfudhajfhiwhnjfdah kfhjdsah lhfujaifhjhedkfahj fhjdashfjkhejaklh jfhdakjs hfruejqafhjdaksl hfuewhfjadk fhjdashj fhewuqfh jd al hufewhqa jfhjdfhjaksdhfauwhjwaefjndjna ahfuefjkhadjf haufeuahfjkhdajsk fhuwawfnjenfahdsuf hwfknakll fhfueh ajfhuahflhfiluhe wajfnje n faa nu ehfuwn bifenuwafnlenjf naunf uea wnif ejanfjdna ufea uwfeinanfjenfnufeifjnasjfnfnuenfjnfjdnaslfluenanjfndjkanlfneunjakwnjfndaskn fksd";
}
}
@@ -5,6 +5,5 @@ namespace BasicModule {
[Serializable]
class FunctionOutput : PrestoResult {
public int value;
- public string randomString = "hfkdjahkfljdhjfhjksdhfjkhdajksfh klhfjkdhsakjfhjhewqhuiohfu hfudhajfhiwhnjfdah kfhjdsah lhfujaifhjhedkfahj fhjdashfjkhejaklh jfhdakjs hfruejqafhjdaksl hfuewhfjadk fhjdashj fhewuqfh jd al hufewhqa jfhjdfhjaksdhfauwhjwaefjndjna ahfuefjkhadjf haufeuahfjkhdajsk fhuwawfnjenfahdsuf hwfknakll fhfueh ajfhuahflhfiluhe wajfnje n faa nu ehfuwn bifenuwafnlenjf naunf uea wnif ejanfjdna ufea uwfeinanfjenfnufeifjnasjfnfnuenfjnfjdnaslfluenanjfndjkanlfneunjakwnjfndaskn fksd";
}
}
View
@@ -1,5 +1,5 @@
using System;
-using System.Collections.Generic;
+using System.Collections.Concurrent;
using System.Threading;
using Presto.Common;
using Presto.Managers;
@@ -54,7 +54,7 @@ public static class Cluster {
internal static IClusterProxy ClusterProxy;
//we keep a list of all jobs currently out for processing
- private static Dictionary<string, Action<PrestoResult>> outboundJobs = new Dictionary<string, Action<PrestoResult>>();
+ private static ConcurrentDictionary<string, Action<PrestoResult>> outboundJobs = new ConcurrentDictionary<string, Action<PrestoResult>>();
private static ManualResetEvent jobCompletionEvent = new ManualResetEvent(true);
/// <summary>
@@ -97,7 +97,7 @@ public static class Cluster {
DateTime now = DateTime.Now;
string contextID = Generator.RandomAlphaNumeric(Config.UIDLength);
//add the job to the scheduled jobs
- outboundJobs.Add(contextID, callback);
+ outboundJobs[contextID] = callback;
//execute
SerializationEngine serializer = new SerializationEngine ();
byte[] stream = serializer.Serialize(parameter);
@@ -109,8 +109,9 @@ public static class Cluster {
/// </summary>
/// <param id="result">The execution result object.</param>
internal static void ReturnExecution(string contextID, PrestoResult result) {
- outboundJobs[contextID].Invoke(result);
- outboundJobs.Remove(contextID);
+ Action<PrestoResult> removable;
+ outboundJobs.TryRemove(contextID, out removable);
+ removable.Invoke(result);
if (outboundJobs.Count < 1) {
jobCompletionEvent.Set();
}
@@ -26,8 +26,6 @@ public class ServerState {
private List<byte> data = new List<byte>();
//a boolean to tell if the message is fully recieved
private bool messageFullyRecieved = false;
- //processing queue for the writes
- private SynchronizedProcessingQueue<byte[]> sendQueue;
/// <summary>
/// Create a new server state object to manage a currently running connection
@@ -36,7 +34,6 @@ public class ServerState {
internal ServerState(Socket socket) {
//set the working socket
this.socket = socket;
- sendQueue = new SynchronizedProcessingQueue<byte[]>(write);
}
/// <summary>
@@ -112,7 +109,7 @@ public class ServerState {
}
//send the data
- sendQueue.Add(output.ToArray());
+ write(output.ToArray());
//close the socket
CloseSocket();
@@ -140,7 +137,7 @@ public class ServerState {
}
//send the data
- sendQueue.Add(output.ToArray());
+ write(output.ToArray());
}
/// <summary>
@@ -157,7 +154,7 @@ public class ServerState {
data = holder.ToArray();
//send the data
- socket.Send(data);
+ socket.BeginSend(data, 0, data.Length, 0, null, null);
return true;
} catch (Exception e) {
//there was a problem sending the data, return false so it will be readded to the queue
@@ -171,7 +168,6 @@ public class ServerState {
/// Closes the ServerState's associated socket
/// </summary>
public void CloseSocket() {
- sendQueue.Wait();
socket.Close();
}
@@ -16,7 +16,6 @@ public class TCPClient {
private IPEndPoint serverEndpoint;
private TcpClient tcpClient;
- private SynchronizedProcessingQueue<byte[]> sendQueue;
// A hash table holding all dispatch references and pointer to their delegates
private Dictionary<MessageType, Action<ClientState>> dispatchList = new Dictionary<MessageType, Action<ClientState>>();
@@ -30,7 +29,6 @@ public class TCPClient {
//Get the endpoint from DNS and set it as the serverEnpoint
IPAddress[] addresses = Dns.GetHostAddresses(host);
serverEndpoint = new IPEndPoint(addresses[0], port);
- sendQueue = new SynchronizedProcessingQueue<byte[]>(write);
}
/// <summary>
@@ -40,7 +38,6 @@ public class TCPClient {
public TCPClient(IPEndPoint host) {
//set the internal serverEnpoint to the provided one.
serverEndpoint = host;
- sendQueue = new SynchronizedProcessingQueue<byte[]>(write);
}
/// <summary>
@@ -128,7 +125,7 @@ public class TCPClient {
}
//Write the output
- sendQueue.Add(output.ToArray());
+ write(output.ToArray());
}
/// <summary>
@@ -149,8 +146,7 @@ public class TCPClient {
}
//Write the output
- sendQueue.Add(output.ToArray());
- sendQueue.Wait();
+ write(output.ToArray());
}
/// <summary>
@@ -169,7 +165,7 @@ public class TCPClient {
//get the tcpClient network stream
NetworkStream nStream = tcpClient.GetStream();
//Start the synchronous Write
- nStream.Write(data, 0, data.Length);
+ nStream.BeginWrite(data, 0, data.Length, null, null);
return true;
} catch (Exception e) {
// the connection was closed return false and the synchronizer will take care of it
@@ -97,6 +97,9 @@ public class TCPServer {
/// </summary>
/// <param name="ar"></param>
private void read(IAsyncResult ar) {
+
+ //I absolutely hate the implementation of this. I hate ti so much. Need to rewrite fully.
+
try {
//Get the Server State Object
ServerState state = (ServerState)ar.AsyncState;

0 comments on commit d240461

Please sign in to comment.