Skip to content

Commit

Permalink
Merge pull request #3037 from SEED-platform/2515-refactor/data-upload…
Browse files Browse the repository at this point in the history
…-progress-bar

2515 refactor/data upload progress bar
  • Loading branch information
macintoshpie committed Dec 22, 2021
2 parents 56f2000 + a511584 commit 2650dda
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 59 deletions.
107 changes: 87 additions & 20 deletions seed/data_importer/match.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from celery.utils.log import get_task_logger

import datetime as dt
import math

from django.contrib.postgres.aggregates.general import ArrayAgg

Expand Down Expand Up @@ -46,6 +47,7 @@
match_merge_link,
matching_filter_criteria,
matching_criteria_column_names,
update_sub_progress_total,
)
from seed.utils.merge import merge_states_with_views

Expand All @@ -58,7 +60,7 @@ def log_debug(message):

@shared_task
@lock_and_track
def match_and_link_incoming_properties_and_taxlots(file_pk, progress_key):
def match_and_link_incoming_properties_and_taxlots(file_pk, progress_key, sub_progress_key):
"""
Match incoming the properties and taxlots. Then, search for links for them.
Expand All @@ -83,6 +85,7 @@ def match_and_link_incoming_properties_and_taxlots(file_pk, progress_key):

import_file = ImportFile.objects.get(pk=file_pk)
progress_data = ProgressData.from_key(progress_key)
update_sub_progress_total(100, sub_progress_key)

# Don't query the org table here, just get the organization from the import_record
org = import_file.import_record.super_organization
Expand Down Expand Up @@ -118,54 +121,79 @@ def match_and_link_incoming_properties_and_taxlots(file_pk, progress_key):
# Within the ImportFile, filter out the duplicates.
log_debug("Start Properties filter_duplicate_states")
promoted_property_ids, property_duplicates_within_file_count = filter_duplicate_states(
incoming_properties
incoming_properties,
sub_progress_key,
)

# Within the ImportFile, merge -States together based on user defined matching_criteria
log_debug('Start Properties inclusive_match_and_merge')
promoted_property_ids, property_merges_within_file_count = inclusive_match_and_merge(promoted_property_ids, org, PropertyState)
promoted_property_ids, property_merges_within_file_count = inclusive_match_and_merge(
promoted_property_ids,
org,
PropertyState,
sub_progress_key,
)

# Filter Cycle-wide duplicates then merge and/or assign -States to -Views
log_debug('Start Properties states_to_views')
merged_property_views, property_duplicates_against_existing_count, property_new_count, property_merges_against_existing_count, property_merges_between_existing_count = states_to_views(
promoted_property_ids,
org,
import_file.cycle,
PropertyState
PropertyState,
sub_progress_key,
)

# Look for links across Cycles
log_debug('Start Properties link_views')
merged_linked_property_views = link_views(merged_property_views, PropertyView)
merged_linked_property_views = link_views(
merged_property_views,
PropertyView,
sub_progress_key,
)

if incoming_tax_lots.exists():
# Within the ImportFile, filter out the duplicates.
log_debug("Start TaxLots filter_duplicate_states")
promoted_tax_lot_ids, tax_lot_duplicates_within_file_count = filter_duplicate_states(
incoming_tax_lots
incoming_tax_lots,
sub_progress_key,
)

# Within the ImportFile, merge -States together based on user defined matching_criteria
log_debug('Start TaxLots inclusive_match_and_merge')
promoted_tax_lot_ids, tax_lot_merges_within_file_count = inclusive_match_and_merge(promoted_tax_lot_ids, org, TaxLotState)
promoted_tax_lot_ids, tax_lot_merges_within_file_count = inclusive_match_and_merge(
promoted_tax_lot_ids,
org,
TaxLotState,
sub_progress_key,
)

# Filter Cycle-wide duplicates then merge and/or assign -States to -Views
log_debug('Start TaxLots states_to_views')
merged_linked_taxlot_views, tax_lot_duplicates_against_existing_count, tax_lot_new_count, tax_lot_merges_against_existing_count, tax_lot_merges_between_existing_count = states_to_views(
promoted_tax_lot_ids,
org,
import_file.cycle,
TaxLotState
TaxLotState,
sub_progress_key,
)

# Look for links across Cycles
log_debug('Start TaxLots link_views')
merged_linked_taxlot_views = link_views(merged_linked_taxlot_views, TaxLotView)
merged_linked_taxlot_views = link_views(
merged_linked_taxlot_views,
TaxLotView,
sub_progress_key,
)

log_debug('Start pair_new_states')
progress_data.step('Pairing data')
pair_new_states(merged_linked_property_views, merged_linked_taxlot_views)
log_debug('End pair_new_states')
pair_new_states(
merged_linked_property_views,
merged_linked_taxlot_views,
sub_progress_key,
)

return {
'import_file_records': import_file.num_rows,
Expand All @@ -186,7 +214,7 @@ def match_and_link_incoming_properties_and_taxlots(file_pk, progress_key):
}


def filter_duplicate_states(unmatched_states):
def filter_duplicate_states(unmatched_states, sub_progress_key):
"""
Takes a QuerySet of -States and flags then separates exact duplicates. This
method returns two items:
Expand All @@ -202,25 +230,31 @@ def filter_duplicate_states(unmatched_states):
:param unmatched_states: QS
:return: canonical_state_ids, duplicate_count
"""
sub_progress_data = update_sub_progress_total(4, sub_progress_key)
sub_progress_data.step('Matching Data (1/6): Filtering Duplicate States')

ids_grouped_by_hash = unmatched_states.\
values('hash_object').\
annotate(duplicate_sets=ArrayAgg('id')).\
values_list('duplicate_sets', flat=True)

sub_progress_data.step('Matching Data (1/6): Filtering Duplicate States')
# For consistency, take the first member of each of the duplicate sets
canonical_state_ids = [
ids.pop(ids.index(min(ids)))
for ids
in ids_grouped_by_hash
]
sub_progress_data.step('Matching Data (1/6): Filtering Duplicate States')
duplicate_state_ids = reduce(lambda x, y: x + y, ids_grouped_by_hash)
duplicate_count = unmatched_states.filter(pk__in=duplicate_state_ids).update(data_state=DATA_STATE_DELETE)

sub_progress_data.step('Matching Data (1/6): Filtering Duplicate States')
sub_progress_data.finish_with_success()
return canonical_state_ids, duplicate_count


def inclusive_match_and_merge(unmatched_state_ids, org, StateClass):
def inclusive_match_and_merge(unmatched_state_ids, org, StateClass, sub_progress_key):
"""
Takes a list of unmatched_state_ids, combines matches of the corresponding
-States, and returns a set of IDs of the remaining -States.
Expand All @@ -232,6 +266,8 @@ def inclusive_match_and_merge(unmatched_state_ids, org, StateClass):
"""
column_names = matching_criteria_column_names(org.id, StateClass.__name__)

sub_progress_data = update_sub_progress_total(100, sub_progress_key)

# IDs of -States with all matching criteria equal to None are intially promoted
# as they're not eligible for matching.
promoted_ids = list(
Expand All @@ -256,7 +292,8 @@ def inclusive_match_and_merge(unmatched_state_ids, org, StateClass):
# Collapse groups of matches found in the previous step into 1 -State per group
merges_within_file = 0
priorities = Column.retrieve_priorities(org)
for ids in matched_id_groups:
batch_size = math.ceil(len(matched_id_groups) / 100)
for idx, ids in enumerate(matched_id_groups):
if len(ids) == 1:
# If there's only 1, no merging is needed, so just promote the ID.
promoted_ids += ids
Expand All @@ -271,14 +308,18 @@ def inclusive_match_and_merge(unmatched_state_ids, org, StateClass):
merge_state = save_state_match(merge_state, newer_state, priorities)

promoted_ids.append(merge_state.id)
if batch_size > 0 and idx % batch_size == 0:
sub_progress_data.step('Matching Data (2/6): Inclusive Matching and Merging')

sub_progress_data.finish_with_success()

# Flag the soon to be promoted ID -States as having gone through matching
StateClass.objects.filter(pk__in=promoted_ids).update(data_state=DATA_STATE_MATCHING)

return promoted_ids, merges_within_file


def states_to_views(unmatched_state_ids, org, cycle, StateClass):
def states_to_views(unmatched_state_ids, org, cycle, StateClass, sub_progress_key):
"""
The purpose of this method is to take incoming -States and, apply them to a
-View. In the process of doing so, -States could be flagged for "deletion"
Expand All @@ -300,6 +341,8 @@ def states_to_views(unmatched_state_ids, org, cycle, StateClass):
"""
table_name = StateClass.__name__

sub_progress_data = update_sub_progress_total(100, sub_progress_key)

if table_name == 'PropertyState':
ViewClass = PropertyView
elif table_name == 'TaxLotState':
Expand Down Expand Up @@ -340,7 +383,8 @@ def states_to_views(unmatched_state_ids, org, cycle, StateClass):
# Otherwise, add current -State to be promoted as is.
merged_between_existing_count = 0
merge_state_pairs = []
for state in unmatched_states:
batch_size = math.ceil(len(unmatched_states) / 100)
for idx, state in enumerate(unmatched_states):
matching_criteria = matching_filter_criteria(state, column_names)
existing_state_matches = StateClass.objects.filter(
pk__in=Subquery(existing_cycle_views.values('state_id')),
Expand All @@ -359,6 +403,11 @@ def states_to_views(unmatched_state_ids, org, cycle, StateClass):
else:
promote_states = promote_states | StateClass.objects.filter(pk=state.id)

if batch_size > 0 and idx % batch_size == 0:
sub_progress_data.step('Matching Data (3/6): Merging Unmatched States')

sub_progress_data = update_sub_progress_total(100, sub_progress_key, finish=True)

# Process -States into -Views either directly (promoted_ids) or post-merge (merge_state_pairs).
_log.debug("There are %s merge_state_pairs and %s promote_states" % (len(merge_state_pairs), promote_states.count()))
priorities = Column.retrieve_priorities(org.pk)
Expand All @@ -367,7 +416,8 @@ def states_to_views(unmatched_state_ids, org, cycle, StateClass):
merged_state_ids = []
try:
with transaction.atomic():
for state_pair in merge_state_pairs:
batch_size = math.ceil(len(merge_state_pairs) / 100)
for idx, state_pair in enumerate(merge_state_pairs):
existing_state, newer_state = state_pair
existing_view = ViewClass.objects.get(state_id=existing_state.id)

Expand All @@ -378,11 +428,20 @@ def states_to_views(unmatched_state_ids, org, cycle, StateClass):

processed_views.append(existing_view)
merged_state_ids.append(merged_state.id)
if batch_size > 0 and idx % batch_size == 0:
sub_progress_data.step('Matching Data (4/6): Merging State Pairs')

sub_progress_data = update_sub_progress_total(100, sub_progress_key, finish=True)

for state in promote_states:
batch_size = math.ceil(len(promote_states) / 100)
for idx, state in enumerate(promote_states):
promoted_ids.append(state.id)
created_view = state.promote(cycle)
processed_views.append(created_view)
if batch_size > 0 and idx % batch_size == 0:
sub_progress_data.step('Matching Data (5/6): Promoting States')
sub_progress_data.finish_with_success()

except IntegrityError as e:
raise IntegrityError("Could not merge results with error: %s" % (e))

Expand All @@ -399,26 +458,34 @@ def states_to_views(unmatched_state_ids, org, cycle, StateClass):
return list(set(processed_views)), duplicate_count, new_count, matched_count, merged_between_existing_count


def link_views(merged_views, ViewClass):
def link_views(merged_views, ViewClass, sub_progress_key):
"""
Run each of the given -Views through a linking round.
For details on the actual linking logic, please refer to the the
match_merge_link() method.
"""

sub_progress_data = update_sub_progress_total(100, sub_progress_key)

if ViewClass == PropertyView:
state_class_name = "PropertyState"
else:
state_class_name = "TaxLotState"

processed_views = []
for view in merged_views:

batch_size = math.ceil(len(merged_views) / 100)
for idx, view in enumerate(merged_views):
_merge_count, _link_count, view_id = match_merge_link(view.id, state_class_name)

if view_id is not None:
processed_views.append(ViewClass.objects.get(pk=view_id))
else:
processed_views.append(view)
if batch_size > 0 and idx % batch_size == 0:
sub_progress_data.step('Matching Data (6/6): Merging Views')
sub_progress_data.finish_with_success()

return processed_views

Expand Down

0 comments on commit 2650dda

Please sign in to comment.