From 2ac67b5f0de75f6b4ba4a707d3077fcfc800fce2 Mon Sep 17 00:00:00 2001 From: elbehery Date: Fri, 27 Feb 2015 17:34:27 +0100 Subject: [PATCH] [FLINK-1615] [java api] SimpleTweetInputFormat --- flink-contrib/pom.xml | 15 + .../io/SimpleTweetInputFormat.java | 92 +++ .../tweetinputformat/io/TweetHandler.java | 681 ++++++++++++++++++ .../tweetinputformat/model/User/Users.java | 479 ++++++++++++ .../model/places/Attributes.java | 114 +++ .../model/places/BoundingBox.java | 61 ++ .../tweetinputformat/model/places/Places.java | 131 ++++ .../model/tweet/Contributors.java | 79 ++ .../model/tweet/Coordinates.java | 57 ++ .../model/tweet/CurrentUserRetweet.java | 56 ++ .../tweetinputformat/model/tweet/Tweet.java | 346 +++++++++ .../model/tweet/entities/Entities.java | 91 +++ .../model/tweet/entities/HashTags.java | 58 ++ .../model/tweet/entities/Media.java | 144 ++++ .../model/tweet/entities/Size.java | 65 ++ .../model/tweet/entities/Symbol.java | 53 ++ .../model/tweet/entities/URL.java | 71 ++ .../model/tweet/entities/UserMention.java | 81 +++ .../main/resources/HashTagTweetSample.json | 4 + flink-contrib/src/main/resources/NOTICE | 15 + .../contrib/SimpleTweetInputFormatTest.java | 100 +++ pom.xml | 4 +- 22 files changed, 2796 insertions(+), 1 deletion(-) create mode 100644 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java create mode 100644 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/TweetHandler.java create mode 100755 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/User/Users.java create mode 100755 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Attributes.java create mode 100755 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/BoundingBox.java create mode 100755 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Places.java create mode 100755 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Contributors.java create mode 100755 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Coordinates.java create mode 100755 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/CurrentUserRetweet.java create mode 100755 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java create mode 100755 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java create mode 100755 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java create mode 100755 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java create mode 100755 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Size.java create mode 100755 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Symbol.java create mode 100755 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/URL.java create mode 100755 flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/UserMention.java create mode 100644 flink-contrib/src/main/resources/HashTagTweetSample.json create mode 100644 flink-contrib/src/main/resources/NOTICE create mode 100644 flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java diff --git a/flink-contrib/pom.xml b/flink-contrib/pom.xml index f0bfa012119ed..1522516f616a9 100644 --- a/flink-contrib/pom.xml +++ b/flink-contrib/pom.xml @@ -36,6 +36,21 @@ under the License. jar + + org.apache.flink + flink-java + ${project.version} + + + com.googlecode.json-simple + json-simple + 1.1.1 + + + org.apache.flink + flink-clients + ${project.version} + diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java new file mode 100644 index 0000000000000..950b398741b55 --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.io; + +import org.apache.flink.api.common.io.DelimitedInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet; +import org.apache.flink.core.fs.FileInputSplit; +import org.codehaus.jackson.JsonParseException; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; + + +public class SimpleTweetInputFormat extends DelimitedInputFormat implements ResultTypeQueryable { + + private static final Logger LOG = LoggerFactory.getLogger(SimpleTweetInputFormat.class); + + private transient JSONParser parser; + private transient TweetHandler handler; + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + this.handler = new TweetHandler(); + this.parser = new JSONParser(); + } + + @Override + public Tweet nextRecord(Tweet record) throws IOException { + Boolean result = false; + + do { + try { + record.reset(0); + record = super.nextRecord(record); + result = true; + + } catch (JsonParseException e) { + result = false; + + } + } while (!result); + + return record; + } + + @Override + public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException { + + InputStreamReader jsonReader = new InputStreamReader(new ByteArrayInputStream(bytes)); + jsonReader.skip(offset); + + try { + + handler.reuse = reuse; + parser.parse(jsonReader, handler, false); + } catch (ParseException e) { + + LOG.debug(" Tweet Parsing Exception : "+e.getMessage() ); + } + + return reuse; + } + + @Override + public TypeInformation getProducedType() { + return new GenericTypeInfo(Tweet.class); + } +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/TweetHandler.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/TweetHandler.java new file mode 100644 index 0000000000000..ab792907e143e --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/TweetHandler.java @@ -0,0 +1,681 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.io; + +import org.apache.flink.contrib.tweetinputformat.model.tweet.Contributors; +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet; +import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags; +import org.json.simple.parser.ContentHandler; +import org.json.simple.parser.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; + + +public class TweetHandler implements ContentHandler { + + private static final Logger logger = LoggerFactory.getLogger(TweetHandler.class); + + protected Tweet reuse; + + private int nesting = 0; + + private ObjectState objectState = ObjectState.TWEET; + + private EntryState entryState = EntryState.UNEXPECTED; + + private boolean sameHashTag = false; + + // to handle the coordinates special case of nesting primitive type + private int coordinatesCounter = 0; + + private double coordinatesTemp = 0.0d; + + + @Override + public void startJSON() throws ParseException, IOException { + sameHashTag = true; + + } + + @Override + public void endJSON() throws ParseException, IOException { + + + } + + @Override + public boolean startObject() throws ParseException, IOException { + + nesting++; + return true; + } + + @Override + public boolean endObject() throws ParseException, IOException { + + nesting--; + + if (this.nesting == 1) { + this.objectState = ObjectState.TWEET; + } + + // The handler in JSONParser checks for the "!contentHandler.endObject()", so we should + // return false if its not the end of the object. + return nesting > 0; + } + + @Override + public boolean startObjectEntry(String key) throws ParseException, IOException { + + if ((key.equals("contributors") || key.equals("user") || key.equals("geo") || key.equals("place") || key.equals("attributes") || key.equals("bounding_box"))) + objectState = ObjectState.valueOf(key.toUpperCase()); + else if (key.equals("hashtags") && nesting == 2) + objectState = ObjectState.valueOf(key.toUpperCase()); + else if (key.equals("coordinates") && (this.nesting == 1)) + objectState = ObjectState.valueOf(key.toUpperCase()); + else + try { + entryState = EntryState.valueOf(key.toUpperCase()); + } catch (IllegalArgumentException e) { + + logger.debug(e.getMessage()); + + } + + return true; + } + + @Override + public boolean endObjectEntry() throws ParseException, IOException { + + if (objectState == ObjectState.CONTRIBUTORS && nesting == 1) + objectState = ObjectState.TWEET; + + return true; + } + + @Override + public boolean startArray() throws ParseException, IOException { + + return true; + } + + @Override + public boolean endArray() throws ParseException, IOException { + + if (objectState == ObjectState.COORDINATES) { + coordinatesCounter = 0; + coordinatesTemp = 0.0d; + } + + + // Some tweets have HashTags twice, this condition to read only one of them + if (objectState == ObjectState.HASHTAGS && entryState == EntryState.INDICES && nesting == 2) { + sameHashTag = false; + } + return true; + } + + @Override + public boolean primitive(Object value) throws ParseException, IOException { + + try { + + if (objectState == ObjectState.TWEET) { + tweetObjectStatePrimitiveHandler(value); + } else if (objectState == ObjectState.USER) { + userObjectStatePrimitiveHandler(value); + } else if (objectState == ObjectState.GEO) { + + return true; + + } else if (objectState == ObjectState.COORDINATES) { + + coordinatesObjectStatePrimitiveHandler(value); + } else if (objectState == ObjectState.PLACE) { + + placeObjectStatePrimitiveHandler(value); + } else if (objectState == ObjectState.GEO) { + + return true; + + } else if (objectState == ObjectState.ATTRIBUTES) { + placeAttributesObjectStatePrimitiveHandler(value); + } else if (objectState == ObjectState.CONTRIBUTORS) { + contributorsObjectStatePrimitiveHandler(value); + } else if (objectState == ObjectState.HASHTAGS && entryState == EntryState.TEXT && sameHashTag) { + hashTagsObjectStatePrimitiveHandler(value); + } + } catch (Exception e) { + logger.debug("Error in primitive type: " + e.getMessage()); + } + + + return true; + } + + public void tweetObjectStatePrimitiveHandler(Object value) { + + switch (entryState) { + case CREATED_AT: + if (value != null) + reuse.setCreated_at((String) value); + break; + case TEXT: + if (value != null) + reuse.setText((String) value); + break; + case ID: + if (value != null) + reuse.setId((Long) value); + break; + case ID_STR: + if (value != null) + reuse.setId_str((String) value); + break; + case SOURCE: + if (value != null) + reuse.setSource((String) value); + break; + case TRUNCATED: + if (value != null) + reuse.setTruncated((Boolean)value); + break; + case IN_REPLY_TO_STATUS_ID: + if (value != null) + reuse.setIn_reply_to_status_id((Long) value); + break; + case IN_REPLY_TO_STATUS_ID_STR: + if (value != null) + reuse.setIn_reply_to_status_id_str((String) value); + break; + case IN_REPLY_TO_USER_ID: + if (value != null) + reuse.setIn_reply_to_user_id((Long) value); + break; + case IN_REPLY_TO_USER_ID_STR: + if (value != null) + reuse.setIn_reply_to_user_id_str((String) value); + break; + case IN_REPLY_TO_SCREEN_NAME: + if (value != null) + reuse.setIn_reply_to_screen_name((String) value); + break; + case RETWEET_COUNT: + if (value != null) + reuse.setRetweet_count((Long) value); + break; + case FAVORITE_COUNT: + if (value != null) + reuse.setFavorite_count((Long) value); + break; + case FAVORITED: + if (value != null) + reuse.setFavorited((Boolean) value); + break; + case RETWEETED: + if (value != null) + reuse.setRetweeted((Boolean) value); + break; + case POSSIBLY_SENSITIVE: + if (value != null) + reuse.setPossibly_sensitive((Boolean) value); + break; + case FILTER_LEVEL: + if (value != null) + reuse.setFilter_level((String) value); + break; + case LANG: + if (value != null) + reuse.setLang((String) value); + break; + } + } + + public void userObjectStatePrimitiveHandler(Object value) { + + switch (entryState) { + case ID: + if (value != null) { + // handle format exception caused by wrong values in the "id" field in the + // tweets. + if (value instanceof String) { + try { + reuse.getUser().setId(Long.parseLong((String) value)); + } catch (NumberFormatException e) { + reuse.getUser().setId(0L); + logger.debug("This Tweet_ID is not a numeric type : " + (String) value); + } + } else + reuse.getUser().setId((Long) value); + } + break; + case ID_STR: + if (value != null) + reuse.getUser().setId_str((String) value); + break; + case NAME: + if (value != null) + reuse.getUser().setName((String) value); + break; + case SCREEN_NAME: + if (value != null) + reuse.getUser().setScreen_name((String) value); + break; + case LOCATION: + if (value != null) + reuse.getUser().setLocation((String) value); + break; + case URL: + if (value != null) + reuse.getUser().setUrl((String) value); + break; + case DESCRIPTION: + if (value != null) + reuse.getUser().setDescription((String) value); + break; + case PROTECTED: + if (value != null) + reuse.getUser().setProtected_tweet((Boolean) value); + break; + case VERIFIED: + if (value != null) + reuse.getUser().setVerified((Boolean) value); + break; + case FOLLOWERS_COUNT: + if (value != null) + reuse.getUser().setFollowers_count((Long) value); + break; + case FRIENDS_COUNT: + if (value != null) + reuse.getUser().setFriends_count((Long) value); + break; + case LISTED_COUNT: + if (value != null) + reuse.getUser().setListed_count((Long) value); + break; + case FAVOURITES_COUNT: + if (value != null) + reuse.getUser().setFavourites_count((Long) value); + break; + case STATUSES_COUNT: + if (value != null) + reuse.getUser().setStatuses_count((Long) value); + break; + case CREATED_AT: + if (value != null) + reuse.getUser().setCreated_at((String) value); + break; + case UTC_OFFSET: + if (value != null) + reuse.getUser().setUtc_offset((Long) value); + break; + case TIME_ZONE: + if (value != null) + reuse.getUser().setTime_zone((String) value); + break; + case GEO_ENABLED: + if (value != null) + reuse.getUser().setGeo_enabled((Boolean) value); + break; + case LANG: + if (value != null) + reuse.getUser().setLang((String) value); + break; + case CONTRIBUTORS_ENABLED: + if (value != null) + reuse.getUser().setContributors_enabled((Boolean) value); + break; + case IS_TRANSLATOR: + if (value != null) + reuse.getUser().setIs_translator((Boolean) value); + break; + case PROFILE_BACKGROUND_COLOR: + if (value != null) + reuse.getUser().setProfile_background_color((String) value); + break; + case PROFILE_BACKGROUND_IMAGE_URL: + if (value != null) + reuse.getUser().setProfile_background_image_url((String) value); + break; + case PROFILE_BACKGROUND_IMAGE_URL_HTTPS: + if (value != null) + reuse.getUser().setProfile_background_image_url_https((String) value); + break; + case PROFILE_BACKGROUND_TILE: + if (value != null) + reuse.getUser().setProfile_background_tile((Boolean) value); + break; + case PROFILE_LINK_COLOR: + if (value != null) + reuse.getUser().setProfile_link_color((String) value); + break; + case PROFILE_SIDEBAR_BORDER_COLOR: + if (value != null) + reuse.getUser().setProfile_sidebar_border_color((String) value); + break; + case PROFILE_SIDEBAR_FILL_COLOR: + if (value != null) + reuse.getUser().setProfile_sidebar_fill_color((String) value); + break; + case PROFILE_TEXT_COLOR: + if (value != null) + reuse.getUser().setProfile_text_color((String) value); + break; + case PROFILE_USE_BACKGROUND_IMAGE: + if (value != null) + reuse.getUser().setProfile_use_background_image((Boolean) value); + break; + case PROFILE_IMAGE_URL: + if (value != null) + reuse.getUser().setProfile_image_url((String) value); + break; + case PROFILE_IMAGE_URL_HTTPS: + if (value != null) + reuse.getUser().setProfile_image_url_https((String) value); + break; + case PROFILE_BANNER_URL: + if (value != null) + reuse.getUser().setProfile_banner_url((String) value); + break; + case DEFAULT_PROFILE: + if (value != null) + reuse.getUser().setDefault_profile((Boolean) value); + break; + case DEFAULT_PROFILE_IMAGE: + if (value != null) + reuse.getUser().setDefault_profile_image((Boolean) value); + break; + case FOLLOWING: + if (value != null) + reuse.getUser().setFollowing((Boolean) value); + break; + case FOLLOW_REQUEST_SENT: + if (value != null) + reuse.getUser().setFollow_request_sent((Boolean) value); + break; + case NOTIFICATIONS: + if (value != null) + reuse.getUser().setNotifications((Boolean) value); + break; + } + } + + public void coordinatesObjectStatePrimitiveHandler(Object value) { + + switch (entryState) { + case COORDINATES: + if (value != null && this.coordinatesCounter == 0) { + coordinatesTemp = (Double) value; + this.coordinatesCounter++; + } else if (value != null && this.coordinatesCounter == 1) { + reuse.getCoordinates().setCoordinates(coordinatesTemp, (Double) value); + } else + reuse.getCoordinates().setCoordinates(0.0d, 0.0d); + break; + } + + } + + public void placeObjectStatePrimitiveHandler(Object value) { + + switch (entryState) { + case ID: + if (value != null) { + reuse.getPlace().setId((String) value); + } + break; + case URL: + if (value != null) + reuse.getPlace().setUrl((String) value); + break; + case PLACE_TYPE: + if (value != null) + reuse.getPlace().setPlace_type((String) value); + break; + case NAME: + if (value != null) + reuse.getPlace().setName((String) value); + break; + case FULL_NAME: + if (value != null) + reuse.getPlace().setFull_name((String) value); + break; + case COUNTRY_CODE: + if (value != null) + reuse.getPlace().setCountry_code((String) value); + break; + case COUNTRY: + if (value != null) + reuse.getPlace().setCountry((String) value); + break; + + // Skipped BoundingBox -- Not Required + + + } + } + + public void placeAttributesObjectStatePrimitiveHandler(Object value) { + + switch (entryState) { + case STREET_ADDRESS: + if (value != null) { + reuse.getPlace().getAttributes().setStreet_address((String) value); + } + break; + case LOCALITY: + if (value != null) { + reuse.getPlace().getAttributes().setLocality((String) value); + } + break; + case REGION: + if (value != null) { + reuse.getPlace().getAttributes().setRegion((String) value); + } + break; + case ISO3: + if (value != null) { + reuse.getPlace().getAttributes().setIso3((String) value); + } + break; + case POSTAL_CODE: + if (value != null) { + reuse.getPlace().getAttributes().setPostal_code((String) value); + } + break; + case PHONE: + if (value != null) { + reuse.getPlace().getAttributes().setPhone((String) value); + } + break; + case URL: + if (value != null) { + reuse.getPlace().getAttributes().setUrl((String) value); + } + break; + case APP_ID: + if (value != null) { + reuse.getPlace().getAttributes().setAppId((String) value); + } + break; + // Skipped BoundingBox -- Not Required + + } + + + } + + public void contributorsObjectStatePrimitiveHandler(Object value) { + + // to handle the case of the null as contributors is an array in the Twitter documentation + // && if it is not null we initialize the object and fill it with the data, + if (value == null) { + reuse.getContributors().add(new Contributors()); + } else { + + Contributors contributor = new Contributors(); + + switch (entryState) { + case ID: + if (value != null) { + contributor.setId((Long) value); + } + break; + case ID_STR: + if (value != null) { + contributor.setId_str((String) value); + } + break; + case TWEET_CONTRIBUTORS_SCREEN_NAME: + if (value != null) { + contributor.setScreenName((String) value); + } + break; + } + reuse.getContributors().add(contributor); + + } + + + } + + public void hashTagsObjectStatePrimitiveHandler(Object value) { + + HashTags hashTag = new HashTags(); + + if (value == null) { + return; + } else if (entryState == EntryState.TEXT && value != null) { + hashTag.setText((String) value, false); + reuse.getEntities().getHashtags().add(hashTag); + } + } + + private static enum ObjectState { + TWEET, + CONTRIBUTORS, + USER, + GEO, + COORDINATES, + PLACE, + ATTRIBUTES, + BOUNDING_BOX, + HASHTAGS; + + } + + private static enum EntryState { + TEXT, + CREATED_AT, + ID, + ID_STR, + SOURCE, + TRUNCATED, + IN_REPLY_TO_STATUS_ID, + IN_REPLY_TO_STATUS_ID_STR, + IN_REPLY_TO_USER_ID, + IN_REPLY_TO_USER_ID_STR, + IN_REPLY_TO_SCREEN_NAME, + RETWEET_COUNT, + FAVORITE_COUNT, + FAVORITED, + RETWEETED, + POSSIBLY_SENSITIVE, + FILTER_LEVEL, + TWEET_CONTRIBUTORS_SCREEN_NAME, + SCREEN_NAME, + LOCATION, + DESCRIPTION, + PROTECTED, + VERIFIED, + FOLLOWERS_COUNT, + FRIENDS_COUNT, + LISTED_COUNT, + FAVOURITES_COUNT, + STATUSES_COUNT, + UTC_OFFSET, + TIME_ZONE, + GEO_ENABLED, + LANG, + CONTRIBUTORS_ENABLED, + IS_TRANSLATOR, + PROFILE_BACKGROUND_COLOR, + PROFILE_BACKGROUND_IMAGE_URL, + PROFILE_BACKGROUND_IMAGE_URL_HTTPS, + PROFILE_BACKGROUND_TILE, + PROFILE_LINK_COLOR, + PROFILE_SIDEBAR_BORDER_COLOR, + PROFILE_SIDEBAR_FILL_COLOR, + PROFILE_TEXT_COLOR, + PROFILE_USE_BACKGROUND_IMAGE, + PROFILE_IMAGE_URL, + PROFILE_IMAGE_URL_HTTPS, + PROFILE_BANNER_URL, + DEFAULT_PROFILE, + DEFAULT_PROFILE_IMAGE, + FOLLOWING, + FOLLOW_REQUEST_SENT, + NOTIFICATIONS, + TYPE, + COORDINATES, + PLACE_TYPE, + NAME, + FULL_NAME, + COUNTRY_CODE, + COUNTRY, + BOUNDING_BOX, + ATTRIBUTES, + STREET_ADDRESS, + LOCALITY, + REGION, + ISO3, + POSTAL_CODE, + PHONE, + URL, + ENTITIES, + HASHTAGS, + TRENDS, + URLS, + USER_MENTIONS, + SYMBOLS, + MEDIA, + INDICES, + MEDIA_URL, + MEDIA_URL_HTTPS, + DISPLAY_URL, + EXPANDED_URL, + SIZES, + LARGE, + W, + H, + RESIZE, + SMALL, + THUMB, + MEDIUM, + RETWEETED_STATUS, + SOURCE_STATUS_ID, + SOURCE_STATUS_ID_STR, + SCOPES, + FOLLOWERS, + APP_ID, + UNEXPECTED; + } +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/User/Users.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/User/Users.java new file mode 100755 index 0000000000000..90b245282717a --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/User/Users.java @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.model.User; + +import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.Entities; + +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * {@link org.apache.flink.contrib.tweetinputformat.model.User.Users} can be anyone or anything. They {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} + * , follow, create lists, have a home_timeline, can be mentioned, and can be looked up in bulk. + */ +public class Users { + + + private boolean contributors_enabled; + + private String created_at = ""; + + private boolean default_profile; + + private boolean default_profile_image; + + private String description = ""; + + private Entities entities; + + private long favourites_count; + + private boolean follow_request_sent; + + private boolean following; + + private long followers_count; + + private long friends_count; + + private boolean geo_enabled; + + private long id; + + private String id_str = ""; + + private boolean is_translator; + + private String lang = ""; + + private long listed_count; + + private String location = ""; + + private String name = ""; + + private boolean notifications; + + private String profile_background_color = ""; + + private String profile_background_image_url = ""; + + private String profile_background_image_url_https = ""; + + private boolean profile_background_tile; + + private String profile_banner_url = ""; + + private String profile_image_url = ""; + + private String profile_image_url_https = ""; + + private String profile_link_color = ""; + + private String profile_sidebar_border_color = ""; + + private String profile_sidebar_fill_color = ""; + + private String profile_text_color = ""; + + private boolean profile_use_background_image; + + private boolean protected_tweet; + + private String screen_name = ""; + + private long statuses_count; + + private String time_zone = ""; + + private String url = ""; + + private long utc_offset; + + private boolean verified; + + public Users() { + reset(); + } + + // to avoid FLINK KRYO serializer problem + public void reset() { + + contributors_enabled = false; + created_at = ""; + default_profile = false; + default_profile_image = false; + description = ""; + entities = new Entities(); + favourites_count = 0L; + follow_request_sent = false; + following = false; + followers_count = 0L; + friends_count = 0L; + geo_enabled = false; + id = 0L; + id_str = ""; + is_translator = false; + lang = ""; + listed_count = 0L; + location = ""; + name = ""; + notifications = false; + profile_background_color = ""; + profile_background_image_url = ""; + profile_background_image_url_https = ""; + profile_background_tile = false; + profile_banner_url = ""; + profile_image_url = ""; + profile_image_url_https = ""; + profile_link_color = ""; + profile_sidebar_border_color = ""; + profile_sidebar_fill_color = ""; + profile_text_color = ""; + profile_use_background_image = false; + protected_tweet = false; + screen_name = ""; + statuses_count = 0L; + time_zone = ""; + url = ""; + utc_offset = 0L; + verified = false; + + } + + private String getUTCTime() { + + Date date = new Date(); + SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy"); + return format.format(date); + + } + + public boolean isContributors_enabled() { + return contributors_enabled; + } + + public void setContributors_enabled(boolean contributors_enabled) { + this.contributors_enabled = contributors_enabled; + } + + public String getCreated_at() { + return created_at; + } + + public void setCreated_at(String created_at) { + this.created_at = created_at; + } + + public boolean isDefault_profile() { + return default_profile; + } + + public void setDefault_profile(boolean default_profile) { + this.default_profile = default_profile; + } + + public boolean isDefault_profile_image() { + return default_profile_image; + } + + public void setDefault_profile_image(boolean default_profile_image) { + this.default_profile_image = default_profile_image; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public Entities getEntities() { + return entities; + } + + public void setEntities(Entities entities) { + this.entities = entities; + } + + public long getFavourites_count() { + return favourites_count; + } + + public void setFavourites_count(long favourites_count) { + this.favourites_count = favourites_count; + } + + public boolean isFollow_request_sent() { + return follow_request_sent; + } + + public void setFollow_request_sent(boolean follow_request_sent) { + this.follow_request_sent = follow_request_sent; + } + + public boolean isFollowing() { + return following; + } + + public void setFollowing(boolean following) { + this.following = following; + } + + public long getFollowers_count() { + return followers_count; + } + + public void setFollowers_count(long followers_count) { + this.followers_count = followers_count; + } + + public long getFriends_count() { + return friends_count; + } + + public void setFriends_count(long friends_count) { + this.friends_count = friends_count; + } + + public boolean isGeo_enabled() { + return geo_enabled; + } + + public void setGeo_enabled(boolean geo_enabled) { + this.geo_enabled = geo_enabled; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getId_str() { + return Long.toString(id); + } + + public void setId_str(String id_str) { + this.id_str = id_str; + } + + public boolean isIs_translator() { + return is_translator; + } + + public void setIs_translator(boolean is_translator) { + this.is_translator = is_translator; + } + + public String getLang() { + return lang; + } + + public void setLang(String lang) { + this.lang = lang; + } + + public long getListed_count() { + return listed_count; + } + + public void setListed_count(long listed_count) { + this.listed_count = listed_count; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public boolean isNotifications() { + return notifications; + } + + public void setNotifications(boolean notifications) { + this.notifications = notifications; + } + + public String getProfile_background_color() { + return profile_background_color; + } + + public void setProfile_background_color(String profile_background_color) { + this.profile_background_color = profile_background_color; + } + + public String getProfile_background_image_url() { + return profile_background_image_url; + } + + public void setProfile_background_image_url(String profile_background_image_url) { + this.profile_background_image_url = profile_background_image_url; + } + + public String getProfile_background_image_url_https() { + return profile_background_image_url_https; + } + + public void setProfile_background_image_url_https(String profile_background_image_url_https) { + this.profile_background_image_url_https = profile_background_image_url_https; + } + + public boolean isProfile_background_tile() { + return profile_background_tile; + } + + public void setProfile_background_tile(boolean profile_background_tile) { + this.profile_background_tile = profile_background_tile; + } + + public String getProfile_banner_url() { + return profile_banner_url; + } + + public void setProfile_banner_url(String profile_banner_url) { + this.profile_banner_url = profile_banner_url; + } + + public String getProfile_image_url() { + return profile_image_url; + } + + public void setProfile_image_url(String profile_image_url) { + this.profile_image_url = profile_image_url; + } + + public String getProfile_image_url_https() { + return profile_image_url_https; + } + + public void setProfile_image_url_https(String profile_image_url_https) { + this.profile_image_url_https = profile_image_url_https; + } + + public String getProfile_link_color() { + return profile_link_color; + } + + public void setProfile_link_color(String profile_link_color) { + this.profile_link_color = profile_link_color; + } + + public String getProfile_sidebar_border_color() { + return profile_sidebar_border_color; + } + + public void setProfile_sidebar_border_color(String profile_sidebar_border_color) { + this.profile_sidebar_border_color = profile_sidebar_border_color; + } + + public String getProfile_sidebar_fill_color() { + return profile_sidebar_fill_color; + } + + public void setProfile_sidebar_fill_color(String profile_sidebar_fill_color) { + this.profile_sidebar_fill_color = profile_sidebar_fill_color; + } + + public String getProfile_text_color() { + return profile_text_color; + } + + public void setProfile_text_color(String profile_text_color) { + this.profile_text_color = profile_text_color; + } + + public boolean isProfile_use_background_image() { + return profile_use_background_image; + } + + public void setProfile_use_background_image(boolean profile_use_background_image) { + this.profile_use_background_image = profile_use_background_image; + } + + public boolean isProtected_tweet() { + return protected_tweet; + } + + public void setProtected_tweet(boolean protected_tweet) { + this.protected_tweet = protected_tweet; + } + + public String getScreen_name() { + return screen_name; + } + + public void setScreen_name(String screen_name) { + this.screen_name = screen_name; + } + + public long getStatuses_count() { + return statuses_count; + } + + public void setStatuses_count(long statuses_count) { + this.statuses_count = statuses_count; + } + + public String getTime_zone() { + return time_zone; + } + + public void setTime_zone(String time_zone) { + this.time_zone = time_zone; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public long getUtc_offset() { + return utc_offset; + } + + public void setUtc_offset(long utc_offset) { + this.utc_offset = utc_offset; + } + + public boolean isVerified() { + return verified; + } + + public void setVerified(boolean verified) { + this.verified = verified; + } +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Attributes.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Attributes.java new file mode 100755 index 0000000000000..fd722bdfce07e --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Attributes.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.model.places; + +public class Attributes { + + private String street_address = ""; + + private String locality = ""; + + private String region = ""; + + private String iso3 = ""; + + private String postal_code = ""; + + private String phone = ""; + + private String twitter = "twitter"; + + private String url = ""; + + // in the API it is app:id !! + private String appId = ""; + + public Attributes() { + + } + + + public String getStreet_address() { + return street_address; + } + + public void setStreet_address(String street_address) { + this.street_address = street_address; + } + + public String getLocality() { + return locality; + } + + public void setLocality(String locality) { + this.locality = locality; + } + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } + + public String getIso3() { + return iso3; + } + + public void setIso3(String iso3) { + this.iso3 = iso3; + } + + public String getPostal_code() { + return postal_code; + } + + public void setPostal_code(String postal_code) { + this.postal_code = postal_code; + } + + public String getPhone() { + return phone; + } + + public void setPhone(String phone) { + this.phone = phone; + } + + public String getTwitter() { + return twitter; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/BoundingBox.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/BoundingBox.java new file mode 100755 index 0000000000000..d89421845c811 --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/BoundingBox.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.model.places; + +import java.util.ArrayList; +import java.util.List; + +/** + * A series of longitude and latitude points, defining a box which will contain the Place entity + * this bounding box is related to. Each point is an array in the form of [longitude, latitude]. + * Points are grouped into an array per bounding box. Bounding box arrays are wrapped in one + * additional array to be compatible with the polygon notation. + */ +public class BoundingBox { + + private List> coordinates = new ArrayList>(); + + private String type = "Polygon"; + + public BoundingBox() { + + } + + public BoundingBox(List points) { + + this.coordinates.add(points); + + } + + public List> getCoordinates() { + return coordinates; + } + + public void setCoordinates(List> coordinates) { + this.coordinates = coordinates; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Places.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Places.java new file mode 100755 index 0000000000000..1a671bbbdcce0 --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Places.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.model.places; + +/** + * {@link org.apache.flink.contrib.tweetinputformat.model.places.Places} are specific, named locations with + * corresponding geo {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Coordinates}. They can be attached + * to {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} by specifying a place_id when tweeting.
+ * {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} associated with places are not necessarily + * issued from that location but could also potentially be about that location.
+ * {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} can be searched for. Tweets can also be found + * by place_id. + */ +public class Places { + + + private Attributes attributes; + + private BoundingBox bounding_box; + + private String country = ""; + + private String country_code = ""; + + private String full_name = ""; + + private String id = ""; + + private String name = ""; + + private String place_type = ""; + + private String url = ""; + + + public Places() { + attributes = new Attributes(); + bounding_box = new BoundingBox(); + + } + + public Attributes getAttributes() { + return attributes; + } + + public void setAttributes(Attributes attributes) { + this.attributes = attributes; + } + + public BoundingBox getBounding_box() { + return bounding_box; + } + + public void setBounding_box(BoundingBox bounding_box) { + this.bounding_box = bounding_box; + } + + public String getCountry() { + return country; + } + + public void setCountry(String country) { + this.country = country; + } + + public String getCountry_code() { + return country_code; + } + + public void setCountry_code(String country_code) { + this.country_code = country_code; + } + + public String getFull_name() { + return full_name; + } + + public void setFull_name(String full_name) { + this.full_name = full_name; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getPlace_type() { + return place_type; + } + + public void setPlace_type(String place_type) { + this.place_type = place_type; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Contributors.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Contributors.java new file mode 100755 index 0000000000000..fefc5b19d1dd0 --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Contributors.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.model.tweet; + +/** + * Nullable. An collection of brief user objects (usually only one) indicating users who contributed + * to the authorship of the {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} on behalf of the + * official tweet author. + */ +public class Contributors { + + + private Long id = 0L; + + private String id_str = ""; + + private String screenName = ""; + + public Contributors() { + reset(); + } + + public Contributors(long id, String id_str, String screenName) { + + this.id = id; + this.id_str = id_str; + this.screenName = screenName; + } + + public void reset() { + + id = 0L; + id_str = ""; + screenName = ""; + + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getId_str() { + return id_str; + } + + public void setId_str(String id_str) { + this.id_str = id_str; + } + + public String getScreenName() { + return screenName; + } + + public void setScreenName(String screenName) { + this.screenName = screenName; + } + + +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Coordinates.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Coordinates.java new file mode 100755 index 0000000000000..759f9aaf8ecfd --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Coordinates.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.model.tweet; + +/** + * Nullable. Represents the geographic location of this + * {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} as reported by the user or client + * application. The inner coordinates array is formatted as geoJSON longitude first, then latitude) + */ +public class Coordinates { + + private String type = "point"; + + private double[] coordinates = new double[2]; + + public Coordinates() { + + } + + public double[] getCoordinates() { + return coordinates; + } + + public void setCoordinates(double[] coordinates) { + this.coordinates = coordinates; + } + + public void setCoordinates(double longitude, double latitude) { + this.coordinates[0] = longitude; + this.coordinates[1] = latitude; + } + + public String getType() { + return type; + } + + @Override + public String toString() { + return "longitude = " + this.coordinates[0] + " latitude = " + this.coordinates[1]; + } +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/CurrentUserRetweet.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/CurrentUserRetweet.java new file mode 100755 index 0000000000000..7c3a4c1e77e2d --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/CurrentUserRetweet.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.model.tweet; + +/** + * Details the {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} ID of the user’s own retweet (if + * existent) of this {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet}. + */ +public class CurrentUserRetweet { + + private long id; + + private String id_str = ""; + + public CurrentUserRetweet() { + reset(); + } + + public void reset() { + id = 0L; + id_str = ""; + + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getId_str() { + return id_str; + } + + public void setId_str() { + this.id_str = Long.toString(id); + } +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java new file mode 100755 index 0000000000000..5b52b8067d50b --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.model.tweet; + +import org.apache.flink.contrib.tweetinputformat.model.User.Users; +import org.apache.flink.contrib.tweetinputformat.model.places.Places; +import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.Entities; + +import java.util.ArrayList; +import java.util.List; + +public class Tweet { + + private List contributors; + + private Coordinates coordinates; + + private String created_at = ""; + + private Entities entities; + + private long favorite_count; + + private boolean favorited; + + private String filter_level = ""; + + private long id; + + private String id_str = ""; + + private String in_reply_to_screen_name = ""; + + private long in_reply_to_status_id; + + private String in_reply_to_status_id_str = ""; + + private long in_reply_to_user_id; + + private String in_reply_to_user_id_str = ""; + + private String lang = ""; + + // Places + private Places place; + + private boolean possibly_sensitive; + + private long retweet_count; + + private boolean retweeted; + + private CurrentUserRetweet currentUserRetweet; + + private String source = ""; + + private String text = ""; + + private boolean truncated; + + private Users user; + + // to Hanlde retweeted_status + private Tweet retweeted_status; + + private int tweetLevel; + + public Tweet() { + tweetLevel = 0; + reset(tweetLevel); + } + + public Tweet(int level) { + tweetLevel = level; + reset(tweetLevel); + } + + + // to avoid FLINK KRYO serializer problem + public void reset(int level) { + + contributors = new ArrayList(); + coordinates = new Coordinates(); + created_at = ""; + entities = new Entities(); + favorite_count = 0L; + favorited = false; + filter_level = ""; + id = 0L; + id_str = ""; + in_reply_to_screen_name = ""; + in_reply_to_status_id = 0L; + in_reply_to_status_id_str = ""; + in_reply_to_user_id = 0L; + in_reply_to_user_id_str = ""; + lang = ""; + place = new Places(); + possibly_sensitive = false; + retweet_count = 0L; + + // to Hanlde retweeted_status + if (level == 0) + retweeted_status = new Tweet(++level); + + + currentUserRetweet = new CurrentUserRetweet(); + retweeted = false; + source = ""; + text = ""; + truncated = false; + user = new Users(); + + } + + public List getContributors() { + return contributors; + } + + public void setContributors(List contributors) { + this.contributors = contributors; + } + + public Coordinates getCoordinates() { + return coordinates; + } + + public void setCoordinates(Coordinates coordinates) { + this.coordinates = coordinates; + } + + public String getCreated_at() { + return created_at; + } + + public void setCreated_at(String created_at) { + this.created_at = created_at; + } + + public Entities getEntities() { + return entities; + } + + public void setEntities(Entities entities) { + this.entities = entities; + } + + public long getFavorite_count() { + return favorite_count; + } + + public void setFavorite_count(long favorite_count) { + this.favorite_count = favorite_count; + } + + public boolean isFavorited() { + return favorited; + } + + public void setFavorited(boolean favorited) { + this.favorited = favorited; + } + + public String getFilter_level() { + return filter_level; + } + + public void setFilter_level(String filter_level) { + this.filter_level = filter_level; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getId_str() { + return id_str; + } + + public void setId_str(String id_str) { + this.id_str = id_str; + } + + public String getIn_reply_to_screen_name() { + return in_reply_to_screen_name; + } + + public void setIn_reply_to_screen_name(String in_reply_to_screen_name) { + this.in_reply_to_screen_name = in_reply_to_screen_name; + } + + + public long getIn_reply_to_status_id() { + return in_reply_to_status_id; + } + + public void setIn_reply_to_status_id(long in_reply_to_status_id) { + this.in_reply_to_status_id = in_reply_to_status_id; + } + + public String getIn_reply_to_status_id_str() { + return in_reply_to_status_id_str; + } + + public void setIn_reply_to_status_id_str(String in_reply_to_status_id_str) { + this.in_reply_to_status_id_str = in_reply_to_status_id_str; + } + + public long getIn_reply_to_user_id() { + return in_reply_to_user_id; + } + + public void setIn_reply_to_user_id(long in_reply_to_user_id) { + this.in_reply_to_user_id = in_reply_to_user_id; + } + + public String getIn_reply_to_user_id_str() { + return in_reply_to_user_id_str; + } + + public void setIn_reply_to_user_id_str(String in_reply_to_user_id_str) { + this.in_reply_to_user_id_str = in_reply_to_user_id_str; + } + + public String getLang() { + return lang; + } + + public void setLang(String lang) { + this.lang = lang; + } + + public Places getPlace() { + return place; + } + + public void setPlace(Places place) { + this.place = place; + } + + public boolean getPossibly_sensitive() { + return possibly_sensitive; + } + + public void setPossibly_sensitive(boolean possibly_sensitive) { + this.possibly_sensitive = possibly_sensitive; + } + + public long getRetweet_count() { + return retweet_count; + } + + public void setRetweet_count(long retweet_count) { + this.retweet_count = retweet_count; + } + + public boolean isRetweeted() { + return retweeted; + } + + public void setRetweeted(boolean retweeted) { + this.retweeted = retweeted; + } + + public String getSource() { + return source; + } + + public void setSource(String source) { + this.source = source; + } + + public String getText() { + return text; + } + + public void setText(String text) { + this.text = text; + } + + public boolean isTruncated() { + return truncated; + } + + public void setTruncated(boolean truncated) { + this.truncated = truncated; + } + + public Users getUser() { + return user; + } + + public void setUser(Users user) { + this.user = user; + } + + public CurrentUserRetweet getCurrentUserRetweet() { + return currentUserRetweet; + } + + public void setCurrentUserRetweet(CurrentUserRetweet currentUserRetweet) { + this.currentUserRetweet = currentUserRetweet; + } + + + public boolean isPossibly_sensitive() { + return possibly_sensitive; + } + + public Tweet getRetweeted_status() { + return retweeted_status; + } + + public void setRetweeted_status(Tweet retweeted_status) { + this.retweeted_status = retweeted_status; + } + + public int getTweetLevel() { + return tweetLevel; + } + + public void setTweetLevel(int tweetLevel) { + this.tweetLevel = tweetLevel; + } + + +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java new file mode 100755 index 0000000000000..f8c0fc3bacc0c --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.model.tweet.entities; + +import java.util.ArrayList; +import java.util.List; + +/** + * Entities which have been parsed out of the text of the + * {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet}. + */ +public class Entities { + + private List hashtags; + + private List media; + + private List urls; + + private List user_mentions; + + private List symbols; + + public Entities() { + + hashtags = new ArrayList(); + media = new ArrayList(); + urls = new ArrayList(); + user_mentions = new ArrayList(); + symbols = new ArrayList(); + + } + + public List getHashtags() { + return hashtags; + } + + public void setHashtags(List hashtags) { + this.hashtags = hashtags; + } + + public List getMedia() { + return media; + } + + public void setMedia(List media) { + this.media = media; + } + + public List getUrls() { + return urls; + } + + public void setUrls(List urls) { + this.urls = urls; + } + + public List getUser_mentions() { + return user_mentions; + } + + public void setUser_mentions(List user_mentions) { + this.user_mentions = user_mentions; + } + + + public List getSymbols() { + return symbols; + } + + public void setSymbols(List symbols) { + this.symbols = symbols; + } + +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java new file mode 100755 index 0000000000000..66a01419cf08e --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.model.tweet.entities; + +/** + * Represents hashtags which have been parsed out of the + * {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} text. + */ + +public class HashTags { + + private long[] indices = new long[2]; + + private String text = ""; + + + public long[] getIndices() { + return indices; + } + + public void setIndices(long[] indices) { + this.indices = indices; + } + + public void setIndices(long start, long end) { + this.indices[0] = start; + this.indices[1] = end; + + } + + public String getText() { + return text; + } + + public void setText(String text, boolean hashExist) { + if (hashExist) + this.text = text.substring((int) indices[0] + 1); + else + this.text = text; + } + +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java new file mode 100755 index 0000000000000..1f9d7b854ad92 --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.model.tweet.entities; + +import java.util.HashMap; +import java.util.Map; + +/** + * Represents media elements uploaded with the {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet}. + */ +public class Media { + + + private String display_url = ""; + + private String expanded_url = ""; + + private long id; + + private String id_str = ""; + + private long[] indices; + + private String media_url = ""; + + private String media_url_https = ""; + + private Map sizes; + + private String type = ""; + + private String url = ""; + + public Media() { + + this.display_url = ""; + this.expanded_url = ""; + this.id = 0L; + this.id_str = ""; + this.setIndices(new long[] {0L, 0L}); + this.media_url = ""; + this.media_url_https = ""; + this.sizes = new HashMap(); + this.type = ""; + this.url = ""; + + } + + public String getDisplay_url() { + return display_url; + } + + public void setDisplay_url(String display_url) { + this.display_url = display_url; + } + + public String getExpanded_url() { + return expanded_url; + } + + public void setExpanded_url(String expanded_url) { + this.expanded_url = expanded_url; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getId_str() { + return id_str; + } + + public void setId_str(String id_str) { + this.id_str = id_str; + } + + public long[] getIndices() { + return indices; + } + + public void setIndices(long[] indices) { + this.indices = indices; + } + + public String getMedia_url() { + return media_url; + } + + public void setMedia_url(String media_url) { + this.media_url = media_url; + } + + public String getMedia_url_https() { + return media_url_https; + } + + public void setMedia_url_https(String media_url_https) { + this.media_url_https = media_url_https; + } + + public Map getSizes() { + return sizes; + } + + public void setSizes(Map sizes) { + this.sizes = sizes; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Size.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Size.java new file mode 100755 index 0000000000000..5797cf4d79ecb --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Size.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.model.tweet.entities; + +/** + * An object showing available sizes for the media file. + */ +public class Size { + + private long w; + + private long h; + + private String resize = ""; + + + public Size(long width, long height, String resize) { + + this.w = width; + this.h = height; + this.resize = resize; + + } + + + public long getWidth() { + return w; + } + + public void setWidth(long width) { + this.w = width; + } + + public long getHeight() { + return h; + } + + public void setHeight(long height) { + this.h = height; + } + + public String getResize() { + return resize; + } + + public void setResize(String resize) { + this.resize = resize; + } +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Symbol.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Symbol.java new file mode 100755 index 0000000000000..ba2489b1ec3ba --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Symbol.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.model.tweet.entities; + +/** + * An array of financial symbols starting with the dollar sign extracted from the + * {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} text. + */ + +public class Symbol { + + private String text = ""; + + private long[] indices; + + public Symbol() { + this.text = ""; + this.setIndices(new long[] {0L, 0L}); + + } + + public String getText() { + return text; + } + + public void setText(String text) { + this.text = text; + } + + public long[] getIndices() { + return indices; + } + + public void setIndices(long[] indices) { + this.indices = indices; + } +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/URL.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/URL.java new file mode 100755 index 0000000000000..3b2d4c595daf3 --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/URL.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.model.tweet.entities; + +/** + * Represents URLs included in the text of a Tweet or within textual fields of a + * {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} object. + */ +public class URL { + + private String url = ""; + + private String display_url = ""; + + private String expanded_url = ""; + + private long[] indices; + + public URL() { + this.setIndices(new long[] {0L, 0L}); + } + + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public String getDisplay_url() { + return display_url; + } + + public void setDisplay_url(String display_url) { + this.display_url = display_url; + } + + public String getExpanded_url() { + return expanded_url; + } + + public void setExpanded_url(String expanded_url) { + this.expanded_url = expanded_url; + } + + public long[] getIndices() { + return indices; + } + + public void setIndices(long[] indices) { + this.indices = indices; + } +} diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/UserMention.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/UserMention.java new file mode 100755 index 0000000000000..77503d092bffc --- /dev/null +++ b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/UserMention.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.tweetinputformat.model.tweet.entities; + +/** + * Represents other Twitter users mentioned in the text of the + * {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet}. + */ +public class UserMention { + + private long id; + + private String id_str = ""; + + private String screen_name = ""; + + private String name = ""; + + private long[] indices; + + public UserMention() { + + this.setIndices(new long[] {0L, 0L}); + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getId_str() { + return id_str; + } + + public void setId_str() { + this.id_str = Long.toString(id); + } + + public String getScreen_name() { + return screen_name; + } + + public void setScreen_name(String screen_name) { + this.screen_name = screen_name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public long[] getIndices() { + return indices; + } + + public void setIndices(long[] indices) { + this.indices = indices; + } +} diff --git a/flink-contrib/src/main/resources/HashTagTweetSample.json b/flink-contrib/src/main/resources/HashTagTweetSample.json new file mode 100644 index 0000000000000..ba3f072290d85 --- /dev/null +++ b/flink-contrib/src/main/resources/HashTagTweetSample.json @@ -0,0 +1,4 @@ +{"created_at":"Wed Aug 20 21:58:34 +0000 2014","id":502213125421412352,"id_str":"502213125421412352","text":"Cooler than a wine glass. Witcho fine ass \ud83d\ude0c http:\/\/t.co\/uSJwfwRYNk","source":"\u003ca href=\"http:\/\/twitter.com\/download\/iphone\" rel=\"nofollow\"\u003eTwitter for iPhone\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":452474201,"id_str":"452474201","name":"Shawnna \u2652\ufe0f","screen_name":"ThaaOddOne","location":"Onnat Ass","url":"http:\/\/contradictiveoxymoron.tumblr.com","description":"Booty.","protected":false,"verified":false,"followers_count":986,"friends_count":590,"listed_count":2,"favourites_count":1748,"statuses_count":35562,"created_at":"Sun Jan 01 22:06:19 +0000 2012","utc_offset":-18000,"time_zone":"Central Time (US & Canada)","geo_enabled":true,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"022330","profile_background_image_url":"http:\/\/pbs.twimg.com\/profile_background_images\/378800000047672066\/d6cfdb19d8b56a6c388491b00a9a1fcb.jpeg","profile_background_image_url_https":"https:\/\/pbs.twimg.com\/profile_background_images\/378800000047672066\/d6cfdb19d8b56a6c388491b00a9a1fcb.jpeg","profile_background_tile":true,"profile_link_color":"0084B4","profile_sidebar_border_color":"000000","profile_sidebar_fill_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"profile_image_url":"http:\/\/pbs.twimg.com\/profile_images\/501811487984611328\/WfUh6Mb7_normal.jpeg","profile_image_url_https":"https:\/\/pbs.twimg.com\/profile_images\/501811487984611328\/WfUh6Mb7_normal.jpeg","profile_banner_url":"https:\/\/pbs.twimg.com\/profile_banners\/452474201\/1408484011","default_profile":false,"default_profile_image":false,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"retweet_count":4,"favorite_count":1,"entities":{"hashtags":[{"text":"example","indices":[0,16]},{"text":"tweet","indices":[118,125]}],"trends":[],"urls":[],"user_mentions":[],"symbols":[],"media":[{"id":502213094119325697,"id_str":"502213094119325697","indices":[44,66],"media_url":"http:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","media_url_https":"https:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","url":"http:\/\/t.co\/uSJwfwRYNk","display_url":"pic.twitter.com\/uSJwfwRYNk","expanded_url":"http:\/\/twitter.com\/ThaaOddOne\/status\/502213125421412352\/photo\/1","type":"photo","sizes":{"medium":{"w":600,"h":800,"resize":"fit"},"thumb":{"w":150,"h":150,"resize":"crop"},"small":{"w":340,"h":453,"resize":"fit"},"large":{"w":768,"h":1024,"resize":"fit"}}}]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"low","lang":"en"},"retweet_count":0,"favorite_count":0,"entities":{"hashtags":[],"trends":[],"urls":[],"user_mentions":[{"screen_name":"ThaaOddOne","name":"Shawnna \u2652\ufe0f","id":452474201,"id_str":"452474201","indices":[3,14]}],"symbols":[],"media":[{"id":502213094119325697,"id_str":"502213094119325697","indices":[60,82],"media_url":"http:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","media_url_https":"https:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","url":"http:\/\/t.co\/uSJwfwRYNk","display_url":"pic.twitter.com\/uSJwfwRYNk","expanded_url":"http:\/\/twitter.com\/ThaaOddOne\/status\/502213125421412352\/photo\/1","type":"photo","sizes":{"medium":{"w":600,"h":800,"resize":"fit"},"thumb":{"w":150,"h":150,"resize":"crop"},"small":{"w":340,"h":453,"resize":"fit"},"large":{"w":768,"h":1024,"resize":"fit"}},"source_status_id":502213125421412352,"source_status_id_str":"502213125421412352"}]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"medium","lang":"en"} +{"created_at":"Wed Aug 20 21:58:34 +0000 2014","id":502213125421412353,"id_str":"502213125421412353","text":"Cooler than a wine glass. Witcho fine ass \ud83d\ude0c http:\/\/t.co\/uSJwfwRYNk","source":"\u003ca href=\"http:\/\/twitter.com\/download\/iphone\" rel=\"nofollow\"\u003eTwitter for iPhone\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":452474201,"id_str":"452474201","name":"Shawnna \u2652\ufe0f","screen_name":"ThaaOddOne","location":"Onnat Ass","url":"http:\/\/contradictiveoxymoron.tumblr.com","description":"Booty.","protected":false,"verified":false,"followers_count":986,"friends_count":590,"listed_count":2,"favourites_count":1748,"statuses_count":35562,"created_at":"Sun Jan 01 22:06:19 +0000 2012","utc_offset":-18000,"time_zone":"Central Time (US & Canada)","geo_enabled":true,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"022330","profile_background_image_url":"http:\/\/pbs.twimg.com\/profile_background_images\/378800000047672066\/d6cfdb19d8b56a6c388491b00a9a1fcb.jpeg","profile_background_image_url_https":"https:\/\/pbs.twimg.com\/profile_background_images\/378800000047672066\/d6cfdb19d8b56a6c388491b00a9a1fcb.jpeg","profile_background_tile":true,"profile_link_color":"0084B4","profile_sidebar_border_color":"000000","profile_sidebar_fill_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"profile_image_url":"http:\/\/pbs.twimg.com\/profile_images\/501811487984611328\/WfUh6Mb7_normal.jpeg","profile_image_url_https":"https:\/\/pbs.twimg.com\/profile_images\/501811487984611328\/WfUh6Mb7_normal.jpeg","profile_banner_url":"https:\/\/pbs.twimg.com\/profile_banners\/452474201\/1408484011","default_profile":false,"default_profile_image":false,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"retweet_count":4,"favorite_count":1,"entities":{"hashtags":[{"text":"example","indices":[0,16]},{"text":"tweet","indices":[118,125]}],"trends":[],"urls":[],"user_mentions":[],"symbols":[],"media":[{"id":502213094119325697,"id_str":"502213094119325697","indices":[44,66],"media_url":"http:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","media_url_https":"https:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","url":"http:\/\/t.co\/uSJwfwRYNk","display_url":"pic.twitter.com\/uSJwfwRYNk","expanded_url":"http:\/\/twitter.com\/ThaaOddOne\/status\/502213125421412352\/photo\/1","type":"photo","sizes":{"medium":{"w":600,"h":800,"resize":"fit"},"thumb":{"w":150,"h":150,"resize":"crop"},"small":{"w":340,"h":453,"resize":"fit"},"large":{"w":768,"h":1024,"resize":"fit"}}}]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"low","lang":"en"},"retweet_count":0,"favorite_count":0,"entities":{"hashtags":[],"trends":[],"urls":[],"user_mentions":[{"screen_name":"ThaaOddOne","name":"Shawnna \u2652\ufe0f","id":452474201,"id_str":"452474201","indices":[3,14]}],"symbols":[],"media":[{"id":502213094119325697,"id_str":"502213094119325697","indices":[60,82],"media_url":"http:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","media_url_https":"https:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","url":"http:\/\/t.co\/uSJwfwRYNk","display_url":"pic.twitter.com\/uSJwfwRYNk","expanded_url":"http:\/\/twitter.com\/ThaaOddOne\/status\/502213125421412352\/photo\/1","type":"photo","sizes":{"medium":{"w":600,"h":800,"resize":"fit"},"thumb":{"w":150,"h":150,"resize":"crop"},"small":{"w":340,"h":453,"resize":"fit"},"large":{"w":768,"h":1024,"resize":"fit"}},"source_status_id":502213125421412352,"source_status_id_str":"502213125421412352"}]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"medium","lang":"en"} +{"created_at":"Wed Aug 20 21:58:34 +0000 2014","id":502213125421412354,"id_str":"502213125421412354","text":"Cooler than a wine glass. Witcho fine ass \ud83d\ude0c http:\/\/t.co\/uSJwfwRYNk","source":"\u003ca href=\"http:\/\/twitter.com\/download\/iphone\" rel=\"nofollow\"\u003eTwitter for iPhone\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":452474201,"id_str":"452474201","name":"Shawnna \u2652\ufe0f","screen_name":"ThaaOddOne","location":"Onnat Ass","url":"http:\/\/contradictiveoxymoron.tumblr.com","description":"Booty.","protected":false,"verified":false,"followers_count":986,"friends_count":590,"listed_count":2,"favourites_count":1748,"statuses_count":35562,"created_at":"Sun Jan 01 22:06:19 +0000 2012","utc_offset":-18000,"time_zone":"Central Time (US & Canada)","geo_enabled":true,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"022330","profile_background_image_url":"http:\/\/pbs.twimg.com\/profile_background_images\/378800000047672066\/d6cfdb19d8b56a6c388491b00a9a1fcb.jpeg","profile_background_image_url_https":"https:\/\/pbs.twimg.com\/profile_background_images\/378800000047672066\/d6cfdb19d8b56a6c388491b00a9a1fcb.jpeg","profile_background_tile":true,"profile_link_color":"0084B4","profile_sidebar_border_color":"000000","profile_sidebar_fill_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"profile_image_url":"http:\/\/pbs.twimg.com\/profile_images\/501811487984611328\/WfUh6Mb7_normal.jpeg","profile_image_url_https":"https:\/\/pbs.twimg.com\/profile_images\/501811487984611328\/WfUh6Mb7_normal.jpeg","profile_banner_url":"https:\/\/pbs.twimg.com\/profile_banners\/452474201\/1408484011","default_profile":false,"default_profile_image":false,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"retweet_count":4,"favorite_count":1,"entities":{"hashtags":[{"text":"last","indices":[0,16]},{"text":"example","indices":[118,125]},{"text":"that","indices":[118,125]}],"trends":[],"urls":[],"user_mentions":[],"symbols":[],"media":[{"id":502213094119325697,"id_str":"502213094119325697","indices":[44,66],"media_url":"http:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","media_url_https":"https:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","url":"http:\/\/t.co\/uSJwfwRYNk","display_url":"pic.twitter.com\/uSJwfwRYNk","expanded_url":"http:\/\/twitter.com\/ThaaOddOne\/status\/502213125421412352\/photo\/1","type":"photo","sizes":{"medium":{"w":600,"h":800,"resize":"fit"},"thumb":{"w":150,"h":150,"resize":"crop"},"small":{"w":340,"h":453,"resize":"fit"},"large":{"w":768,"h":1024,"resize":"fit"}}}]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"low","lang":"en"},"retweet_count":0,"favorite_count":0,"entities":{"hashtags":[],"trends":[],"urls":[],"user_mentions":[{"screen_name":"ThaaOddOne","name":"Shawnna \u2652\ufe0f","id":452474201,"id_str":"452474201","indices":[3,14]}],"symbols":[],"media":[{"id":502213094119325697,"id_str":"502213094119325697","indices":[60,82],"media_url":"http:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","media_url_https":"https:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","url":"http:\/\/t.co\/uSJwfwRYNk","display_url":"pic.twitter.com\/uSJwfwRYNk","expanded_url":"http:\/\/twitter.com\/ThaaOddOne\/status\/502213125421412352\/photo\/1","type":"photo","sizes":{"medium":{"w":600,"h":800,"resize":"fit"},"thumb":{"w":150,"h":150,"resize":"crop"},"small":{"w":340,"h":453,"resize":"fit"},"large":{"w":768,"h":1024,"resize":"fit"}},"source_status_id":502213125421412352,"source_status_id_str":"502213125421412352"}]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"medium","lang":"en"} +{"created_at":"Wed Aug 20 21:58:34 +0000 2014","id":502213125421412355,"id_str":"502213125421412355","text":"Cooler than a wine glass. Witcho fine ass \ud83d\ude0c http:\/\/t.co\/uSJwfwRYNk","source":"\u003ca href=\"http:\/\/twitter.com\/download\/iphone\" rel=\"nofollow\"\u003eTwitter for iPhone\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":452474201,"id_str":"452474201","name":"Shawnna \u2652\ufe0f","screen_name":"ThaaOddOne","location":"Onnat Ass","url":"http:\/\/contradictiveoxymoron.tumblr.com","description":"Booty.","protected":false,"verified":false,"followers_count":986,"friends_count":590,"listed_count":2,"favourites_count":1748,"statuses_count":35562,"created_at":"Sun Jan 01 22:06:19 +0000 2012","utc_offset":-18000,"time_zone":"Central Time (US & Canada)","geo_enabled":true,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"022330","profile_background_image_url":"http:\/\/pbs.twimg.com\/profile_background_images\/378800000047672066\/d6cfdb19d8b56a6c388491b00a9a1fcb.jpeg","profile_background_image_url_https":"https:\/\/pbs.twimg.com\/profile_background_images\/378800000047672066\/d6cfdb19d8b56a6c388491b00a9a1fcb.jpeg","profile_background_tile":true,"profile_link_color":"0084B4","profile_sidebar_border_color":"000000","profile_sidebar_fill_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"profile_image_url":"http:\/\/pbs.twimg.com\/profile_images\/501811487984611328\/WfUh6Mb7_normal.jpeg","profile_image_url_https":"https:\/\/pbs.twimg.com\/profile_images\/501811487984611328\/WfUh6Mb7_normal.jpeg","profile_banner_url":"https:\/\/pbs.twimg.com\/profile_banners\/452474201\/1408484011","default_profile":false,"default_profile_image":false,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"retweet_count":4,"favorite_count":1,"entities":{"hashtags":[{"text":"d12","indices":[0,16]},{"text":"how_to","indices":[118,125]}],"trends":[],"urls":[],"user_mentions":[],"symbols":[],"media":[{"id":502213094119325697,"id_str":"502213094119325697","indices":[44,66],"media_url":"http:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","media_url_https":"https:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","url":"http:\/\/t.co\/uSJwfwRYNk","display_url":"pic.twitter.com\/uSJwfwRYNk","expanded_url":"http:\/\/twitter.com\/ThaaOddOne\/status\/502213125421412352\/photo\/1","type":"photo","sizes":{"medium":{"w":600,"h":800,"resize":"fit"},"thumb":{"w":150,"h":150,"resize":"crop"},"small":{"w":340,"h":453,"resize":"fit"},"large":{"w":768,"h":1024,"resize":"fit"}}}]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"low","lang":"en"},"retweet_count":0,"favorite_count":0,"entities":{"hashtags":[],"trends":[],"urls":[],"user_mentions":[{"screen_name":"ThaaOddOne","name":"Shawnna \u2652\ufe0f","id":452474201,"id_str":"452474201","indices":[3,14]}],"symbols":[],"media":[{"id":502213094119325697,"id_str":"502213094119325697","indices":[60,82],"media_url":"http:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","media_url_https":"https:\/\/pbs.twimg.com\/media\/Bvg4JfSIQAEihSw.jpg","url":"http:\/\/t.co\/uSJwfwRYNk","display_url":"pic.twitter.com\/uSJwfwRYNk","expanded_url":"http:\/\/twitter.com\/ThaaOddOne\/status\/502213125421412352\/photo\/1","type":"photo","sizes":{"medium":{"w":600,"h":800,"resize":"fit"},"thumb":{"w":150,"h":150,"resize":"crop"},"small":{"w":340,"h":453,"resize":"fit"},"large":{"w":768,"h":1024,"resize":"fit"}},"source_status_id":502213125421412352,"source_status_id_str":"502213125421412352"}]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"medium","lang":"en"} diff --git a/flink-contrib/src/main/resources/NOTICE b/flink-contrib/src/main/resources/NOTICE new file mode 100644 index 0000000000000..7be7a0eb06e81 --- /dev/null +++ b/flink-contrib/src/main/resources/NOTICE @@ -0,0 +1,15 @@ +Apache Flink +Copyright 2014-2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +======================================================================= + +Apache Flink contains subcomponents with separate copyright notices and +license terms. Your use of the source code for the these subcomponents +is subject to the terms and conditions of their respective licenses. + +See the LICENSE file for a list of subcomponents and dependencies and +their respective licenses. + diff --git a/flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java b/flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java new file mode 100644 index 0000000000000..97048e1f1671f --- /dev/null +++ b/flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat; +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet; +import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import java.io.File; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + + +public class SimpleTweetInputFormatTest { + + private Tweet tweet; + + private SimpleTweetInputFormat simpleTweetInputFormat; + + private FileInputSplit fileInputSplit; + + protected Configuration config; + + protected File tempFile; + + + @Before + public void testSetUp() { + + + simpleTweetInputFormat = new SimpleTweetInputFormat(); + + File jsonFile = new File("../flink-contrib/src/main/resources/HashTagTweetSample.json"); + + fileInputSplit = new FileInputSplit(0, new Path(jsonFile.getPath()), 0, jsonFile.length(), new String[] {"localhost"}); + } + + @Test + public void testTweetInput() throws Exception { + + + simpleTweetInputFormat.open(fileInputSplit); + List result; + + int i = 0; + while (i<4) { + i++; + tweet = new Tweet(); + tweet = simpleTweetInputFormat.nextRecord(tweet); + + if(tweet != null){ + result = new ArrayList(); + for(Iterator iterator = tweet.getEntities().getHashtags().iterator(); iterator.hasNext();){ + result.add(iterator.next().getText()); + } + + if(tweet.getId_str().equals("502213125421412352")) { + Assert.assertArrayEquals(new String[]{"example", "tweet"}, result.toArray()); + } + else if(tweet.getId_str().equals("502213125421412353")) { + Assert.assertArrayEquals(new String[]{"example", "tweet"}, result.toArray()); + } + else if(tweet.getId_str().equals("502213125421412354")) { + Assert.assertArrayEquals(new String[]{"last","example", "that"}, result.toArray()); + } + else if(tweet.getId_str().equals("502213125421412355")) { + Assert.assertArrayEquals(new String[]{"d12", "how_to"}, result.toArray()); + } + } + } + + tweet = new Tweet(); + tweet = simpleTweetInputFormat.nextRecord(tweet); + Assert.assertNull(tweet); + Assert.assertTrue(simpleTweetInputFormat.reachedEnd()); + + } +} diff --git a/pom.xml b/pom.xml index 3c6538e2e8754..f6e3ee11b87c7 100644 --- a/pom.xml +++ b/pom.xml @@ -604,6 +604,8 @@ under the License. flink-staging/flink-avro/src/test/resources/testdata.avro flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java out/test/flink-avro/avro/user.avsc + + flink-contrib/src/main/resources/HashTagTweetSample.json **/flink-bin/conf/slaves @@ -863,4 +865,4 @@ under the License. - + \ No newline at end of file