Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chrony protocol #12

Merged
merged 1 commit into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions protocol/chrony/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Chrony network protocol used for command and monitoring of the timeserver

Native Go implementation of Chrony communication protocol v6.

As of now, only monitoring part of protocol that is used to communicate between `chronyc` and `chronyd` is implemented.
112 changes: 112 additions & 0 deletions protocol/chrony/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright (c) Facebook, Inc. and its affiliates.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package chrony

import (
"bytes"
"encoding/binary"
"fmt"
"io"

log "github.com/sirupsen/logrus"
)

// Client talks to chronyd
type Client struct {
Connection io.ReadWriter
Sequence uint32
}

// Communicate sends the packet to chronyd, parse response into something usable
func (n *Client) Communicate(packet RequestPacket) (ResponsePacket, error) {
n.Sequence++
var err error
packet.SetSequence(n.Sequence)
err = binary.Write(n.Connection, binary.BigEndian, packet)
if err != nil {
return nil, err
}
response := make([]uint8, 1024)
read, err := n.Connection.Read(response)
if err != nil {
return nil, err
}
log.Debugf("Read %d bytes", read)
r := bytes.NewReader(response)
head := new(replyHead)
if err = binary.Read(r, binary.BigEndian, head); err != nil {
return nil, err
}
log.Debugf("response head: %+v", head)
if head.Status == sttUnauth {
return nil, ErrNotAuthorized
}
if head.Status != sttSuccess {
return nil, fmt.Errorf("got status %s", StatusDesc[head.Status])
}
switch head.Reply {
case rpyNSources:
data := new(replySourcesContent)
if err = binary.Read(r, binary.BigEndian, data); err != nil {
return nil, err
}
log.Debugf("response data: %+v", data)
return &ReplySources{
replyHead: *head,
NSources: int(data.NSources),
}, nil
case rpySourceData:
data := new(replySourceDataContent)
if err = binary.Read(r, binary.BigEndian, data); err != nil {
return nil, err
}
log.Debugf("response data: %+v", data)
return &ReplySourceData{
replyHead: *head,
sourceData: *newSourceData(data),
}, nil
case rpyTracking:
data := new(replyTrackingContent)
if err = binary.Read(r, binary.BigEndian, data); err != nil {
return nil, err
}
log.Debugf("response data: %+v", data)
return &ReplyTracking{
replyHead: *head,
tracking: *newTracking(data),
}, nil
case rpyServerStats:
data := new(serverStats)
if err = binary.Read(r, binary.BigEndian, data); err != nil {
return nil, err
}
log.Debugf("response data: %+v", data)
return &ReplyServerStats{
replyHead: *head,
serverStats: *data,
}, nil
case rpyNTPData:
data := new(replyNTPDataContent)
if err = binary.Read(r, binary.BigEndian, data); err != nil {
return nil, err
}
log.Debugf("response data: %+v", data)
return &ReplyNTPData{
replyHead: *head,
ntpData: *newNTPData(data),
}, nil
default:
return nil, fmt.Errorf("not implemented reply type %d from %+v", head.Reply, head)
}
}
197 changes: 197 additions & 0 deletions protocol/chrony/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
Copyright (c) Facebook, Inc. and its affiliates.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package chrony

