From ec73102b273a258ad01263ea0d52f0a083f41103 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Wed, 15 Feb 2023 15:21:51 +1100 Subject: [PATCH 1/3] feat(influx_publisher): allow overriding the timestamp useful for batched documentation --- shard.lock | 2 +- src/source/publishing/influx_publisher.cr | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/shard.lock b/shard.lock index 35191a2..8ea57be 100644 --- a/shard.lock +++ b/shard.lock @@ -6,7 +6,7 @@ shards: action-controller: git: https://github.com/spider-gazelle/action-controller.git - version: 5.6.0 + version: 5.6.2 active-model: git: https://github.com/spider-gazelle/active-model.git diff --git a/src/source/publishing/influx_publisher.cr b/src/source/publishing/influx_publisher.cr index bafa6bf..67e4465 100644 --- a/src/source/publishing/influx_publisher.cr +++ b/src/source/publishing/influx_publisher.cr @@ -37,6 +37,9 @@ module PlaceOS::Source property value : Array(Hash(String, FieldTypes?)) property ts_tag_keys : Array(String)? property ts_map : Hash(String, String)? + + # allow for a custom timestamp field + property timestamp_field : String? end alias Value = FieldTypes | Hash(String, FieldTypes?) | Hash(String, Hash(String, FieldTypes?)) | Array(Hash(String, FieldTypes?)) | CustomMetrics @@ -173,12 +176,19 @@ module PlaceOS::Source next if compacted.empty? measurement = default_measurement || data.module_name + override_timestamp = nil + if time_key = raw.timestamp_field + if time = fields.delete(time_key).as?(Float64) + override_timestamp = Time.from_unix time.to_i64 + end + end + # Must include a `pos_uniq` tag for seperating points # as per: https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/duplicate-points/#add-an-arbitrary-tag local_tags = tags.dup local_tags["pos_uniq"] = index.to_s - points << build_custom_point(measurement, data, fields, local_tags, compacted, timestamp, ts_map, raw.ts_tag_keys) + points << build_custom_point(measurement, data, fields, local_tags, compacted, override_timestamp || timestamp, ts_map, raw.ts_tag_keys) end points From d7645649635b90aab37fd5082bdd58b627baaedf Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Wed, 15 Feb 2023 15:25:44 +1100 Subject: [PATCH 2/3] fix unix time --- src/source/publishing/influx_publisher.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/source/publishing/influx_publisher.cr b/src/source/publishing/influx_publisher.cr index 67e4465..f89ad8f 100644 --- a/src/source/publishing/influx_publisher.cr +++ b/src/source/publishing/influx_publisher.cr @@ -179,7 +179,7 @@ module PlaceOS::Source override_timestamp = nil if time_key = raw.timestamp_field if time = fields.delete(time_key).as?(Float64) - override_timestamp = Time.from_unix time.to_i64 + override_timestamp = Time.unix time.to_i64 end end From 320d05a7e3c77384a22b48f8add11e2439e1e7e7 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Wed, 15 Feb 2023 17:28:49 +1100 Subject: [PATCH 3/3] update specs --- spec/publishing/influx_publisher_spec.cr | 4 ++-- src/source/publishing/influx_publisher.cr | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/spec/publishing/influx_publisher_spec.cr b/spec/publishing/influx_publisher_spec.cr index 8dde535..38e87b5 100644 --- a/spec/publishing/influx_publisher_spec.cr +++ b/spec/publishing/influx_publisher_spec.cr @@ -158,6 +158,7 @@ module PlaceOS::Source ts_tags: { pos_building: "pack", }, + ts_timestamp: "last_seen", }.to_json) points = InfluxPublisher.transform(message) @@ -167,7 +168,7 @@ module PlaceOS::Source point.measurement.should eq "M'Odule" - point.timestamp.should eq Time::UNIX_EPOCH + point.timestamp.not_nil!.to_unix.should eq 1601555879_i64 point.tags.should eq({ "pos_org" => "org-donor", @@ -193,7 +194,6 @@ module PlaceOS::Source "lat" => 25.20090608906493, "mac" => "66e0fd1279ce", "variance" => 4.5194575835650745, - "last_seen" => 1601555879, "building" => "zone-EmWLJNm0i~6", "level" => "zone-Epaq-dE1DaH", "map_width" => 1234.2, diff --git a/src/source/publishing/influx_publisher.cr b/src/source/publishing/influx_publisher.cr index f89ad8f..3087634 100644 --- a/src/source/publishing/influx_publisher.cr +++ b/src/source/publishing/influx_publisher.cr @@ -39,7 +39,7 @@ module PlaceOS::Source property ts_map : Hash(String, String)? # allow for a custom timestamp field - property timestamp_field : String? + property ts_timestamp : String? end alias Value = FieldTypes | Hash(String, FieldTypes?) | Hash(String, Hash(String, FieldTypes?)) | Array(Hash(String, FieldTypes?)) | CustomMetrics @@ -177,8 +177,8 @@ module PlaceOS::Source measurement = default_measurement || data.module_name override_timestamp = nil - if time_key = raw.timestamp_field - if time = fields.delete(time_key).as?(Float64) + if time_key = raw.ts_timestamp + if time = compacted.delete(time_key).as?(Float64) override_timestamp = Time.unix time.to_i64 end end