Skip to content

Commit

Permalink
Local input bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Sieboldianus committed Dec 3, 2018
1 parent 9cb8adf commit d5e6f8c
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 18 deletions.
28 changes: 14 additions & 14 deletions lbsntransform/__main__.py
Expand Up @@ -38,9 +38,9 @@ def main():
config.parseArgs()
sys.stdout.flush()
log = set_logger()
# load import mapper
# load import mapper depending on lbsn origin (e.g. 1 = Instagram, 2 = Flickr, 3 = Twitter)
importer = HelperFunctions.load_importer_mapping_module(config.Origin)
# establish output connection (will return none of no config)
# establish output connection
conn_output, cursor_output = LoadData.initialize_output_connection(config)
output = LBSNTransfer(dbCursor=cursor_output,
dbConnection=conn_output,
Expand All @@ -67,12 +67,12 @@ def main():

finished = False
# initialize field mapping structure
twitter_records = importer(config.disableReactionPostReferencing,
geocode_dict,
config.MapRelations)
import_mapper = importer(config.disableReactionPostReferencing,
geocode_dict,
config.MapRelations)

# Manually add entries that need submission prior to parsing data
# add_bundestag_group_example(twitter_records)
# add_bundestag_group_example(import_mapper)

how_long = TimeMonitor()
# loop input DB until transferlimit reached or no more rows are returned
Expand Down Expand Up @@ -104,24 +104,24 @@ def main():
continue_number = records[-1][0] #last returned db_row_number
processed_count, finished = LoadData.loop_input_records(records,
max_records,
twitter_records,
import_mapper,
config.end_with_db_row_number,
config.LocalInput,
config.input_type)
processed_records += processed_count
processed_total += processed_count
print(f'{processed_total} input records processed (up to {continue_number}). '
f'Count per type: {twitter_records.lbsnRecords.getTypeCounts()}records.', end='\n')
f'Count per type: {import_mapper.lbsnRecords.getTypeCounts()}records.', end='\n')
# update console
# On the first loop or after 500.000 processed records, transfer results to DB
if not start_number or processed_records >= config.transferCount or finished:
sys.stdout.flush()
print(f'Storing {twitter_records.lbsnRecords.CountGlob} records ..')
output.storeLbsnRecordDicts(twitter_records)
print(f'Storing {import_mapper.lbsnRecords.CountGlob} records ..')
output.storeLbsnRecordDicts(import_mapper)
output.commitChanges()
processed_records = 0
## create a new empty dict of records
twitter_records = importer(config.disableReactionPostReferencing,
import_mapper = importer(config.disableReactionPostReferencing,
geocode_dict,
config.MapRelations)
# remember the first processed DBRow ID
Expand All @@ -133,9 +133,9 @@ def main():

# submit remaining
# ??
if twitter_records.lbsnRecords.CountGlob > 0:
print(f'Transferring remaining {twitter_records.lbsnRecords.CountGlob} to db..')
output.storeLbsnRecordDicts(twitter_records)
if import_mapper.lbsnRecords.CountGlob > 0:
print(f'Transferring remaining {import_mapper.lbsnRecords.CountGlob} to db..')
output.storeLbsnRecordDicts(import_mapper)
output.commitChanges()

# finalize all transactions (csv merge etc.)
Expand Down
1 change: 1 addition & 0 deletions lbsntransform/classes/helper_functions.py
Expand Up @@ -157,6 +157,7 @@ def returnEWKBFromGeoTEXT(text):
geom = geom.wkb_hex
return geom

@staticmethod
def decode_stacked(document, pos=0, decoder=JSONDecoder()):
NOT_WHITESPACE = re.compile(r'[^\s]')
while True:
Expand Down
12 changes: 8 additions & 4 deletions lbsntransform/classes/load_data.py
@@ -1,6 +1,10 @@
# -*- coding: utf-8 -*-
from .db_connection import DBConnection
#from .helperFunctions import lbsnRecordDicts as lbsnRecordDicts
from .helper_functions import HelperFunctions
from glob import glob
#import json
from json import loads as json_loads, decoder as json_decoder
#from json import decoder, JSONDecoder, JSONDecodeError

class LoadData():
def loop_input_records(json_records, transferlimit, twitter_records, end_with_db_row_number, is_local_input, input_type):
Expand Down Expand Up @@ -68,13 +72,13 @@ def fetch_json_data_from_file(loc_filelist, start_file_id=0, is_stacked_json=Fal
# {json1}{json2} etc.
if is_stacked_json:
try:
for obj in helperFunctions.decode_stacked(file.read()):
for obj in HelperFunctions.decode_stacked(file.read()):
records.append(obj)
except json.decoder.JSONDecodeError:
except json_decoder.JSONDecodeError:
pass
else:
# normal json nesting, e.g. {{record1},{record2}}
records = json.loads(file.read())
records = json_loads(file.read())
if records:
return records
return None
Expand Down

0 comments on commit d5e6f8c

Please sign in to comment.