From c9d129c7a661bd0665012dae4f9dbeb9d07baa11 Mon Sep 17 00:00:00 2001 From: AD Date: Tue, 5 Feb 2019 08:52:57 +0100 Subject: [PATCH] refactor: load data and prepare data into separate classes --- tagmaps/__main__.py | 50 +-- tagmaps/classes/load_data.py | 654 +++++--------------------------- tagmaps/classes/prepare_data.py | 545 ++++++++++++++++++++++++++ 3 files changed, 667 insertions(+), 582 deletions(-) diff --git a/tagmaps/__main__.py b/tagmaps/__main__.py index f6fcb7e..2016918 100644 --- a/tagmaps/__main__.py +++ b/tagmaps/__main__.py @@ -22,6 +22,7 @@ from tagmaps.classes.compile_output import Compile from tagmaps.classes.interface import UserInterface from tagmaps.classes.load_data import LoadData +from tagmaps.classes.prepare_data import PrepareData from tagmaps.classes.shared_structure import EMOJI, LOCATIONS, TAGS from tagmaps.classes.utils import Utils @@ -35,7 +36,9 @@ def main(): # initialize logger and config cfg, log = Utils.init_main() - lbsn_data = LoadData(cfg) + # lbsn_data = LoadData(cfg) + lbsn_data = PrepareData(cfg) + input_data = LoadData(cfg) print('\n') log.info( @@ -43,30 +46,33 @@ def main(): "STEP 1 of 6: Data Cleanup " "##########") - lbsn_data.parse_input_records() + with input_data as records: + for record in records: + lbsn_data.add_record(record) + + log.info( + f'\nTotal user count (UC): ' + f'{len(lbsn_data.locations_per_userid_dict)}') + log.info( + f'Total user post locations (UPL): ' + f'{len(lbsn_data.distinct_userlocations_set)}') + log.info( + f'Total post count (PC): ' + f'{input_data.stats.count_glob:02d}') + log.info( + f'Total tag count (PTC): ' + f'{input_data.stats.count_tags_global}') + log.info( + f'Total emoji count (PEC): ' + f'{input_data.stats.count_emojis_global}') + log.info( + input_data.bounds.get_bound_report()) + # get current time now = time.time() # get cleaned data for use in clustering cleaned_post_dict = lbsn_data.get_cleaned_post_dict() cleaned_post_list = list(cleaned_post_dict.values()) - # status report - log.info( - f'\nTotal user count (UC): ' - f'{len(lbsn_data.locations_per_userid_dict)}') - log.info( - f'Total post count (PC): ' - f'{lbsn_data.stats.count_glob:02d}') - log.info( - f'Total tag count (PTC): ' - f'{lbsn_data.stats.count_tags_global}') - log.info( - f'Total emoji count (PEC): ' - f'{lbsn_data.stats.count_emojis_global}') - log.info( - f'Total user post locations (UPL): ' - f'{len(lbsn_data.distinct_userlocations_set)}') - log.info( - lbsn_data.bounds.get_bound_report()) # get prepared data for statistics and clustering prepared_data = lbsn_data.get_prepared_data() @@ -78,7 +84,7 @@ def main(): "##########") location_name_count = len( - prepared_data.locid_locname_dict) + lbsn_data.locid_locname_dict) if location_name_count > 0: log.info( f"Number of locations with names: " @@ -139,7 +145,7 @@ def main(): if not cfg.auto_mode: user_intf = UserInterface( clusterer_list, - prepared_data.locid_locname_dict) + lbsn_data.locid_locname_dict) user_intf.start() if cfg.auto_mode or user_intf.abort is False: diff --git a/tagmaps/classes/load_data.py b/tagmaps/classes/load_data.py index 864df6e..2e045d5 100644 --- a/tagmaps/classes/load_data.py +++ b/tagmaps/classes/load_data.py @@ -1,12 +1,11 @@ # -*- coding: utf-8 -*- -"""Module for loading data and calculating basic statistics. +"""Module for loading data Returns: - PreparedData: Data cleaned for Tag Maps clustering - -Todo: - * separate module into a) load data and b) prepare data + cleanedPost: a subset of the original available + post attributes + that is needed for Tag Maps clustering """ @@ -36,63 +35,54 @@ class LoadData(): - """Main Class for ingesting data and building summary statistics. + """Main Class for ingesting data - - will process CSV data into dict/set structures - - will filter data, cleaned output can be stored - - will generate statistics + - will apply basic filters (based on stoplists etc.) + - Returns CleanedPost """ def __init__(self, cfg): """Initializes Load Data structure""" self.filelist = self._read_local_files(cfg) self.guid_hash = set() # global list of guids - self.cleaned_photo_list = [] self.append_to_already_exist = False # unused? self.shape_exclude_locid_hash = set() self.shape_included_locid_hash = set() - self.total_tag_counter = collections.Counter() - self.total_emoji_counter = collections.Counter() - self.total_location_counter = collections.Counter() - self.bounds = AnalysisBounds() - self.stats = DataStats() - self.prepared_data = PreparedData() self.cfg = cfg self.log = logging.getLogger("tagmaps") - # Hashsets: - self.locations_per_userid_dict = defaultdict(set) - self.userlocation_taglist_dict = defaultdict(set) - self.userlocation_emojilist_dict = defaultdict(set) - self.locid_locname_dict: Dict[str, str] = dict() # nopep8 - if cfg.topic_modeling: - self.user_topiclist_dict = defaultdict(set) - self.user_post_ids_dict = defaultdict(set) - self.userpost_first_thumb_dict = defaultdict(str) - self.userlocation_wordlist_dict = defaultdict(set) - self.userlocations_firstpost_dict = defaultdict(set) - # UserDict_TagCounters = defaultdict(set) - self.userdict_tagcounters_global = defaultdict(set) - self.userdict_emojicounters_global = defaultdict(set) - self.userdict_locationcounters_global = defaultdict(set) - # UserIDsPerLocation_dict = defaultdict(set) - # PhotoLocDict = defaultdict(set) + self.bounds = AnalysisBounds() self.distinct_locations_set = set() - self.distinct_userlocations_set = set() + # basic statistics collection + self.stats = DataStats() + + def __enter__(self): + """Main pipeline for reading posts from file + + Combine multiple generators to single pipeline + that is returned for being processed by + with-statement. + """ + post_pipeline = self._parse_postlist( + self._process_inputfile( + self._parse_input_files())) + return post_pipeline - def parse_input_records(self): - """Loops input csv records and adds to records_dict + def __exit__(self, type, value, traceback): + """Contextmanager exit: nothing to do here""" + return False - Returns statistic-counts, modifies (adds results to) class structures + def _parse_input_files(self): + """Loops input input filelist and + returns opened file handles """ # get user input for max tags to process self._get_tmax() for file_name in self.filelist: - with open(file_name, 'r', newline='', encoding='utf8') as f: - self.stats.partcount += 1 - self._process_inputfile(f) + self.stats.partcount += 1 + return open(file_name, 'r', newline='', encoding='utf8') def _process_inputfile(self, file_handle): - """File parse for CSV or JSON + """File parse for CSV or JSON from open file handle Output: produces a list of post that can be parsed """ @@ -104,11 +94,16 @@ def _process_inputfile(self, file_handle): quoting=self.cfg.source_map.quoting) # next(post_list, None) # skip headerline elif self.cfg.source_map.file_extension == "json": - post_reader = post_reader + json.loads(file_handle.read()) - self._parse_postlist(post_reader) + post_reader = post_reader + json.loads( + file_handle.read()) + return post_reader + + def _parse_postlist(self, post_reader: TextIO, + reporting: bool = True): + """Process posts according to specifications - def _parse_postlist(self, post_reader: TextIO): - """Process posts according to specifications""" + Returns generator for single record + """ # row_num = 0 msg = None for post in post_reader: @@ -116,494 +111,31 @@ def _parse_postlist(self, post_reader: TextIO): lbsn_post = self._parse_post(post) if lbsn_post is None: continue - self._merge_posts(lbsn_post) - # status report - msg = ( - f'Cleaned output to {len(self.distinct_locations_set):02d} ' - f'distinct locations from ' - f'{self.stats.count_glob:02d} posts ' - f'(File {self.stats.partcount} of {len(self.filelist)}) - ' - f'Skipped posts: {self.stats.skipped_count} - skipped tags: ' - f'{self.stats.count_tags_skipped} of ' - f'{self.stats.count_tags_global}') - # if (row_num % 10 == 0): - # modulo: print only once every 10 iterations - print(msg, end='\r') + else: + self.stats.count_glob += 1 + if reporting: + msg = self._report_progress() + # if (row_num % 10 == 0): + # modulo: print only once every 10 iterations + print(msg, end='\r') + yield lbsn_post # log last message to file, clean stdout if msg: print(" " * len(msg), end='\r') sys.stdout.flush() self.log.info(msg) - def _merge_posts(self, lbsn_post: PostStructure): - """Method will union all tags of a single user for each location - - - further information is derived from the first - post for each user-location - - the result is a cleaned output containing - reduced information that is necessary for tag maps - """ - # create userid_loc_id - post_locid_userid = f'{lbsn_post.loc_id}::{lbsn_post.user_guid}' - self.distinct_locations_set.add(lbsn_post.loc_id) - # print(f'Added: {photo_locID} to distinct_locations_set ' - # f'(len: {len(self.distinct_locations_set)})') - self.distinct_userlocations_set.add(post_locid_userid) - # print(f'Added: {post_locid_userid} to distinct_userlocations_set ' - # f'(len: {len(distinct_userlocations_set)})') - if (lbsn_post.loc_name and - lbsn_post.loc_id not in self.locid_locname_dict): - # add locname to dict - self.locid_locname_dict[ - lbsn_post.loc_id] = lbsn_post.loc_name - if lbsn_post.user_guid not in \ - self.locations_per_userid_dict or \ - lbsn_post.loc_id not in \ - self.locations_per_userid_dict[ - lbsn_post.user_guid]: - # Bit wise or and assignment in one step. - # -> assign locID to UserDict list - # if not already contained - self.locations_per_userid_dict[ - lbsn_post.user_guid] |= { - lbsn_post.loc_id} - self.stats.count_loc += 1 - self.userlocations_firstpost_dict[ - post_locid_userid] = lbsn_post - # union tags/emoji per userid/unique location - if self.cfg.cluster_tags: - self.userlocation_taglist_dict[ - post_locid_userid] |= lbsn_post.hashtags - if self.cfg.cluster_emoji: - self.userlocation_emojilist_dict[ - post_locid_userid] |= lbsn_post.emoji - # get cleaned wordlist for topic modeling - cleaned_wordlist = self._get_cleaned_wordlist( - lbsn_post.post_body) - # union words per userid/unique location - self.userlocation_wordlist_dict[ - post_locid_userid] |= set( - cleaned_wordlist) - self.stats.count_glob += 1 - self._update_toplists(lbsn_post) - - def _update_toplists(self, lbsn_post): - """Calculate toplists for emoji and tags - - - adds tag/emojicount of this media to overall - tag/emojicount for this user, - - initialize counter for user if not already done - """ - if self.cfg.cluster_tags and lbsn_post.hashtags: - self.userdict_tagcounters_global[ - lbsn_post.user_guid].update( - lbsn_post.hashtags) - self.total_tag_counter.update(lbsn_post.hashtags) - if self.cfg.cluster_emoji and lbsn_post.emoji: - self.userdict_emojicounters_global[ - lbsn_post.user_guid].update( - lbsn_post.emoji) - self.total_emoji_counter.update( - lbsn_post.emoji) - if lbsn_post.loc_id: - # update single item hack - # there're more elegant ways to do this - self.userdict_locationcounters_global[ - lbsn_post.user_guid].update( - (lbsn_post.loc_id,)) - self.total_location_counter.update( - (lbsn_post.loc_id,)) - - def get_cleaned_post_dict( - self) -> Dict[str, CleanedPost]: - """Output wrapper - - - calls loop user locations method - - optionally initializes output to file - """ - if self.cfg.write_cleaned_data: - with open(f'{self.cfg.output_folder}/Output_cleaned.csv', 'w', - encoding='utf8') as csvfile: - # get headerline from class structure - headerline = ','.join(CleanedPost._fields) - csvfile.write(f'{headerline}\n') - # values will be written with CSV writer module - datawriter = csv.writer( - csvfile, delimiter=',', lineterminator='\n', - quotechar='"', quoting=csv.QUOTE_NONNUMERIC) - cleaned_post_dict = self._loop_loc_per_userid(datawriter) - else: - cleaned_post_dict = self._loop_loc_per_userid(None) - if self.cfg.topic_modeling: - self._write_topic_models() - return cleaned_post_dict - - def get_prepared_data(self) -> 'PreparedData': - """After data is loaded, this collects data and stats - - - prepare data for tag maps clustering - - store to self.data_prepared - """ - self._prepare_main_stats() - return self.prepared_data - - def _prepare_main_stats(self): - """Calculate overall tag and emoji statistics - - - write results (optionally) to file - """ - # top lists and unique - tag_stats = self._get_top_list( - self.userdict_tagcounters_global, "tags") - top_tags_list = tag_stats[0] - total_unique_tags = tag_stats[1] - tagscount_without_longtail = tag_stats[2] - - emoji_stats = self._get_top_list( - self.userdict_emojicounters_global, "emoji") - top_emoji_list = emoji_stats[0] - total_unique_emoji = emoji_stats[1] - emojicount_without_longtail = emoji_stats[2] - - location_stats = self._get_top_list( - self.userdict_locationcounters_global, "locations") - top_location_list = location_stats[0] - total_unique_locations = location_stats[1] - - # update tmax and emax from optionally long tail removal - if tagscount_without_longtail: - self.prepared_data.tmax = tagscount_without_longtail - else: - self.prepared_data.tmax = self.cfg.tmax - if emojicount_without_longtail: - self.prepared_data.emax = emojicount_without_longtail - else: - self.prepared_data.emax = self.cfg.tmax - - # collect stats in prepared_data - - # if self.cfg.cluster_locations: - total_location_count = LoadData._get_total_count( - top_location_list, self.total_location_counter) - self.prepared_data.top_locations_list = top_location_list - self.prepared_data.total_unique_locations = total_unique_locations - self.prepared_data.total_location_count = total_location_count - self.prepared_data.locid_locname_dict = self.locid_locname_dict - - if self.cfg.cluster_tags: - # top counts - total_tag_count = LoadData._get_total_count( - top_tags_list, self.total_tag_counter) - self.prepared_data.top_tags_list = top_tags_list - self.prepared_data.total_unique_tags = total_unique_tags - self.prepared_data.total_tag_count = total_tag_count - - if self.cfg.cluster_emoji: - total_emoji_count = LoadData._get_total_count( - top_emoji_list, self.total_emoji_counter) - self.prepared_data.top_emoji_list = top_emoji_list - self.prepared_data.total_unique_emoji = total_unique_emoji - self.prepared_data.total_emoji_count = total_emoji_count - - def _write_toplist(self, top_list, list_name): - """Write toplists to file - - e.g.: - tag, usercount - toptag1, 1231 - toptag2, 560 - ... - """ - if len(top_list) == 0: - return - top_list_store = ''.join( - "%s,%i" % v + '\n' for v in top_list) - # overwrite, if exists: - with open(f'{self.cfg.output_folder}/' - f'Output_top{list_name}.txt', 'w', - encoding='utf8') as out_file: - out_file.write(f'{list_name}, usercount\n') - out_file.write(top_list_store) - - def _get_top_list(self, userdict_tagemoji_counters, - listname: str = "tags"): - """Get Top Tags on a per user basis, i.e. - - - the global number of distinct users who used each distinct tag - - this ignores duplicate use of - - calculation is based on dict userdict_tagcounters_global, - with counters of tags for each user - Returns: - - list of top tags up to tmax [1000] - - count of total unique tags - """ - overall_usercount_perte = collections.Counter() - for tagemoji_hash in userdict_tagemoji_counters.values(): - # taghash contains unique values (= strings) for each user, - # thus summing up these taghashes counts each user - # only once per tag (or emoji) - overall_usercount_perte.update(tagemoji_hash) - total_unique = len(overall_usercount_perte) - # get all items for "locations" - # but clip list for tags and emoji - if listname in ("tags", "emoji"): - max_items = self.cfg.tmax - else: - max_items = None - top_list = overall_usercount_perte.most_common(max_items) - if self.cfg.remove_long_tail is True: - total_without_longtail = self._remove_long_tail(top_list, listname) - max_to_write = min(1000, self.cfg.tmax) - self._write_toplist(top_list[:max_to_write], listname) - return top_list, total_unique, total_without_longtail - - @staticmethod - def _get_total_count(top_list, top_counter): - """Calculate Total Tags for selected - - Arguments: - top_list (Long Tail Stat) - top_counter (Reference to counter object) - """ - total_count = 0 - for tagemoji in top_list: - count = top_counter.get(tagemoji[0]) - if count: - total_count += count - return total_count - - def _remove_long_tail(self, - top_list: List[Tuple[str, int]], - listname: str - ) -> int: - """Removes all items from list that are used by less - - than x number of users, - where x is given as input arg limit_bottom_user_count - Note: since list is a mutable object, method - will modify top_tags_list - """ - if listname == 'locations': - # keep all locations - return len(top_list) - elif listname == 'emoji': - # emoji use a smaller area than tags on the map - # therefore we can keep more emoji - # (e.g..: use 2 instead of 5) - bottomuser_count = math.trunc( - self.cfg.limit_bottom_user_count/2) - else: - bottomuser_count = self.cfg.limit_bottom_user_count - indexMin = next((i for i, (t1, t2) in enumerate( - top_list) if t2 < bottomuser_count - ), None) - if not indexMin: - return - len_before = len(top_list) - # delete based on slicing - del top_list[indexMin:] - len_after = len(top_list) - if len_before == len_after: - # if no change, return - return len_after - self.log.info( - f'Long tail removal: Filtered {len_before - len_after} ' - f'{listname} that were used by less than ' - f'{bottomuser_count} users.') - return len_after - - def _loop_loc_per_userid(self, datawriter=None): - """Will produce final cleaned list - of items to be processed by clustering. - - - optionally writes entries to file, if handler exists - """ - cleaned_post_dict = defaultdict(CleanedPost) - for user_key, locationhash in \ - self.locations_per_userid_dict.items(): - for location in locationhash: - locid_userid = f'{location}::{user_key}' - post_latlng = location.split(':') - - first_post = self.userlocations_firstpost_dict.get( - locid_userid, None) - if first_post is None: - return - # create tuple with cleaned photo data - cleaned_post_location = self._get_cleaned_location( - first_post, locid_userid, post_latlng, user_key) - - if datawriter is not None: - LoadData._write_location_tocsv( - datawriter, cleaned_post_location) - if self.cfg.topic_modeling: - self._update_topic_models( - cleaned_post_location, user_key) - cleaned_post_dict[cleaned_post_location.guid] = \ - cleaned_post_location - return cleaned_post_dict - - def _write_topic_models(self): - """Initialize two lists for topic modeling output - - - hashed (anonymized) output (*add salt) - - original output - """ - headerline = "topics,post_ids,user_ids\n" - with open(f'{self.cfg.output_folder}/' - f'"Output_usertopics_anonymized.csv', 'w', - encoding='utf8') as csvfile_anon, \ - open(f'{self.cfg.output_folder}/' - f'"Output_usertopics.csv', 'w', - encoding='utf8') as csvfile: - dw_list = list() - for cfile in (csvfile, csvfile_anon): - cfile.write(headerline) - dw = csv.writer(cfile, delimiter=',', - lineterminator='\n', quotechar='"', - quoting=csv.QUOTE_NONNUMERIC) - dw_list.append(dw) - self._write_topic_rows(dw_list) - - def _write_topic_rows(self, dw_list): - """Write Topic models to file""" - dw = dw_list[0] - dw_anon = dw_list[1] - - def _join_encode(keys): - joined_keys = ",".join(keys) - joined_encoded_keys = ",".join( - [Utils.encode_string(post_id) for post_id in keys]) - return joined_keys, joined_encoded_keys - for user_key, topics in self.user_topiclist_dict.items(): - joined_topics = " ".join(topics) - post_keys = self.user_post_ids_dict.get(user_key, None) - joined_keys, joined_encoded_keys = _join_encode(post_keys) - dw_anon.writerow([joined_topics, - "{" + joined_encoded_keys + "}", - Utils.encode_string(user_key)]) - dw.writerow([joined_topics, - "{" + joined_keys + "}", - str(user_key)]) - - def _update_topic_models(self, - cleaned_post_location, - user_key): - """If Topic Modeling enabled, update - required dictionaries with merged words from - title, tags and post_body - """ - if not len( - cleaned_post_location.hashtags) == 0: - self.user_topiclist_dict[user_key] |= \ - cleaned_post_location.hashtags - # also use descriptions for Topic Modeling - self. user_topiclist_dict[user_key] |= \ - cleaned_post_location.post_body - # Bit wise or and assignment in one step. - # -> assign PhotoGuid to UserDict list - # if not already contained - self.user_post_ids_dict[user_key] |= { - cleaned_post_location.guid} - # UserPhotoFirstThumb_dict[user_key] = photo[5] - - def _get_cleaned_location(self, first_post, locid_userid, - post_latlng, user_key): - """Merge cleaned post from all posts of a certain user - at a specific location - - - some information is not needed, those post attributes - are simply skipped (e.g. location name) - - some information must not be merged, this can be directly copied - from the first post at a location/user (e.g. origin_id - will always be - the same for a particular user, post_create_date, post_publish_date) - - some information (e.g. hashtags) need merge with removing dupliates: - use prepared dictionaries - - some important information is type-checked (longitude, latitude) - - Keyword arguments: - first_post -- first post of a user_guid at a location - locid_userid -- user_guid and loc_id in merged format - (f'{location}::{user_key}') - post_latlng -- tuple with lat/lng coordinates - user_key -- user_guid - - Note: - ("",) means: substitute empty tuple as default - """ - - merged_wordlist = LoadData._get_merged( - self.userlocation_wordlist_dict, locid_userid) - merged_emojilist = LoadData._get_merged( - self.userlocation_emojilist_dict, locid_userid) - merged_taglist = LoadData._get_merged( - self.userlocation_taglist_dict, locid_userid) - cleaned_post = CleanedPost( - origin_id=first_post.origin_id, - lat=float(post_latlng[0]), - lng=float(post_latlng[1]), - guid=first_post.guid, - user_guid=user_key, - post_body=merged_wordlist, - post_create_date=first_post.post_create_date, - post_publish_date=first_post.post_publish_date, - post_views_count=first_post.post_views_count, - post_like_count=first_post.post_like_count, - emoji=merged_emojilist, - hashtags=merged_taglist, - loc_id=first_post.loc_id - ) - return cleaned_post - - @staticmethod - def _get_merged(ref_dict: Dict, locid_userid: str) -> Set[str]: - """Gets set of words for userlocid from ref dictionary - - Note: since using defaultdict, - keys not found will return empty set() - """ - value = ref_dict[locid_userid] - return value - - @staticmethod - def _write_location_tocsv(datawriter: TextIO, - cleaned_post_location: CleanedPost) -> None: - """Writes a single record of cleaned posts to CSV list - - - write intermediate cleaned post data to file for later use - Arguments - datawriter - open file file_handle to - output file - cleaned_post_location - cleaned post of type CleanedPost - (namedtuple) - """ - ploc_list = LoadData._cleaned_ploc_tolist( - cleaned_post_location) - datawriter.writerow(ploc_list) - - @staticmethod - def _cleaned_ploc_tolist(cleaned_post_location: CleanedPost - ) -> List[str]: - """Converts a cleaned post structure to list for CSV write""" - attr_list = list() - for attr in cleaned_post_location: - if isinstance(attr, set): - attr_list.append(";".join(attr)) - else: - attr_list.append(attr) - return attr_list - - def _get_cleaned_wordlist(self, post_body_string): - cleaned_post_body = Utils._remove_special_chars(post_body_string) - cleaned_wordlist = LoadData._get_wordlist(cleaned_post_body) - return cleaned_wordlist - - @staticmethod - def _get_wordlist(cleaned_post_body): - """split by space-characterm, filter by length""" - wordlist = [word for word in cleaned_post_body.lower().split( - ' ') if len(word) > 2] - return wordlist + def _report_progress(self): + """Status report""" + msg = ( + f'Cleaned input to {len(self.distinct_locations_set):02d} ' + f'distinct locations from ' + f'{self.stats.count_glob:02d} posts ' + f'(File {self.stats.partcount} of {len(self.filelist)}) - ' + f'Skipped posts: {self.stats.skipped_count} - skipped tags: ' + f'{self.stats.count_tags_skipped} of ' + f'{self.stats.count_tags_global}') + return msg def _parse_post(self, post: Dict[str, str]) -> PostStructure: """Process single post and attach to common structure""" @@ -641,6 +173,8 @@ def _parse_post(self, post: Dict[str, str]) -> PostStructure: lbsn_post.longitude = lng lbsn_post.loc_id = str(lat) + ':' + \ str(lng) # create loc_id from lat/lng + # counting of distinct loc ids + self.distinct_locations_set.add(lbsn_post.loc_id) lbsn_post.loc_name = post.get(self.cfg.source_map.place_name_col) # exclude posts outside boundary if self.cfg.shapefile_intersect and \ @@ -669,6 +203,34 @@ def _parse_post(self, post: Dict[str, str]) -> PostStructure: # return parsed post object return lbsn_post + @staticmethod + def _read_local_files(config): + """Read Local Files according to config parameters + + - returns list of file-paths + """ + input_path = config.input_folder + filelist = list(input_path.glob( + f'*.{config.source_map.file_extension}')) + input_count = len(filelist) + if input_count == 0: + raise ValueError( + f'No input files *.' + f'{config.source_map.file_extension} ' + f'found.') + else: + return filelist + + @staticmethod + def _get_count_frompost(count_string: str) -> int: + if count_string and not count_string == "": + try: + photo_likes_int = int(count_string) + return photo_likes_int + except TypeError: + pass + return 0 + def _get_emoji(self, post_body): emoji_filtered = set(Utils._extract_emoji(post_body)) if not len(emoji_filtered) == 0: @@ -694,16 +256,6 @@ def _get_tags(self, tags_string: str) -> Set[str]: self.stats.count_tags_skipped += count_skipped return tags - @staticmethod - def _get_count_frompost(count_string: str) -> int: - if count_string and not count_string == "": - try: - photo_likes_int = int(count_string) - return photo_likes_int - except TypeError: - pass - return 0 - def _correct_placelatlng(self, place_guid_string, lat, lng): """If place corrections available, update lat/lng coordinates Needs test: not place_guid_string @@ -771,36 +323,18 @@ def _get_tmax(self): else: self.cfg.tmax = int(inputtext) - @staticmethod - def _read_local_files(config): - """Read Local Files according to config parameters - - - returns list of file-paths - """ - input_path = config.input_folder - filelist = list(input_path.glob( - f'*.{config.source_map.file_extension}')) - input_count = len(filelist) - if input_count == 0: - raise ValueError( - f'No input files *.' - f'{config.source_map.file_extension} ' - f'found.') - else: - return filelist - class DataStats(): - """Class storing analysis stats""" + """Class storing basic data stats""" def __init__(self): """Initialize stats.""" + self.count_glob = 0 + # self.count_loc = 0 + self.partcount = 0 + self.skipped_count = 0 self.count_non_geotagged = 0 self.count_outside_shape = 0 self.count_tags_global = 0 self.count_emojis_global = 0 self.count_tags_skipped = 0 - self.skipped_count = 0 - self.count_glob = 0 - self.partcount = 0 - self.count_loc = 0 diff --git a/tagmaps/classes/prepare_data.py b/tagmaps/classes/prepare_data.py index e69de29..cb75fca 100644 --- a/tagmaps/classes/prepare_data.py +++ b/tagmaps/classes/prepare_data.py @@ -0,0 +1,545 @@ +# -*- coding: utf-8 -*- + +"""Module for preparing base data and calculating +overall statistics. + +Returns: + PreparedData: Cleaned list of posts and statistics + prepared for Tag Maps clustering + +""" + + +import sys +import os +import ntpath +import csv +import logging +from pathlib import Path +from glob import glob +from _csv import QUOTE_MINIMAL +from decimal import Decimal +import json +import math +import collections +from typing import List, Set, Dict, Tuple, Optional, TextIO +from collections import Counter +from collections import defaultdict +from collections import namedtuple + +from shapely.geometry import Polygon +from shapely.geometry import shape +from shapely.geometry import Point +from tagmaps.classes.utils import Utils +from tagmaps.classes.shared_structure import ( + PostStructure, CleanedPost, AnalysisBounds, PreparedData) + + +class PrepareData(): + """Main Class for building summary statistics. + + - will process individual cleaned post data into dict/set structures + - will filter data, cleaned output can be stored + - will generate statistics + """ + + def __init__(self, cfg): + """Initializes Prepare Data structure""" + self.cfg = cfg + self.count_glob = 0 + self.bounds = AnalysisBounds() + self.log = logging.getLogger("tagmaps") + self.total_tag_counter = collections.Counter() + self.total_emoji_counter = collections.Counter() + self.total_location_counter = collections.Counter() + self.prepared_data = PreparedData() + # Hashsets: + self.locations_per_userid_dict = defaultdict(set) + self.userlocation_taglist_dict = defaultdict(set) + self.userlocation_emojilist_dict = defaultdict(set) + self.locid_locname_dict: Dict[str, str] = dict() # nopep8 + if cfg.topic_modeling: + self.user_topiclist_dict = defaultdict(set) + self.user_post_ids_dict = defaultdict(set) + self.userpost_first_thumb_dict = defaultdict(str) + self.userlocation_wordlist_dict = defaultdict(set) + self.userlocations_firstpost_dict = defaultdict(set) + # UserDict_TagCounters = defaultdict(set) + self.userdict_tagcounters_global = defaultdict(set) + self.userdict_emojicounters_global = defaultdict(set) + self.userdict_locationcounters_global = defaultdict(set) + # UserIDsPerLocation_dict = defaultdict(set) + # PhotoLocDict = defaultdict(set) + self.distinct_locations_set = set() + self.distinct_userlocations_set = set() + + def add_record(self, lbsn_post: PostStructure): + """Method will union all tags of a single user for each location + + - further information is derived from the first + post for each user-location + - the result is a cleaned output containing + reduced information that is necessary for tag maps + """ + # create userid_loc_id + post_locid_userid = f'{lbsn_post.loc_id}::{lbsn_post.user_guid}' + self.distinct_locations_set.add(lbsn_post.loc_id) + # print(f'Added: {photo_locID} to distinct_locations_set ' + # f'(len: {len(self.distinct_locations_set)})') + self.distinct_userlocations_set.add(post_locid_userid) + # print(f'Added: {post_locid_userid} to distinct_userlocations_set ' + # f'(len: {len(distinct_userlocations_set)})') + if (lbsn_post.loc_name and + lbsn_post.loc_id not in self.locid_locname_dict): + # add locname to dict + self.locid_locname_dict[ + lbsn_post.loc_id] = lbsn_post.loc_name + if lbsn_post.user_guid not in \ + self.locations_per_userid_dict or \ + lbsn_post.loc_id not in \ + self.locations_per_userid_dict[ + lbsn_post.user_guid]: + # Bit wise or and assignment in one step. + # -> assign locID to UserDict list + # if not already contained + self.locations_per_userid_dict[ + lbsn_post.user_guid] |= { + lbsn_post.loc_id} + # self.stats.count_loc += 1 + self.userlocations_firstpost_dict[ + post_locid_userid] = lbsn_post + # union tags/emoji per userid/unique location + if self.cfg.cluster_tags: + self.userlocation_taglist_dict[ + post_locid_userid] |= lbsn_post.hashtags + if self.cfg.cluster_emoji: + self.userlocation_emojilist_dict[ + post_locid_userid] |= lbsn_post.emoji + # get cleaned wordlist for topic modeling + cleaned_wordlist = self._get_cleaned_wordlist( + lbsn_post.post_body) + # union words per userid/unique location + self.userlocation_wordlist_dict[ + post_locid_userid] |= set( + cleaned_wordlist) + self._update_toplists(lbsn_post) + + def get_cleaned_post_dict( + self) -> Dict[str, CleanedPost]: + """Output wrapper + + - calls loop user locations method + - optionally initializes output to file + """ + if self.cfg.write_cleaned_data: + with open(f'{self.cfg.output_folder}/Output_cleaned.csv', 'w', + encoding='utf8') as csvfile: + # get headerline from class structure + headerline = ','.join(CleanedPost._fields) + csvfile.write(f'{headerline}\n') + # values will be written with CSV writer module + datawriter = csv.writer( + csvfile, delimiter=',', lineterminator='\n', + quotechar='"', quoting=csv.QUOTE_NONNUMERIC) + cleaned_post_dict = self._loop_loc_per_userid(datawriter) + else: + cleaned_post_dict = self._loop_loc_per_userid(None) + if self.cfg.topic_modeling: + self._write_topic_models() + return cleaned_post_dict + + def get_prepared_data(self) -> 'PreparedData': + """After data is loaded, this collects data and stats + + - prepare data for tag maps clustering + - store to self.data_prepared + """ + self._prepare_main_stats() + return self.prepared_data + + def _prepare_main_stats(self): + """Calculate overall tag and emoji statistics + + - write results (optionally) to file + """ + # top lists and unique + tag_stats = self._get_top_list( + self.userdict_tagcounters_global, "tags") + top_tags_list = tag_stats[0] + total_unique_tags = tag_stats[1] + tagscount_without_longtail = tag_stats[2] + + emoji_stats = self._get_top_list( + self.userdict_emojicounters_global, "emoji") + top_emoji_list = emoji_stats[0] + total_unique_emoji = emoji_stats[1] + emojicount_without_longtail = emoji_stats[2] + + location_stats = self._get_top_list( + self.userdict_locationcounters_global, "locations") + top_location_list = location_stats[0] + total_unique_locations = location_stats[1] + + # update tmax and emax from optionally long tail removal + if tagscount_without_longtail: + self.prepared_data.tmax = tagscount_without_longtail + else: + self.prepared_data.tmax = self.cfg.tmax + if emojicount_without_longtail: + self.prepared_data.emax = emojicount_without_longtail + else: + self.prepared_data.emax = self.cfg.tmax + + # collect stats in prepared_data + + # if self.cfg.cluster_locations: + total_location_count = PrepareData._get_total_count( + top_location_list, self.total_location_counter) + self.prepared_data.top_locations_list = top_location_list + self.prepared_data.total_unique_locations = total_unique_locations + self.prepared_data.total_location_count = total_location_count + self.prepared_data.locid_locname_dict = self.locid_locname_dict + + if self.cfg.cluster_tags: + # top counts + total_tag_count = PrepareData._get_total_count( + top_tags_list, self.total_tag_counter) + self.prepared_data.top_tags_list = top_tags_list + self.prepared_data.total_unique_tags = total_unique_tags + self.prepared_data.total_tag_count = total_tag_count + + if self.cfg.cluster_emoji: + total_emoji_count = PrepareData._get_total_count( + top_emoji_list, self.total_emoji_counter) + self.prepared_data.top_emoji_list = top_emoji_list + self.prepared_data.total_unique_emoji = total_unique_emoji + self.prepared_data.total_emoji_count = total_emoji_count + + def _update_toplists(self, lbsn_post): + """Calculate toplists for emoji and tags + + - adds tag/emojicount of this media to overall + tag/emojicount for this user, + - initialize counter for user if not already done + """ + if self.cfg.cluster_tags and lbsn_post.hashtags: + self.userdict_tagcounters_global[ + lbsn_post.user_guid].update( + lbsn_post.hashtags) + self.total_tag_counter.update(lbsn_post.hashtags) + if self.cfg.cluster_emoji and lbsn_post.emoji: + self.userdict_emojicounters_global[ + lbsn_post.user_guid].update( + lbsn_post.emoji) + self.total_emoji_counter.update( + lbsn_post.emoji) + if lbsn_post.loc_id: + # update single item hack + # there're more elegant ways to do this + self.userdict_locationcounters_global[ + lbsn_post.user_guid].update( + (lbsn_post.loc_id,)) + self.total_location_counter.update( + (lbsn_post.loc_id,)) + + def _write_toplist(self, top_list, list_name): + """Write toplists to file + + e.g.: + tag, usercount + toptag1, 1231 + toptag2, 560 + ... + """ + if len(top_list) == 0: + return + top_list_store = ''.join( + "%s,%i" % v + '\n' for v in top_list) + # overwrite, if exists: + with open(f'{self.cfg.output_folder}/' + f'Output_top{list_name}.txt', 'w', + encoding='utf8') as out_file: + out_file.write(f'{list_name}, usercount\n') + out_file.write(top_list_store) + + def _get_top_list(self, userdict_tagemoji_counters, + listname: str = "tags"): + """Get Top Tags on a per user basis, i.e. + + - the global number of distinct users who used each distinct tag + - this ignores duplicate use of + - calculation is based on dict userdict_tagcounters_global, + with counters of tags for each user + Returns: + - list of top tags up to tmax [1000] + - count of total unique tags + """ + overall_usercount_perte = collections.Counter() + for tagemoji_hash in userdict_tagemoji_counters.values(): + # taghash contains unique values (= strings) for each user, + # thus summing up these taghashes counts each user + # only once per tag (or emoji) + overall_usercount_perte.update(tagemoji_hash) + total_unique = len(overall_usercount_perte) + # get all items for "locations" + # but clip list for tags and emoji + if listname in ("tags", "emoji"): + max_items = self.cfg.tmax + else: + max_items = None + top_list = overall_usercount_perte.most_common(max_items) + if self.cfg.remove_long_tail is True: + total_without_longtail = self._remove_long_tail(top_list, listname) + max_to_write = min(1000, self.cfg.tmax) + self._write_toplist(top_list[:max_to_write], listname) + return top_list, total_unique, total_without_longtail + + @staticmethod + def _get_total_count(top_list, top_counter): + """Calculate Total Tags for selected + + Arguments: + top_list (Long Tail Stat) + top_counter (Reference to counter object) + """ + total_count = 0 + for tagemoji in top_list: + count = top_counter.get(tagemoji[0]) + if count: + total_count += count + return total_count + + def _remove_long_tail(self, + top_list: List[Tuple[str, int]], + listname: str + ) -> int: + """Removes all items from list that are used by less + + than x number of users, + where x is given as input arg limit_bottom_user_count + Note: since list is a mutable object, method + will modify top_tags_list + """ + if listname == 'locations': + # keep all locations + return len(top_list) + elif listname == 'emoji': + # emoji use a smaller area than tags on the map + # therefore we can keep more emoji + # (e.g..: use 2 instead of 5) + bottomuser_count = math.trunc( + self.cfg.limit_bottom_user_count/2) + else: + bottomuser_count = self.cfg.limit_bottom_user_count + indexMin = next((i for i, (t1, t2) in enumerate( + top_list) if t2 < bottomuser_count + ), None) + if not indexMin: + return + len_before = len(top_list) + # delete based on slicing + del top_list[indexMin:] + len_after = len(top_list) + if len_before == len_after: + # if no change, return + return len_after + self.log.info( + f'Long tail removal: Filtered {len_before - len_after} ' + f'{listname} that were used by less than ' + f'{bottomuser_count} users.') + return len_after + + def _loop_loc_per_userid(self, datawriter=None): + """Will produce final cleaned list + of items to be processed by clustering. + + - optionally writes entries to file, if handler exists + """ + cleaned_post_dict = defaultdict(CleanedPost) + for user_key, locationhash in \ + self.locations_per_userid_dict.items(): + for location in locationhash: + locid_userid = f'{location}::{user_key}' + post_latlng = location.split(':') + + first_post = self.userlocations_firstpost_dict.get( + locid_userid, None) + if first_post is None: + return + # create tuple with cleaned photo data + cleaned_post_location = self._get_cleaned_location( + first_post, locid_userid, post_latlng, user_key) + # update boundary + self.bounds._upd_latlng_bounds( + cleaned_post_location.lat, cleaned_post_location.lng) + if datawriter is not None: + PrepareData._write_location_tocsv( + datawriter, cleaned_post_location) + if self.cfg.topic_modeling: + self._update_topic_models( + cleaned_post_location, user_key) + cleaned_post_dict[cleaned_post_location.guid] = \ + cleaned_post_location + return cleaned_post_dict + + def _write_topic_models(self): + """Initialize two lists for topic modeling output + + - hashed (anonymized) output (*add salt) + - original output + """ + headerline = "topics,post_ids,user_ids\n" + with open(f'{self.cfg.output_folder}/' + f'"Output_usertopics_anonymized.csv', 'w', + encoding='utf8') as csvfile_anon, \ + open(f'{self.cfg.output_folder}/' + f'"Output_usertopics.csv', 'w', + encoding='utf8') as csvfile: + dw_list = list() + for cfile in (csvfile, csvfile_anon): + cfile.write(headerline) + dw = csv.writer(cfile, delimiter=',', + lineterminator='\n', quotechar='"', + quoting=csv.QUOTE_NONNUMERIC) + dw_list.append(dw) + self._write_topic_rows(dw_list) + + def _write_topic_rows(self, dw_list): + """Write Topic models to file""" + dw = dw_list[0] + dw_anon = dw_list[1] + + def _join_encode(keys): + joined_keys = ",".join(keys) + joined_encoded_keys = ",".join( + [Utils.encode_string(post_id) for post_id in keys]) + return joined_keys, joined_encoded_keys + for user_key, topics in self.user_topiclist_dict.items(): + joined_topics = " ".join(topics) + post_keys = self.user_post_ids_dict.get(user_key, None) + joined_keys, joined_encoded_keys = _join_encode(post_keys) + dw_anon.writerow([joined_topics, + "{" + joined_encoded_keys + "}", + Utils.encode_string(user_key)]) + dw.writerow([joined_topics, + "{" + joined_keys + "}", + str(user_key)]) + + def _update_topic_models(self, + cleaned_post_location, + user_key): + """If Topic Modeling enabled, update + required dictionaries with merged words from + title, tags and post_body + """ + if not len( + cleaned_post_location.hashtags) == 0: + self.user_topiclist_dict[user_key] |= \ + cleaned_post_location.hashtags + # also use descriptions for Topic Modeling + self. user_topiclist_dict[user_key] |= \ + cleaned_post_location.post_body + # Bit wise or and assignment in one step. + # -> assign PhotoGuid to UserDict list + # if not already contained + self.user_post_ids_dict[user_key] |= { + cleaned_post_location.guid} + # UserPhotoFirstThumb_dict[user_key] = photo[5] + + def _get_cleaned_location(self, first_post, locid_userid, + post_latlng, user_key): + """Merge cleaned post from all posts of a certain user + at a specific location + + - some information is not needed, those post attributes + are simply skipped (e.g. location name) + - some information must not be merged, this can be directly copied + from the first post at a location/user (e.g. origin_id - will always be + the same for a particular user, post_create_date, post_publish_date) + - some information (e.g. hashtags) need merge with removing dupliates: + use prepared dictionaries + - some important information is type-checked (longitude, latitude) + + Keyword arguments: + first_post -- first post of a user_guid at a location + locid_userid -- user_guid and loc_id in merged format + (f'{location}::{user_key}') + post_latlng -- tuple with lat/lng coordinates + user_key -- user_guid + + Note: + ("",) means: substitute empty tuple as default + """ + + merged_wordlist = PrepareData._get_merged( + self.userlocation_wordlist_dict, locid_userid) + merged_emojilist = PrepareData._get_merged( + self.userlocation_emojilist_dict, locid_userid) + merged_taglist = PrepareData._get_merged( + self.userlocation_taglist_dict, locid_userid) + cleaned_post = CleanedPost( + origin_id=first_post.origin_id, + lat=float(post_latlng[0]), + lng=float(post_latlng[1]), + guid=first_post.guid, + user_guid=user_key, + post_body=merged_wordlist, + post_create_date=first_post.post_create_date, + post_publish_date=first_post.post_publish_date, + post_views_count=first_post.post_views_count, + post_like_count=first_post.post_like_count, + emoji=merged_emojilist, + hashtags=merged_taglist, + loc_id=first_post.loc_id + ) + return cleaned_post + + @staticmethod + def _get_merged(ref_dict: Dict, locid_userid: str) -> Set[str]: + """Gets set of words for userlocid from ref dictionary + + Note: since using defaultdict, + keys not found will return empty set() + """ + value = ref_dict[locid_userid] + return value + + @staticmethod + def _write_location_tocsv(datawriter: TextIO, + cleaned_post_location: CleanedPost) -> None: + """Writes a single record of cleaned posts to CSV list + + - write intermediate cleaned post data to file for later use + Arguments + datawriter - open file file_handle to + output file + cleaned_post_location - cleaned post of type CleanedPost + (namedtuple) + """ + ploc_list = PrepareData._cleaned_ploc_tolist( + cleaned_post_location) + datawriter.writerow(ploc_list) + + @staticmethod + def _cleaned_ploc_tolist(cleaned_post_location: CleanedPost + ) -> List[str]: + """Converts a cleaned post structure to list for CSV write""" + attr_list = list() + for attr in cleaned_post_location: + if isinstance(attr, set): + attr_list.append(";".join(attr)) + else: + attr_list.append(attr) + return attr_list + + def _get_cleaned_wordlist(self, post_body_string): + cleaned_post_body = Utils._remove_special_chars(post_body_string) + cleaned_wordlist = PrepareData._get_wordlist(cleaned_post_body) + return cleaned_wordlist + + @staticmethod + def _get_wordlist(cleaned_post_body): + """split by space-characterm, filter by length""" + wordlist = [word for word in cleaned_post_body.lower().split( + ' ') if len(word) > 2] + return wordlist