Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

addressed issue #7 and bumped minor version #8

Merged
merged 2 commits into from

2 participants

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 39 additions and 80 deletions.
  1. +1 −1  Datasift/Config.cs
  2. +38 −79 Datasift/DatasiftStream/DatasiftStream.cs
View
2  Datasift/Config.cs
@@ -143,7 +143,7 @@ public string Authorization
}
public string Version
{
- get { return "1.0.1"; }
+ get { return "1.0.2"; }
set { }
}
public string UserAgent
View
117 Datasift/DatasiftStream/DatasiftStream.cs
@@ -18,7 +18,6 @@ public class DatasiftStream : Publisher
private List<DatasiftStreamClient> subcribers;
private Thread streamThread;
private State status = State.NOT_STARTED;
- private string reason = null;
public enum State { NOT_STARTED, RUNNING, STOPPED }
private JsonSerializer json;
/// <summary>
@@ -116,56 +115,35 @@ private void StartStreaming()
case HttpStatusCode.OK: break;
case HttpStatusCode.Unauthorized:
- this.reason = "Error 401 - Unauthorized. The credentials supplied were not valid.";
- PropagateStopped();
+ PropagateStopped("Error 401 - Unauthorized. The credentials supplied were not valid.");
return;
case HttpStatusCode.Forbidden:
- this.reason = "Error 403 - Forbidden. Your account has been denied access due to a violation.";
- PropagateStopped();
+ PropagateStopped("Error 403 - Forbidden. Your account has been denied access due to a violation.");
return;
case HttpStatusCode.NotFound:
- this.reason = "Error 404 - Not Found. The DatasiftStream you requested could not be found. Are you using a valid hash?.";
- PropagateStopped();
+ PropagateStopped("Error 404 - Not Found. The DatasiftStream you requested could not be found. Are you using a valid hash?.");
return;
case HttpStatusCode.ServiceUnavailable:
- //this is the only documented none 200 status where the client should try to reconnect
- if (config.AutoReconnect && connectCount <= config.MaxRetries)
- {
- //we can reconnect increasing delay between each reconnect linearly (up to max configured retries, default=5)
- Thread.Sleep(exponentialConnectTimeoutLength * 1000);
- exponentialConnectTimeoutLength *= 2;//double wait time
- connectCount++;
- Consume();
- return;
- }
- else
- {
- status = State.STOPPED;
- //if we end up here, we've run out of retries
- this.reason = "Error 503 - Service Unavailable. The node you were routed to is unavailable. Please try again";
- PropagateStopped();
- return;
- }
+ //stop if we've run out of retries
+ Retry("Error 503 - Service Unavailable. The node you were routed to is unavailable. Please try again");
+ return;
//if some unknown DatasiftStream response is detected then flag it!
default:
- this.reason = ((HttpWebResponse)e.Response).StatusCode.ToString();
- PropagateStopped();
+ PropagateStopped(((HttpWebResponse)e.Response).StatusCode.ToString());
return;
}
}
if (e.Status.ToString() == "NameResolutionFailure")
{
//if connection is not available then propagate error back up for user to handle
- this.reason = "Unable to resolve the Datasift domain name. A possible cause is the local connection to the internet \n" + e.Message;
- PropagateStopped();
+ PropagateStopped("Unable to resolve the Datasift domain name. A possible cause is the local connection to the internet \n" + e.Message);
return;
}
//if we get this far, possibly other local network issues - too many to handle and be more specific
- this.reason = "The connection to the DatasiftStream could not be established! \n" + e.Message;
- PropagateStopped();
+ PropagateStopped("The connection to the DatasiftStream could not be established! \n" + e.Message);
return;
}
// get data from response DatasiftStream
@@ -178,12 +156,7 @@ private void StartStreaming()
//first thing's first see if we've changed state to stop and break out of the loop
if (this.status == State.STOPPED)
{
- //if reason is null and state is stopped then stop must have been invoked by the client
- if (this.reason == null)
- {
- this.reason = "Client requested the DatasiftStream be stopped.";
- }
- PropagateStopped();
+ PropagateStopped("Stop request received.");
break;
}
try
@@ -191,38 +164,12 @@ private void StartStreaming()
// fill the buffer with data
count = resStream.Read(buf, 0, buf.Length);
}
- catch (WebException ex)
+ catch (Exception ex)
{
- status = State.STOPPED;
- if (ex.Status.ToString() == "Timeout")
- {
- //slight bit of duplication from header status handling
- if (config.AutoReconnect && connectCount <= config.MaxRetries)
- {
- //we can reconnect increasing delay between each reconnect linearly (up to max configured retries, default=5)
- Thread.Sleep(linearConnectTimeoutLength * 1000);
- linearConnectTimeoutLength++;//increase time out should we end up here again
- connectCount++;
- Consume();
- return;
- }
- else
- {
- status = State.STOPPED;
- //if we end up here, we've run out of retries
- this.reason = "The connection to the DatasiftStream timed out \n" + ex.Message;
- PropagateStopped();
- }
- }
- //we shouldn't end up here if we simply timed out.
- this.reason = "An unhandled exception was encountered while consuming the Datasift stream \n" + ex.Message;
- PropagateStopped();
- }
- catch (Exception e)
- {
- this.reason = "Unkown exception: \n" + e.Message;
- PropagateStopped();
+ //stop if we've run out of retries
+ Retry(ex.Message);
}
+
// must have data in the buffer
if (count != 0)
{
@@ -230,30 +177,43 @@ private void StartStreaming()
NotifyConsumers(Encoding.UTF8.GetString(buf, 0, count));
}
}
+ resStream.Close();
//if we've stopped reading for some reason may need to do some checks
//i.e if state hasn't been changed to stopped then DatasiftStream may have ended prematurely/unexpectedly
if (this.status != State.STOPPED)
{
- //mark as stopped
- this.status = State.STOPPED;
- if (this.reason == null)
- {
- this.reason = "DatasiftStream ended prematurely";
- }
- PropagateStopped();
+ Retry("DatasiftStream ended prematurely last read total =>"+count);
}
}
+ private void Retry(string msg)
+ {
+ //this is the only documented none 200 status where the client should try to reconnect
+ if (config.AutoReconnect && connectCount <= config.MaxRetries)
+ {
+ //we can reconnect increasing delay between each reconnect linearly (up to max configured retries, default=5)
+ Thread.Sleep(exponentialConnectTimeoutLength * 1000);
+ exponentialConnectTimeoutLength *= 2;//double wait time
+ connectCount++;
+ Consume();
+ }
+ else
+ {
+ status = State.STOPPED;
+ //if we end up here, we've run out of retries
+ PropagateStopped(msg);
+ }
+ }
/// <summary>
/// Let all subscribers know the DatasiftStream has stopped
/// </summary>
- private void PropagateStopped()
+ private void PropagateStopped(string reason)
{
foreach (DatasiftStreamClient c in this.subcribers)
{
//invoke consumer's onstop, passing the reason for stopping
- c.onStopped(this.reason);
+ c.onStopped(reason);
}
}
private StringBuilder interactionCache = null;
@@ -321,8 +281,7 @@ private void PushToSubscribers(string data)
Interaction interaction = new Interaction(data);
if (interaction.IsError())
{
- status = State.STOPPED;
- reason = interaction.StatusMessage();
+ Retry(interaction.StatusMessage());
return;
}
foreach (DatasiftStreamClient c in this.subcribers)
@@ -341,7 +300,7 @@ private void PushToSubscribers(string data)
public void Stop()
{
status = State.STOPPED;
- PropagateStopped();
+ PropagateStopped("Client requested stop.");
}
}
}
Something went wrong with that request. Please try again.