Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Improved error handling.

Bumped version to 1.3.0.
  • Loading branch information...
commit 04ac739a64f307d25fdf2d2bd3480970e03d773a 1 parent c558670
@3ft9 3ft9 authored
View
8 README.md
@@ -50,6 +50,14 @@ more details.
Changelog
---------
+* v.1.3.0 Improved error handling (2012-03-08)
+
+ Added onError and onWarning events - see examples/consumer-stream.php for an
+ example.
+
+ Stopped the HTTP consumer from attempting to reconnect when it receives a
+ 4xx response from the server.
+
* v.1.2.0 Support for multiple streams (2012-02-29)
The User object now has a getMultiConsumer method which allows you to
View
24 examples/consume-stream.php
@@ -29,7 +29,7 @@
// Create the consumer
echo "Getting the consumer...\n";
-$consumer = $user->getMultiConsumer(DataSift_StreamConsumer::TYPE_HTTP, $_SERVER['argv'], 'display', 'stopped', 'processDeleteReq');
+$consumer = $user->getMultiConsumer(DataSift_StreamConsumer::TYPE_HTTP, $_SERVER['argv'], 'display', 'stopped', 'processDeleteReq', 'handleError', 'handleWarning');
// And start consuming
echo "Consuming...\n--\n";
@@ -73,3 +73,25 @@ function stopped($consumer, $reason)
{
echo "\nStopped: $reason\n\n";
}
+
+/**
+ * Called when an error message is received.
+ *
+ * @param DataSift_StreamConsumer $consumer The consumer object.
+ * @param string $message The message.
+ */
+function handleError($consumer, $message)
+{
+ echo "ERROR: $message\n--\n";
+}
+
+/**
+ * Called when a warning message is received.
+ *
+ * @param DataSift_StreamConsumer $consumer The consumer object.
+ * @param string $message The message.
+ */
+function handleWarning($consumer, $message)
+{
+ echo "WARNING: $message\n--\n";
+}
View
48 lib/DataSift/StreamConsumer.php
@@ -79,6 +79,16 @@
protected $_onDeleted = false;
/**
+ * @var mixed A function name or array(class/object, method)
+ */
+ protected $_onError = false;
+
+ /**
+ * @var mixed A function name or array(class/object, method)
+ */
+ protected $_onWarning = false;
+
+ /**
* Factory function. Creates a StreamConsumer-derived object for the given
* type.
*
@@ -91,14 +101,14 @@
* @return DataSift_StreamConsumer The consumer object
* @throws DataSift_Exception_InvalidData
*/
- public static function factory($user, $type, $definition, $onInteraction = false, $onStopped = false, $onDeleted = false)
+ public static function factory($user, $type, $definition, $onInteraction = false, $onStopped = false, $onDeleted = false, $onError = false, $onWarning = false)
{
$classname = 'DataSift_StreamConsumer_'.$type;
if (!class_exists($classname)) {
throw new DataSift_Exception_InvalidData('Consumer type "'.$type.'" is unknown');
}
- return new $classname($user, $definition, $onInteraction, $onStopped, $onDeleted);
+ return new $classname($user, $definition, $onInteraction, $onStopped, $onDeleted, $onError, $onWarning);
}
/**
@@ -114,7 +124,7 @@ public static function factory($user, $type, $definition, $onInteraction = false
* @throws DataSiftExceotion_CompileFailed
* @throws DataSift_Exception_APIError
*/
- protected function __construct($user, $definition, $onInteraction = false, $onStopped = false, $onDeleted = false)
+ protected function __construct($user, $definition, $onInteraction = false, $onStopped = false, $onDeleted = false, $onError = false, $onWarning = false)
{
if (!($user instanceof DataSift_User)) {
throw new DataSift_Exception_InvalidData('Please supply a valid DataSift_User object when creating a DataSift_StreamConsumer object.');
@@ -148,6 +158,8 @@ protected function __construct($user, $definition, $onInteraction = false, $onSt
$this->_onInteraction = $onInteraction;
$this->_onStopped = $onStopped;
$this->_onDeleted = $onDeleted;
+ $this->_onError = $onError;
+ $this->_onWarning = $onWarning;
// Ask for the definition hash - this will compile the definition if
// necessary
@@ -189,6 +201,36 @@ protected function onDeleted($interaction, $hash = false)
}
/**
+ * This is called when an error notification is received on a stream
+ * connection.
+ *
+ * @param string $message The error message
+ *
+ * @return void
+ */
+ protected function onError($message)
+ {
+ if ($this->_onError !== false) {
+ call_user_func($this->_onError, $this, $message);
+ }
+ }
+
+ /**
+ * This is called when a warning notification is received on a scream
+ * connection.
+ *
+ * @param string $message The warning message
+ *
+ * @return void
+ */
+ protected function onWarning($message)
+ {
+ if ($this->_onWarning !== false) {
+ call_user_func($this->_onWarning, $this, $message);
+ }
+ }
+
+ /**
* This is called when the consumer is stopped.
*
* @param string $reason Reason to stop the stream
View
56 lib/DataSift/StreamConsumer/HTTP.php
@@ -65,9 +65,9 @@ class DataSift_StreamConsumer_HTTP extends DataSift_StreamConsumer
* @throws DataSift_Exception_APIError
* @see DataSift_StreamConsumer::__construct
*/
- public function __construct($user, $definition, $onInteraction = false, $onStopped = false, $onDeleted = false)
+ public function __construct($user, $definition, $onInteraction = false, $onStopped = false, $onDeleted = false, $onError = false, $onWarning = false)
{
- parent::__construct($user, $definition, $onInteraction, $onStopped, $onDeleted);
+ parent::__construct($user, $definition, $onInteraction, $onStopped, $onDeleted, $onError, $onWarning);
}
/**
@@ -137,17 +137,32 @@ protected function onStart()
// If the interaction is valid, pass it to the event handler
if ($interaction) {
- // Extract the hash and the data if present
- $hash = false;
- if (isset($interaction['hash'])) {
- $hash = $interaction['hash'];
- $interaction = $interaction['data'];
- }
- // Ignore ticks and handle delete requests
- if (!empty($interaction['deleted'])) {
- $this->onDeleted($interaction, $hash);
- } else if (!empty($interaction['interaction'])) {
- $this->onInteraction($interaction, $hash);
+ if (isset($interaction['status'])) {
+ switch ($interaction['status']) {
+ case 'error':
+ case 'failure':
+ $this->onError($interaction['message']);
+ break;
+ case 'warning':
+ $this->onWarning($interaction['message']);
+ break;
+ default:
+ // Ticks
+ break;
+ }
+ } else {
+ // Extract the hash and the data if present
+ $hash = false;
+ if (isset($interaction['hash'])) {
+ $hash = $interaction['hash'];
+ $interaction = $interaction['data'];
+ }
+ // Ignore ticks and handle delete requests
+ if (!empty($interaction['deleted'])) {
+ $this->onDeleted($interaction, $hash);
+ } else if (!empty($interaction['interaction'])) {
+ $this->onInteraction($interaction, $hash);
+ }
}
}
}
@@ -263,9 +278,18 @@ private function connect()
if ($code == '200') {
// Success!
$this->_state = parent::STATE_RUNNING;
- } elseif ($code == '404') {
- // The hash doesn't exist
- throw new DataSift_Exception_StreamError('Hash not found!');
+ } elseif ($code >= '400' && $code < 500 && $code != 420) {
+ // Connection refused, find out why
+ $line = '';
+ while (strlen($line) < 10) {
+ $line = trim(fgets($this->_conn, $this->_max_line_length));
+ }
+ $data = json_decode($line, true);
+ if (isset($data['message'])) {
+ throw new DataSift_Exception_StreamError($data['message']);
+ } else {
+ throw new DataSift_Exception_StreamError('Connection refused: '.$code.' '.$message);
+ }
} else {
// Connection failed, back off a bit and try again
// Timings from http://dev.datasift.com/docs/streaming-api
View
2  lib/DataSift/User.php
@@ -28,7 +28,7 @@
*/
class DataSift_User
{
- const USER_AGENT = 'DataSiftPHP/1.2.0';
+ const USER_AGENT = 'DataSiftPHP/1.3.0';
const API_BASE_URL = 'api.datasift.com/';
const STREAM_BASE_URL = 'stream.datasift.com/';
Please sign in to comment.
Something went wrong with that request. Please try again.