Skip to content

Commit

Permalink
Merge pull request #3151 from SEED-platform/Sync-Sensor-Readings-crea…
Browse files Browse the repository at this point in the history
…tion

Sync sensor readings creation
  • Loading branch information
haneslinger committed Mar 4, 2022
2 parents 98e79f2 + 3fe8b53 commit f8641e8
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 28 deletions.
104 changes: 81 additions & 23 deletions seed/data_importer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from django.utils import timezone as tz
from django.utils.timezone import make_naive
from past.builtins import basestring
from seed.models.sensors import SensorReading
from unidecode import unidecode

from seed.building_sync import validation_client
Expand Down Expand Up @@ -750,7 +749,8 @@ def finish_raw_save(results, file_pk, progress_key):
finished_progress_data = progress_data.finish_with_success(new_summary)

elif import_file.source_type == "SensorReadings":
finished_progress_data = progress_data.finish_with_success(results)
new_summary = _append_sensor_readings_import_results_to_summary(results)
finished_progress_data = progress_data.finish_with_success(new_summary)

else:
finished_progress_data = progress_data.finish_with_success()
Expand Down Expand Up @@ -892,32 +892,69 @@ def _save_sensor_readings_data_create_tasks(file_pk, progress_key):
)
sensor_readings_data = parser.sensor_readings_details

sensor_readings_summary = []
tasks = []
chunk_size = 500
for sensor_column_name, readings in sensor_readings_data.items():
readings_tuples = [t for t in readings.items()]
for batch_readings in batch(readings_tuples, chunk_size):
tasks.append(_save_sensor_readings_task.s(batch_readings, sensor_column_name, progress_data.key))

progress_data.total = len(tasks)
progress_data.save()

return chord(tasks, interval=15)(finish_raw_save.s(file_pk, progress_data.key))


@shared_task
def _save_sensor_readings_task(readings_tuples, sensor_column_name, progress_key):
progress_data = ProgressData.from_key(progress_key)

result = {}
try:
sensor = Sensor.objects.get(column_name=sensor_column_name)

num_readings = 0
if sensor:
for timestamp, value in readings.items():
sr, _ = SensorReading.objects.get_or_create(**{
"sensor": sensor,
"timestamp": timestamp
})
sr.reading = value
sr.save()
num_readings += 1

sensor_readings_summary.append({
"column_name": sensor.display_name,
"exists": sensor is not None,
"num_readings": num_readings
})
except Sensor.DoesNotExist:
result[sensor_column_name] = {'error': 'No such sensor.'}

# add in the proposed_imports into the progress key to be used later. (This used to be the summary).
progress_data.total = 0
progress_data.save()
else:
try:
with transaction.atomic():
reading_strings = [
f"({sensor.id}, '{timestamp}', '{value}')"
for timestamp, value
in readings_tuples
]
sql = (
'INSERT INTO seed_sensorreading(sensor_id, timestamp, reading)' +
' VALUES ' + ', '.join(reading_strings) +
' ON CONFLICT (sensor_id, timestamp)' +
' DO UPDATE SET reading = EXCLUDED.reading' +
' RETURNING reading;'
)
with connection.cursor() as cursor:
cursor.execute(sql)
result[sensor_column_name] = {'count': len(cursor.fetchall())}
except ProgrammingError as e:
if 'ON CONFLICT DO UPDATE command cannot affect row a second time' in str(e):
result[sensor_column_name] = {'error': 'Overlapping readings.'}
else:
progress_data.finish_with_error('data failed to import')
raise e
except DataError as e:
if "date/time field" in str(e):
result[sensor_column_name] = {'error': 'Invalid readings. Ensure timestamps are in iso format.'}
elif "invalid input syntax for type double precision" in str(e):
result[sensor_column_name] = {'error': 'Invalid readings. Ensure readings are numbers.'}
else:
result[sensor_column_name] = {'error': 'Invalid readings.'}

except Exception as e:
progress_data.finish_with_error('data failed to import')
raise e

progress_data.step()

return finish_raw_save(sensor_readings_summary, file_pk, progress_data.key)
return result


@shared_task
Expand Down Expand Up @@ -1140,6 +1177,27 @@ def _append_sensor_import_results_to_summary(import_results):
]


def _append_sensor_readings_import_results_to_summary(import_results):
summary = {}
for import_result in import_results:
sensor_name, result = list(import_result.items())[0]

if sensor_name not in summary:
summary[sensor_name] = {
"column_name": sensor_name,
"num_readings": 0,
"errors": ""
}

if "count" in result:
summary[sensor_name]["num_readings"] += result["count"]

if "error" in result:
summary[sensor_name]["errors"] = result["error"]

return list(summary.values())


@shared_task
def _save_raw_data_create_tasks(file_pk, progress_key):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@ angular.module('BE.seed.controller.sensor_readings_upload_modal', [])
displayName: 'column name',
enableHiding: false,
type: 'string'
}, {
field: 'exists',
enableHiding: false
}, {
field: 'num_readings',
displayName: 'number of readings',
Expand All @@ -113,9 +110,16 @@ angular.module('BE.seed.controller.sensor_readings_upload_modal', [])

var show_confirmation_info = function () {
uploader_service.sensor_readings_preview($scope.file_id, $scope.organization_id, $scope.view_id).then(function (result) {
var addtional_columnDefs = [
{
field: 'exists',
enableHiding: false
}
]

$scope.proposed_imports_options = {
data: result,
columnDefs: base_sensor_readings_col_defs,
columnDefs: [...base_sensor_readings_col_defs, ...addtional_columnDefs],
enableColumnResizing: true,
enableHorizontalScrollbar: uiGridConstants.scrollbars.NEVER,
enableVerticalScrollbar: result.length <= 5 ? uiGridConstants.scrollbars.NEVER : uiGridConstants.scrollbars.WHEN_NEEDED,
Expand All @@ -140,9 +144,17 @@ angular.module('BE.seed.controller.sensor_readings_upload_modal', [])
};

var buildImportResults = function (message) {
var addtional_columnDefs = [
{
field: 'errors',
displayName: 'errors',
enableHiding: false
}
]

$scope.import_result_options = {
data: message,
columnDefs: base_sensor_readings_col_defs,
columnDefs: [...base_sensor_readings_col_defs, ...addtional_columnDefs],
enableColumnResizing: true,
enableHorizontalScrollbar: uiGridConstants.scrollbars.NEVER,
enableVerticalScrollbar: message.length <= 5 ? uiGridConstants.scrollbars.NEVER : uiGridConstants.scrollbars.WHEN_NEEDED,
Expand Down
6 changes: 6 additions & 0 deletions seed/tests/data/small_sensor_readings_example_bad.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
timestamp,I'm not here,my_coolness_sensor,charisma_sensor_1,dex_sensor_1,intelligence_sensor
2021-10-22 16:30Z,8700000.8,20.7,61,670.9,asdf
2021-10-22 17:00Z,90.5,21.7,56.7,704.8,207.9
2021-10-22 17:15Z,90,20.8,59.2,654.1,163.8
22/12/2021 17:30Z,87.9,20,61.8,636.2,54.9
2021-10-22 17:45Z,89.5,20.7,60,653.9,69.2

0 comments on commit f8641e8

Please sign in to comment.