Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Composable messages #1

Merged
merged 2 commits into from
Nov 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 42 additions & 18 deletions commands.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
//go:generate stringer -type=MsgType
//go:generate stringer -type=CommandType

package chik

// CommandType is an enum of all the possible commands
type CommandType uint16
import (
"encoding/json"

"github.com/Sirupsen/logrus"
)

// Action is an enum of all the possible commands
type Action uint16

// EnabledDays days on which a TimedCommand is enabled
// used as binary flag
type EnabledDays uint16

// MsgType represent the type of the current message
type MsgType uint8
// CommandType represent the type of the current message
type CommandType uint8

// Message types used in various plugins
const (
HeartbeatType MsgType = iota
HeartbeatType CommandType = iota
DigitalCommandType
TimerCommandType
StatusRequestCommandType
Expand All @@ -28,14 +34,14 @@ const (

// Available command types
const (
SET CommandType = iota // Turn on/activate something
RESET // Turn off/deactivate something
TOGGLE // Toggle something from on/activated to off/deactivated
PUSH // Used to define actions shuch as the one of pushing a button
GET // Retrieve a value
SUNSET // Sunset related command
SUNRISE // Sunrise Related command
DELETE // Delete a value
SET Action = iota // Turn on/activate something
RESET // Turn off/deactivate something
TOGGLE // Toggle something from on/activated to off/deactivated
PUSH // Used to define actions shuch as the one of pushing a button
GET // Retrieve a value
SUNSET // Sunset related command
SUNRISE // Sunrise Related command
DELETE // Delete a value
)

// Days of the week
Expand All @@ -50,6 +56,23 @@ const (
Saturday EnabledDays = 0x40
)

// Command is the root object in every message
type Command struct {
Type CommandType
Data json.RawMessage
}

// NewCommand creates a command given a type and his content
func NewCommand(t CommandType, data interface{}) *Command {
body, err := json.Marshal(data)
if err != nil {
logrus.Error("Cannot compose command: ", err)
return nil
}

return &Command{t, json.RawMessage(body)}
}

// SimpleCommand is used to send a basic request regarding the whole system
type SimpleCommand struct {
Command JSIntArr `json:",JSIntArr"`
Expand All @@ -66,10 +89,11 @@ type DigitalCommand struct {
// if TimerID is zero it means it is a new timer. otherwise it should edit the timer with
// the corresponding id
type TimedCommand struct {
DigitalCommand `mapstructure:",squash"`
TimerID uint16 `json:",int,omitempty"`
Time JSONTime `json:",string,omitempty"`
Repeat EnabledDays `json:",int,omitempty"`
Action JSIntArr `json:",JSIntArr"`
TimerID uint16 `json:",int,omitempty"`
Time JSONTime `json:",string,omitempty"`
Repeat EnabledDays `json:",int,omitempty"`
Command *Command
}

// Status is the response to a status request Command
Expand Down
16 changes: 16 additions & 0 deletions commandtype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 2 additions & 10 deletions controller.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package chik

import (
"encoding/json"
"net"
"sync"
"time"

"github.com/Sirupsen/logrus"
"github.com/cskr/pubsub"
"github.com/gofrs/uuid"
)
Expand Down Expand Up @@ -56,15 +54,9 @@ func (c *Controller) Disconnect() {
}

// Reply sends back a reply message
func (c *Controller) Reply(request *Message, replyType MsgType, replyContent interface{}) {
rawReply, err := json.Marshal(replyContent)
if err != nil {
logrus.Error("Cannot marshal status message")
return
}

func (c *Controller) Reply(request *Message, replyType CommandType, replyContent interface{}) {
sender := request.SenderUUID()
reply := NewMessage(replyType, sender, rawReply)
reply := NewMessage(sender, NewCommand(replyType, replyContent))

// If sender is null the message is internal, otherwise it needs to go out
destination := "out"
Expand Down
5 changes: 3 additions & 2 deletions handlers/forwarding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ func TestForwarding(t *testing.T) {
logrus.Debug("Sender:", client1.id, "receiver:", client2.id)

forwarded := client1.remote.PubSub.Sub(chik.DigitalCommandType.String())
client1.remote.PubSub.Pub(chik.NewMessage(chik.HeartbeatType, uuid.Nil, []byte("")), "out")
client2.remote.PubSub.Pub(chik.NewMessage(chik.DigitalCommandType, client1.id, []byte("Hello")), "out")
client1.remote.PubSub.Pub(chik.NewMessage(uuid.Nil, chik.NewCommand(chik.HeartbeatType, nil)), "out")
time.Sleep(100 * time.Millisecond) // TODO: fix the handshake
client2.remote.PubSub.Pub(chik.NewMessage(client1.id, chik.NewCommand(chik.DigitalCommandType, nil)), "out")

select {
case <-forwarded:
Expand Down
4 changes: 2 additions & 2 deletions handlers/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewHeartBeatHandler(interval time.Duration) chik.Handler {
func (h *heartbeat) sender(controller *chik.Controller) *time.Ticker {
sendHeartBeat := func() {
logrus.Debug("Sending heartbeat")
controller.PubSub.Pub(chik.NewMessage(chik.HeartbeatType, uuid.Nil, []byte{}), "out")
controller.PubSub.Pub(chik.NewMessage(uuid.Nil, chik.NewCommand(chik.HeartbeatType, nil)), "out")
}

ticker := time.NewTicker(h.interval)
Expand Down Expand Up @@ -58,7 +58,7 @@ func (h *heartbeat) Run(controller *chik.Controller) {
for data := range in {
message := data.(*chik.Message)

if message.Type() == chik.HeartbeatType {
if message.Command().Type == chik.HeartbeatType {
atomic.StoreUint32(&h.errors, 0)
}
}
Expand Down
2 changes: 1 addition & 1 deletion handlers/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (h *io) Run(remote *chik.Controller) {
message := data.(*chik.Message)

command := chik.DigitalCommand{}
err := json.Unmarshal(message.Data(), &command)
err := json.Unmarshal(message.Command().Data, &command)
if err != nil {
logrus.Error("cannot decode digital command: ", err)
continue
Expand Down
2 changes: 1 addition & 1 deletion handlers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (h *handler) Run(remote *chik.Controller) {
for rawMessage := range incoming {
message := rawMessage.(*chik.Message)
command := chik.SimpleCommand{}
err := json.Unmarshal(message.Data(), &command)
err := json.Unmarshal(message.Command().Data, &command)
if err != nil || len(command.Command) != 1 || command.Command[0] != chik.GET {
continue
}
Expand Down
53 changes: 17 additions & 36 deletions handlers/sunset.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,14 @@ func NewSunset() chik.Handler {
func requestTimerStatus(remote *chik.Controller) []chik.TimedCommand {
result := make([]chik.TimedCommand, 0)
sub := remote.PubSub.SubOnce(chik.StatusReplyCommandType.String())
statusCommand, err := json.Marshal(chik.SimpleCommand{Command: []chik.CommandType{chik.GET}})
if err != nil {
logrus.Error(err)
return result
}
statusRequest := chik.NewMessage(chik.StatusRequestCommandType, uuid.Nil, statusCommand)
statusCommand := chik.SimpleCommand{Command: []chik.Action{chik.GET}}
statusRequest := chik.NewMessage(uuid.Nil, chik.NewCommand(chik.StatusRequestCommandType, statusCommand))
remote.PubSub.Pub(statusRequest, chik.StatusRequestCommandType.String())
select {
case statusRaw := <-sub:
var status map[string]interface{}
json.Unmarshal(statusRaw.(*chik.Message).Data(), &status)
err = chik.Decode(status["timers"], &result)
json.Unmarshal(statusRaw.(*chik.Message).Command().Data, &status)
err := chik.Decode(status["timers"], &result)
if err != nil {
logrus.Error(err)
}
Expand All @@ -97,27 +93,22 @@ func (h *suntime) updateTimers(remote *chik.Controller) {
timers := requestTimerStatus(remote)
logrus.Debug("Timers: ", timers)

for _, t := range timers {
for _, timer := range timers {
send := false

if funk.Contains(t.Command, chik.SUNRISE) {
t.Time = chik.JSONTime{h.cache.sunrise.In(time.Local)}
if funk.Contains(timer.Action, chik.SUNRISE) {
timer.Time = chik.JSONTime{h.cache.sunrise.In(time.Local)}
send = true
}

if funk.Contains(t.Command, chik.SUNSET) {
t.Time = chik.JSONTime{h.cache.sunset.In(time.Local)}
if funk.Contains(timer.Action, chik.SUNSET) {
timer.Time = chik.JSONTime{h.cache.sunset.In(time.Local)}
send = true
}

if send {
logrus.Debug("Updating timer according to sun time")
timerCommand, err := json.Marshal(t)
if err != nil {
logrus.Error(err)
return
}
timerChangeRequest := chik.NewMessage(chik.TimerCommandType, uuid.Nil, timerCommand)
timerChangeRequest := chik.NewMessage(uuid.Nil, chik.NewCommand(chik.TimerCommandType, timer))
remote.PubSub.Pub(timerChangeRequest, chik.TimerCommandType.String())
}
}
Expand Down Expand Up @@ -206,21 +197,16 @@ func (h *suntime) fetchSunTime() {
}

func (h *suntime) addSunTimer(remote *chik.Controller, timer chik.TimedCommand) {
if funk.Contains(timer.Command, chik.SUNRISE) {
if funk.Contains(timer.Action, chik.SUNRISE) {
timer.Time = chik.JSONTime{h.cache.sunrise.In(time.Local)}
} else if funk.Contains(timer.Command, chik.SUNSET) {
} else if funk.Contains(timer.Action, chik.SUNSET) {
timer.Time = chik.JSONTime{h.cache.sunset.In(time.Local)}
} else {
logrus.Error("Command does not contain sunrise or sunset")
return
}

rawCommand, err := json.Marshal(timer)
if err != nil {
logrus.Error(err)
return
}
message := chik.NewMessage(chik.TimerCommandType, uuid.Nil, rawCommand)
message := chik.NewMessage(uuid.Nil, chik.NewCommand(chik.TimerCommandType, timer))
remote.PubSub.Pub(message, chik.TimerCommandType.String())
}

Expand All @@ -229,12 +215,7 @@ func (h *suntime) editSunTimer(remote *chik.Controller, timer chik.TimedCommand)
}

func (h *suntime) removeSunTimer(remote *chik.Controller, timer chik.TimedCommand) {
rawCommand, err := json.Marshal(timer)
if err != nil {
logrus.Error(err)
return
}
message := chik.NewMessage(chik.TimerCommandType, uuid.Nil, rawCommand)
message := chik.NewMessage(uuid.Nil, chik.NewCommand(chik.TimerCommandType, timer))
remote.PubSub.Pub(message, chik.TimerCommandType.String())
}

Expand All @@ -246,18 +227,18 @@ func (h *suntime) Run(remote *chik.Controller) {
for rawMessage := range sub {
message := rawMessage.(*chik.Message)
var command chik.TimedCommand
err := json.Unmarshal(message.Data(), &command)
err := json.Unmarshal(message.Command().Data, &command)
if err != nil {
logrus.Error("Command parsing failed")
continue
}

if len(command.Command) < 2 {
if len(command.Action) < 2 {
logrus.Warning("Unexpected command length, skipping")
continue
}

if funk.Contains(command.Command, chik.SET) || funk.Contains(command.Command, chik.RESET) {
if funk.Contains(command.Action, chik.SET) {
if command.TimerID == 0 {
logrus.Debug("Adding a new sun timer: ", command)
h.addSunTimer(remote, command)
Expand Down
21 changes: 4 additions & 17 deletions handlers/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,7 @@ func (h *timers) timeTicker(remote *chik.Controller) *time.Ticker {
lastMinute = tick.Minute()
for _, timer := range h.timers {
if timer.Time.Hour() == tick.Hour() && timer.Time.Minute() == tick.Minute() {
timedCommand := chik.DigitalCommand{Pin: timer.Pin}
if funk.Contains(timer.Command, chik.SET) {
timedCommand.Command = []chik.CommandType{chik.SET}
} else {
timedCommand.Command = []chik.CommandType{chik.RESET}
}

command, err := json.Marshal(timedCommand)
if err != nil {
logrus.Fatal("cannot marshal a digitalcommand: ", err)
}
remote.PubSub.Pub(
chik.NewMessage(chik.DigitalCommandType, uuid.Nil, command),
chik.DigitalCommandType.String())
remote.PubSub.Pub(chik.NewMessage(uuid.Nil, timer.Command), timer.Command.Type.String())
}
}
}
Expand Down Expand Up @@ -111,13 +98,13 @@ func (h *timers) Run(remote *chik.Controller) {
for rawMessage := range incoming {
message := rawMessage.(*chik.Message)
command := chik.TimedCommand{}
err := json.Unmarshal(message.Data(), &command)
err := json.Unmarshal(message.Command().Data, &command)
if err != nil {
logrus.Warn("Failed decoding timer: ", err)
continue
}

if funk.Contains(command.Command, chik.SET) || funk.Contains(command.Command, chik.RESET) {
if funk.Contains(command.Action, chik.SET) {
if command.Time.IsZero() {
logrus.Warning("Cannot add/edit a timer with a null time")
continue
Expand All @@ -130,7 +117,7 @@ func (h *timers) Run(remote *chik.Controller) {
continue
}

if funk.Contains(command.Command, chik.DELETE) {
if funk.Contains(command.Action, chik.DELETE) {
h.deleteTimer(command)
continue
}
Expand Down
4 changes: 2 additions & 2 deletions handlers/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (h *updater) Run(remote *chik.Controller) {
in := remote.PubSub.Sub(chik.VersionRequestCommandType.String())
for data := range in {
message := data.(*chik.Message)
command := chik.SimpleCommand{}
err := json.Unmarshal(message.Data(), &command)
var command chik.SimpleCommand
err := json.Unmarshal(message.Command().Data, &command)
if err != nil {
logrus.Warn("Unexpected message")
continue
Expand Down
Loading