Skip to content

Commit

Permalink
First version of complete Twitter Tweet mapping to lbsn-protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
Sieboldianus committed May 31, 2018
1 parent f2b1079 commit 500d751
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 57 deletions.
Binary file modified classes/__pycache__/helperFunctions.cpython-36.pyc
Binary file not shown.
43 changes: 38 additions & 5 deletions classes/helperFunctions.py
Expand Up @@ -6,18 +6,23 @@
import numpy as np
from lbsnstructure.Structure_pb2 import *
from lbsnstructure.external.timestamp_pb2 import Timestamp
import datetime

class helperFunctions():

def utc_to_local(utc_dt):
return utc_dt.replace(tzinfo=timezone.utc).astimezone(tz=None)

def cleanhtml(raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext

def extract_emojis(str):
#str = str.decode('utf-32').encode('utf-32', 'surrogatepass')
#return list(c for c in str if c in emoji.UNICODE_EMOJI)
return list(c for c in str if c in emoji.UNICODE_EMOJI)

def getRectangleBounds(points):
lats = []
lngs = []
Expand All @@ -29,6 +34,7 @@ def getRectangleBounds(points):
limXMin = np.min(lngs)
limXMax = np.max(lngs)
return limYMin,limYMax,limXMin,limXMax

def createNewLBSNRecord_with_id(record,id,origin):
# initializes new record with composite ID
c_Key = CompositeKey()
Expand All @@ -47,24 +53,51 @@ def createNewLBSNRecord_with_id(record,id,origin):
elif isinstance(record,lbsnUser):
record.user_pkey.CopyFrom(c_Key)
return record

def isPostReaction_Type(jsonString,return_type = False):
reaction = lbsnPostReaction()
if jsonString.get('in_reply_to_status_id_str'):
if return_type:
reaction.post_type = lbsnPostReaction.REPLY
reaction.reaction_type = lbsnPostReaction.REPLY
return reaction
else:
return True
elif jsonString.get('quoted_status_id_str'):
if return_type:
reaction.post_type = lbsnPostReaction.QUOTE
reaction.reaction_type = lbsnPostReaction.QUOTE
return reaction
else:
return True
elif jsonString.get('retweeted_status'):
if return_type:
reaction.post_type = lbsnPostReaction.SHARE
reaction.reaction_type = lbsnPostReaction.SHARE
return reaction
else:
return True
return False
return True
return False

def isPost_Type(jsonString):
# if post, get type of first entity
if 'media' in jsonString:
typeString = jsonString.get('entities').get('media')[0].get('type')
# type is either photo, video, or animated_gif
# https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/extended-entities-object.html
if typeString:
post = lbsnPost()
if typeString == "photo":
post.post_type = lbsnPost.IMAGE
elif typeString == "video" or typeString == "animated_gif":
post.post_type = lbsnPost.VIDEO
else:
post.post_type = lbsnPost.TEXT
return post
else:
return False

def parseJSONDateStringToProtoBuf(jsonDateString):
# Parse String -Timestamp Format found in Twitter json
dateTimeRecord = datetime.datetime.strptime(jsonDateString,'%a %b %d %H:%M:%S +0000 %Y')
protobufTimestampRecord = Timestamp()
# Convert to ProtoBuf Timestamp Recommendation
protobufTimestampRecord.FromDatetime(dateTimeRecord)
return protobufTimestampRecord
155 changes: 103 additions & 52 deletions transferData.py
Expand Up @@ -47,6 +47,7 @@ def main():
parser.add_argument('-tG', "--transferNotGeotagged", default=0)
args = parser.parse_args()

transferlimit = args.transferlimit
# We're dealing with Twitter, lets create the OriginID globally
# this OriginID is required for all CompositeKeys
origin = lbsnOrigin()
Expand All @@ -67,11 +68,15 @@ def main():
)
conn_input, cursor_input = inputConnection.connect()
records,returnedRecord_count = fetchJsonData_from_LBSN(cursor_input)
x=0
if returnedRecord_count == 0:
print("All fetched.")
else:
for record in records:
parseJsonRecord(record, origin)
x+=1
if x>100:
break
# print(records[0])
cursor_input.close()

Expand All @@ -87,10 +92,11 @@ def fetchJsonData_from_LBSN(cursor, startID = 0):

def parseJsonRecord(JsonRecord,origin):
log = logging.getLogger()

#print(JsonRecord[2])
jsonString = JsonRecord[2]
dbRowNumber = JsonRecord[0]
post_guid = jsonString.get('id_str')
postGeoaccuracy = None

if not post_guid:
print("No PostGuid")
Expand All @@ -104,91 +110,136 @@ def parseJsonRecord(JsonRecord,origin):
else:
l_lng = post_coordinates.get('coordinates')[0]
l_lat = post_coordinates.get('coordinates')[1]
postGeoaccuracy = lbsnPost.LATLNG

