Skip to content

Commit

Permalink
Style improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Sieboldianus committed Dec 19, 2018
1 parent 10acf9c commit 59b9be6
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 44 deletions.
17 changes: 9 additions & 8 deletions lbsntransform/__main__.py
Expand Up @@ -18,7 +18,6 @@

import logging
import io
import os

def main():
""" Main function to process data from postgres db or local file input
Expand Down Expand Up @@ -91,9 +90,9 @@ def main():
if continue_number > len(loc_filelist) - 1:
break
records = LoadData.fetch_data_from_file(loc_filelist,
continue_number,
config.is_stacked_json,
config.local_file_type)
continue_number,
config.is_stacked_json,
config.local_file_type)
# skip empty files
if not records:
continue_number += 1
Expand All @@ -116,12 +115,13 @@ def main():
processed_records += processed_count
processed_total += processed_count
print(f'{processed_total} input records processed (up to {continue_number}). '
f'Count per type: {import_mapper.lbsnRecords.getTypeCounts()}records.', end='\n')
f'Count per type: {import_mapper.lbsn_records.getTypeCounts()}records.', end='\n')
# update console
# On the first loop or after 500.000 processed records, transfer results to DB
if not start_number or processed_records >= config.transferCount or finished:
sys.stdout.flush()
print(f'Storing {import_mapper.lbsnRecords.CountGlob} records .. {HF.null_notice(import_mapper.null_island)})')
print(f'Storing {import_mapper.lbsn_records.CountGlob} records.. '
f'{HF.null_notice(import_mapper.null_island)})')
output.storeLbsnRecordDicts(import_mapper)
output.commitChanges()
processed_records = 0
Expand All @@ -141,8 +141,9 @@ def main():

# submit remaining
# ??
if import_mapper.lbsnRecords.CountGlob > 0:
print(f'Transferring remaining {import_mapper.lbsnRecords.CountGlob} to db.. {HF.null_notice(import_mapper.null_island)})')
if import_mapper.lbsn_records.CountGlob > 0:
print(f'Transferring remaining {import_mapper.lbsn_records.CountGlob} to db.. '
f'{HF.null_notice(import_mapper.null_island)})')
output.storeLbsnRecordDicts(import_mapper)
output.commitChanges()

Expand Down
2 changes: 1 addition & 1 deletion lbsntransform/classes/db_connection.py
Expand Up @@ -53,7 +53,7 @@ def connect(self):
conn = psycopg2.connect(conn_string)
except Exception as err:
print(err)
sys.exit
sys.exit()
# conn.cursor will return a cursor object, you can use this cursor to perform queries
cursor = conn.cursor()
dnow = datetime.datetime.now()
Expand Down
49 changes: 32 additions & 17 deletions lbsntransform/classes/field_mapping_flickr.py
Expand Up @@ -7,17 +7,25 @@
# for debugging only:
from google.protobuf import text_format


class FieldMappingFlickr():
def __init__(self, disableReactionPostReferencing=False, geocodes=False, mapFullRelations=False):
""" Provides mapping function from Flickr endpoints to
protobuf lbsnstructure
"""
def __init__(self,
disableReactionPostReferencing=False,
geocodes=False,
mapFullRelations=False,
map_reactions=True,
ignore_non_geotagged=False,
ignore_sources_set=set()):
# We're dealing with Flickr in this class, lets create the OriginID
# globally
# this OriginID is required for all CompositeKeys
origin = lbsnOrigin()
origin.origin_id = lbsnOrigin.FLICKR
self.origin = origin
self.null_island = 0
self.lbsnRecords = LBSNRecordDicts() #this is where all the data will be stored
self.lbsn_records = LBSNRecordDicts() #this is where all the data will be stored
self.log = logging.getLogger('__main__') #get the main logger object
#self.disableReactionPostReferencing = disableReactionPostReferencing
#self.mapFullRelations = mapFullRelations
Expand Down Expand Up @@ -45,20 +53,25 @@ def extract_flickr_post(self, record):
To Do:
- parameterize column numbers and structure
- provide external config-file for specific CSV structures
- currently not included in lbsn mapping are MachineTags, GeoContext (indoors, outdoors), WoeId
- currently not included in lbsn mapping are MachineTags,
GeoContext (indoors, outdoors), WoeId
and some extra attributes only present for Flickr
"""
post_guid = record[5]
if not HF.check_notice_empty_post_guid(post_guid):
return None
postRecord = HF.createNewLBSNRecord_with_id(lbsnPost(),post_guid,self.origin)
postRecord = HF.createNewLBSNRecord_with_id(lbsnPost(),
post_guid,
self.origin)
postGeoaccuracy = None
userRecord = HF.createNewLBSNRecord_with_id(lbsnUser(),record[7],self.origin)
userRecord = HF.createNewLBSNRecord_with_id(lbsnUser(),
record[7],
self.origin)
userRecord.user_name = record[6]
userRecord.url = f'http://www.flickr.com/photos/{userRecord.pkey.id}/'
if userRecord:
postRecord.user_pkey.CopyFrom(userRecord.pkey)
self.lbsnRecords.AddRecordsToDict(userRecord)
self.lbsn_records.AddRecordsToDict(userRecord)
postRecord.post_latlng = self.flickr_extract_postlatlng(record)
geoaccuracy = FieldMappingFlickr.flickr_map_geoaccuracy(record[13])
if geoaccuracy:
Expand All @@ -67,8 +80,10 @@ def extract_flickr_post(self, record):
# we need some information from postRecord to create placeRecord
# (e.g. user language, geoaccuracy, post_latlng)
# some of the information from place will also modify postRecord
placeRecord = HF.createNewLBSNRecord_with_id(lbsnPlace(),record[19],self.origin)
self.lbsnRecords.AddRecordsToDict(placeRecord)
placeRecord = HF.createNewLBSNRecord_with_id(lbsnPlace(),
record[19],
self.origin)
self.lbsn_records.AddRecordsToDict(placeRecord)
postRecord.place_pkey.CopyFrom(placeRecord.pkey)
postRecord.post_publish_date.CopyFrom(HF.parse_csv_datestring_to_protobuf(record[9]))
postRecord.post_create_date.CopyFrom(HF.parse_csv_datestring_to_protobuf(record[8]))
Expand All @@ -80,7 +95,7 @@ def extract_flickr_post(self, record):
postRecord.post_url = f'http://flickr.com/photo.gne?id={post_guid}'
postRecord.post_body = FieldMappingFlickr.reverse_csv_comma_replace(record[21])
postRecord.post_title = FieldMappingFlickr.reverse_csv_comma_replace(record[3])
postRecord.post_thumbnail_url =record[4]
postRecord.post_thumbnail_url = record[4]
record_tags_list = list(filter(None, record[11].split(";")))
if record_tags_list:
for tag in record_tags_list:
Expand All @@ -92,7 +107,7 @@ def extract_flickr_post(self, record):
else:
postRecord.post_type = lbsnPost.IMAGE
postRecord.post_content_license = valueCount(record[14])
self.lbsnRecords.AddRecordsToDict(postRecord)
self.lbsn_records.AddRecordsToDict(postRecord)

