Skip to content
This repository has been archived by the owner on Apr 5, 2022. It is now read-only.

Added support for receipt of Events and Direct Messages in Twitter Streaming API #67

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ spring-social-core/src/test/java/exploration
**/.project
**/.settings
**/bin
/bin
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.springframework.social.twitter.api;

public class StreamEvent extends TwitterObject {
private final String event;
private final TwitterProfile source;
private final TwitterProfile target;

public StreamEvent(String event, TwitterProfile source,
TwitterProfile target) {
super();
this.event = event;
this.source = source;
this.target = target;
}

public String getEvent() {
return event;
}

public TwitterProfile getSource() {
return source;
}

public TwitterProfile getTarget() {
return target;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,14 @@ public interface StreamListener {
*/
void onWarning(StreamWarningEvent warningEvent);

/**
* Called when a event message is available on the stream
*/
void onEvent(StreamEvent event);

/**
* Called when a direct message is available on the stream
*/
void onDirectMessage(DirectMessage directMessage);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.springframework.social.twitter.api.impl;

import java.io.IOException;

import org.springframework.social.twitter.api.TwitterProfile;

import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

abstract class AbstractTwitterDeserializer<T> extends JsonDeserializer<T> {

protected ObjectMapper createMapper() {
final ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new TwitterModule());
return mapper;
}

protected TwitterProfile toProfile(final JsonNode node) throws IOException {
if (null == node || node.isNull() || node.isMissingNode()) {
return null;
}
final ObjectMapper mapper = this.createMapper();
return mapper.reader(TwitterProfile.class).readValue(node);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.social.twitter.api.DirectMessage;
import org.springframework.social.twitter.api.StreamDeleteEvent;
import org.springframework.social.twitter.api.StreamEvent;
import org.springframework.social.twitter.api.StreamListener;
import org.springframework.social.twitter.api.StreamWarningEvent;
import org.springframework.social.twitter.api.Tweet;
import org.springframework.social.twitter.api.TwitterProfile;

import com.fasterxml.jackson.databind.ObjectMapper;

class StreamDispatcher implements Runnable {

private static final Log logger = LogFactory.getLog(StreamDispatcher.class);

private final List<StreamListener> listeners;

private ObjectMapper objectMapper;
Expand All @@ -50,6 +56,9 @@ public StreamDispatcher(Queue<String> queue, List<StreamListener> listeners) {
objectMapper.addMixInAnnotations(Tweet.class, TweetMixin.class);
objectMapper.addMixInAnnotations(StreamDeleteEvent.class, StreamDeleteEventMixin.class);
objectMapper.addMixInAnnotations(StreamWarningEvent.class, StreamWarningEventMixin.class);
objectMapper.addMixInAnnotations(StreamEvent.class, StreamEventMixin.class);
objectMapper.addMixInAnnotations(DirectMessage.class, DirectMessageMixin.class);
objectMapper.addMixInAnnotations(TwitterProfile.class, TwitterProfileMixin.class);
active = new AtomicBoolean(true);
}

Expand All @@ -60,6 +69,8 @@ public void run() {

// TODO: handle scrub_geo, status_withheld, user_withheld, disconnect, friends, events,

logger.debug(line);

try {
if (line.contains("in_reply_to_status_id_str")) { // TODO: This is kinda hacky
handleTweet(line);
Expand All @@ -69,6 +80,11 @@ public void run() {
handleDelete(line);
} else if (line.startsWith("{\"warning")) {
handleWarning(line);
} else if (line.startsWith("{\"event")) {
handleEvent(line);
}
else if (line.startsWith("{\"direct_message")) {
handleDirectMessage(line);
}
} catch (IOException e) {
// TODO: Should only happen if Jackson doesn't know how to map the line
Expand Down Expand Up @@ -125,4 +141,27 @@ public void run() {
}
}

private void handleEvent(String line) throws IOException {
final StreamEvent event = objectMapper.readValue(line, StreamEvent.class);
for (final StreamListener listener : listeners) {
Future<?> result = pool.submit((new Runnable() {
public void run() {
listener.onEvent(event);
}
}));
}
}

private void handleDirectMessage(String line) throws IOException {
String messagePart = objectMapper.readTree(line).get("direct_message").toString();
final DirectMessage directMessage = objectMapper.readValue(messagePart, DirectMessage.class);
for (final StreamListener listener : listeners) {
Future<?> result = pool.submit((new Runnable() {
public void run() {
listener.onDirectMessage(directMessage);
}
}));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.social.twitter.api.impl;

import java.io.IOException;

import org.springframework.social.twitter.api.StreamEvent;
import org.springframework.social.twitter.api.TwitterProfile;
import org.springframework.social.twitter.api.impl.StreamEventMixin.StreamEventDeserializer;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonDeserialize(using=StreamEventDeserializer.class)
abstract class StreamEventMixin extends TwitterObjectMixin {

static final class StreamEventDeserializer extends AbstractTwitterDeserializer<StreamEvent> {
@Override
public StreamEvent deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
final JsonNode node = jp.readValueAs(JsonNode.class);
if (null == node || node.isMissingNode() || node.isNull()) {
return null;
}

TwitterProfile source = toProfile(node.get("source"));
TwitterProfile target = toProfile(node.get("target"));

return new StreamEvent(
node.get("event").asText(),
source,
target);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

Expand All @@ -42,7 +41,7 @@
* varies between the search API and the timeline API. This deserializer determine which structure is in play and creates a tweet from it.
* @author Craig Walls
*/
class TweetDeserializer extends JsonDeserializer<Tweet> {
class TweetDeserializer extends AbstractTwitterDeserializer<Tweet> {

@Override
public Tweet deserialize(final JsonParser jp, final DeserializationContext ctx) throws IOException {
Expand Down Expand Up @@ -101,12 +100,6 @@ public Tweet deserialize(JsonNode node) throws IOException, JsonProcessingExcept
tweet.setUser(user);
return tweet;
}

private ObjectMapper createMapper() {
final ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new TwitterModule());
return mapper;
}

private Date toDate(String dateString, DateFormat dateFormat) {
if (dateString == null) {
Expand Down Expand Up @@ -142,16 +135,6 @@ private void extractTickerSymbolEntitiesFromText(String text, Entities entities)
}
}


private TwitterProfile toProfile(final JsonNode node) throws IOException {
if (null == node || node.isNull() || node.isMissingNode()) {
return null;
}
final ObjectMapper mapper = this.createMapper();
return mapper.reader(TwitterProfile.class).readValue(node);
}


private static final String TIMELINE_DATE_FORMAT = "EEE MMM dd HH:mm:ss ZZZZZ yyyy";

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
*/
package org.springframework.social.twitter.api.impl;

import static org.junit.Assert.*;
import static org.springframework.http.HttpMethod.*;
import static org.springframework.http.MediaType.*;
import static org.springframework.test.web.client.match.MockRestRequestMatchers.*;
import static org.springframework.test.web.client.response.MockRestResponseCreators.*;
import static org.junit.Assert.assertEquals;
import static org.springframework.http.HttpMethod.POST;
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.test.web.client.match.MockRestRequestMatchers.content;
import static org.springframework.test.web.client.match.MockRestRequestMatchers.method;
import static org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo;
import static org.springframework.test.web.client.response.MockRestResponseCreators.withSuccess;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -28,7 +30,9 @@
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.core.io.ClassPathResource;
import org.springframework.social.twitter.api.DirectMessage;
import org.springframework.social.twitter.api.StreamDeleteEvent;
import org.springframework.social.twitter.api.StreamEvent;
import org.springframework.social.twitter.api.StreamListener;
import org.springframework.social.twitter.api.StreamWarningEvent;
import org.springframework.social.twitter.api.Tweet;
Expand Down Expand Up @@ -57,6 +61,8 @@ protected void shutdown() {
assertEquals(4, listener.tweetsReceived.size());
assertEquals(2, listener.deletesReceived.size());
assertEquals(1, listener.limitsReceived.size());
assertEquals(2, listener.eventsReceived.size());
assertEquals(1, listener.directMessagesReceived.size());
}

@Test
Expand Down Expand Up @@ -88,6 +94,8 @@ private abstract static class MockStreamListener implements StreamListener {
List<StreamDeleteEvent> deletesReceived = new ArrayList<StreamDeleteEvent>();
List<Integer> limitsReceived = new ArrayList<Integer>();
List<StreamWarningEvent> warningsReceived = new ArrayList<StreamWarningEvent>();
List<StreamEvent> eventsReceived = new ArrayList<StreamEvent>();
List<DirectMessage> directMessagesReceived = new ArrayList<DirectMessage>();
private int stopAfter = Integer.MAX_VALUE;

public MockStreamListener(int stopAfter) {
Expand All @@ -113,6 +121,16 @@ public void onWarning(StreamWarningEvent warningEvent) {
warningsReceived.add(warningEvent);
messageReceived();
}

public void onEvent(StreamEvent event) {
eventsReceived.add(event);
messageReceived();
}

public void onDirectMessage(DirectMessage directMessage) {
directMessagesReceived.add(directMessage);
messageReceived();
}

private void messageReceived() {
stopAfter--;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@
{"limit":{"track":369}}
{"warning":{"code":"FALLING_BEHIND","message":"Your connection is falling behind and messages are being queued for delivery to you. Your queue is now over 60% full. You will be disconnected when the queue is full.","percent_full": 70}}
{"in_reply_to_status_id_str":"73103062906056704","retweet_count":0,"text":"@mizzeecustoms not me. gonna be same as clevelands sneaker show last year. come to dallas for kixpo. that will be huge","coordinates":null,"in_reply_to_screen_name":"mizzeecustoms","in_reply_to_user_id_str":"24400628","created_at":"Tue May 24 19:56:49 +0000 2011","contributors":null,"retweeted":false,"truncated":false,"source":"web","id_str":"73115255005384704","entities":{"urls":[],"user_mentions":[{"indices":[0,14],"id_str":"24400628","name":"Miz","screen_name":"mizzeecustoms","id":24400628}],"hashtags":[]},"in_reply_to_user_id":24400628,"in_reply_to_status_id":73103062906056704,"place":null,"user":{"statuses_count":2564,"time_zone":null,"profile_use_background_image":true,"notifications":null,"created_at":"Wed Jul 07 01:51:16 +0000 2010","profile_background_color":"C0DEED","listed_count":6,"profile_background_image_url":"http:\/\/a0.twimg.com\/images\/themes\/theme1\/bg.png","followers_count":480,"description":"Father, shoe collector, business man\n","is_translator":false,"id_str":"163693947","friends_count":213,"profile_text_color":"333333","profile_sidebar_fill_color":"DDEEF6","default_profile_image":false,"lang":"en","profile_background_tile":false,"protected":false,"url":null,"profile_image_url":"http:\/\/a0.twimg.com\/profile_images\/1352421382\/justinamazing1_normal.jpg","show_all_inline_media":false,"geo_enabled":true,"name":"Justin Amazing","default_profile":true,"contributors_enabled":false,"verified":false,"favourites_count":1,"profile_link_color":"0084B4","screen_name":"justinamazing1","id":163693947,"follow_request_sent":null,"following":null,"utc_offset":null,"profile_sidebar_border_color":"C0DEED","location":"Pa"},"id":73115255005384704,"geo":null,"favorited":false}
{"warning":{"code":"FALLING_BEHIND","message":"Your connection is falling behind and messages are being queued for delivery to you. Your queue is now over 60% full. You will be disconnected when the queue is full.","percent_full": 80}}
{"warning":{"code":"FALLING_BEHIND","message":"Your connection is falling behind and messages are being queued for delivery to you. Your queue is now over 60% full. You will be disconnected when the queue is full.","percent_full": 80}}
{"event":"follow","source":{"id":12345},,"target":{"id":12346}}
{"event":"unfollow","source":{"id":12345},,"target":{"id":12346}}
{"direct_message":{ "id":12345,"text":"This is a test", "sender":{"id":12345},"recipient":{"id":12346}}}