#Check if Place is mentioned
place = jsonString.get('place')
if place:
placeID = place.get('id')
bounding_box_points = place.get('bounding_box').get('coordinates')[0]
limYMin,limYMax,limXMin,limXMax = helperFunctions.getRectangleBounds(bounding_box_points)
bound_points_shapely = geometry.MultiPoint([(limXMin, limYMin), (limXMax, limYMax)])
lon_center = bound_points_shapely.centroid.coords[0][0] #True centroid (coords may be multipoint)
lat_center = bound_points_shapely.centroid.coords[0][1]
place_type = place.get('place_type')

# create the CompositeKey for place record
# from Origin and API GUID
#placeKey = CompositeKey()
#placeKey.origin.CopyFrom(origin)
#placeKey.id = place.get('id')
if place_type == "city" or place_type == "neighborhood":
if place_type == "country":
#country_guid
placeRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnCountry(),placeID,origin)
if not postGeoaccuracy:
postGeoaccuracy = lbsnPost.COUNTRY
log.debug("country/admin")
#sys.exit("COUNTRY DETECTED - should not exist") #debug
if place_type == "city" or place_type == "neighborhood" or place_type == "admin":
#city_guid
cityRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnCity(),place.get('id'),origin)
cityRecord.name = place.get('name')
cityRecord.url = place.get('url')
refCountryRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnCountry(),place.get('country_code'),origin)
cityRecord.country_pkey.CopyFrom(refCountryRecord)
cityRecord.geom_center = "POINT(%s %s)" % (lon_center,lat_center)
cityRecord.geom_area = Polygon(bounding_box_points).wkt # prints: 'POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))'
print(cityRecord)
placeRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnCity(),place.get('id'),origin)
if not postGeoaccuracy:
postGeoaccuracy = lbsnPost.CITY
log.debug("city/neighborhood")
if place_type == "country" or place_type == "admin":
#country_guid
log.debug("country/admin")
if place_type == "poi":
#place_guid
placeRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnPlace(),place.get('id'),origin)
if not postGeoaccuracy:
postGeoaccuracy = lbsnPost.PLACE
print("place/poi")

placeRecord.name = place.get('name')
placeRecord.url = place.get('url')
placeRecord.geom_center = "POINT(%s %s)" % (lon_center,lat_center)
placeRecord.geom_area = Polygon(bounding_box_points).wkt # prints: 'POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))'
if not place_type == "country":
refCountryRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnCountry(),place.get('country_code'),origin)
refCountryRecord.name = place.get('country') # Needs to be saved
placeRecord.country_pkey.CopyFrom(refCountryRecord) ##Assignment Error!
print(placeRecord)
# Get Post/Reaction Details of user
userRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnUser(),jsonString.get('user').get('id_str'),origin)
# get additional information about the user, if available
userRecord.user_fullname = jsonString.get('user').get('name')
userRecord.follows = jsonString.get('user').get('friends_count')
userRecord.followed = jsonString.get('user').get('followers_count')
userBio = jsonString.get('user').get('description')
if userBio:
userRecord.biography = userBio
userRecord.user_name = jsonString.get('user').get('screen_name')
userRecord.group_count = jsonString.get('user').get('listed_count')
userRecord.post_count = jsonString.get('user').get('statuses_count')
userRecord.url = f'https://twitter.com/intent/user?user_id={userRecord.user_pkey.id}'
userLanguage = helperFunctions.createNewLBSNRecord_with_id(Language(),jsonString.get('user').get('lang'),origin)
userRecord.user_language.CopyFrom(userLanguage)
userLocation = jsonString.get('user').get('location')
if userLocation:
userRecord.user_location = userLocation
userRecord.liked_count = jsonString.get('user').get('favourites_count')
userRecord.active_since.CopyFrom(helperFunctions.parseJSONDateStringToProtoBuf(jsonString.get('user').get('created_at')))
userRecord.profile_image_url = jsonString.get('user').get('profile_image_url')