@staticmethod
def reverse_csv_comma_replace(csv_string):
Expand All @@ -105,7 +120,7 @@ def reverse_csv_comma_replace(csv_string):
def clean_tags_from_flickr(tag):
"""Clean special vars not allowed in tags.
"""
characters_to_replace = ('{','}')
characters_to_replace = ('{', '}')
for char_check in characters_to_replace:
tag = tag.replace(char_check,'')
return tag
Expand Down Expand Up @@ -150,29 +165,29 @@ def flickr_extract_postlatlng(self, record):
lat_entry = record[1]
lng_entry = record[2]
if lat_entry == "" and lng_entry == "":
l_lat,l_lng = 0,0
l_lat, l_lng = 0, 0
else:
try:
l_lng = Decimal(lng_entry)
l_lat = Decimal(lat_entry)
except:
l_lat,l_lng = 0,0
l_lat, l_lng = 0, 0

if (l_lat == 0 and l_lng == 0) or l_lat > 90 or l_lat < -90 or l_lng > 180 or l_lng < -180:
l_lat,l_lng = 0,0
l_lat, l_lng = 0, 0
self.send_to_null_island(lat_entry, lng_entry, record[5])
return FieldMappingFlickr.lat_lng_to_wkt(l_lat, l_lng)

@staticmethod
def lat_lng_to_wkt(lat, lng):
"""Convert lat lng to WKT (Well-Known-Text)
"""
point_latlng_string = "POINT(%s %s)" % (lng,lat)
point_latlng_string = "POINT(%s %s)" % (lng, lat)
return point_latlng_string

def send_to_null_island(self, lat_entry, lng_entry, record_guid):
"""Logs entries with problematic lat/lng's,
increases Null Island Counter by 1.
"""
self.log.debug(f'NULL island: Guid {record_guid} - Coordinates: {lat_entry}, {lng_entry}')
self.null_island += 1
self.null_island += 1
37 changes: 20 additions & 17 deletions lbsntransform/classes/field_mapping_twitter.py
Expand Up @@ -12,6 +12,9 @@
from google.protobuf import text_format

