Skip to content

Commit

Permalink
feat: lbsn raw to hll structure transformation
Browse files Browse the repository at this point in the history
First part of hll feature implementation

Add hllworker and update bases

refactor: imports and import formatting

hll_add_agg implementation

Merge hll_add_agg results back to records

refactor: use class inheritance for hll bases

refactor: move try..catch to contextmanager

refactor: reduce code duplication in hll.Base classes

refactor: formatting to code conventions

Code separation improvements

feat: allow zipping of multiple web sources

fix: updates

fix: NUL character exception in values

fixup: nul characters

fixup: NUL character

refactor: cleanup & code duplication reduction
  • Loading branch information
Sieboldianus committed Nov 19, 2019
1 parent c2f6537 commit db1c24e
Show file tree
Hide file tree
Showing 21 changed files with 2,402 additions and 724 deletions.
1 change: 1 addition & 0 deletions environment.yml
Expand Up @@ -11,6 +11,7 @@ dependencies:
- protobuf
- shapely
- requests
- nltk
- pip:
- lbsnstructure>=0.5.0
- ppygis3>=0.4
Expand Down
1 change: 1 addition & 0 deletions environment_dev.yml
Expand Up @@ -15,6 +15,7 @@ dependencies:
- shapely
- requests
- bitarray
- nltk
- pip:
- anybadge
- pylint-exit
Expand Down
52 changes: 23 additions & 29 deletions lbsntransform/__main__.py
Expand Up @@ -30,8 +30,9 @@


