Skip to content

Commit

Permalink
Merge pull request #3547 from TheThingsNetwork/fix/issue/3515-lbs-rtt
Browse files Browse the repository at this point in the history
Fix RTT handling for LBS gateways
  • Loading branch information
KrishnaIyer committed Dec 8, 2020
2 parents 05fbbc6 + b1c2bc4 commit d6f0d02
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ For details about compatibility between different releases, see the **Commitment

- Simulated uplinks visibility in webhook messages.
- Retransmission handling.
- RTT recording for LBS gateways. The maximum round trip delay for RTT calculation is configurable via `--gs.basic-station.max-valid-round-trip-delay`.

### Security

Expand Down
7 changes: 4 additions & 3 deletions cmd/internal/shared/gatewayserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ var DefaultGatewayServerConfig = gatewayserver.Config{
PublicTLSAddress: fmt.Sprintf("%s:8882", shared.DefaultPublicHost),
},
BasicStation: gatewayserver.BasicStationConfig{
Config: ws.DefaultConfig,
Listen: ":1887",
ListenTLS: ":8887",
Config: ws.DefaultConfig,
MaxValidRoundTripDelay: 10 * time.Second,
Listen: ":1887",
ListenTLS: ":8887",
},
UpdateConnectionStatsDebounceTime: 3 * time.Second,
}
7 changes: 4 additions & 3 deletions pkg/gatewayserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ type UDPConfig struct {
// BasicStationConfig defines the LoRa Basics Station configuration of the Gateway Server.
type BasicStationConfig struct {
ws.Config `name:",squash"`
FallbackFrequencyPlanID string `name:"fallback-frequency-plan-id" description:"Fallback frequency plan ID for non-registered gateways"`
Listen string `name:"listen" description:"Address for the Basic Station frontend to listen on"`
ListenTLS string `name:"listen-tls" description:"Address for the Basic Station frontend to listen on (with TLS)"`
MaxValidRoundTripDelay time.Duration `name:"max-valid-round-trip-delay" description:"Maximum valid round trip delay to qualify for RTT calculations"`
FallbackFrequencyPlanID string `name:"fallback-frequency-plan-id" description:"Fallback frequency plan ID for non-registered gateways"`
Listen string `name:"listen" description:"Address for the Basic Station frontend to listen on"`
ListenTLS string `name:"listen-tls" description:"Address for the Basic Station frontend to listen on (with TLS)"`
}

// Config represents the Gateway Server configuration.
Expand Down
2 changes: 1 addition & 1 deletion pkg/gatewayserver/gatewayserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func New(c *component.Component, conf *Config, opts ...Option) (gs *GatewayServe
}{
{
Name: "basicstation",
Formatter: lbslns.NewFormatter(),
Formatter: lbslns.NewFormatter(conf.BasicStation.MaxValidRoundTripDelay),
listenerConfig: listenerConfig{
fallbackFreqPlanID: conf.BasicStation.FallbackFrequencyPlanID,
listen: conf.BasicStation.Listen,
Expand Down
10 changes: 7 additions & 3 deletions pkg/gatewayserver/io/ws/lbslns/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package lbslns

import (
"fmt"
"time"

"go.thethings.network/lorawan-stack/v3/pkg/errors"
"go.thethings.network/lorawan-stack/v3/pkg/gatewayserver/io"
Expand All @@ -33,12 +34,15 @@ type State struct {
}

type lbsLNS struct {
tokens io.DownlinkTokens
maxRoundTripDelay time.Duration
tokens io.DownlinkTokens
}

// NewFormatter returns a new LoRa Basic Station LNS formatter.
func NewFormatter() ws.Formatter {
return &lbsLNS{}
func NewFormatter(maxRoundTripDelay time.Duration) ws.Formatter {
return &lbsLNS{
maxRoundTripDelay: maxRoundTripDelay,
}
}

func (f *lbsLNS) Endpoints() ws.Endpoints {
Expand Down
26 changes: 18 additions & 8 deletions pkg/gatewayserver/io/ws/lbslns/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,9 @@ func (f *lbsLNS) HandleUp(ctx context.Context, raw []byte, ids ttnpb.GatewayIden
"upstream_type", typ,
))

recordTime := func(refTime float64, xTime int64, server time.Time) {
sec, nsec := math.Modf(refTime)
if sec != 0 {
ref := time.Unix(int64(sec), int64(nsec*1e9))
conn.RecordRTT(server.Sub(ref), server)
recordTime := func(recordRTT bool, refTime float64, xTime int64, server time.Time) {
if recordRTT {
conn.RecordRTT(server.Sub(getTimeFromFloat64(refTime)), server)
}
conn.SyncWithGatewayConcentrator(
// The concentrator timestamp is the 32 LSB.
Expand Down Expand Up @@ -485,7 +483,7 @@ func (f *lbsLNS) HandleUp(ctx context.Context, raw []byte, ids ttnpb.GatewayIden
ID: int32(jreq.UpInfo.XTime >> 48),
}
session.DataMu.Unlock()
recordTime(jreq.RefTime, jreq.UpInfo.XTime, receivedAt)
recordTime(false, jreq.RefTime, jreq.UpInfo.XTime, receivedAt)

case TypeUpstreamUplinkDataFrame:
var updf UplinkDataFrame
Expand All @@ -512,7 +510,7 @@ func (f *lbsLNS) HandleUp(ctx context.Context, raw []byte, ids ttnpb.GatewayIden
ID: int32(updf.UpInfo.XTime >> 48),
}
session.DataMu.Unlock()
recordTime(updf.RefTime, updf.UpInfo.XTime, receivedAt)
recordTime(false, updf.RefTime, updf.UpInfo.XTime, receivedAt)

case TypeUpstreamTxConfirmation:
var txConf TxConfirmation
Expand All @@ -533,7 +531,14 @@ func (f *lbsLNS) HandleUp(ctx context.Context, raw []byte, ids ttnpb.GatewayIden
ID: int32(txConf.XTime >> 48),
}
session.DataMu.Unlock()
recordTime(txConf.RefTime, txConf.XTime, receivedAt)
recordRTT := true
refTime := getTimeFromFloat64(txConf.RefTime)
delta := receivedAt.Sub(refTime)
if delta > f.maxRoundTripDelay {
logger.WithField("delta", delta).Warn("Gateway reported reftime greater than the valid maximum. Skip RTT measurement")
recordRTT = false
}
recordTime(recordRTT, txConf.RefTime, txConf.XTime, receivedAt)

case TypeUpstreamProprietaryDataFrame, TypeUpstreamRemoteShell, TypeUpstreamTimeSync:
logger.WithField("message_type", typ).Debug("Message type not implemented")
Expand All @@ -544,3 +549,8 @@ func (f *lbsLNS) HandleUp(ctx context.Context, raw []byte, ids ttnpb.GatewayIden
}
return nil, nil
}

func getTimeFromFloat64(timeInFloat float64) time.Time {
sec, nsec := math.Modf(timeInFloat)
return time.Unix(int64(sec), int64(nsec*1e9))
}
91 changes: 64 additions & 27 deletions pkg/gatewayserver/io/ws/ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"net"
"net/http"
"testing"
Expand Down Expand Up @@ -58,12 +59,14 @@ var (

testTrafficEndPoint = "/traffic/eui-0101010101010101"

timeout = (1 << 5) * test.Delay
timeout = (1 << 7) * test.Delay
defaultConfig = Config{
WSPingInterval: (1 << 3) * test.Delay,
AllowUnauthenticated: true,
UseTrafficTLSAddress: false,
}

maxValidRoundTripDelay = (1 << 4) * test.Delay
)

func eui64Ptr(eui types.EUI64) *types.EUI64 { return &eui }
Expand Down Expand Up @@ -108,7 +111,7 @@ func TestClientTokenAuth(t *testing.T) {
} {
cfg := defaultConfig
cfg.AllowUnauthenticated = ttc.AllowUnauthenticated
bsWebServer := New(ctx, gs, lbslns.NewFormatter(), cfg)
bsWebServer := New(ctx, gs, lbslns.NewFormatter(maxValidRoundTripDelay), cfg)
lis, err := net.Listen("tcp", serverAddress)
if !a.So(err, should.BeNil) {
t.FailNow()
Expand Down Expand Up @@ -223,7 +226,7 @@ func TestDiscover(t *testing.T) {
mustHavePeer(ctx, c, ttnpb.ClusterRole_ENTITY_REGISTRY)
gs := mock.NewServer(c)

bsWebServer := New(ctx, gs, lbslns.NewFormatter(), defaultConfig)
bsWebServer := New(ctx, gs, lbslns.NewFormatter(maxValidRoundTripDelay), defaultConfig)
lis, err := net.Listen("tcp", serverAddress)
if !a.So(err, should.BeNil) {
t.FailNow()
Expand Down Expand Up @@ -460,7 +463,7 @@ func TestVersion(t *testing.T) {
mustHavePeer(ctx, c, ttnpb.ClusterRole_ENTITY_REGISTRY)
gs := mock.NewServer(c)

bsWebServer := New(ctx, gs, lbslns.NewFormatter(), defaultConfig)
bsWebServer := New(ctx, gs, lbslns.NewFormatter(maxValidRoundTripDelay), defaultConfig)
lis, err := net.Listen("tcp", serverAddress)
if !a.So(err, should.BeNil) {
t.FailNow()
Expand Down Expand Up @@ -722,7 +725,7 @@ func TestTraffic(t *testing.T) {
mustHavePeer(ctx, c, ttnpb.ClusterRole_ENTITY_REGISTRY)
gs := mock.NewServer(c)

bsWebServer := New(ctx, gs, lbslns.NewFormatter(), defaultConfig)
bsWebServer := New(ctx, gs, lbslns.NewFormatter(maxValidRoundTripDelay), defaultConfig)
lis, err := net.Listen("tcp", serverAddress)
if !a.So(err, should.BeNil) {
t.FailNow()
Expand Down Expand Up @@ -1032,6 +1035,19 @@ func TestTraffic(t *testing.T) {
}
}

type testTime struct {
Mux, Rx *time.Time
}

func (t testTime) getRefTime(drift time.Duration) float64 {
time.Sleep(1 << 3 * test.Delay)
now := time.Now()
offset := now.Sub(*t.Rx)
refTime := t.Mux.Add(offset)
refTime = refTime.Add(-drift)
return float64(refTime.UnixNano()) / float64(time.Second)
}

func TestRTT(t *testing.T) {
a := assertions.New(t)
ctx := log.NewContext(test.Context(), test.GetLogger(t))
Expand All @@ -1057,7 +1073,7 @@ func TestRTT(t *testing.T) {
mustHavePeer(ctx, c, ttnpb.ClusterRole_ENTITY_REGISTRY)
gs := mock.NewServer(c)

bsWebServer := New(ctx, gs, lbslns.NewFormatter(), defaultConfig)
bsWebServer := New(ctx, gs, lbslns.NewFormatter(maxValidRoundTripDelay), defaultConfig)
lis, err := net.Listen("tcp", serverAddress)
if !a.So(err, should.BeNil) {
t.FailNow()
Expand All @@ -1083,13 +1099,20 @@ func TestRTT(t *testing.T) {
t.Fatal("Connection timeout")
}

var MuxTime, RxTime float64
testTime := testTime{}

getTimeFromFloat64 := func(timeInFloat float64) *time.Time {
sec, nsec := math.Modf(timeInFloat)
retTime := time.Unix(int64(sec), int64(nsec*1e9))
return &retTime
}

for _, tc := range []struct {
Name string
InputBSUpstream interface{}
InputNetworkDownstream *ttnpb.DownlinkMessage
InputDownlinkPath *ttnpb.DownlinkPath
WaitTime time.Duration
GatewayClockDrift time.Duration
ExpectedRTTStatsCount int
}{
{
Expand Down Expand Up @@ -1152,7 +1175,6 @@ func TestRTT(t *testing.T) {
XTime: 1548059982,
},
ExpectedRTTStatsCount: 1,
WaitTime: 1 << 4 * test.Delay,
},
{
Name: "RepeatedTxAck",
Expand All @@ -1161,7 +1183,6 @@ func TestRTT(t *testing.T) {
XTime: 1548059982,
},
ExpectedRTTStatsCount: 2,
WaitTime: 0,
},
{
Name: "UplinkFrame",
Expand All @@ -1185,19 +1206,33 @@ func TestRTT(t *testing.T) {
},
},
},
ExpectedRTTStatsCount: 2,
},
{
Name: "TxAckWithSmallClockDrift",
InputBSUpstream: lbslns.TxConfirmation{
Diid: 1,
XTime: 1548059982,
},
ExpectedRTTStatsCount: 3,
WaitTime: 1 << 3 * test.Delay,
},
{
Name: "TxAckWithClockDriftAboveThreshold",
InputBSUpstream: lbslns.TxConfirmation{
Diid: 1,
XTime: 1548059982,
},
ExpectedRTTStatsCount: 3,
GatewayClockDrift: (1 << 5 * test.Delay),
},
} {
t.Run(tc.Name, func(t *testing.T) {
a := assertions.New(t)
if tc.InputBSUpstream != nil {
switch v := tc.InputBSUpstream.(type) {
case lbslns.TxConfirmation:
if MuxTime != 0 {
time.Sleep(tc.WaitTime)
now := float64(time.Now().UnixNano()) / float64(time.Second)
v.RefTime = now - RxTime + MuxTime
if testTime.Mux != nil {
v.RefTime = testTime.getRefTime(tc.GatewayClockDrift)
}
req, err := json.Marshal(v)
if err != nil {
Expand All @@ -1216,10 +1251,8 @@ func TestRTT(t *testing.T) {
}

case lbslns.UplinkDataFrame:
if MuxTime != 0 {
time.Sleep(tc.WaitTime)
now := float64(time.Now().UnixNano()) / float64(time.Second)
v.RefTime = now - RxTime + MuxTime
if testTime.Mux != nil {
v.RefTime = testTime.getRefTime(tc.GatewayClockDrift)
}
req, err := json.Marshal(v)
if err != nil {
Expand All @@ -1240,10 +1273,8 @@ func TestRTT(t *testing.T) {
}

case lbslns.JoinRequest:
if MuxTime != 0 {
time.Sleep(tc.WaitTime)
now := float64(time.Now().Unix()) + float64(time.Now().Nanosecond())/(1e9)
v.RefTime = now - RxTime + MuxTime
if testTime.Mux != nil {
v.RefTime = testTime.getRefTime(tc.GatewayClockDrift)
}
req, err := json.Marshal(v)
if err != nil {
Expand All @@ -1264,7 +1295,10 @@ func TestRTT(t *testing.T) {
}
}

if MuxTime > 0 {
if testTime.Mux != nil {
// Wait for stats to get updated
time.Sleep(1 << 2 * test.Delay)

// Atleast one downlink is needed for the first muxtime.
min, max, median, _, count := gsConn.RTTStats(90, time.Now())
if !a.So(count, should.Equal, tc.ExpectedRTTStatsCount) {
Expand Down Expand Up @@ -1303,8 +1337,11 @@ func TestRTT(t *testing.T) {
if err := json.Unmarshal(res, &msg); err != nil {
t.Fatalf("Failed to unmarshal response `%s`: %v", string(res), err)
}
MuxTime = msg.MuxTime
RxTime = float64(time.Now().Unix()) + float64(time.Now().Nanosecond())/(1e9)
testTime.Mux = getTimeFromFloat64(msg.MuxTime)
// Simulate downstream delay
time.Sleep(1 << 2 * test.Delay)
now := time.Now()
testTime.Rx = &now
case <-time.After(timeout):
t.Fatalf("Read message timeout")
}
Expand Down Expand Up @@ -1338,7 +1375,7 @@ func TestPingPong(t *testing.T) {
mustHavePeer(ctx, c, ttnpb.ClusterRole_ENTITY_REGISTRY)
gs := mock.NewServer(c)

bsWebServer := New(ctx, gs, lbslns.NewFormatter(), defaultConfig)
bsWebServer := New(ctx, gs, lbslns.NewFormatter(maxValidRoundTripDelay), defaultConfig)
lis, err := net.Listen("tcp", serverAddress)
if !a.So(err, should.BeNil) {
t.FailNow()
Expand Down

0 comments on commit d6f0d02

Please sign in to comment.