Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also .

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also .
  • 2 commits
  • 24 files changed
  • 0 commit comments
  • 2 contributors
Commits on Mar 08, 2012
Stuart Dallas Stream warning/error handling + WS disconnection changes.
Bumped version to 1.3.0.
0d31125
@quipo quipo Merge pull request #16 from 3ft9/current
Stream warning/error handling + WS disconnection changes.
5feafad
View
15 README.md
@@ -31,6 +31,21 @@ more details.
Changelog
---------
+* v.1.3.0 Stream warning/error handling + WS disconnection changes (2012-03-08)
+
+ The IStreamConsumerEvents and IMultiStreamConsumerEvents interfaces now
+ contain callbacks for onWarning and onError events. These MUST be implemented
+ in your event handler.
+
+ The WebSocket consumer has been modified to request that the server
+ unsubscribe from all streams and disconnect rather than simply closing
+ the connection. This change is largely transparent except that you'll get
+ an error notification acknowledging the request and indicating that the
+ server is about to disconnect. The client will then wait up to 30 seconds
+ for the server to actually disconnect before closing the socket itself. This
+ should never happen as the server will usually disconnect immediately after
+ sending that message.
+
* v.1.2.1 Clarification (2012-02-27)
Modified the football example to display the IDs of interactions so you can
View
2 build/build.xml
@@ -4,7 +4,7 @@
<property name="deploy.dir" value="../deploy"/>
<property name="classes.dir" value="../deploy/classes"/>
- <property name="version" value="1.2.1"/>
+ <property name="version" value="1.3.0"/>
<path id="class.path">
<fileset dir="../lib">
View
2 pom.xml
@@ -7,7 +7,7 @@
<groupId>com.mediasift</groupId>
<artifactId>datasift-java</artifactId>
<packaging>jar</packaging>
- <version>1.2.1</version>
+ <version>1.3.0</version>
<name>DataSift Java Client library</name>
<description>A client library for interacting with the DataSift API on the JVM.</description>
View
1 src/org/datasift/Config.java
@@ -9,6 +9,7 @@
final public class Config {
public static String username = "<your_username>";
public static String api_key = "<your_api_key>";
+
public static String definition = "interaction.content contains \"datasift\"";
public static String definition_hash = "947b690ec9dca525fb8724645e088d79";
public static double definition_dpu = 0.1;
View
18 src/org/datasift/IMultiStreamConsumerEvents.java
@@ -30,6 +30,24 @@ public void onDeleted(StreamConsumer consumer, String hash, Interaction interact
throws EInvalidData;
/**
+ * Called for each deletion notification consumed.
+ * @param consumer
+ * @param interaction
+ * @throws EInvalidData
+ */
+ public void onWarning(StreamConsumer consumer, String message)
+ throws EInvalidData;
+
+ /**
+ * Called for each deletion notification consumed.
+ * @param consumer
+ * @param interaction
+ * @throws EInvalidData
+ */
+ public void onError(StreamConsumer consumer, String message)
+ throws EInvalidData;
+
+ /**
* Called when the consumer stops for some reason.
* @param consumer
* @param reason
View
18 src/org/datasift/IStreamConsumerEvents.java
@@ -28,6 +28,24 @@ public void onDeleted(StreamConsumer consumer, Interaction interaction)
throws EInvalidData;
/**
+ * Called for each deletion notification consumed.
+ * @param consumer
+ * @param interaction
+ * @throws EInvalidData
+ */
+ public void onWarning(StreamConsumer consumer, String message)
+ throws EInvalidData;
+
+ /**
+ * Called for each deletion notification consumed.
+ * @param consumer
+ * @param interaction
+ * @throws EInvalidData
+ */
+ public void onError(StreamConsumer consumer, String message)
+ throws EInvalidData;
+
+ /**
* Called when the consumer stops for some reason.
* @param consumer
* @param reason
View
44 src/org/datasift/StreamConsumer.java
@@ -244,6 +244,18 @@ public int getState() {
}
/**
+ * Set the current state to stopped.
+ * @throws EInvalidData
+ */
+ public void setStopped() throws EInvalidData {
+ if (_state == STATE_STOPPING) {
+ _state = STATE_STOPPED;
+ } else {
+ throw new EInvalidData("The state must be STOPPING before it can be set to STOPPED.");
+ }
+ }
+
+ /**
* This is called for each interaction received from the stream and should
* be implemented in extending classes if they don't use an
* IStreamConsumerEvents object.
@@ -327,11 +339,43 @@ public void onMultiDeleted(String hash, Interaction interaction)
public void onStopped(String reason) throws EInvalidData {
if (_eventHandler != null) {
_eventHandler.onStopped(this, reason);
+ } else if (_multiEventHandler != null) {
+ _multiEventHandler.onStopped(this, reason);
} else {
throw new EInvalidData(
"You must provide an onStopped method or an eventHandler object");
}
}
+
+ /**
+ * This is called when a warning is received in the data stream.
+ *
+ * @param message The warning message.
+ * @throws EInvalidData
+ */
+ public void onWarning(String message) throws EInvalidData {
+ if (_eventHandler != null) {
+ _eventHandler.onWarning(this, message);
+ } else if (_multiEventHandler != null) {
+ _multiEventHandler.onWarning(this, message);
+ }
+ // If we don't have a handler for this event, swallow it!
+ }
+
+ /**
+ * This is called when an error is received in the data stream.
+ *
+ * @param message The error message.
+ * @throws EInvalidData
+ */
+ public void onError(String message) throws EInvalidData {
+ if (_eventHandler != null) {
+ _eventHandler.onError(this, message);
+ } else if (_multiEventHandler != null) {
+ _multiEventHandler.onError(this, message);
+ }
+ // If we don't have a handler for this event, swallow it!
+ }
/**
* Start consuming with auto_reconnect enabled.
View
2 src/org/datasift/User.java
@@ -22,7 +22,7 @@
*
* @access public
*/
- public final static String _user_agent = "DataSiftJava/1.2.0";
+ public final static String _user_agent = "DataSiftJava/1.3.0";
/**
* The base URL for API calls. No http://, and with the trailing slash.
View
26 src/org/datasift/examples/ConsumeStream.java
@@ -97,12 +97,34 @@ public void onDeleted(StreamConsumer c, Interaction i)
* Called when the consumer has stopped.
*
* @param DataSift_StreamConsumer
- * $consumer The consumer object.
+ * consumer The consumer object.
* @param string
- * $reason The reason the consumer stopped.
+ * reason The reason the consumer stopped.
*/
public void onStopped(StreamConsumer consumer, String reason) {
System.out.print("Stopped: ");
System.out.println(reason);
}
+
+ /**
+ * Called when a warning is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The warning message.
+ */
+ public void onWarning(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Warning: " + message);
+ }
+
+ /**
+ * Called when an error is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The error message.
+ */
+ public void onError(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Error: " + message);
+ }
}
View
154 src/org/datasift/examples/ConsumeStreamWS.java
@@ -0,0 +1,154 @@
+package org.datasift.examples;
+
+import java.net.MalformedURLException;
+
+import org.datasift.Config;
+import org.datasift.EAPIError;
+import org.datasift.EAccessDenied;
+import org.datasift.ECompileFailed;
+import org.datasift.EInvalidData;
+import org.datasift.IMultiStreamConsumerEvents;
+import org.datasift.Interaction;
+import org.datasift.StreamConsumer;
+import org.datasift.User;
+
+public class ConsumeStreamWS implements IMultiStreamConsumerEvents {
+
+ private User _user = null;
+
+ /**
+ * @param args
+ * @throws IOException
+ * @throws MalformedURLException
+ */
+ public static void main(String[] args) {
+ new ConsumeStreamWS().run(args);
+ }
+
+ protected void run(String[] hashes) {
+ try {
+ // Authenticate
+ System.out.println("Creating user...");
+ _user = new User(Config.username, Config.api_key);
+
+ // Get the hash list from the parameters
+ System.out.println("Building hash list...");
+
+ // Create the consumer
+ System.out.println("Getting the consumer...");
+ StreamConsumer consumer = StreamConsumer.factory(_user, StreamConsumer.TYPE_WS, this);
+
+ // And start consuming
+ System.out.println("Consuming...");
+ System.out.println("--");
+ consumer.consume();
+
+ // Send subscriptions
+ for (String hash : hashes) {
+ boolean tryagain = true;
+ while (tryagain) {
+ try {
+ consumer.subscribe(hash);
+ System.out.println("Subscribing to \"" + hash + "\"...");
+ tryagain = false;
+ Thread.sleep(100);
+ } catch (EAPIError e) {
+ if (!e.getMessage().contains("not connected")) {
+ throw e;
+ }
+ }
+ }
+ }
+ } catch (EInvalidData e) {
+ System.out.print("InvalidData: ");
+ System.out.println(e.getMessage());
+ } catch (ECompileFailed e) {
+ System.out.print("CompileFailed: ");
+ System.out.println(e.getMessage());
+ } catch (EAccessDenied e) {
+ System.out.print("AccessDenied: ");
+ System.out.println(e.getMessage());
+ } catch (EAPIError e) {
+ System.out.print("APIError: ");
+ System.out.println(e.getMessage());
+ } catch (InterruptedException e) {
+ System.out.print("InterruptedException: ");
+ System.out.println(e.getMessage());
+ }
+ }
+
+ /**
+ * Handle incoming data.
+ *
+ * @param StreamConsumer
+ * consumer The consumer object.
+ * @param String
+ * hash The hash of the stream that matched this interaction.
+ * @param Interaction
+ * interaction The interaction data.
+ * @throws EInvalidData
+ */
+ public void onInteraction(StreamConsumer c, String hash, Interaction i)
+ throws EInvalidData {
+ try {
+ System.out.print(i.getStringVal("interaction.author.name"));
+ System.out.print(": ");
+ System.out.println(i.getStringVal("interaction.content"));
+ } catch (EInvalidData e) {
+ // The interaction did not contain either a type or content.
+ System.out.println("Exception: " + e.getMessage());
+ System.out.print("Interaction: ");
+ System.out.println(i);
+ }
+ System.out.println("--");
+ }
+
+ /**
+ * Handle delete notifications.
+ *
+ * @param StreamConsumer
+ * consumer The consumer object.
+ * @param JSONObject
+ * interaction The interaction data.
+ * @throws EInvalidData
+ */
+ public void onDeleted(StreamConsumer c, String hash, Interaction i)
+ throws EInvalidData {
+ // Ignored for this example
+ }
+
+ /**
+ * Called when the consumer has stopped.
+ *
+ * @param DataSift_StreamConsumer
+ * $consumer The consumer object.
+ * @param string
+ * $reason The reason the consumer stopped.
+ */
+ public void onStopped(StreamConsumer consumer, String reason) {
+ System.out.print("Stopped: ");
+ System.out.println(reason);
+ }
+
+ /**
+ * Called when a warning is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The warning message.
+ */
+ public void onWarning(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Warning: " + message);
+ }
+
+ /**
+ * Called when an error is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The error message.
+ */
+ public void onError(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Error: " + message);
+ }
+}
View
22 src/org/datasift/examples/Deletes.java
@@ -76,6 +76,28 @@ public void onDeleted(StreamConsumer c, Interaction i)
}
/**
+ * Called when a warning is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The warning message.
+ */
+ public void onWarning(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Warning: " + message);
+ }
+
+ /**
+ * Called when an error is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The error message.
+ */
+ public void onError(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Error: " + message);
+ }
+
+ /**
* Called when the consumer has stopped.
*
* @param DataSift_StreamConsumer
View
22 src/org/datasift/examples/DeletesWS.java
@@ -112,6 +112,28 @@ public void onStopped(StreamConsumer consumer, String reason) {
System.out.println(reason);
}
+ /**
+ * Called when a warning is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The warning message.
+ */
+ public void onWarning(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Warning: " + message);
+ }
+
+ /**
+ * Called when an error is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The error message.
+ */
+ public void onError(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Error: " + message);
+ }
+
private void incLineLen() {
_line_len++;
if (_line_len > 80) {
View
22 src/org/datasift/examples/Football.java
@@ -122,4 +122,26 @@ public void onStopped(StreamConsumer consumer, String reason) {
System.out.print("Stopped: ");
System.out.println(reason);
}
+
+ /**
+ * Called when a warning is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The warning message.
+ */
+ public void onWarning(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Warning: " + message);
+ }
+
+ /**
+ * Called when an error is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The error message.
+ */
+ public void onError(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Error: " + message);
+ }
}
View
22 src/org/datasift/examples/JDBCFootball.java
@@ -125,4 +125,26 @@ public void onStopped(StreamConsumer consumer, String reason) {
System.out.print("Stopped: ");
System.out.println(reason);
}
+
+ /**
+ * Called when a warning is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The warning message.
+ */
+ public void onWarning(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Warning: " + message);
+ }
+
+ /**
+ * Called when an error is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The error message.
+ */
+ public void onError(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Error: " + message);
+ }
}
View
21 src/org/datasift/examples/MultiStream.java
@@ -167,4 +167,25 @@ public void onStopped(StreamConsumer consumer, String reason) {
System.out.println(reason);
}
+ /**
+ * Called when a warning is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The warning message.
+ */
+ public void onWarning(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Warning: " + message);
+ }
+
+ /**
+ * Called when an error is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The error message.
+ */
+ public void onError(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Error: " + message);
+ }
}
View
23 src/org/datasift/examples/MultiStreamWS.java
@@ -74,7 +74,7 @@ protected void run() {
while (tryagain) {
try {
consumer.subscribe(hash);
- System.out.println("Subscribing to \"" + _hashes.get(hash) + "\"...");
+ System.out.println("Subscribing to \"" + _hashes.get(hash) + "\", " + hash + "...");
tryagain = false;
Thread.sleep(100);
} catch (EAPIError e) {
@@ -187,4 +187,25 @@ public void onStopped(StreamConsumer consumer, String reason) {
System.out.println(reason);
}
+ /**
+ * Called when a warning is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The warning message.
+ */
+ public void onWarning(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Warning: " + message);
+ }
+
+ /**
+ * Called when an error is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The error message.
+ */
+ public void onError(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Error: " + message);
+ }
}
View
22 src/org/datasift/examples/StreamUsage.java
@@ -181,4 +181,26 @@ public void onStopped(StreamConsumer consumer, String reason) {
System.out.println(e.getMessage());
}
}
+
+ /**
+ * Called when a warning is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The warning message.
+ */
+ public void onWarning(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Warning: " + message);
+ }
+
+ /**
+ * Called when an error is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The error message.
+ */
+ public void onError(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Error: " + message);
+ }
}
View
22 src/org/datasift/examples/TwitterTrack.java
@@ -113,4 +113,26 @@ public void onStopped(StreamConsumer consumer, String reason) {
System.out.print("Stopped: ");
System.out.println(reason);
}
+
+ /**
+ * Called when a warning is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The warning message.
+ */
+ public void onWarning(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Warning: " + message);
+ }
+
+ /**
+ * Called when an error is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The error message.
+ */
+ public void onError(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Error: " + message);
+ }
}
View
22 src/org/datasift/examples/docs/LiveStream.java
@@ -83,4 +83,26 @@ public void onStopped(StreamConsumer consumer, String reason) {
System.out.print("Stopped: ");
System.out.println(reason);
}
+
+ /**
+ * Called when a warning is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The warning message.
+ */
+ public void onWarning(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Warning: " + message);
+ }
+
+ /**
+ * Called when an error is received in the data stream.
+ *
+ * @param DataSift_StreamConsumer consumer The consumer object.
+ * @param string message The error message.
+ */
+ public void onError(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ System.out.println("Error: " + message);
+ }
}
View
24 src/org/datasift/streamconsumer/HttpMultiThread.java
@@ -44,10 +44,22 @@ public synchronized void processLine(String line) {
JSONdn data = new JSONdn(line);
Interaction i = new Interaction(data.getJSONObject("data").toString());
- if (i.has("deleted")) {
- _consumer.onMultiDeleted(data.getStringVal("hash"), i);
+
+ if (i.has("status")) {
+ String status = i.getStringVal("status");
+ if (status == "error") {
+ _consumer.onError(i.getStringVal("message"));
+ } else if (status == "warning") {
+ _consumer.onWarning(i.getStringVal("message"));
+ } else {
+ // Should be a tick, ignore it
+ }
} else {
- _consumer.onMultiInteraction(data.getStringVal("hash"), i);
+ if (i.has("deleted")) {
+ _consumer.onMultiDeleted(data.getStringVal("hash"), i);
+ } else {
+ _consumer.onMultiInteraction(data.getStringVal("hash"), i);
+ }
}
} catch (JSONException e) {
// Ignore
@@ -115,9 +127,9 @@ public void run() {
while (getConsumerState() == StreamConsumer.STATE_RUNNING) {
if (_kill_requested) return;
String line = reader.readLine();
- // If the line length is bigger than a tick or an
- // empty line, process it
- if (line.length() > 100) {
+ // If the line is longer than a length indicator,
+ // process it
+ if (line.length() > 10) {
processLine(line);
}
}
View
23 src/org/datasift/streamconsumer/HttpThread.java
@@ -40,10 +40,21 @@ public synchronized int getConsumerState() {
public synchronized void processLine(String line) {
try {
Interaction i = new Interaction(line);
- if (i.has("deleted")) {
- _consumer.onDeleted(i);
+ if (i.has("status")) {
+ String status = i.getStringVal("status");
+ if (status == "error") {
+ _consumer.onError(i.getStringVal("message"));
+ } else if (status == "warning") {
+ _consumer.onWarning(i.getStringVal("message"));
+ } else {
+ // Should be a tick, ignore it
+ }
} else {
- _consumer.onInteraction(i);
+ if (i.has("deleted")) {
+ _consumer.onDeleted(i);
+ } else {
+ _consumer.onInteraction(i);
+ }
}
} catch (JSONException e) {
// Ignore
@@ -107,9 +118,9 @@ public void run() {
// Break out the loop and auto reconnect if enabled
break;
- } else if (line.length() > 100) {
- // If the line length is bigger than a tick or an
- // empty line, process it
+ } else if (line.length() > 10) {
+ // If the line is longer than a length
+ // indicator, process it
processLine(line);
}
}
View
6 src/org/datasift/streamconsumer/WS.java
@@ -62,6 +62,12 @@ public void unsubscribe(String hash) throws EAPIError {
_thread.unsubscribe(hash);
}
+ @Override
+ public void stop() throws EInvalidData {
+ _thread.sendStop();
+ _state = StreamConsumer.STATE_STOPPING;
+ }
+
public boolean isRunning() {
if (_thread == null) {
return false;
View
97 src/org/datasift/streamconsumer/WSThread.java
@@ -43,15 +43,33 @@ public synchronized int getConsumerState() {
return _consumer.getState();
}
+ public synchronized void setStopped() throws EInvalidData {
+ _consumer.setStopped();
+ }
+
public synchronized void processLine(String line) {
try {
// Extract the hash
JSONdn data = new JSONdn(line);
- Interaction i = new Interaction(data.getJSONObject("data").toString());
- if (i.has("deleted")) {
- _consumer.onMultiDeleted(data.getStringVal("hash"), i);
+ if (data.has("status")) {
+ String status = data.getStringVal("status");
+ if (status.equals("error") || status.equals("failure")) {
+ _consumer.onError(data.getStringVal("message"));
+ } else if (status.equals("warning")) {
+ _consumer.onWarning(data.getStringVal("message"));
+ } else {
+ // Dunno what that is, make it an error
+ _consumer.onError("Unhandled content received: " + line);
+ }
+ } else if (data.has("data")) {
+ Interaction i = new Interaction(data.getJSONObject("data").toString());
+ if (i.has("deleted")) {
+ _consumer.onMultiDeleted(data.getStringVal("hash"), i);
+ } else {
+ _consumer.onMultiInteraction(data.getStringVal("hash"), i);
+ }
} else {
- _consumer.onMultiInteraction(data.getStringVal("hash"), i);
+ _consumer.onError("Unhandled content received: " + line);
}
} catch (JSONException e) {
// Ignore
@@ -71,6 +89,13 @@ public synchronized void stopConsumer() {
}
}
+ public synchronized void stopped() {
+ try {
+ _consumer.setStopped();
+ } catch (EInvalidData e) {
+ }
+ }
+
public synchronized void onRestarted() {
_consumer.onRestarted();
}
@@ -85,19 +110,34 @@ public synchronized void onStopped(String reason) {
public synchronized void subscribe(String hash) throws EAPIError {
try {
- _ws.send("{\"action\":\"subscribe\",\"hash\":\"" + hash + "\"}");
+ if (getConsumerState() == StreamConsumer.STATE_RUNNING) {
+ _ws.send("{\"action\":\"subscribe\",\"hash\":\"" + hash + "\"}");
+ }
} catch (WebSocketException e) {
throw new EAPIError(e.getMessage());
}
}
public synchronized void unsubscribe(String hash) throws EAPIError {
try {
- _ws.send("{\"action\":\"unsubscribe\",\"hash\":\"" + hash + "\"}");
+ if (getConsumerState() == StreamConsumer.STATE_RUNNING) {
+ _ws.send("{\"action\":\"unsubscribe\",\"hash\":\"" + hash + "\"}");
+ }
} catch (WebSocketException e) {
throw new EAPIError(e.getMessage());
}
}
+
+ public synchronized void sendStop() throws EInvalidData {
+ try {
+ if (getConsumerState() == StreamConsumer.STATE_RUNNING) {
+ _ws.send("{\"action\":\"stop\"}");
+ }
+ } catch (WebSocketException e) {
+ // Swallow this and just mark us as stopped
+ stopped();
+ }
+ }
public void run() {
if (getConsumerState() == StreamConsumer.STATE_RESTARTING) {
@@ -131,32 +171,63 @@ public void onMessage(WebSocketMessage message)
{
// Message received
String line = message.getText();
- if (line.length() > 100) {
+ if (line.length() > 10) {
processLine(line);
}
}
public void onClose()
{
// Socket closed
- if (getConsumerState() == StreamConsumer.STATE_RUNNING) {
+ switch (getConsumerState()) {
+ case StreamConsumer.STATE_RUNNING:
if (_auto_reconnect) {
restartConsumer();
} else {
- stopConsumer();
+ stopped();
}
+ break;
+ case StreamConsumer.STATE_STOPPING:
+ stopped();
+ break;
}
}
});
// Establish WebSocket Connection
_ws.connect();
+ // Wait for the state to change
while (getConsumerState() == StreamConsumer.STATE_RUNNING) {
- Thread.sleep(5000);
+ Thread.sleep(500);
}
- reason = "Socket disconnected";
+ // If the state is not stopping or stopped, we got disconnected!
+ if (getConsumerState() != StreamConsumer.STATE_STOPPING && getConsumerState() != StreamConsumer.STATE_STOPPED) {
+ // Send the stop message
+ stopConsumer();
+ reason = "Socket disconnected";
+ } else {
+ // The stop was requested
+ reason = "Stop requested";
+ }
+
+ // Wait a maximum of 30 seconds while the stop process happens
+ int stopCounter = 60;
+ while (stopCounter > 0 && getConsumerState() == StreamConsumer.STATE_STOPPING) {
+ Thread.sleep(500);
+ stopCounter--;
+ }
+ if (stopCounter == 0) {
+ // Timed out waiting for the stop ack, tell the user
+ synchronized (this) {
+ try {
+ _consumer.onWarning("Timed out waiting for the server to respond to the stop request, disconnecting!");
+ } catch (EInvalidData e) {
+ // Ignored
+ }
+ }
+ }
} catch (WebSocketException e) {
_auto_reconnect = false;
reason = e.getMessage();
@@ -165,10 +236,10 @@ public void onClose()
} finally {
try {
_ws.close();
- } catch (WebSocketException e) {
+ } catch (Exception e) {
// Deliberately ignored - usually thrown due to the
// connection not being open, which we really don't
- // care about knowing!
+ // care about knowing at this point!
}
}
}
View
8 src/org/datasift/tests/TestDefinition.java
@@ -360,4 +360,12 @@ public void onDeleted(StreamConsumer c, Interaction i)
public void onStopped(StreamConsumer consumer, String reason) {
}
+ public void onWarning(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ }
+
+ public void onError(StreamConsumer consumer, String message)
+ throws EInvalidData {
+ }
+
}

No commit comments for this range

Something went wrong with that request. Please try again.