Skip to content

Commit

Permalink
Basic implementation of SeedLink DATA command
Browse files Browse the repository at this point in the history
  • Loading branch information
bclswl0827 committed Mar 11, 2024
1 parent c37bcb6 commit 7d6a93f
Show file tree
Hide file tree
Showing 24 changed files with 161 additions and 135 deletions.
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/feature_request.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Feature request
about: Suggest an idea for this project
---

### Is your feature request related to a problem? Please describe.
### Is your feature request related to a problem? Please describe

<!-- Add a clear and concise description of what the problem is. E.g. *I'm always frustrated when [...]* -->

Expand Down
15 changes: 8 additions & 7 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
**Description**
### Description

<!--
Please explain the changes you made here.
If the feature changes current behaviour, explain why your solution is better.
Please explain the changes you made here.
If the feature changes current behaviour, explain why your solution is better.
-->

Before submitting your PR, please indicate which issues (in any of the repos) are either fixed or closed by this PR. See [GitHub Help: Closing issues using keywords](https://help.github.com/articles/closing-issues-via-commit-messages/).
Expand All @@ -12,9 +13,9 @@ Before submitting your PR, please indicate which issues (in any of the repos) ar
- [ ] DO make sure that related issues are opened in other repositories. I.e., the AnyShake Explorer, hardware RC filter cutoff frequency need to be updated accordingly.
- [ ] AVOID breaking the continuous integration build.

**Further comments**
<!--
If this is a relatively large or complex change, kick off the discussion by explaining why you chose the solution you did, what alternatives you considered, etc.
## Further comments

:heart: Thank you!
<!--
If this is a relatively large or complex change, kick off the discussion by explaining why you chose the solution you did, what alternatives you considered, etc.
:heart: Thank you!
-->
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

Starting from v2.2.5, all notable changes to this project will be documented in this file.

## v2.11.2

- Code style improvements again
- Basic implementation of SeedLink DATA command
- Fix frontend issue where the input component does not update its value

## v2.11.1

- Some frontend code style improvements
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v2.11.1
v2.11.2
2 changes: 1 addition & 1 deletion build/assets/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"lifecycle": 10
},
"seedlink_settings": {
"enable": false,
"enable": true,
"host": "0.0.0.0",
"port": 18000,
"duration": 86400
Expand Down
21 changes: 20 additions & 1 deletion driver/seedlink/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,34 @@ package seedlink

import (
"net"
"strconv"

"github.com/anyshake/observer/feature"
"github.com/anyshake/observer/utils/duration"
)

type DATA struct{}

// Callback of "DATA" command, implements SeedLinkCommandCallback interface
func (*DATA) Callback(sl *SeedLinkGlobal, cl *SeedLinkClient, options *feature.FeatureOptions, streamer SeedLinkStreamer, conn net.Conn, args ...string) error {
_, err := conn.Write([]byte(RES_ERR))
switch len(args) {
case 3:
fallthrough
case 2:
fallthrough
case 1:
seq, err := strconv.ParseInt(args[0], 16, 64)
if err != nil {
conn.Write([]byte(RES_ERR))
return err
}
cl.Sequence = seq + 1
cl.StartTime, _ = duration.Timestamp(options.Status.System.Offset)
default:
_, err := conn.Write([]byte(RES_ERR))
return err
}
_, err := conn.Write([]byte(RES_OK))
return err
}

Expand Down
51 changes: 19 additions & 32 deletions driver/seedlink/end.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,46 +8,31 @@ import (

"github.com/anyshake/observer/feature"
"github.com/anyshake/observer/publisher"
"github.com/anyshake/observer/utils/text"
"github.com/ostafen/clover/v2/query"
)

type END struct{}

// Callback of "END" command, implements SeedLinkCommandCallback interface
func (*END) Callback(sl *SeedLinkGlobal, cl *SeedLinkClient, options *feature.FeatureOptions, streamer SeedLinkStreamer, conn net.Conn, args ...string) error {
cl.StreamMode = true // Enter stream mode
var (
seqNum int64 = 0
channels = cl.Channels
location = cl.Location
endTime = cl.EndTime
startTime = cl.StartTime
database = sl.SeedLinkBuffer.Database
collection = sl.SeedLinkBuffer.Collection
station = text.TruncateString(cl.Station, 5)
network = text.TruncateString(cl.Network, 2)
)

if startTime.IsZero() {
func (*END) Callback(sl *SeedLinkGlobal, client *SeedLinkClient, options *feature.FeatureOptions, streamer SeedLinkStreamer, conn net.Conn, args ...string) error {
if client.StartTime.IsZero() {
_, err := conn.Write([]byte(RES_ERR))
return err
}

records, err := database.FindAll(
query.NewQuery(collection).
Where(query.Field("ts").
Gt(startTime.UnixMilli()).
And(query.Field("ts").
Lt(endTime.UnixMilli()),
),
),
)
// Query from buffer database
records, err := sl.SeedLinkBuffer.Database.FindAll(query.NewQuery(sl.SeedLinkBuffer.Collection).
Where(query.Field("ts").Gt(client.StartTime.UTC().UnixMilli()).
And(query.Field("ts").Lt(client.EndTime.UTC().UnixMilli())),
))
if err != nil {
conn.Write([]byte(RES_ERR))
return err
}

// Enter stream mode
client.Streaming = true

for _, record := range records {
var recordMap map[string]any
record.Unmarshal(&recordMap)
Expand All @@ -56,16 +41,16 @@ func (*END) Callback(sl *SeedLinkGlobal, cl *SeedLinkClient, options *feature.Fe
"EHE": recordMap["ehe"].(string),
"EHN": recordMap["ehn"].(string),
}
for _, channel := range channels {
for _, channel := range client.Channels {
data, ok := channelMap[channel]
if !ok {
continue
}
var (
timestamp = int64(recordMap["ts"].(float64))
bufTime = time.UnixMilli(timestamp)
bufTime = time.UnixMilli(timestamp).UTC()
)
if bufTime.After(startTime) && bufTime.Before(endTime) {
if bufTime.After(client.StartTime.UTC()) && bufTime.Before(client.EndTime.UTC()) {
var countData []int32
for _, v := range strings.Split(data, "|") {
intData, err := strconv.Atoi(v)
Expand All @@ -74,7 +59,9 @@ func (*END) Callback(sl *SeedLinkGlobal, cl *SeedLinkClient, options *feature.Fe
}
countData = append(countData, int32(intData))
}
err := SendSLPacket(conn, countData, timestamp, &seqNum, network, station, channel, location)
err := SendSLPacket(conn, client, SeedLinkPacket{
Channel: channel, Timestamp: timestamp, Count: countData,
})
if err != nil {
return err
}
Expand All @@ -84,16 +71,16 @@ func (*END) Callback(sl *SeedLinkGlobal, cl *SeedLinkClient, options *feature.Fe

// Subscribe to the publisher
go publisher.Subscribe(
&options.Status.Geophone, &cl.StreamMode,
&options.Status.Geophone, &client.Streaming,
func(gp *publisher.Geophone) error {
return streamer(gp, conn, channels, network, station, location, &seqNum)
return streamer(conn, client, gp)
},
)

return nil
}

// Fallback of "END" command, implements SeedLinkCommandCallback interface
func (*END) Fallback(sl *SeedLinkGlobal, cl *SeedLinkClient, options *feature.FeatureOptions, conn net.Conn, args ...string) {
func (*END) Fallback(sl *SeedLinkGlobal, client *SeedLinkClient, options *feature.FeatureOptions, conn net.Conn, args ...string) {
conn.Close()
}
34 changes: 17 additions & 17 deletions driver/seedlink/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,34 @@ import (
"github.com/bclswl0827/mseedio"
)

func SendSLPacket(conn net.Conn, count []int32, ts int64, seq *int64, network, station, channel, location string) error {
func SendSLPacket(conn net.Conn, client *SeedLinkClient, data SeedLinkPacket) error {
// Create data chunks to adapt to SeedLink packet size
var countGroup [][]int32
if len(count) > CHUNK_SIZE {
for i := 0; i < len(count); i += CHUNK_SIZE {
if i+CHUNK_SIZE > len(count) {
countGroup = append(countGroup, count[i:])
if len(data.Count) > CHUNK_SIZE {
for i := 0; i < len(data.Count); i += CHUNK_SIZE {
if i+CHUNK_SIZE > len(data.Count) {
countGroup = append(countGroup, data.Count[i:])
} else {
countGroup = append(countGroup, count[i:i+CHUNK_SIZE])
countGroup = append(countGroup, data.Count[i:i+CHUNK_SIZE])
}
}
} else {
countGroup = append(countGroup, count)
countGroup = append(countGroup, data.Count)
}

dataSpanMs := 1000 / float64(len(count))
dataSpanMs := 1000 / float64(len(data.Count))
for i, c := range countGroup {
// Generate MiniSEED record
var miniseed mseedio.MiniSeedData
miniseed.Init(mseedio.STEIM2, mseedio.MSBFIRST)
err := miniseed.Append(c, &mseedio.AppendOptions{
StationCode: station,
LocationCode: location,
ChannelCode: channel,
NetworkCode: network,
SampleRate: float64(len(count)),
SequenceNumber: fmt.Sprintf("%06d", *seq),
StartTime: time.UnixMilli(ts + int64(float64(i*CHUNK_SIZE)*dataSpanMs)).UTC(),
ChannelCode: data.Channel,
StationCode: client.Station,
LocationCode: client.Location,
NetworkCode: client.Network,
SampleRate: float64(len(data.Count)),
SequenceNumber: fmt.Sprintf("%06d", client.Sequence),
StartTime: time.UnixMilli(data.Timestamp + int64(float64(i*CHUNK_SIZE)*dataSpanMs)).UTC(),
})
if err != nil {
return err
Expand All @@ -49,7 +49,7 @@ func SendSLPacket(conn net.Conn, count []int32, ts int64, seq *int64, network, s
}

// Prepend and send SeedLink sequence number
slSeq := []byte(fmt.Sprintf("SL%06X", *seq))
slSeq := []byte(fmt.Sprintf("SL%06X", client.Sequence))
_, err = conn.Write(slSeq)
if err != nil {
return err
Expand All @@ -61,7 +61,7 @@ func SendSLPacket(conn net.Conn, count []int32, ts int64, seq *int64, network, s
return err
}

*seq++
client.Sequence++
}

return nil
Expand Down
5 changes: 3 additions & 2 deletions driver/seedlink/station.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"net"

"github.com/anyshake/observer/feature"
"github.com/anyshake/observer/utils/text"
)

type STATION struct{}

// Callback of "STATION <...> <...>" command, implements SeedLinkCommandCallback interface
func (*STATION) Callback(sl *SeedLinkGlobal, cl *SeedLinkClient, options *feature.FeatureOptions, streamer SeedLinkStreamer, conn net.Conn, args ...string) error {
cl.Station = args[0]
cl.Network = args[1]
cl.Station = text.TruncateString(args[0], 5)
cl.Network = text.TruncateString(args[1], 2)
_, err := conn.Write([]byte(RES_OK))
return err
}
Expand Down
24 changes: 16 additions & 8 deletions driver/seedlink/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,24 @@ type SeedLinkCommand struct {

// SeedLink client state
type SeedLinkClient struct {
StreamMode bool
Network string
Station string
Location string
Channels []string
StartTime time.Time
EndTime time.Time
Streaming bool
Sequence int64
Network string
Station string
Location string
Channels []string
StartTime time.Time
EndTime time.Time
}

type SeedLinkStreamer func(pub *publisher.Geophone, conn net.Conn, channels []string, network, station, location string, seqNum *int64) error
// SeedLink data packet model
type SeedLinkPacket struct {
Count []int32
Channel string
Timestamp int64
}

type SeedLinkStreamer func(conn net.Conn, client *SeedLinkClient, pub *publisher.Geophone) error

// Interface for SeedLink command callback & fallback
type SeedLinkCommandCallback interface {
Expand Down
7 changes: 1 addition & 6 deletions feature/seedlink/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *SeedLink) handleCommand(options *feature.FeatureOptions, slGlobal *seed
if clientMessage != "END" &&
// An exception for INFO command
!strings.Contains(clientMessage, "INFO ") {
slClient.StreamMode = false
slClient.Streaming = false
}

// Check if command is whitelisted
Expand Down Expand Up @@ -102,10 +102,5 @@ func (s *SeedLink) handleCommand(options *feature.FeatureOptions, slGlobal *seed
cmd.Fallback(slGlobal, slClient, options, conn)
}
}

// Clear selected channels
if clientMessage == "END" {
slClient.Channels = []string{}
}
}
}
2 changes: 1 addition & 1 deletion feature/seedlink/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func (s *SeedLink) InitClient(slClient *seedlink.SeedLinkClient) {
slClient.StreamMode = false
slClient.Streaming = false
}

func (s *SeedLink) InitGlobal(slGlobal *seedlink.SeedLinkGlobal, currentTime time.Time, station, network, location string, bufferDuration int) error {
Expand Down
15 changes: 11 additions & 4 deletions feature/seedlink/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ import (
"github.com/anyshake/observer/publisher"
)

func (s *SeedLink) handleMessage(gp *publisher.Geophone, conn net.Conn, channels []string, network, station, location string, seqNum *int64) error {
func (s *SeedLink) handleMessage(conn net.Conn, client *seedlink.SeedLinkClient, gp *publisher.Geophone) error {
if len(client.Channels) == 0 {
return fmt.Errorf("no channels selected")
}

var (
ts = gp.TS
ehz = gp.EHZ
ehe = gp.EHE
ehn = gp.EHN
Expand All @@ -18,15 +23,17 @@ func (s *SeedLink) handleMessage(gp *publisher.Geophone, conn net.Conn, channels
}
)

for _, channel := range channels {
data, ok := chMap[channel]
for _, channel := range client.Channels {
countData, ok := chMap[channel]
if !ok {
conn.Write([]byte(seedlink.RES_ERR))
err := fmt.Errorf("channel %s not found", channel)
return err
}

err := seedlink.SendSLPacket(conn, data, gp.TS, seqNum, network, station, channel, location)
err := seedlink.SendSLPacket(conn, client, seedlink.SeedLinkPacket{
Channel: channel, Timestamp: ts, Count: countData,
})
if err != nil {
return err
}
Expand Down

0 comments on commit 7d6a93f

Please sign in to comment.