Skip to content

Commit

Permalink
Merge pull request #1907 from SEED-platform/meters-pm-format
Browse files Browse the repository at this point in the history
Meters pm format
  • Loading branch information
axelstudios committed Jul 1, 2019
2 parents 4762cb4 + f363177 commit 93c4bdc
Show file tree
Hide file tree
Showing 32 changed files with 1,815 additions and 825 deletions.
339 changes: 192 additions & 147 deletions seed/data_importer/meters_parser.py

Large diffs are not rendered by default.

66 changes: 41 additions & 25 deletions seed/data_importer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
ImportRecord,
STATUS_READY_TO_MERGE,
)
from seed.data_importer.utils import usage_point_id
from seed.decorators import lock_and_track
from seed.lib.mcm import cleaners, mapper, reader
from seed.lib.mcm.mapper import expand_rows
Expand Down Expand Up @@ -732,10 +733,9 @@ def _save_greenbutton_data_create_tasks(file_pk, progress_key):
readings = meter_readings['readings']
meter_only_details = {k: v for k, v in meter_readings.items() if k != "readings"}
meter, _created = Meter.objects.get_or_create(**meter_only_details)

meter_id = meter.id

meter_usage_point_id = meters_parser.usage_point_id(meter.source_id)
meter_usage_point_id = usage_point_id(meter.source_id)

chunk_size = 1000