import (
"bytes"
"encoding/binary"
"fmt"
"net"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// fakeConn gives us fake io.ReadWriter interace implementation for which we can provide fake outputs
type fakeConn struct {
readCount int
outputs []*bytes.Buffer
}

func newConn(outputs []*bytes.Buffer) *fakeConn {
return &fakeConn{
readCount: 0,
outputs: outputs,
}
}

func (c *fakeConn) Read(p []byte) (n int, err error) {
pos := c.readCount
if c.readCount < len(c.outputs) {
c.readCount++
return c.outputs[pos].Read(p)
}
return 0, fmt.Errorf("EOF")
}

func (c *fakeConn) Write(p []byte) (n int, err error) {
// here we may assert writes
return 0, nil
}

// Test if we have errors when there is nothing on the line to read
func TestCommunicateEOF(t *testing.T) {
require := require.New(t)
conn := newConn([]*bytes.Buffer{
bytes.NewBuffer([]byte{}),
})
client := Client{Sequence: 1, Connection: conn}
_, err := client.Communicate(NewTrackingPacket())
require.NotNil(err)
}

func TestCommunicateError(t *testing.T) {
var err error
require := require.New(t)
buf := &bytes.Buffer{}
packetHead := replyHead{
Version: protoVersionNumber,
PKTType: pktTypeCmdReply,
Res1: 0,
Res2: 0,
Command: reqTracking,
Reply: rpyTracking,
Status: sttNoSuchSource,
Pad1: 0,
Pad2: 0,
Pad3: 0,
Sequence: 2,
Pad4: 0,
Pad5: 0,
}
packetBody := replyTrackingContent{}
err = binary.Write(buf, binary.BigEndian, packetHead)
require.Nil(err)
err = binary.Write(buf, binary.BigEndian, packetBody)
require.Nil(err)
conn := newConn([]*bytes.Buffer{
buf,
})
client := Client{Sequence: 1, Connection: conn}
_, err = client.Communicate(NewTrackingPacket())
require.NotNil(err)
}

func TestCommunicateAuthError(t *testing.T) {
var err error
assert := assert.New(t)
require := require.New(t)
buf := &bytes.Buffer{}
packetHead := replyHead{
Version: protoVersionNumber,
PKTType: pktTypeCmdReply,
Res1: 0,
Res2: 0,
Command: reqTracking,
Reply: rpyTracking,
Status: sttUnauth,
Pad1: 0,
Pad2: 0,
Pad3: 0,
Sequence: 2,
Pad4: 0,
Pad5: 0,
}
packetBody := replyTrackingContent{}
err = binary.Write(buf, binary.BigEndian, packetHead)
require.Nil(err)
err = binary.Write(buf, binary.BigEndian, packetBody)
require.Nil(err)
conn := newConn([]*bytes.Buffer{
buf,
})
client := Client{Sequence: 1, Connection: conn}
_, err = client.Communicate(NewTrackingPacket())
assert.Equal(ErrNotAuthorized, err)
}

// Test if we can read reply properly
func TestCommunicateOK(t *testing.T) {
var err error
assert := assert.New(t)
require := require.New(t)
buf := &bytes.Buffer{}
packetHead := replyHead{
Version: protoVersionNumber,
PKTType: pktTypeCmdReply,
Res1: 0,
Res2: 0,
Command: reqTracking,
Reply: rpyTracking,
Status: sttSuccess,
Pad1: 0,
Pad2: 0,
Pad3: 0,
Sequence: 2,
Pad4: 0,
Pad5: 0,
}
packetBody := replyTrackingContent{
RefID: 1,
IPAddr: *newIPAddr(net.IP([]byte{192, 168, 0, 10})),
Stratum: 3,
LeapStatus: 0,
RefTime: timeSpec{},
CurrentCorrection: 0,
LastOffset: 12345,
RMSOffset: 0,
FreqPPM: 0,
ResidFreqPPM: 0,
SkewPPM: 0,
RootDelay: 0,
RootDispersion: 0,
LastUpdateInterval: 0,
}
err = binary.Write(buf, binary.BigEndian, packetHead)
require.Nil(err)
err = binary.Write(buf, binary.BigEndian, packetBody)
require.Nil(err)
conn := newConn([]*bytes.Buffer{
buf,
})
client := Client{Sequence: 1, Connection: conn}
p, err := client.Communicate(NewTrackingPacket())
require.Nil(err)
expected := &ReplyTracking{
replyHead: packetHead,
tracking: tracking{
RefID: packetBody.RefID,
IPAddr: net.IP([]byte{192, 168, 0, 10}),
Stratum: packetBody.Stratum,
LeapStatus: packetBody.LeapStatus,
RefTime: packetBody.RefTime.ToTime(),
CurrentCorrection: packetBody.CurrentCorrection.ToFloat(),
LastOffset: packetBody.LastOffset.ToFloat(),
RMSOffset: packetBody.RMSOffset.ToFloat(),
FreqPPM: packetBody.FreqPPM.ToFloat(),
ResidFreqPPM: packetBody.ResidFreqPPM.ToFloat(),
SkewPPM: packetBody.SkewPPM.ToFloat(),
RootDelay: packetBody.RootDelay.ToFloat(),
RootDispersion: packetBody.RootDispersion.ToFloat(),
LastUpdateInterval: packetBody.LastUpdateInterval.ToFloat(),
},
}
assert.Equal(expected, p)
}
Loading