Skip to content

Commit

Permalink
Refactored main, config; tested & bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Sieboldianus committed Jan 3, 2019
1 parent e27786a commit da20640
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 144 deletions.
38 changes: 19 additions & 19 deletions lbsntransform/__main__.py
Expand Up @@ -43,13 +43,13 @@ def main():
# load import mapper depending on lbsn origin (e.g. 1 = Instagram,
# 2 = Flickr,
# 3 = Twitter)
importer = HF.load_importer_mapping_module(config.Origin)
importer = HF.load_importer_mapping_module(config.origin)
# establish output connection
conn_output, cursor_output = LoadData.initialize_output_connection(config)
output = LBSNTransfer(db_cursor=cursor_output,
db_connection=conn_output,
store_csv=config.CSVOutput,
SUPPRESS_LINEBREAKS=config.CSVsuppressLinebreaks)
store_csv=config.csv_output,
SUPPRESS_LINEBREAKS=config.csv_suppress_linebreaks)
# load from local json/csv or from PostgresDB
if config.is_local_input:
loc_filelist = LoadData.read_local_files(config)
Expand All @@ -64,12 +64,12 @@ def main():
skipped_low_geoaccuracy = 0
skipped_low_geoaccuracy_total = 0
# Start Value, Modify to continue from last processing
continue_number = config.startWithdb_row_number
continue_number = config.startwith_db_rownumber

# Optional Geocoding
geocode_dict = False
if config.geocodeLocations:
geocode_dict = LoadData.load_geocodes(config.geocodeLocations)
if config.geocode_locations:
geocode_dict = LoadData.load_geocodes(config.geocode_locations)
# Optional ignore input sources
ignore_sources_set = None
if config.ignore_input_source_list:
Expand All @@ -78,10 +78,10 @@ def main():