Expand Down Expand Up @@ -766,6 +766,7 @@ def _save_greenbutton_data_task(readings, meter_id, meter_usage_point_id, progre
readings for that batch are saved.
"""
progress_data = ProgressData.from_key(progress_key)
meter = Meter.objects.get(pk=meter_id)

result = {}
try:
Expand All @@ -785,14 +786,15 @@ def _save_greenbutton_data_task(readings, meter_id, meter_usage_point_id, progre
)
with connection.cursor() as cursor:
cursor.execute(sql)
result['source_id'] = meter_usage_point_id
result['count'] = len(cursor.fetchall())
key = "{} - {}".format(meter_usage_point_id, meter.get_type_display())
result[key] = {'count': len(cursor.fetchall())}
except ProgrammingError as e:
if "ON CONFLICT DO UPDATE command cannot affect row a second time" in str(e):
result['source_id'] = meter_usage_point_id
result['error'] = "Overlapping readings."
except Exception:
return progress_data.finish_with_error('data failed to import')
key = "{} - {}".format(meter_usage_point_id, meter.get_type_display())
result[key] = {"error": "Overlapping readings."}
except Exception as e:
progress_data.finish_with_error('data failed to import')
raise e

# Indicate progress
progress_data.step()
Expand Down Expand Up @@ -841,14 +843,16 @@ def _save_pm_meter_usage_data_task(meter_readings, file_pk, progress_key):
)
with connection.cursor() as cursor:
cursor.execute(sql)
result['source_id'] = meter.source_id
result['count'] = len(cursor.fetchall())
key = "{} - {}".format(meter.source_id, meter.get_type_display())
result[key] = {'count': len(cursor.fetchall())}
except ProgrammingError as e:
if "ON CONFLICT DO UPDATE command cannot affect row a second time" in str(e):
result['source_id'] = meter_readings.get("source_id")
result['error'] = "Overlapping readings."
except Exception:
return progress_data.finish_with_error('data failed to import')
type_lookup = dict(Meter.ENERGY_TYPES)
key = "{} - {}".format(meter_readings.get("source_id"), type_lookup[meter_readings['type']])
result[key] = {"error": "Overlapping readings."}
except Exception as e:
progress_data.finish_with_error('data failed to import')
raise e

# Indicate progress
progress_data.step()
Expand Down Expand Up @@ -891,30 +895,42 @@ def _save_pm_meter_usage_data_create_tasks(file_pk, progress_key):
return tasks, proposed_imports


def _append_meter_import_results_to_summary(import_results, summary):
def _append_meter_import_results_to_summary(import_results, incoming_summary):
"""
This appends meter import result counts and, if applicable, error messages.
Note, import_results will be of the form:
[
{'<source_id/usage_point_id> - <type>": {'count': 100}},
{'<source_id/usage_point_id> - <type>": {'count': 100}},
{'<source_id/usage_point_id> - <type>": {'error': "<error_message>"}},
{'<source_id/usage_point_id> - <type>": {'error': "<error_message>"}},
]
"""
agg_results_summary = collections.defaultdict(lambda: 0)
error_comments = collections.defaultdict(lambda: set())

for id_count in import_results:
success_count = id_count.get("count")
# First aggregate import_results by key
for result in import_results:
key = list(result.keys())[0]

success_count = result[key].get('count')

if success_count:
agg_results_summary[id_count["source_id"]] += success_count
if success_count is None:
error_comments[key].add(result[key].get("error"))
else:
error_comments[id_count["source_id"]].add(id_count.get("error"))
agg_results_summary[key] += success_count

for import_info in summary:
pm_id = import_info["source_id"]
# Next update summary of incoming meters imports with aggregated results.
for import_info in incoming_summary:
key = "{} - {}".format(import_info['source_id'], import_info['type'])

import_info["successfully_imported"] = agg_results_summary.get(pm_id, 0)
import_info["successfully_imported"] = agg_results_summary.get(key, 0)

if error_comments:
import_info["errors"] = " ".join(list(error_comments.get(pm_id, "")))
import_info["errors"] = " ".join(list(error_comments.get(key, "")))

return summary
return incoming_summary


def _save_raw_data_create_tasks(file_pk, progress_key):
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
42 changes: 21 additions & 21 deletions seed/data_importer/tests/test_greenbutton_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,13 @@

from django.core.files.uploadedfile import SimpleUploadedFile
from django.core.urlresolvers import reverse
from django.test import TestCase
from django.utils.timezone import (
get_current_timezone,
make_aware, # make_aware is used because inconsistencies exist in creating datetime with tzinfo
)

from pytz import timezone

from quantityfield import ureg

from seed.data_importer.models import ImportFile, ImportRecord
from seed.landing.models import SEEDUser as User
from seed.models import (
Expand All @@ -35,9 +32,10 @@
FakePropertyStateFactory,
)
from seed.utils.organizations import create_organization
from seed.tests.util import DataMappingBaseTestCase


class GreenButtonImportTest(TestCase):
class GreenButtonImportTest(DataMappingBaseTestCase):
def setUp(self):
self.user_details = {
'username': 'test_user@demo.com',
Expand Down Expand Up @@ -91,7 +89,7 @@ def test_green_button_import_base_case(self):
refreshed_property_1 = Property.objects.get(pk=self.property_1.id)
self.assertEqual(refreshed_property_1.meters.all().count(), 1)

meter_1 = refreshed_property_1.meters.get(type=Meter.ELECTRICITY)
meter_1 = refreshed_property_1.meters.get(type=Meter.ELECTRICITY_GRID)
self.assertEqual(meter_1.source, Meter.GREENBUTTON)
self.assertEqual(meter_1.source_id, 'User/6150855/UsagePoint/409483/MeterReading/1/IntervalBlock/1')
self.assertEqual(meter_1.is_virtual, False)
Expand All @@ -101,15 +99,15 @@ def test_green_button_import_base_case(self):

self.assertEqual(meter_reading_10.start_time, make_aware(datetime(2011, 3, 5, 21, 0, 0), timezone=self.tz_obj))
self.assertEqual(meter_reading_10.end_time, make_aware(datetime(2011, 3, 5, 21, 15, 0), timezone=self.tz_obj))
self.assertEqual(meter_reading_10.reading, 1790 * 3.412 / 1000 * ureg('kBtu'))
self.assertEqual(meter_reading_10.source_unit, "kWh")
self.assertEqual(meter_reading_10.conversion_factor, 3.412)
self.assertEqual(meter_reading_10.reading, 1790 * 3.41 / 1000)
self.assertEqual(meter_reading_10.source_unit, 'kWh (thousand Watt-hours)')
self.assertEqual(meter_reading_10.conversion_factor, 3.41)

self.assertEqual(meter_reading_11.start_time, make_aware(datetime(2011, 3, 5, 21, 15, 0), timezone=self.tz_obj))
self.assertEqual(meter_reading_11.end_time, make_aware(datetime(2011, 3, 5, 21, 30, 0), timezone=self.tz_obj))
self.assertEqual(meter_reading_11.reading, 1791 * 3.412 / 1000 * ureg('kBtu'))
self.assertEqual(meter_reading_11.source_unit, "kWh")
self.assertEqual(meter_reading_11.conversion_factor, 3.412)
self.assertEqual(meter_reading_11.reading, 1791 * 3.41 / 1000)
self.assertEqual(meter_reading_11.source_unit, 'kWh (thousand Watt-hours)')
self.assertEqual(meter_reading_11.conversion_factor, 3.41)

# matching_results_data gets cleared out since the field wasn't meant for this
refreshed_import_file = ImportFile.objects.get(pk=self.import_file.id)
Expand All @@ -126,7 +124,7 @@ def test_existing_meter_is_found_and_used_if_import_file_should_reference_it(sel
property=property,
source=Meter.GREENBUTTON,
source_id='User/6150855/UsagePoint/409483/MeterReading/1/IntervalBlock/1',
type=Meter.ELECTRICITY,
type=Meter.ELECTRICITY_GRID,
)
unsaved_meter.save()
existing_meter = Meter.objects.get(pk=unsaved_meter.id)
Expand All @@ -151,11 +149,11 @@ def test_existing_meter_is_found_and_used_if_import_file_should_reference_it(sel
refreshed_property_1 = Property.objects.get(pk=self.property_1.id)
self.assertEqual(refreshed_property_1.meters.all().count(), 1)

refreshed_meter = refreshed_property_1.meters.get(type=Meter.ELECTRICITY)
refreshed_meter = refreshed_property_1.meters.get(type=Meter.ELECTRICITY_GRID)

meter_reading_10, meter_reading_11, meter_reading_12 = list(refreshed_meter.meter_readings.order_by('start_time').all())
self.assertEqual(meter_reading_10.reading, 1790 * 3.412 / 1000 * ureg('kBtu'))
self.assertEqual(meter_reading_11.reading, 1791 * 3.412 / 1000 * ureg('kBtu'))
self.assertEqual(meter_reading_10.reading, 1790 * 3.41 / 1000)
self.assertEqual(meter_reading_11.reading, 1791 * 3.41 / 1000)

# Sanity check to be sure, nothing was changed with existing meter reading
self.assertEqual(meter_reading_12, existing_meter_reading)
Expand All @@ -168,7 +166,7 @@ def test_existing_meter_reading_has_reading_source_unit_and_conversion_factor_up
property=property,
source=Meter.GREENBUTTON,
source_id='User/6150855/UsagePoint/409483/MeterReading/1/IntervalBlock/1',
type=Meter.ELECTRICITY,
type=Meter.ELECTRICITY_GRID,
)
unsaved_meter.save()
existing_meter = Meter.objects.get(pk=unsaved_meter.id)
Expand All @@ -183,7 +181,7 @@ def test_existing_meter_reading_has_reading_source_unit_and_conversion_factor_up
end_time=end_time,
reading=1000,
source_unit="GJ",
conversion_factor=947.817
conversion_factor=947.82
)
unsaved_meter_reading.save()

Expand All @@ -198,13 +196,13 @@ def test_existing_meter_reading_has_reading_source_unit_and_conversion_factor_up
self.assertEqual(MeterReading.objects.all().count(), 2)

refreshed_property = Property.objects.get(pk=self.property_1.id)
refreshed_meter = refreshed_property.meters.get(type=Meter.ELECTRICITY)
refreshed_meter = refreshed_property.meters.get(type=Meter.ELECTRICITY_GRID)
meter_reading = refreshed_meter.meter_readings.get(start_time=start_time)

self.assertEqual(meter_reading.end_time, end_time)
self.assertEqual(meter_reading.reading, 1790 * 3.412 / 1000 * ureg('kBtu'))
self.assertEqual(meter_reading.source_unit, "kWh")
self.assertEqual(meter_reading.conversion_factor, 3.412)
self.assertEqual(meter_reading.reading, 1790 * 3.41 / 1000)
self.assertEqual(meter_reading.source_unit, 'kWh (thousand Watt-hours)')
self.assertEqual(meter_reading.conversion_factor, 3.41)

def test_the_response_contains_expected_and_actual_reading_counts_for_pm_ids(self):
url = reverse("api:v2:import_files-save-raw-data", args=[self.import_file.id])
Expand All @@ -220,6 +218,7 @@ def test_the_response_contains_expected_and_actual_reading_counts_for_pm_ids(sel
{
"source_id": "409483",
"incoming": 2,
"type": "Electric - Grid",
"successfully_imported": 2,
},
]
Expand Down Expand Up @@ -250,6 +249,7 @@ def test_error_noted_in_response_if_meter_has_overlapping_readings_in_the_same_b
expectation = [
{
"source_id": "409483",
"type": "Electric - Grid",
"incoming": 1002,
"successfully_imported": 1000,
"errors": 'Overlapping readings.',
Expand Down

0 comments on commit 93c4bdc

Please sign in to comment.