# Assignment Step
# check first if post is reaction to other post
# reaction means: reduced structure compared to post
postReaction = helperFunctions.isPostReaction_Type(jsonString, True)
if postReaction:
reaction_guid=post_guid
postReactionRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnPostReaction(),reaction_guid,origin)
refUserRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnUser(),jsonString.get('user').get('id_str'),origin)
postReactionRecord.user_pkey.CopyFrom(refUserRecord.user_pkey)
postReactionRecord.user_pkey.CopyFrom(userRecord.user_pkey)
postReactionRecord.reaction_latlng = "POINT(%s %s)" % (l_lng,l_lat)
# Parse String -Timestamp Format found in Twitter json
created_at = datetime.datetime.strptime(jsonString.get('created_at'),'%a %b %d %H:%M:%S +0000 %Y')
timestampRecord = Timestamp()
# Convert to ProtoBuf Timestamp Recommendation
timestampRecord.FromDatetime(created_at)
postReactionRecord.reaction_date.CopyFrom(timestampRecord)
postReactionRecord.reaction_date.CopyFrom(helperFunctions.parseJSONDateStringToProtoBuf(jsonString.get('created_at')))
postReactionRecord.reaction_like_count = jsonString.get('favorite_count')
postReactionRecord.reaction_content = jsonString.get('text')
postReactionRecord.post_type = postReaction.post_type
if postReaction.post_type == lbsnPostReaction.REPLY:
postReactionRecord.reaction_type = postReaction.reaction_type
if postReaction.reaction_type == lbsnPostReaction.REPLY:
refPostRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnPost(),jsonString.get('in_reply_to_status_id_str'),origin)
postReactionRecord.post_pkey.CopyFrom(refPostRecord.post_pkey)
elif isinstance(postReaction.post_type,lbsnPostReaction.QUOTE):
refUserRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnUser(),jsonString.get('in_reply_to_user_id_str'),origin)
refUserRecord.user_name = jsonString.get('in_reply_to_screen_name') # Needs to be saved
postReactionRecord.referencedPost_pkey.CopyFrom(refPostRecord.post_pkey)
elif postReaction.reaction_type == lbsnPostReaction.QUOTE:
refPostRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnPost(),jsonString.get('quoted_status_id_str'),origin)
postReactionRecord.post_pkey.CopyFrom(refPostRecord)
elif isinstance(postReaction.post_type,lbsnPostReaction.SHARE):
postReactionRecord.referencedPost_pkey.CopyFrom(refPostRecord.post_pkey)
elif postReaction.reaction_type == lbsnPostReaction.SHARE:
refPostRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnPost(),jsonString.get('retweeted_status').get('id_str'),origin)
postReactionRecord.post_pkey.CopyFrom(refPostRecord)
postReactionRecord.referencedPost_pkey.CopyFrom(refPostRecord.post_pkey)
# ToDo: if a Reaction refers to another reaction (Information Spread)
# This information is currently not [available from Twitter](https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/tweet-object):
# "Note that retweets of retweets do not show representations of the intermediary retweet [...]"
# postReactionRecord.referencedPostreaction_pkey.CopyFrom(refPostReactionRecord)
print(postReactionRecord)

else:
hashtags = []
# if record is a post
postRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnPost(),post_guid,origin)
#hashtags = []
hashtags_json = jsonString.get('entities').get('hashtags')
for hashtag in hashtags_json: #iterate over the list
hashtags.append(hashtag["text"])
record = lbsnPost(post_guid = post_guid,
post_body=jsonString.get('text'),
input_source=helperFunctions.cleanhtml(jsonString.get('source')),
post_publish_date=jsonString.get('created_at'),
user_guid=jsonString.get('user').get('id_str'),
post_latlng_WKT="POINT(%s %s)" % (l_lng,l_lat),
post_quote_count=jsonString.get('quote_count'),
post_comment_count=jsonString.get('reply_count'),
post_share_count=jsonString.get('retweet_count'),
post_like_count=jsonString.get('favorite_count'),
hashtags=hashtags,
post_type=jsonString.get('entities').get('media'),
post_language=jsonString.get('lang'),
emoji=helperFunctions.extract_emojis(jsonString.get('text'))
)
postRecord.hashtags.append(hashtag["text"])
postRecord.post_body = jsonString.get('text')
postRecord.input_source = helperFunctions.cleanhtml(jsonString.get('source'))
postRecord.post_publish_date.CopyFrom(helperFunctions.parseJSONDateStringToProtoBuf(jsonString.get('created_at')))
postRecord.post_geoaccuracy = postGeoaccuracy
postRecord.user_pkey.CopyFrom(userRecord.user_pkey)
postRecord.post_latlng = "POINT(%s %s)" % (l_lng,l_lat)
valueCount = lambda x: 0 if x is None else x
postRecord.post_quote_count = valueCount(jsonString.get('quote_count'))
postRecord.post_reply_count = valueCount(jsonString.get('reply_count'))
postRecord.post_share_count = valueCount(jsonString.get('retweet_count'))
postRecord.post_like_count = valueCount(jsonString.get('favorite_count'))
postRecord.post_url = f'https://twitter.com/statuses/{post_guid}'
postType = helperFunctions.isPost_Type(jsonString)
if postType:
postRecord.post_type = postType.post_type
else:
postRecord.post_type = lbsnPost.OTHER
print(jsonString.get('entities').get('media'))
postLanguage = helperFunctions.createNewLBSNRecord_with_id(Language(),jsonString.get('lang'),origin)
postRecord.post_language.CopyFrom(postLanguage)
postRecord.emoji.extend(helperFunctions.extract_emojis(jsonString.get('text')))
userMentions_json = jsonString.get('entities').get('user_mentions')
for userMention in userMentions_json: #iterate over the list
refUserRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnUser(),userMention.get('id_str'),origin)
refUserRecord.user_name = userMention.get('name') # Needs to be saved
postRecord.user_mentions_pkey.append(refUserRecord.user_pkey)
print(postRecord)
print(userRecord)

print('\n')
record.attr_list(True)

if __name__ == "__main__":
main()
Expand Down

0 comments on commit 500d751

Please sign in to comment.