finished = False
# initialize field mapping structure
import_mapper = importer(config.disableReactionPostReferencing,
import_mapper = importer(config.disable_reactionpost_ref,
geocode_dict,
config.MapRelations,
config.transferReactions,
config.map_relations,
config.transfer_reactions,
config.ignore_non_geotagged,
ignore_sources_set,
config.min_geoaccuracy)
Expand Down Expand Up @@ -131,25 +131,25 @@ def main():
print(f'{processed_total} input records processed (up to '
f'{continue_number}). '
f'Skipped {skipped_low_geoaccuracy} due to low geoaccuracy. '
f'Count per type: {import_mapper.lbsn_records.getTypeCounts()}'
f'Count per type: {import_mapper.lbsn_records.get_type_counts()}'
f'records.', end='\n')

# update console
# On the first loop or after 500.000 processed records,
# transfer results to DB
if not start_number or processed_records >= config.transferCount or \
if not start_number or processed_records >= config.transfer_count or \
finished:
sys.stdout.flush()
print(f'Storing {import_mapper.lbsn_records.CountGlob} records.. '
print(f'Storing {import_mapper.lbsn_records.count_glob} records.. '
f'{HF.null_notice(import_mapper.null_island)})')
output.storeLbsnRecordDicts(import_mapper)
output.store_lbsn_record_dicts(import_mapper)
output.commit_changes()
processed_records = 0
# create a new empty dict of records
import_mapper = importer(config.disableReactionPostReferencing,
import_mapper = importer(config.disable_reactionpost_ref,
geocode_dict,
config.MapRelations,
config.transferReactions,
config.map_relations,
config.transfer_reactions,
config.ignore_non_geotagged,
ignore_sources_set,
config.min_geoaccuracy)
Expand All @@ -164,9 +164,9 @@ def main():
# ??
if import_mapper.lbsn_records.count_glob > 0:
print(f'Transferring remaining '
f'{import_mapper.lbsn_records.CountGlob} to db.. '
f'{import_mapper.lbsn_records.count_glob} to db.. '
f'{HF.null_notice(import_mapper.null_island)})')
output.storeLbsnRecordDicts(import_mapper)
output.store_lbsn_record_dicts(import_mapper)
output.commit_changes()

# finalize all transactions (csv merge etc.)
Expand All @@ -175,7 +175,7 @@ def main():
# Close connections to DBs
if not config.is_local_input:
cursor_input.close()
if config.dbUser_Output:
if config.dbuser_output:
cursor_output.close()
log.info(f'\n\nProcessed {processed_total} input records '
f'(Input {start_number} to {continue_number}).'
Expand Down
5 changes: 3 additions & 2 deletions lbsntransform/classes/field_mapping_twitter.py
Expand Up @@ -244,7 +244,7 @@ def extract_user(self, json_string_dict):
user_record.group_count = listed_count
user_record.post_count = json_string_dict.get('statuses_count')
user_record.url = f'https://twitter.com/intent/user?user_id=' \
f'{userRecord.pkey.id}'
f'{user_record.pkey.id}'
ref_user_language = Language()
ref_user_language.language_short = json_string_dict.get('lang')
user_record.user_language.CopyFrom(ref_user_language)
Expand Down Expand Up @@ -375,7 +375,8 @@ def extract_post(self, json_string_dict, user_pkey=None):
if post_source:
post_record.input_source = HF.cleanhtml(
json_string_dict.get('source'))
if post_record.input_source in self.ignore_sources_set:
if self.ignore_sources_set and \
post_record.input_source in self.ignore_sources_set:
# skip entry if in ignore list
return None
post_record.post_publish_date.CopyFrom(
Expand Down
24 changes: 12 additions & 12 deletions lbsntransform/classes/load_data.py
Expand Up @@ -42,8 +42,8 @@ def loop_input_records(records, transferlimit, import_mapper, config):

if (transferlimit and processed_records >= transferlimit) or \
(not config.is_local_input and
config.end_with_db_row_number and
db_row_number >= config.end_with_db_row_number):
config.endwith_db_rownumber and
db_row_number >= config.endwith_db_rownumber):
finished = True
break
return processed_records, finished
Expand Down Expand Up @@ -149,11 +149,11 @@ def fetch_csv_data_from_file(loc_filelist, start_file_id=0):

def initialize_output_connection(config):
"""Establishes connection to output DB (Postgres), if set in config"""
if config.dbUser_Output:
output_connection = DBConnection(config.dbServeradressOutput,
config.dbNameOutput,
config.dbUser_Output,
config.dbPassword_Output)
if config.dbuser_output:
output_connection = DBConnection(config.dbserveradress_output,
config.dbname_output,
config.dbuser_output,
config.dbpassword_output)
conn_output, cursor_output = output_connection.connect()
else:
conn_output = None
Expand All @@ -165,10 +165,10 @@ def initialize_input_connection(config):
Returns cursor
"""
input_connection = DBConnection(config.dbServeradressInput,
config.dbNameInput,
config.dbUser_Input,
config.dbPassword_Input,
input_connection = DBConnection(config.dbserveradress_input,
config.db_name_input,
config.dbuser_Input,
config.dbpassword_input,
True # ReadOnly Mode
)
conn_input, cursor_input = input_connection.connect()
Expand All @@ -179,7 +179,7 @@ def read_local_files(config):
returns list of file-paths
"""
path = f'{config.InputPath}'
if config.recursiveLoad:
if config.recursive_load:
excludefolderlist = ["01_DataSetHistory",
"02_UserData", "03_ClippedData", "04_MapVis"]
excludestartswithfile = ["log", "settings", "GridCoordinates"]
Expand Down
52 changes: 26 additions & 26 deletions lbsntransform/classes/submit_data.py
Expand Up @@ -61,12 +61,12 @@ def __init__(self, db_cursor=None,
lbsnPostReaction.DESCRIPTOR.name: list(),
lbsnRelationship.DESCRIPTOR.name: list()}

self.countRound = 0
self.count_round = 0
# Records are batched and submitted in
# one insert with x number of records
self.batchDBVolume = 100
self.batch_db_volume = 100
self.store_csv = store_csv
self.headersWritten = set()
self.headers_written = set()
self.db_mapping = ProtoLBSM_db_Mapping()
# self.CSVsuppressLinebreaks = CSVsuppressLinebreaks

Expand All @@ -83,23 +83,23 @@ def store_changes(self):
self.csv_output.clean_csv_batches(self.batched_records)
self.count_entries_store = 0

def storeLbsnRecordDicts(self, fieldMappingTwitter):
def store_lbsn_record_dicts(self, field_mapping):
# order is important here, as PostGres will reject any
# records where Foreign Keys are violated
# therefore, records are processed starting from lowest
# granularity. Order is stored in allDicts()
self.countRound += 1
self.count_round += 1
# self.headersWritten.clear()
recordDicts = fieldMappingTwitter.lbsn_records
record_dicts = field_mapping.lbsn_records
x = 0
self.count_affected = 0
for recordsDict in recordDicts.all_dicts:
type_name = recordsDict[1]
for record_pkey, record in recordsDict[0].items():
for records_dict in record_dicts.all_dicts:
type_name = records_dict[1]
for record_pkey, record in records_dict[0].items():
x += 1
print(f'Storing {x} of {recordDicts.CountGlob} '
print(f'Storing {x} of {record_dicts.count_glob} '
f'output records ({type_name})..', end='\r')
self.prepareLbsnRecord(record, type_name)
self.prepare_lbsn_record(record, type_name)
self.count_glob += 1 # self.dbCursor.rowcount
self.count_entries_commit += 1 # self.dbCursor.rowcount
self.count_entries_store += 1
Expand All @@ -111,23 +111,23 @@ def storeLbsnRecordDicts(self, fieldMappingTwitter):
self.store_volume):
self.store_changes()
# submit remaining rest
self.submitAllBatches()
self.submit_all_batches()
# self.count_affected += x # monitoring
print(f'\nRound {self.countRound:03d}: '
print(f'\nRound {self.count_round:03d}: '
f'Updated/Inserted {self.count_glob} records.')

def prepareLbsnRecord(self, record, record_type):
def prepare_lbsn_record(self, record, record_type):
# clean duplicates in repeated Fields and Sort List
self.sortCleanProtoRepeatedField(record)
self.sort_clean_proto_repeated_field(record)
# store cleaned ProtoBuf records
self.batched_records[record_type].append(record)
for listType, batchList in self.batched_records.items():
for list_type, batch_list in self.batched_records.items():
# if any dict contains more values than self.batchDBVolume,
# submit/store all
if len(batchList) >= self.batchDBVolume:
self.submitAllBatches()
if len(batch_list) >= self.batch_db_volume:
self.submit_all_batches()

def submitAllBatches(self):
def submit_all_batches(self):
for record_type, batch_list in self.batched_records.items():
if batch_list:
# if self.storeCSV and not record_type in self.headersWritten:
Expand All @@ -150,7 +150,7 @@ def submit_lbsn_records(self, record_type):
if self.store_csv:
self.csv_output.store_append_batch_to_csv(
self.batched_records[record_type],
self.countRound, record_type)
self.count_round, record_type)
if self.db_cursor:
values_str = ','.join(
[self.prepare_sqlescaped_values(record) for
Expand Down Expand Up @@ -450,7 +450,7 @@ def submitLbsnRelationships(self):
if selectFriends:
if self.store_csv:
self.csv_output.store_append_batch_to_csv(
selectFriends, self.countRound, '_user_friends_user')
selectFriends, self.count_round, '_user_friends_user')
if self.db_cursor:
args_isFriend = ','.join(selectFriends)
insert_sql = \
Expand All @@ -469,7 +469,7 @@ def submitLbsnRelationships(self):
if selectConnected:
if self.store_csv:
self.csv_output.store_append_batch_to_csv(
selectConnected, self.countRound, '_user_connectsto_user')
selectConnected, self.count_round, '_user_connectsto_user')
if self.db_cursor:
args_isConnected = ','.join(selectConnected)
insert_sql = \
Expand All @@ -489,7 +489,7 @@ def submitLbsnRelationships(self):
if selectUserGroupMember:
if self.store_csv:
self.csv_output.store_append_batch_to_csv(
selectUserGroupMember, self.countRound,
selectUserGroupMember, self.count_round,
'_user_memberof_group')
if self.db_cursor:
args_isInGroup = ','.join(selectUserGroupMember)
Expand All @@ -509,7 +509,7 @@ def submitLbsnRelationships(self):
if selectUserGroupMember:
if self.store_csv:
self.csv_output.store_append_batch_to_csv(
selectUserGroupMember, self.countRound,
selectUserGroupMember, self.count_round,
'_user_follows_group')
if self.db_cursor:
args_isInGroup = ','.join(selectUserGroupMember)
Expand All @@ -529,7 +529,7 @@ def submitLbsnRelationships(self):
if selectUserMentions:
if self.store_csv:
self.csv_output.store_append_batch_to_csv(
selectUserMentions, self.countRound,
selectUserMentions, self.count_round,
'_user_mentions_user')
if self.db_cursor:
args_isInGroup = ','.join(selectUserMentions)
Expand Down Expand Up @@ -606,7 +606,7 @@ def prepare_sqlescaped_values(self, *args):
preparedSQLRecord = preparedSQLRecord.decode()
return preparedSQLRecord

def sortCleanProtoRepeatedField(self, record):
def sort_clean_proto_repeated_field(self, record):
"""Remove duplicate values in repeated field, sort alphabetically
ProtocolBuffers has no unique list field type. This function will
Expand Down

0 comments on commit da20640

Please sign in to comment.