Skip to content

Introduction

Jonathon Hill edited this page Apr 8, 2016 · 2 revisions

Introduction

This library is implemented as a single abstract class (Phirehose) that is extended to utilize the Twitter Streaming API.

This design choice was made to allow the utilization of the library with the minimal amount of code to be written, whilst keeping things in a PHP/OO-style way (ie: no callbacks, etc).

The implementation uses IO block timeouts to allow hand-off for enqueuing statuses and periodic updates of filter predicates whilst using the minimum amount of CPU time.

In its most simple form, you need to extended the class and implement an enqueueStatus() method:

require_once('Phirehose.php');
class MyStream extends Phirehose
{
  public function enqueueStatus($status)
  {
    print $status;
  }
}

$stream = new MyStream('username', 'password');
$stream->consume();

See the examples/sample.php file for a working example of this.

Concepts

It is highly recommended you familiarize yourself with the Twitter Streaming API documentation: http://dev.twitter.com/pages/streaming_api as this library aims to be a direct implementation of the recommendations described.

For most users, you'll be interested in the sample and filter methods, as access to any further functionality requires activation by Twitter themselves.

Please note that this library is only intended to work in a CLI environment (not embedded in a web-script and run by your webserver) and will likely require some form of multi-processing (either pcntl_fork() or entirely separate processes and some knowledge of how multi-processing works on your and your hosting providers operating systems). See below for more details.

Collecting and Processing

An important concept to understand is the separation of collection and processing functionality. To quote the Twitter documentation:

To prevent latency problems and plan for scale, design your client with decoupled collection, processing and persistence components. The collection component should efficiently handle connecting to the Streaming API and retrieving responses, as well as reconnecting in the event of network failure, and hand-off statuses via an asynchronous queueing mechanism to application specific processing and persistence components. This component should be isolated from any subsequent downstream processing backlog or maintenance, otherwise queuing will occur in the Streaming API. Eventually your client will be disconnected, resulting in data loss.

For example, collect "raw" statuses (that is, not parsed or marshaled into your language's native object format) in one process, and pass each status into a queueing system, rotated flatfile, or database. In a second process, consume statuses from your queue or store of choice, parse them, extract the fields relevant to your application, etc. Consumers of high-volume streams should consider performing JSON and XML markup parsing in a parallel manner as the status volume is approaching the single processor throughput limit of some software stacks. End-to-end stress test your stack.

It is because of this recommendation that the only (required) method that need be implemented is enqueueStatus(). It is up to you to decide how you want to manage queueing of your raw statuses, but it is important that you just queue them and don't try to do full processing inline as this could cause your client to lag and get disconnected or (eventually) banned.

Phirehose will keep track of how long your client spends in the enqueueStatus() method and will report it every 60 seconds via log(). You need to keep this as low as possible. To reiterate, Phirehose is the collection component which handles:

"connecting to the Streaming API and retrieving responses, as well as reconnecting in the event of network failure, and hand-off statuses via an asynchronous queueing mechanism"

Methods

Implementation Required

enqueueStatus($status)

This method is called once for every status update (ie: tweet) received from the stream. The status is raw, unprocessed data in the format you asked for (recommended JSON). You need to implement this to save statuses as fast as possible. A (not very good) implementation might be:

public function enqueueStatus($status)
{
  self::$tweetCounter ++;
  $filePath = '/tmp/tweets/' . time() . '_' . self::$tweetCounter . '.tweet';
  file_put_contents($filePath, $status);
}

As of Phirehose version 0.2.2 there is an example of a simple "ghetto queue" included in the tarball (see file: ghetto-queue-collect.php and ghetto-queue-consume.php) that shows how statuses could be easily collected on to the filesystem for processing and then picked up by a separate process (consume).

Implementation Recommended

checkFilterPredicates()

This method is only required/desirable if you're using the filter method. It is called every ~5 seconds and is responsible for checking if filter predicates have changed for your application. See: [https://dev.twitter.com/docs/streaming-apis/parameters#track]

Phirehose does most of the hard work for you in terms of ensuring that predicate updates happen as soon as possible (but not too often). All you have to do is make calls to setTrack() and setFollow() appropriately, and Phirehose will handle the reconnection/etc. For example, it may look something like (partial example):

  public function checkFilterPredicates()
  {
    $trackIds = AppMemcache::get('TwitterTrackIds');
    if ($trackIds != $this->getTrack()) {
      $this->setTrack($trackIds);
    }
  }

(Note that AppMemcache::get() is not a native method and is just being used for illustrative purposes). See the examples/filter-reconfigure.php file for a working example of this.

log($message)

You should implement this method to handle logging in a way that's suitable for your application. The default method simply passes the message to PHP's error_log() function.

User Streams & OAuth

Some users have developed patches/extensions to Phirehose to allow the use of OAuth and UserStreams.