Skip to content

Commit

Permalink
Added Composite Structure for storing individual records
Browse files Browse the repository at this point in the history
  • Loading branch information
Sieboldianus committed Jun 1, 2018
1 parent 2e15c83 commit c561d01
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 20 deletions.
Binary file modified classes/__pycache__/helperFunctions.cpython-36.pyc
Binary file not shown.
75 changes: 74 additions & 1 deletion classes/helperFunctions.py
Expand Up @@ -109,4 +109,77 @@ def getMentionedUsers(userMentions_jsonString,origin):
refUserRecord.user_fullname = userMention.get('name') # Needs to be saved
refUserRecord.user_name = userMention.get('screen_name')
mentionedUsersList.append(refUserRecord)
return mentionedUsersList
return mentionedUsersList

class lbsnRecordDicts():
def __init__(self, lbsnCountryDict=dict(), lbsnCityDict=dict(),
lbsnPlaceDict=dict(),lbsnUserDict=dict(),lbsnPostDict=dict(), lbsnPostReactionDict=dict()):
self.lbsnCountryDict = lbsnCountryDict
self.lbsnCityDict = lbsnCityDict
self.lbsnPlaceDict = lbsnPlaceDict
self.lbsnUserDict = lbsnUserDict
self.lbsnPostDict = lbsnPostDict
self.lbsnPostReactionDict = lbsnPostReactionDict

def updateRecordDicts(self,newLbsnRecordDicts):
# this will overwrite values of keys in dict 1 with those in dict 2, if keys are the same
# optimally, one should compare values and choose merge rules
# e.g. https://www.quora.com/How-do-I-compare-two-different-dictionary-values-in-Python
# SerializeToString() to compare messages
# https://stackoverflow.com/questions/24296221/how-do-i-compare-the-contents-of-two-google-protocol-buffer-messages-for-equalit?utm_medium=organic&utm_source=google_rich_qa&utm_campaign=google_rich_qa
self.lbsnCountryDict = {** self.lbsnCountryDict, **newLbsnRecordDicts.lbsnCountryDict}
self.lbsnCityDict = {** self.lbsnCountryDict, **newLbsnRecordDicts.lbsnCityDict}
self.lbsnPlaceDict = {** self.lbsnCountryDict, **newLbsnRecordDicts.lbsnPlaceDict}
self.lbsnUserDict = {** self.lbsnCountryDict, **newLbsnRecordDicts.lbsnUserDict}
self.lbsnPostDict = {** self.lbsnCountryDict, **newLbsnRecordDicts.lbsnPostDict}
self.lbsnPostReactionDict = {** self.lbsnCountryDict, **newLbsnRecordDicts.lbsnPostReactionDict}

def Count(self):
x = 0
items = self.__dict__.items()
for k,v in items:
x += len(v) # count number of entries in specific dict (lbsnCountry, lbsnPost etc.)
return x

def MergeExistingRecords(self, newrecord,pkeyID,dict):
# Compare Vaues, keep longer ones
if pkeyID in dict:
oldRecordString = dict[pkeyID].SerializeToString()
newRecordString = newrecord.SerializeToString()
if len(oldRecordString) <= len(newRecordString):
dict[pkeyID] = newrecord
print("overwritten")
else:
print("kept")
else:
dict[pkeyID] = newrecord

def AddRecordsToDict(self,records):
if isinstance(records,(list,)):
for record in records:
self.AddRecordToDict(record)
else:
record = records
self.AddRecordToDict(record)

def AddRecordToDict(self,record):
print(type(record))
if isinstance(record,lbsnPost):
pkeyID = record.post_pkey.id
dict = self.lbsnPostDict
elif isinstance(record,lbsnCountry):
pkeyID = record.country_pkey.id
dict = self.lbsnCountryDict
elif isinstance(record,lbsnCity):
pkeyID = record.city_pkey.id
dict = self.lbsnCityDict
elif isinstance(record,lbsnPlace):
pkeyID = record.place_pkey.id
dict = self.lbsnPlaceDict
elif isinstance(record,lbsnPostReaction):
pkeyID = record.postreaction_pkey.id
dict = self.lbsnPostReactionDict
elif isinstance(record,lbsnUser):
pkeyID = record.user_pkey.id
dict = self.lbsnUserDict
self.MergeExistingRecords(record,pkeyID,dict)
52 changes: 33 additions & 19 deletions transferData.py
Expand Up @@ -5,6 +5,7 @@
import logging
from classes.dbConnection import dbConnection
from classes.helperFunctions import helperFunctions
from classes.helperFunctions import lbsnRecordDicts as lbsnRecordDicts
#LBSN Structure Import from ProtoBuf
from lbsnstructure.Structure_pb2 import *
from lbsnstructure.external.timestamp_pb2 import Timestamp
Expand Down Expand Up @@ -83,19 +84,24 @@ def main():
finished = True
log.info(f'Processed all available {processedRecords} records. Done.')
else:
for record in records:
dbRowNumber = record[0]
#singleUJSONRecord = ujson.loads(record[2])
singleJSONRecordDict = record[2]
parseJsonRecord(singleJSONRecordDict, origin)
processedRecords += 1
if processedRecords >= transferlimit:
finished = True
log.info(f'Processed {processedRecords} records. Done.')
break
lbsnRecords, processedRecords = loopInputRecords(records,origin, processedRecords, transferlimit)
log.info(f'Processed {lbsnRecords.Count()} records. \n')
# print(records[0])
cursor_input.close()