def main():
""" Main function to process data from postgres db or local file input
to postgres db or local file output
""" Main function for cli-mode to process data
from postgres db or local file input
to postgres db or local file output
"""

# Load Config, will be overwritten if args are given
Expand All @@ -52,11 +53,17 @@ def main():
dbname_output=config.dbname_output,
dbpassword_output=config.dbpassword_output,
dbserverport_output=config.dbserverport_output,
dbformat_output=config.dbformat_output,
dbuser_input=config.dbuser_input,
dbserveraddress_input=config.dbserveraddress_input,
dbname_input=config.dbname_input,
dbpassword_input=config.dbpassword_input,
dbserverport_input=config.dbserverport_input)
dbserverport_input=config.dbserverport_input,
dbuser_hllworker=config.dbuser_hllworker,
dbserveraddress_hllworker=config.dbserveraddress_hllworker,
dbname_hllworker=config.dbname_hllworker,
dbpassword_hllworker=config.dbpassword_hllworker,
dbserverport_hllworker=config.dbserverport_hllworker)

# initialize converter class
# depending on lbsn origin
Expand Down Expand Up @@ -99,32 +106,19 @@ def main():
# read and process unfiltered input records from csv
# start settings
with input_data as records:
try:
for record in records:
lbsntransform.add_processed_records(record)
# report progress
if lbsntransform.processed_total % 1000 == 0:
stats_str = HF.report_stats(
input_data.count_glob,
input_data.continue_number,
lbsntransform.lbsn_records)
print(stats_str, end='\r')
sys.stdout.flush()
if (config.transferlimit and
lbsntransform.processed_total >= config.transferlimit):
break
except Exception as e:
# catch any exception and output additional information
lbsntransform.log.warning(
f"\nError while reading records: "
f"{e}\n{e.args}\n{traceback.format_exc()}\n")
lbsntransform.log.warning(
f"Current source: \n {input_data.current_source}\n")
stats_str = HF.report_stats(
input_data.count_glob,
input_data.continue_number,
lbsntransform.lbsn_records)
lbsntransform.log.warning(stats_str)
for record in records:
lbsntransform.add_processed_records(record)
# report progress
if lbsntransform.processed_total % 1000 == 0:
stats_str = HF.report_stats(
input_data.count_glob,
input_data.continue_number,
lbsntransform.lbsn_records)
print(stats_str, end='\r')
sys.stdout.flush()
if (config.transferlimit and
lbsntransform.processed_total >= config.transferlimit):
break

# finalize output (close db connection, submit remaining)
lbsntransform.log.info(
Expand Down
26 changes: 10 additions & 16 deletions lbsntransform/classes/field_mapping_fb.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-

"""
Module for mapping public Facebook Place Graph to common LBSN Structure.
Module for mapping public Facebook lbsn.Place Graph to common LBSN Structure.
"""

# pylint: disable=no-member
Expand All @@ -14,21 +14,15 @@
# for debugging only:
from google.protobuf import text_format
from google.protobuf.timestamp_pb2 import Timestamp
from lbsnstructure.lbsnstructure_pb2 import (CompositeKey, Language,
RelationshipKey, City,
Country, Origin,
Place, Post,
PostReaction,
Relationship, User,
UserGroup)
from lbsnstructure import lbsnstructure_pb2 as lbsn
from shapely.geometry.polygon import Polygon

from .helper_functions import HelperFunctions as HF
from .helper_functions import LBSNRecordDicts


class FieldMappingFBPlace():
""" Provides mapping function from Facebook Place Graph endpoints to
""" Provides mapping function from Facebook lbsn.Place Graph endpoints to
protobuf lbsnstructure
"""
ORIGIN_NAME = "Facebook"
Expand All @@ -42,8 +36,8 @@ def __init__(self,
ignore_non_geotagged=False,
ignore_sources_set=set(),
min_geoaccuracy=None):
origin = Origin()
origin.origin_id = Origin.FACEBOOK
origin = lbsn.Origin()
origin.origin_id = lbsn.Origin.FACEBOOK
self.origin = origin
# this is where all the data will be stored
self.lbsn_records = []
Expand Down Expand Up @@ -102,15 +96,15 @@ def extract_place(self, postplace_json):
first_cat_name = first_cat.get("name")
if first_cat_name == "country":
place_record = HF.new_lbsn_record_with_id(
Country(), place_id, self.origin)
lbsn.Country(), place_id, self.origin)
elif first_cat_name in ("city", "neighborhood", "admin"):
place_record = HF.new_lbsn_record_with_id(
City(), place_id, self.origin)
lbsn.City(), place_id, self.origin)
else:
# all other cat types
place_record = HF.new_lbsn_record_with_id(
Place(), place_id, self.origin)
if isinstance(place_record, Place):
lbsn.Place(), place_id, self.origin)
if isinstance(place_record, lbsn.Place):
# place specific
if place_cat_list:
for cat in place_cat_list:
Expand Down Expand Up @@ -144,7 +138,7 @@ def extract_place(self, postplace_json):
place_phone = place.get('place_phone')
if place_phone:
place_record.place_phone = place_phone
# same for Country, City and Place
# same for lbsn.Country, lbsn.City and lbsn.Place
place_name = place.get('name').replace('\n\r', '')
# remove multiple whitespace
place_name = re.sub(' +', ' ', place_name)
Expand Down
44 changes: 19 additions & 25 deletions lbsntransform/classes/field_mapping_flickr.py
Expand Up @@ -10,14 +10,8 @@
from decimal import Decimal

# for debugging only:
from google.protobuf import text_format
from lbsnstructure.lbsnstructure_pb2 import (CompositeKey, Language,
RelationshipKey, City,
Country, Origin,
Place, Post,
PostReaction,
Relationship, User,
UserGroup)
# from google.protobuf import text_format
from lbsnstructure import lbsnstructure_pb2 as lbsn

