Skip to content

Commit

Permalink
Merge branch 'dev' of gitlab.vgiscience.de:lbsn/lbsntransform into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Sieboldianus committed Nov 21, 2019
2 parents a408e47 + 697dddb commit 7827825
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 221 deletions.
4 changes: 0 additions & 4 deletions lbsntransform/__main__.py
Expand Up @@ -16,15 +16,11 @@
__license__ = "GNU GPLv3"
# version: see version.py

import io
import logging
import sys
import traceback

from lbsntransform.tools.helper_functions import HelperFunctions as HF
from lbsntransform.output.shared_structure import TimeMonitor
from lbsntransform.input.load_data import LoadData
from lbsntransform.output.submit_data import LBSNTransfer
from lbsntransform.config.config import BaseConfig
from lbsntransform.lbsntransform_ import LBSNTransform

Expand Down
40 changes: 17 additions & 23 deletions lbsntransform/input/load_data.py
Expand Up @@ -6,16 +6,13 @@

import codecs
import csv
import json
import os
import sys
import logging
import traceback
from contextlib import closing
from itertools import zip_longest
from glob import glob
from pathlib import Path
from typing import Dict, TextIO, List, Union, Any, Generator, Iterator
from typing import Dict, TextIO, List, Union, Any, Iterator

import ntpath
import requests
Expand Down Expand Up @@ -125,17 +122,16 @@ def __enter__(self) -> Iterator[Union[
return record_pipeline
# return record_pipeline

def __exit__(self, exception_type, exception_value, tb):
def __exit__(self, exception_type, exception_value, tb_value):
"""Contextmanager exit: nothing to do here if no exception is raised"""
if any(
v is not None for v in [
exception_type, exception_value, tb]):
if any(v is not None for v in [
exception_type, exception_value, tb_value]):
# only if any of these variables is not None
# catch exception and output additional information
logging.getLogger('__main__').warning(
f"\nError while reading records: "
f"{exception_type}\n{exception_value}\n"
f"{traceback.print_tb(tb)}\n")
f"{traceback.print_tb(tb_value)}\n")
logging.getLogger('__main__').warning(
f"Current source: \n {self.current_source}\n")
stats_str = HF.report_stats(
Expand All @@ -144,7 +140,7 @@ def __exit__(self, exception_type, exception_value, tb):
logging.getLogger('__main__').warning(stats_str)
return False

def _open_input_files(self, count: bool = None):
def _open_input_files(self):
"""Loops input input filelist and
returns opened file handles
"""
Expand Down Expand Up @@ -179,22 +175,22 @@ def _process_input(self, file_handles: Iterator[TextIO]) -> Iterator[Dict[
yield record
else:
# multiple web file query
if self.web_zip == True and len(self.filelist) == 2:
if self.web_zip and len(self.filelist) == 2:
# zip 2 web csv sources in parallel, e.g.
# zip_longest('ABCD', 'xy', fillvalue='-') --> Ax By C- D-
url1 = self.filelist[0]
url2 = self.filelist[1]
with closing(requests.get(url1, stream=True)) as f1, \
closing(requests.get(url2, stream=True)) as f2:
r1 = csv.reader(
codecs.iterdecode(f1.iter_lines(), 'utf-8'),
with closing(requests.get(url1, stream=True)) as fhandle1, \
closing(requests.get(url2, stream=True)) as fhandle2:
reader1 = csv.reader(
codecs.iterdecode(fhandle1.iter_lines(), 'utf-8'),
delimiter=self.csv_delim,
quotechar='"', quoting=csv.QUOTE_NONE)
r2 = csv.reader(
codecs.iterdecode(f2.iter_lines(), 'utf-8'),
reader2 = csv.reader(
codecs.iterdecode(fhandle2.iter_lines(), 'utf-8'),
delimiter=self.csv_delim,
quotechar='"', quoting=csv.QUOTE_NONE)
for zipped_record in zip_longest(r1, r2):
for zipped_record in zip_longest(reader1, reader2):
# two combine lists
yield zipped_record[0] + zipped_record[1]
return
Expand Down Expand Up @@ -256,7 +252,7 @@ def convert_records(self, records: Dict[str, Any]) -> Iterator[List[Union[
lbsn_records = self.import_mapper.parse_csv_record(
single_record)
else:
exit(f'Format {self.local_file_type} not supportet.')
sys.exit(f'Format {self.local_file_type} not supportet.')
# return record as pipe
if lbsn_records is None:
continue
Expand Down Expand Up @@ -313,12 +309,12 @@ def fetch_record_from_file(self, file_handle):
record_reader = self.fetch_csv_data_from_file(
file_handle)
else:
exit(f'Format {self.file_format} not supported.')
sys.exit(f'Format {self.file_format} not supported.')
# return record pipeline
for record in record_reader:
yield record

def fetch_json_data_from_file(self, file_handle, start_file_id=0):
def fetch_json_data_from_file(self, file_handle):
"""Read json entries from file.
Typical form is [{json1},{json2}], if is_stacked_json is True:
Expand Down Expand Up @@ -450,8 +446,6 @@ def load_geocodes(geo_config):
@staticmethod
def load_ignore_sources(list_source=None):
"""Loads list of source types to be ignored"""
if list_source is None:
return
ignore_source_list = set()
with open(list_source, newline='', encoding='utf8') as file_handle:
for ignore_source in file_handle:
Expand Down
26 changes: 10 additions & 16 deletions lbsntransform/input/mappings/field_mapping_fb.py
Expand Up @@ -8,17 +8,10 @@

import logging
import re
import sys

import shapely.geometry as geometry
# for debugging only:
from google.protobuf import text_format
from google.protobuf.timestamp_pb2 import Timestamp
from lbsnstructure import lbsnstructure_pb2 as lbsn
from shapely.geometry.polygon import Polygon

from ...tools.helper_functions import HelperFunctions as HF
from ...output.shared_structure import LBSNRecordDicts


class FieldMappingFBPlace():
Expand All @@ -34,7 +27,7 @@ def __init__(self,
mapFullRelations=False,
map_reactions=True,
ignore_non_geotagged=False,
ignore_sources_set=set(),
ignore_sources_set=None,
min_geoaccuracy=None):
origin = lbsn.Origin()
origin.origin_id = lbsn.Origin.FACEBOOK
Expand Down Expand Up @@ -76,6 +69,7 @@ def parse_json_record(self, json_string_dict, input_lbsn_type=None):
return self.lbsn_records

def extract_place(self, postplace_json):
"""Extract lbsn.Place from fb place json"""
place = postplace_json
place_id = place.get('id')

Expand Down Expand Up @@ -149,14 +143,14 @@ def extract_place(self, postplace_json):
return place_record

@staticmethod
def compile_address(fb_place_dict):
def compile_address(fb_place_dict) -> str:
"""Concat single line address from fb place dict"""
single_line_address = fb_place_dict.get("single_line_address")
if single_line_address:
return single_line_address
else:
fb_city = fb_place_dict.get("city")
fb_country = fb_place_dict.get("country")
fb_street = fb_place_dict.get("street")
fb_address = ', '.join(
filter(None, [fb_street, fb_city, fb_country]))
return fb_address
fb_city = fb_place_dict.get("city")
fb_country = fb_place_dict.get("country")
fb_street = fb_place_dict.get("street")
fb_address = ', '.join(
filter(None, [fb_street, fb_city, fb_country]))
return fb_address
19 changes: 7 additions & 12 deletions lbsntransform/input/mappings/field_mapping_flickr.py
Expand Up @@ -9,12 +9,9 @@
import logging
from decimal import Decimal

# for debugging only:
# from google.protobuf import text_format
from lbsnstructure import lbsnstructure_pb2 as lbsn

from ...tools.helper_functions import HelperFunctions as HF
from ...output.shared_structure import LBSNRecordDicts


class FieldMappingFlickr():
Expand All @@ -30,7 +27,7 @@ def __init__(self,
map_full_relations=False,
map_reactions=True,
ignore_non_geotagged=False,
ignore_sources_set=set(),
ignore_sources_set=None,
min_geoaccuracy=None):
# We're dealing with Flickr in this class, lets create the OriginID
# globally
Expand Down Expand Up @@ -61,8 +58,7 @@ def parse_csv_record(self, record):
# skip
self.skipped_count += 1
return
else:
self.extract_flickr_post(record)
self.extract_flickr_post(record)
return self.lbsn_records

def extract_flickr_post(self, record):
Expand Down Expand Up @@ -109,10 +105,9 @@ def extract_flickr_post(self, record):
HF.parse_csv_datestring_to_protobuf(record[8]))
# valueCount = lambda x: 0 if x is None else x

def value_count(x): return int(x) if x.isdigit() else 0
post_record.post_views_count = value_count(record[10])
post_record.post_comment_count = value_count(record[18])
post_record.post_like_count = value_count(record[17])
post_record.post_views_count = HF.value_count(record[10])
post_record.post_comment_count = HF.value_count(record[18])
post_record.post_like_count = HF.value_count(record[17])
post_record.post_url = f'http://flickr.com/photo.gne?id={post_guid}'
post_record.post_body = FieldMappingFlickr.reverse_csv_comma_replace(
record[21])
Expand All @@ -129,7 +124,7 @@ def value_count(x): return int(x) if x.isdigit() else 0
post_record.post_type = lbsn.Post.VIDEO
else:
post_record.post_type = lbsn.Post.IMAGE
post_record.post_content_license = value_count(record[14])
post_record.post_content_license = HF.value_count(record[14])
self.lbsn_records.append(post_record)

@staticmethod
Expand Down Expand Up @@ -196,7 +191,7 @@ def flickr_extract_postlatlng(self, record):
try:
l_lng = Decimal(lng_entry)
l_lat = Decimal(lat_entry)
except:
except ValueError:
l_lat, l_lng = 0, 0

if (l_lat == 0 and l_lng == 0) \
Expand Down

0 comments on commit 7827825

Please sign in to comment.