Skip to content

Commit

Permalink
Merge pull request #3104 from SEED-platform/Merge-building-sync-files
Browse files Browse the repository at this point in the history
Merge building sync files
  • Loading branch information
haneslinger committed Jan 31, 2022
2 parents 2ca6de2 + 1223281 commit 2643198
Show file tree
Hide file tree
Showing 9 changed files with 359 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<auc:Address>
<auc:StreetAddressDetail>
<auc:Simplified>
<auc:StreetAddress>123 Main St</auc:StreetAddress>
<auc:StreetAddress>456 Main St</auc:StreetAddress>
</auc:Simplified>
</auc:StreetAddressDetail>
<auc:City>San Francisco</auc:City>
Expand Down
Binary file not shown.
39 changes: 28 additions & 11 deletions seed/data_importer/match.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,22 @@ def match_and_link_incoming_properties_and_taxlots(file_pk, progress_key, sub_pr
tax_lot_initial_incoming_count = incoming_tax_lots.count()

if incoming_properties.exists():
# If importing BuildingSync, we will not just skip duplicates like we normally
# do. Since we don't skip them, they will eventually get merged into their "duplicate".
# We do this b/c though the property data might be the same, the Scenarios, Measures,
# or Meters might have been updated. The merging flow is able to "transfer"
# this data, while skipping duplicates cannot.
merge_duplicates = import_file.from_buildingsync

# Within the ImportFile, filter out the duplicates.
log_debug("Start Properties filter_duplicate_states")
promoted_property_ids, property_duplicates_within_file_count = filter_duplicate_states(
incoming_properties,
sub_progress_key,
)
if merge_duplicates:
promoted_property_ids, property_duplicates_within_file_count = incoming_properties.values_list('id', flat=True), 0
else:
promoted_property_ids, property_duplicates_within_file_count = filter_duplicate_states(
incoming_properties,
sub_progress_key,
)

# Within the ImportFile, merge -States together based on user defined matching_criteria
log_debug('Start Properties inclusive_match_and_merge')
Expand All @@ -142,6 +152,7 @@ def match_and_link_incoming_properties_and_taxlots(file_pk, progress_key, sub_pr
import_file.cycle,
PropertyState,
sub_progress_key,
merge_duplicates,
)

# Look for links across Cycles
Expand Down Expand Up @@ -319,7 +330,7 @@ def inclusive_match_and_merge(unmatched_state_ids, org, StateClass, sub_progress
return promoted_ids, merges_within_file


def states_to_views(unmatched_state_ids, org, cycle, StateClass, sub_progress_key):
def states_to_views(unmatched_state_ids, org, cycle, StateClass, sub_progress_key, merge_duplicates=False):
"""
The purpose of this method is to take incoming -States and, apply them to a
-View. In the process of doing so, -States could be flagged for "deletion"
Expand All @@ -337,6 +348,8 @@ def states_to_views(unmatched_state_ids, org, cycle, StateClass, sub_progress_ke
:param org: Organization object
:param cycle: Cycle object
:param StateClass: PropertyState or TaxLotState
:param merge_duplicates: bool, if True, we keep the duplicates and merge them
instead of skipping them. This is used when importing BuildingSync files.
:return: processed_views, duplicate_count, new + matched counts
"""
table_name = StateClass.__name__
Expand All @@ -354,12 +367,16 @@ def states_to_views(unmatched_state_ids, org, cycle, StateClass, sub_progress_ke
pk__in=Subquery(existing_cycle_views.values('state_id'))
)

# Apply DATA_STATE_DELETE to incoming duplicate -States of existing -States in Cycle
duplicate_states = StateClass.objects.filter(
pk__in=unmatched_state_ids,
hash_object__in=Subquery(existing_states.values('hash_object'))
)
duplicate_count = duplicate_states.update(data_state=DATA_STATE_DELETE)
if merge_duplicates:
duplicate_states = StateClass.objects.none()
duplicate_count = 0
else:
# Apply DATA_STATE_DELETE to incoming duplicate -States of existing -States in Cycle
duplicate_states = StateClass.objects.filter(
pk__in=unmatched_state_ids,
hash_object__in=Subquery(existing_states.values('hash_object'))
)
duplicate_count = duplicate_states.update(data_state=DATA_STATE_DELETE)

column_names = matching_criteria_column_names(org.id, table_name)

Expand Down
83 changes: 32 additions & 51 deletions seed/data_importer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import zipfile
import tempfile

from celery import chord, shared_task
from celery import chord, shared_task, group
from celery import chain as celery_chain
from celery.utils.log import get_task_logger
from django.contrib.gis.geos import GEOSGeometry
Expand Down Expand Up @@ -1140,45 +1140,33 @@ def geocode_and_match_buildings_task(file_pk):
if import_file.cycle is None:
_log.warning("Import file cycle is None; This should never happen in production")

post_geocode_tasks = None
if import_file.from_buildingsync:
source_type_dict = {
'Portfolio Raw': PORTFOLIO_RAW,
'Assessed Raw': ASSESSED_RAW,
'BuildingSync Raw': BUILDINGSYNC_RAW
}
source_type = source_type_dict.get(import_file.source_type, ASSESSED_RAW)

# get the properties and chunk them into tasks
qs = PropertyState.objects.filter(
import_file=import_file,
source_type=source_type,
data_state=DATA_STATE_MAPPING,
).only('id').iterator()
# get the properties and chunk them into tasks
property_states = (
PropertyState.objects.filter(import_file_id=file_pk)
.exclude(data_state=DATA_STATE_IMPORT)
.only('id')
.iterator()
)

id_chunks = [[obj.id for obj in chunk] for chunk in batch(qs, 100)]
id_chunks = [[obj.id for obj in chunk] for chunk in batch(property_states, 100)]

post_geocode_tasks_count = len(id_chunks)
post_geocode_tasks = chord(
header=(_map_additional_models.si(ids, import_file.id, progress_data.key) for ids in id_chunks),
body=finish_mapping_additional_models.s(file_pk, progress_data.key))
else:
# Start, match, pair
post_geocode_tasks_count = 3
post_geocode_tasks = chord(
header=match_and_link_incoming_properties_and_taxlots.si(file_pk, progress_data.key, sub_progress_data.key),
body=finish_matching.s(file_pk, progress_data.key),
interval=15)

geocoding_tasks_count = 1
progress_data.total = geocoding_tasks_count + post_geocode_tasks_count
progress_data.total = (
1 # geocoding
+ len(id_chunks) # map additional models tasks
+ 2 # match and link
+ 1 # finish
)
progress_data.save()
sub_progress_data.total = 100
sub_progress_data.save()

celery_chain(
_geocode_properties_or_tax_lots.s(file_pk, progress_data.key, sub_progress_data.key),
post_geocode_tasks)()
_geocode_properties_or_tax_lots.si(file_pk, progress_data.key),
group(_map_additional_models.si(id_chunk, file_pk, progress_data.key) for id_chunk in id_chunks),
match_and_link_incoming_properties_and_taxlots.si(file_pk, progress_data.key, sub_progress_data.key),
finish_matching.s(file_pk, progress_data.key),
)()

sub_progress_data.total = 100
sub_progress_data.save()

return {'progress_data': progress_data.result(), 'sub_progress_data': sub_progress_data.result()}

Expand Down Expand Up @@ -1367,10 +1355,7 @@ def finish_matching(result, import_file_id, progress_key):
import_file = ImportFile.objects.get(pk=import_file_id)
import_file.matching_done = True
import_file.mapping_completion = 100
if isinstance(result, list) and len(result) == 1:
import_file.matching_results_data = result[0]
else:
raise Exception('there are more than one results for matching_results, need to merge')
import_file.matching_results_data = result
import_file.save()

return progress_data.finish_with_success()
Expand Down Expand Up @@ -1446,22 +1431,18 @@ def _map_additional_models(ids, file_pk, progress_key):
for property_state in property_states:
if source_type == BUILDINGSYNC_RAW:
# parse the rest of the models (scenarios, meters, etc) from the building file
# and create the property and property view
# Note that we choose _not_ to promote the property state (i.e. create a canonical property)
# b/c that will be handled in the match/merge/linking later on
building_file = property_state.building_files.get()
p_status, property_state, property_view, messages = building_file.process(
org.id, import_file.cycle)
success, property_state, _, messages = building_file.process(
org.id,
import_file.cycle,
promote_property_state=False
)

if not p_status or len(messages.get('errors', [])) > 0:
# something went wrong, save the messages and skip this file
progress_data.add_file_info(os.path.basename(building_file.filename), messages)
continue
elif len(messages.get('warnings', [])) > 0:
# non-fatal warnings, add the info and continue to save the file
if not success or messages.get('errors') or messages.get('warnings'):
progress_data.add_file_info(os.path.basename(building_file.filename), messages)

property_state.data_state = DATA_STATE_MATCHING
property_state.save()

progress_data.step()

return {
Expand Down
11 changes: 4 additions & 7 deletions seed/data_importer/tests/integration/test_data_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
BuildingFile,
PropertyMeasure,
)
from seed.models.models import DATA_STATE_MAPPING
from seed.tests.util import DataMappingBaseTestCase
from seed.lib.xml_mapping.mapper import default_buildingsync_profile_mappings

Expand Down Expand Up @@ -287,8 +288,7 @@ def test_map_data_zip(self):

# -- Assert
self.assertEqual('success', progress_info['status'])
ps = PropertyState.objects.filter(address_line_1='123 Main St',
import_file=self.import_file)
ps = PropertyState.objects.filter(import_file=self.import_file, data_state=DATA_STATE_MAPPING)
self.assertEqual(len(ps), 2)

def test_map_all_models_zip(self):
Expand All @@ -305,16 +305,13 @@ def test_map_all_models_zip(self):
# map the data
progress_info = tasks.map_data(self.import_file.pk)
self.assertEqual('success', progress_info['status'])
ps = PropertyState.objects.filter(address_line_1='123 Main St',
import_file=self.import_file)
self.assertEqual(ps.count(), 2)

# -- Act
tasks.geocode_and_match_buildings_task(self.import_file.pk)

# -- Assert
ps = PropertyState.objects.filter(address_line_1='123 Main St', import_file=self.import_file)
self.assertEqual(ps.count(), 2)
pvs = PropertyView.objects.all()
self.assertEqual(pvs.count(), 2)

# verify there are 2 building files
bfs = BuildingFile.objects.all()
Expand Down

0 comments on commit 2643198

Please sign in to comment.