Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions .ameba.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
Lint/NotNil:
Description: Identifies usage of `not_nil!` calls
Enabled: false
Style/QueryBoolMethods:
Enabled: false
Severity: Warning

Naming/BlockParameterName:
Description: Disallows non-descriptive block parameter names
Enabled: false
Severity: Convention

Lint/SpecFilename:
Description: Enforces spec filenames to have `_spec` suffix
Enabled: false
Severity: Warning

Naming/AccessorMethodName:
Description: Makes sure that accessor methods are named properly
Enabled: false
Severity: Convention
5 changes: 0 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ x-deployment-env:
&deployment-env
ENV: ${ENV:-development}
SG_ENV: ${SG_ENV:-development}
TZ: $TZ

x-influxdb-api-key: &influxdb-api-key .env.influxdb

Expand Down Expand Up @@ -86,15 +85,11 @@ services:
restart: always
volumes:
- ${PWD}/config/mosquitto.conf:/etc/mosquitto/mosquitto.conf
environment:
TZ: $TZ

redis:
image: eqalpha/keydb
restart: always
hostname: redis
environment:
TZ: $TZ

postgres:
hostname: postgres
Expand Down
6 changes: 3 additions & 3 deletions scripts/init-influxdb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ echo "=== Initialising InfluxDB API"
# Wait for the service to be available
wait=0
max_wait=10
until [ $wait -eq $max_wait ] || docker-compose exec -T $instance influx ping > /dev/null; do
until [ $wait -eq $max_wait ] || docker compose exec -T $instance influx ping > /dev/null; do
sleep $(( wait++ ))
done

Expand All @@ -33,7 +33,7 @@ if [ $wait -eq $max_wait ]; then
fi

# Ensure the bucket in the environment is configured
docker-compose exec -T "$instance" influx setup --force \
docker compose exec -T "$instance" influx setup --force \
--username "${username}" \
--password "${password}" \
--org "${org}" \
Expand All @@ -42,7 +42,7 @@ docker-compose exec -T "$instance" influx setup --force \
--retention "${retention}" 2> /dev/null || echo "Already initialised"

# List buckets in the InfluxDB instance
docker-compose exec -T -e INFLUX_TOKEN="${INFLUX_API_KEY}" "$instance" influx bucket list -o "${org}" -n "${bucket}" --hide-headers > /dev/null
docker compose exec -T -e INFLUX_TOKEN="${INFLUX_API_KEY}" "$instance" influx bucket list -o "${org}" -n "${bucket}" --hide-headers > /dev/null
bucket_check_status=$?

if [ $bucket_check_status -eq 0 ]; then
Expand Down
14 changes: 7 additions & 7 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ shards:

bindata:
git: https://github.com/spider-gazelle/bindata.git
version: 2.0.0
version: 2.1.0

connect-proxy:
git: https://github.com/spider-gazelle/connect-proxy.git
version: 2.0.1
version: 2.0.2

crc16:
git: https://github.com/maiha/crc16.cr.git
Expand All @@ -43,7 +43,7 @@ shards:

csuuid:
git: https://github.com/wyhaines/csuuid.cr.git
version: 1.0.1+git.commit.b71cf5c899dd5cde6aff8e922bdabd5e2dfab585
version: 1.0.1+git.commit.4cb8656a9214aede9c1840cad4acf8e55e658f2f

db:
git: https://github.com/crystal-lang/crystal-db.git
Expand Down Expand Up @@ -135,7 +135,7 @@ shards:

mqtt:
git: https://github.com/spider-gazelle/crystal-mqtt.git
version: 1.2.2
version: 1.2.3

murmur3:
git: https://github.com/aca-labs/murmur3.git
Expand Down Expand Up @@ -187,7 +187,7 @@ shards:

pg-orm:
git: https://github.com/spider-gazelle/pg-orm.git
version: 1.1.1+git.commit.282c353b676806eef73786645f8cd852cdb60bc3
version: 1.1.1+git.commit.b64df7fb114f391a4010bac5f337d5ac2c088992

place_calendar:
git: https://github.com/placeos/calendar.git
Expand All @@ -207,7 +207,7 @@ shards:

placeos-models:
git: https://github.com/placeos/models.git
version: 9.53.3
version: 9.57.4

placeos-resource:
git: https://github.com/place-labs/resource.git
Expand Down Expand Up @@ -259,7 +259,7 @@ shards:

