Skip to content

Commit

Permalink
n2k-reader can filter by source, embbed canboat database into n2k-rea…
Browse files Browse the repository at this point in the history
…der, addressmapper behaviour is configurable
  • Loading branch information
aldas committed Jul 18, 2023
1 parent 520158f commit f14ebf9
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 33 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@

# Ignore Canboat PGNs file https://github.com/canboat/canboat/blob/master/docs/canboat.json
canboat/testdata/canboat.json
cmd/n2kreader/canboat.json

n2k-reader*
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ help: ## Display this help screen

download-canboat-pgns: # Downloads Canboat PNG definitions (pgns.json) from Canboat repository
# download canboat v4.10.0 PGNs
@wget -O canboat/testdata/canboat.json https://raw.githubusercontent.com/canboat/canboat/v4.10.0/docs/canboat.json
@wget -O canboat/testdata/canboat.json -O cmd/n2kreader/canboat.json https://raw.githubusercontent.com/canboat/canboat/v4.10.0/docs/canboat.json
15 changes: 14 additions & 1 deletion actisense/eblreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@ import (
//
// 1b 01 <-- start of data frame (ESC+SOH)
//
// 07 95 <-- "95" is maybe row type. Actisense EBL Reader v2.027 says "now has added support for the new CAN-Raw (BST-95) message format that is used for all data logging on Actisense W2K-1"
// 07 95 <-- "95" is maybe frame type. Actisense EBL Reader v2.027 says "now has added support for the new CAN-Raw (BST-95) message format that is used for all data logging on Actisense W2K-1"
// 0e <-- lengths 14 bytes till end
// 28 9a <-- timestamp 39464 (hex 9A28) (little endian)
// 00 01 f8 09 <--- 0x09f80100 = src:0, dst:255, pgn:129025 (1f801), prio:2 (little endian)
// 3d 0d b3 22 48 32 59 0d <-- CAN payload (N2K endian rules), lat(32bit) 22b30d3d = 582159677, lon(32bit) 0d593248 = 223949384
// 1b 0a <-- end of data frame (ESC+LF)
//
// Timestamp is offset from time found in first data frame in file
// Example: first frame in file:
// 1B 01 03 00 10 E7 A7 84 83 D9 01 1B 0A
//
// 03 <--- "03" maybe frame type
// 00 10 E7 A7 84 83 D9 01 <-- 8 byte little endian unsigned number,
const (
// SOH is start of data frame byte for Actisense BST-95 (EBL file created by Actisense W2K-1 device)
SOH = 0x01
Expand Down Expand Up @@ -126,9 +133,15 @@ func (d *EBLFormatDevice) ReadRawMessage(ctx context.Context) (nmea.RawMessage,
if d.config.DebugLogRawMessageBytes && d.config.LogFunc != nil {
d.config.LogFunc("# DEBUG read raw actisense ELB message: %x\n", msg)
}
//if msg[0] == 0x3 { // 0x03 seems to be time since start of day (8 bytes)
// d.config.LogFunc("# TIME: %x\n", msg)
//}
if msg[0] == 0x7 && msg[1] == cmdRAWActisenseMessageReceived { // 0x07+0x95 seems to identify BST-95 message
return fromActisenseBST95Message(msg[2:], now)
}
//if msg[0] != 0x3 && msg[0] != 0x7 { // all other messages
// d.config.LogFunc("# XXX: %x\n", msg)
//}
if d.config.LogFunc != nil {
d.config.LogFunc("# ERROR unknown message type read: %x\n", msg)
}
Expand Down
36 changes: 31 additions & 5 deletions addressmapper/addressmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,21 @@ type ConfigurationInfo struct {
ManufacturerInfo string
}

// Config configures how AddressMapper instance behaves
type Config struct {
// RequestProductInfo decides if Product Info (126996) is requested after processing AddressClaim (60928)
RequestProductInfo bool
// RequestConfigurationInformation decides if Configuration Information (126998) is requested after processing Product Info (126996)
RequestConfigurationInformation bool
// RequestPGNList decides if PGN List (126464) is requested after processing Configuration Information (126998)
RequestPGNList bool
}

type AddressMapper struct {
mutex sync.Mutex

config Config

// when new device is detected we use this channel to send additional PGNs to query information about that node
// ask for device name, product info, configuration info, pgn list
requestsChan chan nmea.RawMessage
Expand All @@ -183,14 +195,28 @@ type AddressMapper struct {
now func() time.Time
}

// NewAddressMapper creates new instance of AddressMapper with default configuration
func NewAddressMapper(nmeaDevice nmea.RawMessageWriter) *AddressMapper {
return NewAddressMapperWithConfig(
nmeaDevice,
Config{
RequestProductInfo: false,
RequestConfigurationInformation: false,
RequestPGNList: false,
},
)
}

// NewAddressMapperWithConfig creates new instance of AddressMapper with given configuration
func NewAddressMapperWithConfig(nmeaDevice nmea.RawMessageWriter, config Config) *AddressMapper {
return &AddressMapper{
mutex: sync.Mutex{},
now: time.Now,

toggleWriteChan: make(chan bool),
requestsChan: make(chan nmea.RawMessage, addressMapperWriteChannelSize), // TODO: messages could come in bursts. 100+ in very short window for broadcasts (dst=255)
requestsChan: make(chan nmea.RawMessage, addressMapperWriteChannelSize),
nmeaDevice: nmeaDevice,
config: config,

knownNodes: make(map[uint64]*Node),
address2node: [255]*busSlot{},
Expand Down Expand Up @@ -233,7 +259,7 @@ func (m *AddressMapper) Run(ctx context.Context) error {
case writeEnabled := <-m.toggleWriteChan:
enabled = writeEnabled
if enabled {
writeTimer.Reset(10 * time.Millisecond)
writeTimer.Reset(40 * time.Millisecond) // throttle sending to 40ms not to overflow the bus
} else {
writeTimer.Stop()
}
Expand Down Expand Up @@ -358,7 +384,7 @@ func (m *AddressMapper) processISOAddressClaim(slot *busSlot, raw nmea.RawMessag
}

// if we already have not requested, then request product info for that device
if m.writeEnabled && slot.productInfoRequested.IsZero() {
if m.writeEnabled && m.config.RequestProductInfo && slot.productInfoRequested.IsZero() {
slot.productInfoRequested = m.now()
m.requestsChan <- createISORequest(nmea.PGNProductInfo, source)
}
Expand All @@ -378,7 +404,7 @@ func (m *AddressMapper) processProductInfo(slot *busSlot, raw nmea.RawMessage) e
slot.node.ValidProductInfo = true

// if we already have not requested, then request configuration info for that node
if m.writeEnabled && slot.configInfoRequested.IsZero() {
if m.writeEnabled && m.config.RequestConfigurationInformation && slot.configInfoRequested.IsZero() {
slot.configInfoRequested = m.now()
m.requestsChan <- createISORequest(nmea.PGNConfigurationInformation, raw.Header.Source)
}
Expand All @@ -398,7 +424,7 @@ func (m *AddressMapper) processConfigurationInfo(slot *busSlot, raw nmea.RawMess
slot.node.ValidConfigurationInfo = true

// if we already have not requested, then request PGN list for that node
if m.writeEnabled && slot.pgnListRequested.IsZero() {
if m.writeEnabled && m.config.RequestPGNList && slot.pgnListRequested.IsZero() {
slot.pgnListRequested = m.now()
m.requestsChan <- createISORequest(nmea.PGNPGNList, raw.Header.Source)
}
Expand Down
126 changes: 113 additions & 13 deletions cmd/n2kreader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"context"
"embed"
"encoding/base64"
"encoding/hex"
"encoding/json"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/aldas/go-nmea-client/socketcan"
"github.com/tarm/serial"
"io"
"io/fs"
"log"
"net"
"os"
Expand All @@ -28,6 +30,9 @@ import (
"time"
)

//go:embed `canboat.json`

Check failure on line 33 in cmd/n2kreader/main.go

View workflow job for this annotation

GitHub Actions / test (1.19, ubuntu-latest)

pattern canboat.json: no matching files found

Check failure on line 33 in cmd/n2kreader/main.go

View workflow job for this annotation

GitHub Actions / test (1.20, ubuntu-latest)

pattern canboat.json: no matching files found
var canboatDB embed.FS

func main() {
printRaw := flag.Bool("raw", false, "prints raw message")
onlyRead := flag.Bool("read-only", false, "only reads device/file and does not write into it")
Expand All @@ -38,6 +43,7 @@ func main() {
inputFormat := flag.String("input-format", "ngt", "in which format packet are read (ngt, n2k-bin, n2k-ascii, n2k-raw-ascii, canboat-raw, ebl)")
deviceAddr := flag.String("device", "/dev/ttyUSB0", "path to Actisense NGT-1 USB device")
pgnsPath := flag.String("pgns", "", "path to Canboat pgns.json file")
sources := flag.String("source", "", "comma separated list of Source addresses to filter")
pgnFilter := flag.String("filter", "", "comma separated list of PGNs to filter")
csvFieldsRaw := flag.String("csv-fields", "", "list of PGNs and their fields to be written in CSV. `129025:time_ms,latitude,longitude;65280:time_ms,manufacturerCode,industryCode`")
outputFormat := flag.String("output-format", "json", "in which format raw and decoded packet should be printed out (json, canboat, hex, base64)")
Expand All @@ -55,11 +61,17 @@ func main() {
var decoder *canboat.Decoder
var fastPacketPGNs []uint32
if !*onlyRaw {
if pgnsPath == nil || *pgnsPath == "" {
log.Fatal("# missing pgns.json path\n")
var canboatDBFS fs.FS
var canboatDBPath string
if pgnsPath != nil && *pgnsPath != "" {
canboatDBFS = os.DirFS(".")
canboatDBPath = *pgnsPath
} else {
canboatDBFS = canboatDB
canboatDBPath = "canboat.json"
}

schema, err := canboat.LoadCANBoatSchema(os.DirFS("."), *pgnsPath)
schema, err := canboat.LoadCANBoatSchema(canboatDBFS, canboatDBPath)
if err != nil {
log.Fatal(err)
}
Expand All @@ -70,14 +82,22 @@ func main() {
}

var err error
var filter []uint32
var filter msgFilters
if pgnFilter != nil && *pgnFilter != "" {
filter, err = string2intSlice(*pgnFilter)
filter, err = parseMsgFilters(*pgnFilter)
if err != nil {
log.Fatalf("invalid pgn filter given, %v\n", err)
}
fmt.Printf("# Using PGN filter: %v\n", filter)
}
var sourceAllowFilter []uint8
if sources != nil && *sources != "" {
sourceAllowFilter, err = string2intSlice[uint8](*sources)
if err != nil {
log.Fatalf("invalid source address filter given, %v\n", err)
}
fmt.Printf("# Using Source address filter: %v\n", filter)
}

var csvFields csvPGNs
isCSV := false
Expand All @@ -87,12 +107,13 @@ func main() {
log.Fatalf("%v\n", err)
}
for _, cf := range csvFields {
filter = append(filter, cf.PGN)
filter = filter.appendPGN(cf.PGN)
}
if len(csvFields) > 0 {
isCSV = true
}
}
sort.Sort(mfSorter(filter))

switch *outputFormat {
case "json", "canboat", "hex", "base64":
Expand Down Expand Up @@ -173,9 +194,9 @@ func main() {
if err := device.Initialize(); err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second) // give some time to "warm up"
}
fmt.Printf("# Starting to read device: %v\n", *deviceAddr)
time.Sleep(1 * time.Second)

isAddressMapperEnabled := noAddressMapper == nil || !*noAddressMapper
var addressMapper *addressmapper.AddressMapper
Expand Down Expand Up @@ -244,7 +265,10 @@ func main() {
}
}

if filter != nil && !contains(filter, rawMessage.Header.PGN) {
if sourceAllowFilter != nil && !contains(sourceAllowFilter, rawMessage.Header.Source) {
continue
}
if !filter.matches(rawMessage.Header) {
continue
}

Expand Down Expand Up @@ -332,7 +356,8 @@ func handleSTDIO(ctx context.Context, device nmea.RawMessageWriter, addressMappe
if line == "" {
continue
}
if strings.HasPrefix(line, "!nodes") {

if strings.HasPrefix(line, "!nodes") && addressMapper != nil {
nodes := addressMapper.Nodes()
sort.Sort(nodesBySrc(nodes))

Expand All @@ -346,8 +371,9 @@ func handleSTDIO(ctx context.Context, device nmea.RawMessageWriter, addressMappe
}
}
continue
} else if strings.HasPrefix(line, "!addr-claim") {
} else if strings.HasPrefix(line, "!addr-claim") && addressMapper != nil {
addressMapper.BroadcastIsoAddressClaimRequest()
continue
}
msg, err := parseLine(line)
if err != nil {
Expand Down Expand Up @@ -439,14 +465,14 @@ func parseUint8(raw string, min int, max int, name string) (uint8, error) {
return uint8(n), nil
}

func string2intSlice(s string) ([]uint32, error) {
result := make([]uint32, 0, 10)
func string2intSlice[T uint8 | uint32](s string) ([]T, error) {
result := make([]T, 0, 10)
for _, p := range strings.Split(s, ",") {
pgn, err := strconv.Atoi(p)
if err != nil {
return nil, err
}
result = append(result, uint32(pgn))
result = append(result, T(pgn))
}
return result, nil
}
Expand All @@ -465,3 +491,77 @@ type nodesBySrc addressmapper.Nodes
func (v nodesBySrc) Len() int { return len(v) }
func (v nodesBySrc) Swap(i, j int) { v[i], v[j] = v[j], v[i] }
func (v nodesBySrc) Less(i, j int) bool { return v[i].Source < v[j].Source }

type msgFilter struct {
PGN uint32
Source uint8
HasSource bool
}

type msgFilters []msgFilter

func parseMsgFilters(s string) (msgFilters, error) {
result := make([]msgFilter, 0)
for _, p := range strings.Split(s, ",") {
parts := strings.Split(p, ":")
pgn, err := strconv.Atoi(parts[0])
if err != nil {
return nil, fmt.Errorf("failed to parse PGN in filter, err: %w", err)
}
f := msgFilter{
PGN: uint32(pgn),
Source: 0,
HasSource: false,
}
if len(parts) > 1 {
tmpSource, err := strconv.ParseUint(parts[1], 10, 8)
if err != nil {
return nil, fmt.Errorf("failed to parse source in filter, err: %w", err)
}
f.Source = uint8(tmpSource)
f.HasSource = true
}
result = append(result, f)
}
return result, nil
}

func (mf msgFilters) appendPGN(pgn uint32) msgFilters {
// in case existing filters already have same PGN with source filter we do not add thing PGN
for _, f := range mf {
if f.PGN == pgn && f.HasSource {
return mf
}
}

return append(mf, msgFilter{PGN: pgn})
}

func (mf *msgFilters) matches(header nmea.CanBusHeader) bool {
if mf == nil || len(*mf) == 0 {
return true // no filter means match everything
}
for _, f := range *mf {
if f.PGN != header.PGN {
continue
}
if !f.HasSource {
return true
}
if f.Source == header.Source {
return true
}
}
return false
}

type mfSorter msgFilters

func (mf mfSorter) Len() int { return len(mf) }
func (mf mfSorter) Swap(i, j int) { mf[i], mf[j] = mf[j], mf[i] }
func (mf mfSorter) Less(i, j int) bool {
if mf[i].PGN == mf[j].PGN {
return mf[i].Source > mf[j].Source
}
return mf[i].PGN > mf[j].PGN
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ module github.com/aldas/go-nmea-client
go 1.19

require (
github.com/stretchr/testify v1.8.2
github.com/stretchr/testify v1.8.4
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07
golang.org/x/sys v0.6.0
golang.org/x/sys v0.10.0
)

require (
Expand Down
Loading

0 comments on commit f14ebf9

Please sign in to comment.