from .helper_functions import HelperFunctions as HF
from .helper_functions import LBSNRecordDicts
Expand All @@ -41,8 +35,8 @@ def __init__(self,
# We're dealing with Flickr in this class, lets create the OriginID
# globally
# this OriginID is required for all CompositeKeys
origin = Origin()
origin.origin_id = Origin.FLICKR
origin = lbsn.Origin()
origin.origin_id = lbsn.Origin.FLICKR
self.origin = origin
self.null_island = 0
# this is where all the data will be stored
Expand Down Expand Up @@ -85,10 +79,10 @@ def extract_flickr_post(self, record):
post_guid = record[5]
if not HF.check_notice_empty_post_guid(post_guid):
return None
post_record = HF.new_lbsn_record_with_id(Post(),
post_record = HF.new_lbsn_record_with_id(lbsn.Post(),
post_guid,
self.origin)
user_record = HF.new_lbsn_record_with_id(User(),
user_record = HF.new_lbsn_record_with_id(lbsn.User(),
record[7],
self.origin)
user_record.user_name = record[6]
Expand All @@ -104,7 +98,7 @@ def extract_flickr_post(self, record):
# we need some information from postRecord to create placeRecord
# (e.g. user language, geoaccuracy, post_latlng)
# some of the information from place will also modify postRecord
place_record = HF.new_lbsn_record_with_id(Place(),
place_record = HF.new_lbsn_record_with_id(lbsn.Place(),
record[19],
self.origin)
self.lbsn_records.append(place_record)
Expand Down Expand Up @@ -132,9 +126,9 @@ def value_count(x): return int(x) if x.isdigit() else 0
post_record.hashtags.append(tag)
record_media_type = record[16]
if record_media_type and record_media_type == "video":
post_record.post_type = Post.VIDEO
post_record.post_type = lbsn.Post.VIDEO
else:
post_record.post_type = Post.IMAGE
post_record.post_type = lbsn.Post.IMAGE
post_record.post_content_license = value_count(record[14])
self.lbsn_records.append(post_record)

Expand All @@ -159,7 +153,7 @@ def flickr_map_geoaccuracy(flickr_geo_accuracy_level):
"""Flickr Geoaccuracy Levels (16) are mapped to four LBSNstructure levels:
LBSN PostGeoaccuracy: UNKNOWN = 0; LATLNG = 1; PLACE = 2; CITY = 3;
COUNTRY = 4
Fickr: World level is 1, Country is ~3, Region ~6, City ~11,
Fickr: World level is 1, lbsn.Country is ~3, Region ~6, lbsn.City ~11,
Street ~16.
Flickr Current range is 1-16. Defaults to 16 if not specified.
Expand All @@ -173,20 +167,20 @@ def flickr_map_geoaccuracy(flickr_geo_accuracy_level):
if stripped_level.isdigit():
stripped_level = int(stripped_level)
if stripped_level >= 15:
lbsn_geoaccuracy = Post.LATLNG
lbsn_geoaccuracy = lbsn.Post.LATLNG
elif stripped_level >= 12:
lbsn_geoaccuracy = Post.PLACE
lbsn_geoaccuracy = lbsn.Post.PLACE
elif stripped_level >= 8:
lbsn_geoaccuracy = Post.CITY
lbsn_geoaccuracy = lbsn.Post.CITY
else:
lbsn_geoaccuracy = Post.COUNTRY
lbsn_geoaccuracy = lbsn.Post.COUNTRY
else:
if flickr_geo_accuracy_level == "Street":
lbsn_geoaccuracy = Post.LATLNG
elif flickr_geo_accuracy_level in ("City", "Region"):
lbsn_geoaccuracy = Post.CITY
elif flickr_geo_accuracy_level in ("Country", "World"):
lbsn_geoaccuracy = Post.COUNTRY
lbsn_geoaccuracy = lbsn.Post.LATLNG
elif flickr_geo_accuracy_level in ("lbsn.City", "Region"):
lbsn_geoaccuracy = lbsn.Post.CITY
elif flickr_geo_accuracy_level in ("lbsn.Country", "World"):
lbsn_geoaccuracy = lbsn.Post.COUNTRY
return lbsn_geoaccuracy

def flickr_extract_postlatlng(self, record):
Expand Down

0 comments on commit db1c24e

Please sign in to comment.