cursor_input.close()

def loopInputRecords(records,origin, processedRecords, transferlimit):
lbsnRecords = lbsnRecordDicts()
for record in records:
dbRowNumber = record[0]
#singleUJSONRecord = ujson.loads(record[2])
singleJSONRecordDict = record[2]
lbsnRecords.updateRecordDicts(parseJsonRecord(singleJSONRecordDict, origin, lbsnRecords))
if processedRecords >= transferlimit:
finished = True
log.info(f'Processed {processedRecords} records. Done.')
break
return lbsnRecords, processedRecords

def fetchJsonData_from_LBSN(cursor, startID = 0):
query_sql = '''
SELECT in_id,insert_time,data::json FROM public."input"
Expand All @@ -106,9 +112,10 @@ def fetchJsonData_from_LBSN(cursor, startID = 0):
records = cursor.fetchall()
return records, cursor.rowcount

def parseJsonRecord(jsonStringDict,origin):
def parseJsonRecord(jsonStringDict,origin,lbsnRecords):
log = logging.getLogger()
#log.debug(jsonStringDict)
# log.debug(jsonStringDict)
# Define Sets that will hold unique values of each lbsn type

post_guid = jsonStringDict.get('id_str')
log.debug(f'\n\n##################### {post_guid} #####################')
Expand Down Expand Up @@ -143,7 +150,7 @@ def parseJsonRecord(jsonStringDict,origin):
placeRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnCountry(),placeID,origin)
if not postGeoaccuracy:
postGeoaccuracy = lbsnPost.COUNTRY
log.debug(f'Placetype detected: country/')
log.debug(f'Placetype detected: country/')
#sys.exit("COUNTRY DETECTED - should not exist") #debug
if place_type == "city" or place_type == "neighborhood" or place_type == "admin":
#city_guid
Expand All @@ -168,9 +175,11 @@ def parseJsonRecord(jsonStringDict,origin):
placeRecord.geom_area = Polygon(bounding_box_points).wkt # prints: 'POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))'
if not place_type == "country":
refCountryRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnCountry(),place.get('country_code'),origin)
refCountryRecord.name = place.get('country') # Needs to be saved
refCountryRecord.name = place.get('country') # Needs to be saved
lbsnRecords.AddRecordsToDict(refCountryRecord)
placeRecord.country_pkey.CopyFrom(refCountryRecord) ##Assignment Error!
log.debug(f'Final Place Record: {placeRecord}')
lbsnRecords.AddRecordsToDict(placeRecord)
# Get Post/Reaction Details of user
userRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnUser(),jsonStringDict.get('user').get('id_str'),origin)
# get additional information about the user, if available
Expand Down Expand Up @@ -207,7 +216,7 @@ def parseJsonRecord(jsonStringDict,origin):
userMentionsJson = jsonStringDict.get('entities').get('user_mentions')
refUserRecords = helperFunctions.getMentionedUsers(userMentionsJson,origin)
log.debug(f'User mentions: {refUserRecords}')

lbsnRecords.AddRecordsToDict(refUserRecords)
# Assignment Step
# check first if post is reaction to other post
# reaction means: reduced structure compared to post
Expand Down Expand Up @@ -237,7 +246,7 @@ def parseJsonRecord(jsonStringDict,origin):
# "Note that retweets of retweets do not show representations of the intermediary retweet [...]"
# postReactionRecord.referencedPostreaction_pkey.CopyFrom(refPostReactionRecord)
log.debug(f'Reaction record: {postReactionRecord}')

lbsnRecords.AddRecordsToDict(postReactionRecord)
else:
# if record is a post
postRecord = helperFunctions.createNewLBSNRecord_with_id(lbsnPost(),post_guid,origin)
Expand Down Expand Up @@ -274,9 +283,14 @@ def parseJsonRecord(jsonStringDict,origin):
# Add mentioned userRecords
postRecord.user_mentions_pkey.extend([userRef.user_pkey for userRef in refUserRecords])
# because standard print statement will produce escaped text, we can use protobuf text_format to give us a human friendly version of the text
log.debug(f'Post record: {text_format.MessageToString(postRecord,as_utf8=True)}')
# log.debug(f'Post record: {text_format.MessageToString(postRecord,as_utf8=True)}')
log.debug(f'Post record: {postRecord}')
lbsnRecords.AddRecordsToDict(postRecord)
log.debug(f'The user who posted/reacted: {userRecord}')
lbsnRecords.AddRecordsToDict(userRecord)

return lbsnRecords

if __name__ == "__main__":
main()

Expand Down

0 comments on commit c561d01

Please sign in to comment.