class FieldMappingTwitter():
""" Provides mapping function from Twitter endpoints to
protobuf lbsnstructure
"""
def __init__(self,
disableReactionPostReferencing=False,
geocodes=False,
Expand All @@ -24,7 +27,7 @@ def __init__(self,
origin = lbsnOrigin()
origin.origin_id = lbsnOrigin.TWITTER
self.origin = origin
self.lbsnRecords = LBSNRecordDicts() #this is where all the data will be stored
self.lbsn_records = LBSNRecordDicts() #this is where all the data will be stored
self.null_island = 0
self.log = logging.getLogger('__main__')#logging.getLogger()
self.disableReactionPostReferencing = disableReactionPostReferencing
Expand All @@ -39,21 +42,21 @@ def parseJsonRecord(self, jsonStringDict, input_lbsn_type = None):
if input_lbsn_type and input_lbsn_type in ('friendslist', 'followerslist'):
for user, relatedUserList in jsonStringDict.items():
userRecord = HelperFunctions.createNewLBSNRecord_with_id(lbsnUser(),str(user),self.origin)
self.lbsnRecords.AddRecordsToDict(userRecord)
self.lbsn_records.AddRecordsToDict(userRecord)
for relatedUser in relatedUserList:
relatedRecord = HelperFunctions.createNewLBSNRecord_with_id(lbsnUser(),str(relatedUser),self.origin)
self.lbsnRecords.AddRecordsToDict(relatedRecord)
self.lbsn_records.AddRecordsToDict(relatedRecord)
# note the switch of order here, direction is important for 'isConnected', and the different list each give us a different view on this relationship
if input_lbsn_type == 'friendslist':
relationshipRecord = HelperFunctions.createNewLBSNRelationship_with_id(lbsnRelationship(), userRecord.pkey.id, relatedRecord.pkey.id, self.origin)
elif input_lbsn_type == 'followerslist':
relationshipRecord = HelperFunctions.createNewLBSNRelationship_with_id(lbsnRelationship(), relatedRecord.pkey.id, userRecord.pkey.id, self.origin)
relationshipRecord.relationship_type = lbsnRelationship.isCONNECTED
self.lbsnRecords.AddRelationshipToDict(relationshipRecord)
self.lbsn_records.AddRelationshipToDict(relationshipRecord)
elif (input_lbsn_type and input_lbsn_type == 'profile') or 'screen_name' in jsonStringDict:
# user
userRecord = self.extractUser(jsonStringDict)
self.lbsnRecords.AddRecordsToDict(userRecord)
self.lbsn_records.AddRecordsToDict(userRecord)
#sys.exit(f'Post record: {text_format.MessageToString(userRecord,as_utf8=True)}')
if not userRecord.is_private:
# if user profile is private, we cannot access posts
Expand Down Expand Up @@ -125,7 +128,7 @@ def parseJsonPost(self, jsonStringDict, userPkey = None):
refPostRecord = HelperFunctions.createNewLBSNRecord_with_id(lbsnPost(),jsonStringDict.get('in_reply_to_status_id_str'),self.origin)
refUserRecord = HelperFunctions.createNewLBSNRecord_with_id(lbsnUser(),jsonStringDict.get('in_reply_to_user_id_str'),self.origin)
refUserRecord.user_name = jsonStringDict.get('in_reply_to_screen_name') # Needs to be saved
self.lbsnRecords.AddRecordsToDict(refUserRecord)
self.lbsn_records.AddRecordsToDict(refUserRecord)
refPostRecord.user_pkey.CopyFrom(refUserRecord.pkey)

# add referenced post pkey to reaction
Expand All @@ -136,12 +139,12 @@ def parseJsonPost(self, jsonStringDict, userPkey = None):
# "Note that retweets of retweets do not show representations of the intermediary retweet [...]"
# would be added to. postReactionRecord.referencedPostReaction_pkey
if refPostRecord:
self.lbsnRecords.AddRecordsToDict(refPostRecord)
self.lbsn_records.AddRecordsToDict(refPostRecord)
# add postReactionRecord to Dict
self.lbsnRecords.AddRecordsToDict(postReactionRecord)
self.lbsn_records.AddRecordsToDict(postReactionRecord)
else:
# add postReactionRecord to Dict
self.lbsnRecords.AddRecordsToDict(postRecord)
self.lbsn_records.AddRecordsToDict(postRecord)

def extractUser(self, jsonStringDict):
user = jsonStringDict
Expand Down Expand Up @@ -188,7 +191,7 @@ def extractUser(self, jsonStringDict):
#if self.mapFullRelations:
# relationshipRecord = HelperFunctions.createNewLBSNRelationship_with_id(lbsnRelationship(),userRecord.pkey.id,deutscherBundestagGroup.pkey.id, self.origin)
# relationshipRecord.relationship_type = lbsnRelationship.inGROUP
# self.lbsnRecords.AddRelationshipToDict(relationshipRecord)
# self.lbsn_records.AddRelationshipToDict(relationshipRecord)
#userRecord.user_groups_follows = []
return userRecord

Expand All @@ -207,10 +210,10 @@ def extractPost(self,jsonStringDict, userPkey = None):
# userPkey is already available for posts that are statuses
userRecord = HelperFunctions.createNewLBSNRecord_with_id(lbsnUser(),userPkey.id,self.origin)
if userRecord:
self.lbsnRecords.AddRecordsToDict(userRecord)
self.lbsn_records.AddRecordsToDict(userRecord)
else:
self.log.warning(f'Record {self.lbsnRecords.CountGlob}: No User record found for post: {post_guid} (post saved without userid)..')
print(f'Record {self.lbsnRecords.CountGlob}', end='\r')
self.log.warning(f'Record {self.lbsn_records.CountGlob}: No User record found for post: {post_guid} (post saved without userid)..')
print(f'Record {self.lbsn_records.CountGlob}', end='\r')
#self.log.warning(f'{originalString}')
#input("Press Enter to continue... (post will be saved without userid)")

Expand All @@ -235,7 +238,7 @@ def extractPost(self,jsonStringDict, userPkey = None):
if not postRecord.post_geoaccuracy:
postRecord.post_geoaccuracy = postGeoaccuracy
#postRecord.post_geoaccuracy = twitterPostAttributes.geoaccuracy
self.lbsnRecords.AddRecordsToDict(placeRecord)
self.lbsn_records.AddRecordsToDict(placeRecord)
if postCountry:
postRecord.country_pkey.CopyFrom(postCountry.pkey)
if isinstance(placeRecord, lbsnCity):
Expand Down Expand Up @@ -303,13 +306,13 @@ def extractPost(self,jsonStringDict, userPkey = None):
userMentionsJson = entitiesJson.get('user_mentions')
if userMentionsJson:
refUserRecords = HelperFunctions.getMentionedUsers(userMentionsJson,self.origin)
self.lbsnRecords.AddRecordsToDict(refUserRecords)
self.lbsn_records.AddRecordsToDict(refUserRecords)
postRecord.user_mentions_pkey.extend([userRef.pkey for userRef in refUserRecords])
if self.mapFullRelations:
for mentionedUserRecord in refUserRecords:
relationshipRecord = HelperFunctions.createNewLBSNRelationship_with_id(lbsnRelationship(),userRecord.pkey.id,mentionedUserRecord.pkey.id, self.origin)
relationshipRecord.relationship_type = lbsnRelationship.MENTIONS_USER
self.lbsnRecords.AddRelationshipToDict(relationshipRecord)
self.lbsn_records.AddRelationshipToDict(relationshipRecord)
mediaJson = entitiesJson.get('media')
if mediaJson:
postRecord.post_type = HelperFunctions.assignMediaPostType(mediaJson)
Expand Down Expand Up @@ -395,7 +398,7 @@ def extractPlace(self, postPlace_json, postGeoaccuracy, userLanguage = None):
else:
alt_name = place.get('country')
refCountryRecord.name_alternatives.append(alt_name)
self.lbsnRecords.AddRecordsToDict(refCountryRecord)
self.lbsn_records.AddRecordsToDict(refCountryRecord)
if postGeoaccuracy == lbsnPost.CITY and refCountryRecord:
# country_pkey only on lbsnCity(), lbsnPlace() has city_pkey, but this is not available for Twitter
placeRecord.country_pkey.CopyFrom(refCountryRecord.pkey)
Expand Down
2 changes: 1 addition & 1 deletion lbsntransform/classes/submit_data.py
Expand Up @@ -73,7 +73,7 @@ def storeLbsnRecordDicts(self, fieldMappingTwitter):
# therefore, records are processed starting from lowest granularity. Order is stored in allDicts()
self.countRound += 1
#self.headersWritten.clear()
recordDicts = fieldMappingTwitter.lbsnRecords
recordDicts = fieldMappingTwitter.lbsn_records
x = 0
self.count_affected = 0
for recordsDict in recordDicts.allDicts:
Expand Down

0 comments on commit 59b9be6

Please sign in to comment.