splay_tree_map:
git: https://github.com/wyhaines/splay_tree_map.cr.git
version: 0.2.2
version: 0.3.0

ssh2:
git: https://github.com/spider-gazelle/ssh2.cr.git
Expand Down
9 changes: 9 additions & 0 deletions spec/publishing/influx_publisher_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ module PlaceOS::Source

point.tags.should eq({
"pos_org" => "org-donor",
"pos_region" => "_",
"pos_building" => "cards",
"pos_level" => "nek",
"pos_area" => "2042",
Expand Down Expand Up @@ -87,6 +88,7 @@ module PlaceOS::Source

point.tags.should eq({
"pos_org" => "org-donor",
"pos_region" => "_",
"pos_building" => "cards",
"pos_level" => "nek",
"pos_area" => "2042",
Expand Down Expand Up @@ -172,6 +174,7 @@ module PlaceOS::Source

point.tags.should eq({
"pos_org" => "org-donor",
"pos_region" => "_",
"pos_building" => "pack",
"pos_level" => "nek",
"pos_area" => "2042",
Expand Down Expand Up @@ -212,6 +215,7 @@ module PlaceOS::Source

point.tags.should eq({
"pos_org" => "org-donor",
"pos_region" => "_",
"pos_building" => "pack",
"pos_level" => "nek",
"pos_area" => "2042",
Expand Down Expand Up @@ -287,6 +291,7 @@ module PlaceOS::Source

point.tags.should eq({
"pos_org" => "org-donor",
"pos_region" => "_",
"pos_building" => "cards",
"pos_level" => "nek",
"pos_area" => "2042",
Expand Down Expand Up @@ -327,6 +332,7 @@ module PlaceOS::Source

point.tags.should eq({
"pos_org" => "org-donor",
"pos_region" => "_",
"pos_building" => "cards",
"pos_level" => "nek",
"pos_area" => "2042",
Expand Down Expand Up @@ -394,6 +400,7 @@ module PlaceOS::Source

point.tags.should eq({
"pos_org" => "org-donor",
"pos_region" => "_",
"pos_building" => "cards",
"pos_level" => "nek",
"pos_area" => "2042",
Expand Down Expand Up @@ -480,6 +487,7 @@ module PlaceOS::Source

point.tags.should eq({
"pos_org" => "org-donor",
"pos_region" => "_",
"pos_building" => "cards",
"pos_level" => "nek",
"pos_area" => "2042",
Expand Down Expand Up @@ -521,6 +529,7 @@ module PlaceOS::Source

point.tags.should eq({
"pos_org" => "org-donor",
"pos_region" => "_",
"pos_building" => "cards",
"pos_level" => "nek",
"pos_area" => "2042",
Expand Down
4 changes: 2 additions & 2 deletions spec/publishing/mqtt_publisher_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ module PlaceOS::Source
status_event = Mappings.new(state).status_events?("mod-1234", "power").not_nil!.first
key = MqttPublisher.generate_key(status_event)
key.should_not be_nil
key.not_nil!.should eq "placeos/org-donor/state/cards/nek/2042/cs-9445/12345/M'Odule/1/power"
key.not_nil!.should eq "placeos/org-donor/state/_/cards/nek/2042/cs-9445/12345/M'Odule/1/power"
end

it "doesn't create topics for Modules without a top-level scope Zone" do
Expand Down Expand Up @@ -89,7 +89,7 @@ module PlaceOS::Source

key = MqttPublisher.generate_key(status_event)
key.should_not be_nil
key.not_nil!.should eq "placeos/org-donor/state/_/nek/2042/cs-9445/12345/M'Odule/1/power"
key.not_nil!.should eq "placeos/org-donor/state/_/_/nek/2042/cs-9445/12345/M'Odule/1/power"
end

it "generates a metadata key" do
Expand Down
7 changes: 4 additions & 3 deletions spec/router/control_system_router_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ module PlaceOS::Source
cs.zones = zones.compact_map &.id
Router::ControlSystem.system_zones(cs, zones).should eq({
"org" => "zone-0",
"building" => "zone-1",
"level" => "zone-2",
"area" => "zone-3",
"region" => "zone-1",
"building" => "zone-2",
"level" => "zone-3",
"area" => "zone-4",
})
end
end
Expand Down
29 changes: 28 additions & 1 deletion spec/status_events_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,36 @@ module PlaceOS::Source

key = MqttPublisher.generate_key(message.data)

key.should eq "placeos/org-donor/state/cards/nek/2042/cs-9445/12345/M'Odule/1/#{status_key}"
key.should eq "placeos/org-donor/state/_/cards/nek/2042/cs-9445/12345/M'Odule/1/#{status_key}"
message.payload.should eq expected_payload("on")
events.stop
end

it "overwrites and keep only a single copy of unprocessed event" do
module_id = "mod-hello_hello"
status_key = "power"
mock_mappings_state = mock_state(module_id: module_id)

mock_mappings = Mappings.new(mock_mappings_state)
mock_publisher_manager = MockManager.new
managers : Array(PlaceOS::Source::PublisherManager) = [mock_publisher_manager] of PlaceOS::Source::PublisherManager

events = StatusEvents.new(mock_mappings, managers)
spawn(same_thread: true) { events.start }

sleep 0.1

Redis.open(url: REDIS_URL) do |client|
client.publish("status/#{module_id}/#{status_key}", expected_payload("on"))
client.publish("status/#{module_id}/#{status_key}", expected_payload("off"))
end

sleep 0.1
mock_publisher_manager.messages.size.should eq(1)
message = mock_publisher_manager.messages.first?
message.should_not be_nil
message = message.not_nil!
message.payload.should eq expected_payload("off")
end
end
end
5 changes: 3 additions & 2 deletions src/source/publishing/influx_publisher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,13 @@ module PlaceOS::Source
end

protected def self.build_custom_point(measurement, data, fields, local_tags, compacted, timestamp, ts_map, ts_tag_keys)
measurement_value = measurement
# Add the fields
local_fields = fields.dup
compacted.each do |sub_key, value|
sub_key = (ts_map[sub_key]? || sub_key).gsub(/\W/, '_')
if sub_key == "measurement" && value.is_a?(String)
measurement = value
measurement_value = value
else
local_fields[sub_key] = value
end
Expand All @@ -218,7 +219,7 @@ module PlaceOS::Source
end

Flux::Point.new!(
measurement: measurement,
measurement: measurement_value,
timestamp: timestamp,
tags: local_tags,
pos_driver: data.driver_id,
Expand Down
34 changes: 31 additions & 3 deletions src/source/status_events.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require "mutex"
require "redis"
require "simple_retry"

Expand All @@ -18,12 +19,19 @@ module PlaceOS::Source

private property? stopped : Bool = true

private getter sync_lock = Mutex.new(:reentrant)

alias EventKey = NamedTuple(source: Symbol, mod_id: String, status: String)
alias EventValue = NamedTuple(pattern: String, payload: String)
private getter event_container = Hash(EventKey, EventValue).new

def initialize(@mappings : Mappings, @publisher_managers : Array(PublisherManager))
end

def start
self.stopped = false
spawn(same_thread: true) { update_values }
spawn(same_thread: true) { process_events }

SimpleRetry.try_to(
base_interval: 500.milliseconds,
Expand Down Expand Up @@ -65,7 +73,7 @@ module PlaceOS::Source
store.each do |key, value|
status_updated += 1_u64
begin
process_pevent(pattern, module_id, key, value)
synchronize { event_container[{source: :db, mod_id: module_id, status: key}] = {pattern: pattern, payload: value} }
rescue error
Log.error(exception: error) { {
message: "publishing initial state",
Expand Down Expand Up @@ -96,7 +104,21 @@ module PlaceOS::Source
return
end

process_pevent(pattern, module_id, status, payload)
synchronize { event_container[{source: :redis, mod_id: module_id, status: status}] = {pattern: pattern, payload: payload} }
end

protected def process_events
loop do
break if stopped?
task = synchronize { event_container.shift? }
unless task
sleep 0.1
next
end
key = task.first
value = task.last
process_pevent(value[:pattern], key[:mod_id], key[:status], value[:payload]) rescue nil
end
end

protected def process_pevent(pattern : String, module_id : String, status : String, payload : String)
Expand All @@ -113,7 +135,7 @@ module PlaceOS::Source
message = Publisher::Message.new(event, payload)
publisher_managers.each do |manager|
Log.trace { "broadcasting message to #{manager.class}" }
spawn { manager.broadcast(message) }
manager.broadcast(message)
end
end
end
Expand All @@ -126,5 +148,11 @@ module PlaceOS::Source
protected def self.new_redis
Redis.new(url: REDIS_URL)
end

private def synchronize(&)
sync_lock.synchronize do
yield
end
end
end
end
Loading