Skip to content

Commit

Permalink
fix: connection stream abort handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Sieboldianus committed Nov 22, 2019
1 parent 0acc54d commit f8a5d1b
Showing 1 changed file with 34 additions and 37 deletions.
71 changes: 34 additions & 37 deletions lbsntransform/input/load_data.py
Expand Up @@ -12,7 +12,7 @@
import traceback
from contextlib import closing
from itertools import zip_longest
from typing import Dict, TextIO, List, Union, Any, Iterator
from typing import Dict, TextIO, List, Union, Any, Iterator, Optional, IO

import ntpath
import requests
Expand Down Expand Up @@ -41,21 +41,17 @@ def __init__(
ignore_non_geotagged=None, min_geoaccuracy=None, source_web=None,
web_zip=None, skip_until_record=None):
self.is_local_input = is_local_input
self.start_number = None

self.start_number = 1
self.continue_number = 0
if not self.is_local_input:
# Start Value, Modify to continue from last processing
self.continue_number = startwith_db_rownumber
self.start_number = startwith_db_rownumber
else:
if skip_until_file is None:
self.continue_number = 0
else:
if skip_until_file is not None:
self.continue_number = skip_until_file
if skip_until_record is not None:
self.start_number = skip_until_record
else:
self.start_number
self.source_web = source_web
if web_zip is None:
web_zip = True
Expand All @@ -69,10 +65,6 @@ def __init__(
self.cursor_input = cursor_input
elif self.is_local_input and self.source_web:
self.filelist = input_path
else:
self.filelist = None
# self.cursor_input = LoadData.initialize_connection(
# cfg)
self.finished = False
self.db_row_number = 0
self.endwith_db_rownumber = endwith_db_rownumber
Expand Down Expand Up @@ -119,11 +111,11 @@ def __enter__(self) -> Iterator[Union[
Combine multiple generators to single pipeline,
returned for being processed by with-statement
"""

record_pipeline = self.convert_records(self._process_input(
self._open_input_files()))
return record_pipeline
# return record_pipeline
if self.cursor_input or self.source_web:
return self.convert_records(self._process_input())
else:
return self.convert_records(
self._process_input(self._open_input_files()))

def __exit__(self, exception_type, exception_value, tb_value):
"""Contextmanager exit: nothing to do here if no exception is raised"""
Expand All @@ -143,12 +135,8 @@ def __exit__(self, exception_type, exception_value, tb_value):
logging.getLogger('__main__').warning(stats_str)
return False

def _open_input_files(self):
"""Loops input input filelist and
returns opened file handles
"""
if self.cursor_input or self.source_web:
return None
def _open_input_files(self) -> Iterator[IO[str]]:
"""Loops input filelist and returns opened file handles"""
# process localfiles
for file_name in self.filelist:
self.continue_number += 1
Expand All @@ -157,8 +145,9 @@ def _open_input_files(self):
f'Current file: {ntpath.basename(file_name)}')
yield open(file_name, 'r', encoding="utf-8", errors='replace')

def _process_input(self, file_handles: Iterator[TextIO]) -> Iterator[Dict[
str, Any]]:
def _process_input(
self,
file_handles: Iterator[IO[str]] = None) -> Iterator[Optional[List[str]]]:
"""File parse for CSV or JSON from open file handle
Output: produces a list of post that can be parsed
Expand Down Expand Up @@ -195,12 +184,17 @@ def _process_input(self, file_handles: Iterator[TextIO]) -> Iterator[Dict[
quotechar='"', quoting=csv.QUOTE_NONE)
for zipped_record in zip_longest(reader1, reader2):
# catch any type-error None record
if zipped_record[0] is None:
yield zipped_record[1]
if zipped_record[1] is None:
yield zipped_record[0]
# if zipped_record[0] is None:
# yield zipped_record[1]
# if zipped_record[1] is None:
# yield zipped_record[0]
# two combine lists
yield zipped_record[0] + zipped_record[1]
try:
yield zipped_record[0] + zipped_record[1]
except TypeError:
sys.exit(f"Stream appears to have broken. "
f"Check connection and continue at "
f"{self.count_glob}")
return
else:
raise ValueError(
Expand Down Expand Up @@ -228,11 +222,13 @@ def _process_input(self, file_handles: Iterator[TextIO]) -> Iterator[Dict[
else:
# db query
while self.cursor:
record = self.fetch_json_data_from_lbsn(
records = self.fetch_json_data_from_lbsn(
self.cursor, self.continue_number)
yield record
for record in records:
yield record

def convert_records(self, records: Dict[str, Any]) -> Iterator[List[Union[
def convert_records(
self, records: Iterator[Optional[List[str]]]) -> Iterator[List[Union[
lbsn.CompositeKey, lbsn.Language, lbsn.RelationshipKey, lbsn.City,
lbsn.Country, lbsn.Origin, lbsn.Place, lbsn.Post,
lbsn.PostReaction, lbsn.Relationship, lbsn.User,
Expand Down Expand Up @@ -282,8 +278,10 @@ def skip_empty_or_other(single_record):
skip = True
return skip

def fetch_json_data_from_lbsn(self, cursor, start_id=0, get_max=None,
number_of_records_to_fetch=10000):
def fetch_json_data_from_lbsn(
self, cursor, start_id=0, get_max=None,
number_of_records_to_fetch=10000
) -> Optional[List[List[str]]]:
"""Fetches records from Postgres DB
Keyword arguments:
Expand Down Expand Up @@ -311,8 +309,7 @@ def fetch_json_data_from_lbsn(self, cursor, start_id=0, get_max=None,
if not self.start_number:
# first returned db_row_number
self.start_number = records[0][0]
for record in records:
return record
return records

def fetch_record_from_file(self, file_handle):
"""Fetches CSV or JSON data (including stacked json) from file"""
Expand Down

0 comments on commit f8a5d1b

Please sign in to comment.