Skip to content

Commit

Permalink
Twitter River: Support filter stream, closes elastic#416.
Browse files Browse the repository at this point in the history
  • Loading branch information
kimchy committed Oct 8, 2010
1 parent ee2fabb commit 8b03b91
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 5 deletions.
1 change: 1 addition & 0 deletions .idea/dictionaries/kimchy.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .idea/modules/plugin-river-twitter.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions plugins/river/twitter/build.gradle
Expand Up @@ -32,8 +32,8 @@ configurations {
dependencies {
compile project(':elasticsearch')

compile('org.twitter4j:twitter4j-core:2.1.4') { transitive = false }
distLib('org.twitter4j:twitter4j-core:2.1.4') { transitive = false }
compile('org.twitter4j:twitter4j-core:2.1.6') { transitive = false }
distLib('org.twitter4j:twitter4j-core:2.1.6') { transitive = false }


testCompile project(':test-testng')
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand All @@ -38,6 +39,7 @@
import twitter4j.*;

import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -56,6 +58,10 @@ public class TwitterRiver extends AbstractRiverComponent implements River {

private final int dropThreshold;

private FilterQuery filterQuery;

private String streamType;


private final TwitterStream stream;

Expand All @@ -73,14 +79,92 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
Map<String, Object> twitterSettings = (Map<String, Object>) settings.settings().get("twitter");
user = XContentMapValues.nodeStringValue(twitterSettings.get("user"), null);
password = XContentMapValues.nodeStringValue(twitterSettings.get("password"), null);
streamType = XContentMapValues.nodeStringValue(twitterSettings.get("type"), "sample");
Map<String, Object> filterSettings = (Map<String, Object>) twitterSettings.get("filter");
if (filterSettings != null) {
filterQuery = new FilterQuery();
filterQuery.count(XContentMapValues.nodeIntegerValue(filterSettings.get("count"), 0));
Object tracks = filterSettings.get("tracks");
if (tracks != null) {
if (tracks instanceof List) {
List<String> lTracks = (List<String>) tracks;
filterQuery.track(lTracks.toArray(new String[lTracks.size()]));
} else {
filterQuery.track(Strings.commaDelimitedListToStringArray(tracks.toString()));
}
}
Object follow = filterSettings.get("follow");
if (follow != null) {
if (follow instanceof List) {
List lFollow = (List) follow;
int[] followIds = new int[lFollow.size()];
for (int i = 0; i < lFollow.size(); i++) {
Object o = lFollow.get(i);
if (o instanceof Number) {
followIds[i] = ((Number) o).intValue();
} else {
followIds[i] = Integer.parseInt(o.toString());
}
}
filterQuery.follow(followIds);
} else {
String[] ids = Strings.commaDelimitedListToStringArray(follow.toString());
int[] followIds = new int[ids.length];
for (int i = 0; i < ids.length; i++) {
followIds[i] = Integer.parseInt(ids[i]);
}
}
}
Object locations = filterSettings.get("locations");
if (locations != null) {
if (locations instanceof List) {
List lLocations = (List) locations;
double[][] dLocations = new double[lLocations.size()][];
for (int i = 0; i < lLocations.size(); i++) {
Object loc = lLocations.get(i);
double lat;
double lon;
if (loc instanceof List) {
List lLoc = (List) loc;
if (lLoc.get(0) instanceof Number) {
lat = ((Number) lLoc.get(0)).doubleValue();
} else {
lat = Double.parseDouble(lLoc.get(0).toString());
}
if (lLoc.get(1) instanceof Number) {
lon = ((Number) lLoc.get(1)).doubleValue();
} else {
lon = Double.parseDouble(lLoc.get(1).toString());
}
} else {
String[] sLoc = Strings.commaDelimitedListToStringArray(loc.toString());
lat = Double.parseDouble(sLoc[0]);
lon = Double.parseDouble(sLoc[1]);
}
dLocations[i] = new double[]{lat, lon};
}
filterQuery.locations(dLocations);
} else {
String[] sLocations = Strings.commaDelimitedListToStringArray(locations.toString());
double[][] dLocations = new double[sLocations.length / 2][];
int dCounter = 0;
for (int i = 0; i < sLocations.length; i++) {
double lat = Double.parseDouble(sLocations[i]);
double lon = Double.parseDouble(sLocations[++i]);
dLocations[dCounter++] = new double[]{lat, lon};
}
filterQuery.locations(dLocations);
}
}
}
}

logger.info("creating twitter stream river for [{}]", user);

if (user == null || password == null) {
stream = null;
indexName = null;
typeName = null;
typeName = "status";
bulkSize = 100;
dropThreshold = 10;
logger.warn("no user / password specified, disabling river...");
Expand All @@ -104,6 +188,9 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
}

@Override public void start() {
if (stream == null) {
return;
}
logger.info("starting twitter stream");
try {
String mapping = XContentFactory.jsonBuilder().startObject().startObject(typeName)
Expand All @@ -122,7 +209,17 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
}
}
currentRequest = client.prepareBulk();
stream.sample();
if (streamType.equals("filter") || filterQuery != null) {
try {
stream.filter(filterQuery);
} catch (TwitterException e) {
logger.warn("failed to create filter stream based on query, disabling river....");
}
} else if (streamType.equals("firehose")) {
stream.firehose(0);
} else {
stream.sample();
}
}

@Override public void close() {
Expand Down

0 comments on commit 8b03b91

Please sign in to comment.