Skip to content

Commit

Permalink
feat: Implement TCP streaming receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
driskell committed Oct 16, 2022
1 parent afcd8e1 commit b3a3720
Show file tree
Hide file tree
Showing 17 changed files with 323 additions and 22 deletions.
26 changes: 19 additions & 7 deletions lc-lib/transports/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ type EventsEvent interface {

// StatusEvent contains information about a status change for a transport
type StatusEvent struct {
Event
context context.Context
statusChange StatusChange
err error
}

var _ Event = (*StatusEvent)(nil)

// NewStatusEvent generates a new StatusEvent for the given context
func NewStatusEvent(context context.Context, statusChange StatusChange, err error) *StatusEvent {
return &StatusEvent{
Expand All @@ -113,12 +114,13 @@ func (e *StatusEvent) Err() error {

// ConnectEvent marks the start of a new connection on a reciver
type ConnectEvent struct {
Event
context context.Context
remote string
desc string
}

var _ Event = (*ConnectEvent)(nil)

// NewConnectEvent generates a new ConnectEvent for the given Endpoint
func NewConnectEvent(context context.Context, remote string, desc string) *ConnectEvent {
return &ConnectEvent{
Expand All @@ -145,10 +147,11 @@ func (e *ConnectEvent) Desc() string {

// EndEvent marks the end of a stream of events from an endpoint
type EndEvent struct {
Event
context context.Context
}

var _ Event = (*EndEvent)(nil)

// NewEndEvent generates a new EndEvent for the given Endpoint
func NewEndEvent(context context.Context) *EndEvent {
return &EndEvent{
Expand All @@ -163,10 +166,11 @@ func (e *EndEvent) Context() context.Context {

// PongEvent is received when a transport has responded to a Ping() request
type PongEvent struct {
Event
context context.Context
}

var _ Event = (*PongEvent)(nil)

// NewPongEvent generates a new PongEvent for the given Endpoint
func NewPongEvent(context context.Context) *PongEvent {
return &PongEvent{
Expand All @@ -181,10 +185,11 @@ func (e *PongEvent) Context() context.Context {

// PingEvent is received when a transport has responded to a Ping() request
type PingEvent struct {
Event
context context.Context
}

var _ Event = (*PingEvent)(nil)

// NewPingEvent generates a new PingEvent for the given Endpoint
func NewPingEvent(context context.Context) *PingEvent {
return &PingEvent{
Expand All @@ -199,12 +204,13 @@ func (e *PingEvent) Context() context.Context {

// ackEvent contains information on which events have been acknowledged
type ackEvent struct {
AckEvent
context context.Context
nonce *string
sequence uint32
}

var _ AckEvent = (*ackEvent)(nil)

// NewAckEvent generates a new AckEvent for the given Endpoint
func NewAckEvent(context context.Context, nonce *string, sequence uint32) AckEvent {
return &ackEvent{
Expand All @@ -231,12 +237,13 @@ func (e *ackEvent) Sequence() uint32 {

// eventsEvent contains information about an events bundle
type eventsEvent struct {
EventsEvent
context context.Context
nonce *string
events []map[string]interface{}
}

var _ EventsEvent = (*eventsEvent)(nil)

// NewEventsEvent generates a new EventsEvent for the given bundle of events
func NewEventsEvent(context context.Context, nonce *string, events []map[string]interface{}) EventsEvent {
return &eventsEvent{
Expand All @@ -261,6 +268,11 @@ func (e *eventsEvent) Events() []map[string]interface{} {
return e.events
}

// Count returns the number of events in the payload
func (e *eventsEvent) Count() uint32 {
return uint32(len(e.events))
}

// ParseTLSVersion parses a TLS version string into the tls library value for min/max config
// We explicitly refuse SSLv3 to mitigate POODLE vulnerability
func ParseTLSVersion(version string, fallback uint16) (uint16, error) {
Expand Down
8 changes: 4 additions & 4 deletions lc-lib/transports/tcp/courier/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,19 +199,19 @@ func (p *protocol) Read() (transports.Event, error) {
if p.isClient {
switch transportEvent := message.(type) {
case transports.AckEvent:
log.Debugf("[T %s < %s] Received acknowledgement for nonce %x with sequence %d", p.conn.LocalAddr().String(), p.conn.RemoteAddr().String(), transportEvent.Nonce(), transportEvent.Sequence())
log.Debugf("[T %s < %s] Received acknowledgement for nonce %x with sequence %d", p.conn.LocalAddr().String(), p.conn.RemoteAddr().String(), *transportEvent.Nonce(), transportEvent.Sequence())
return transportEvent, nil
case *protocolPONG:
log.Debugf("[T %s < %s] Received pong", p.conn.LocalAddr().String(), p.conn.RemoteAddr().String())
return transportEvent, nil
return transports.NewPongEvent(p.conn.Context()), nil
}
} else {
switch transportEvent := message.(type) {
case *protocolPING:
log.Debugf("[R %s < %s] Received ping", p.conn.LocalAddr().String(), p.conn.RemoteAddr().String())
return transportEvent, nil
return transports.NewPingEvent(p.conn.Context()), nil
case transports.EventsEvent:
log.Debugf("[R %s < %s] Received payload with nonce %x and %d events", p.conn.LocalAddr().String(), p.conn.RemoteAddr().String(), transportEvent.Nonce(), transportEvent.Count())
log.Debugf("[R %s < %s] Received payload with nonce %x and %d events", p.conn.LocalAddr().String(), p.conn.RemoteAddr().String(), *transportEvent.Nonce(), transportEvent.Count())
return transportEvent, nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import (
)

type protocolACKN struct {
transports.AckEvent
ctx context.Context
nonce *string
sequence uint32
}

var _ transports.AckEvent = (*protocolACKN)(nil)

// newProtocolACKN reads a new protocolACKN
func newProtocolACKN(conn tcp.Connection, bodyLength uint32) (tcp.ProtocolMessage, error) {
if bodyLength != 20 {
Expand All @@ -58,6 +59,16 @@ func (p *protocolACKN) Context() context.Context {
return p.ctx
}

// Nonce returns the nonce being acknowledged
func (p *protocolACKN) Nonce() *string {
return p.nonce
}

// Sequence returns the sequence being acknowledged
func (p *protocolACKN) Sequence() uint32 {
return p.sequence
}

// Write writes a payload to the connection
func (p *protocolACKN) Write(conn tcp.Connection) error {
// Encapsulate the ack into a message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ import (
)

type protocolEVNT struct {
transports.EventsEvent
ctx context.Context
nonce *string
events [][]byte
}

var _ transports.EventsEvent = (*protocolEVNT)(nil)

// Reads the events from existing data
func newProtocolEVNT(conn tcp.Connection, bodyLength uint32) (tcp.ProtocolMessage, error) {
if bodyLength != math.MaxUint32 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ import (
)

type protocolJDAT struct {
transports.EventsEvent
ctx context.Context
nonce *string
events [][]byte
}

var _ transports.EventsEvent = (*protocolJDAT)(nil)

// newProtocolJDAT creates a new structure from wire-bytes
func newProtocolJDAT(conn tcp.Connection, bodyLength uint32) (tcp.ProtocolMessage, error) {
if bodyLength < 17 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ package courier
import (
"fmt"

"github.com/driskell/log-courier/lc-lib/transports"
"github.com/driskell/log-courier/lc-lib/transports/tcp"
)

type protocolPING struct {
*transports.PingEvent
}

// newProtocolPING reads a new protocolPING
Expand All @@ -33,7 +31,7 @@ func newProtocolPING(conn tcp.Connection, bodyLength uint32) (tcp.ProtocolMessag
return nil, fmt.Errorf("protocol error: Corrupt message PING size %d != 0", bodyLength)
}

return &protocolPING{transports.NewPingEvent(conn.Context())}, nil
return &protocolPING{}, nil
}

// Type returns a human-readable name for the message type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ package courier
import (
"fmt"

"github.com/driskell/log-courier/lc-lib/transports"
"github.com/driskell/log-courier/lc-lib/transports/tcp"
)

type protocolPONG struct {
*transports.PingEvent
}

// newProtocolPONG reads a new protocolPONG
Expand All @@ -33,7 +31,7 @@ func newProtocolPONG(conn tcp.Connection, bodyLength uint32) (tcp.ProtocolMessag
return nil, fmt.Errorf("protocol error: Corrupt message PONG size %d != 0", bodyLength)
}

return &protocolPONG{transports.NewPingEvent(conn.Context())}, nil
return &protocolPONG{}, nil
}

// Type returns a human-readable name for the message type
Expand Down
24 changes: 24 additions & 0 deletions lc-lib/transports/tcp/stream/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2012-2020 Jason Woods and contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package stream

const (
// TransportStream is the transport name for plain text
TransportStream = "stream"
// TransportStreamTLS is the transport name for encrypted text
TransportStreamTLS = "streamtls"
)
25 changes: 25 additions & 0 deletions lc-lib/transports/tcp/stream/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2012-2020 Jason Woods and contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package stream

import "gopkg.in/op/go-logging.v1"

var log *logging.Logger

func init() {
log = logging.MustGetLogger("transports/tcp/stream")
}
84 changes: 84 additions & 0 deletions lc-lib/transports/tcp/stream/protocol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2012-2020 Jason Woods and contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package stream

import (
"github.com/driskell/log-courier/lc-lib/event"
"github.com/driskell/log-courier/lc-lib/harvester"
"github.com/driskell/log-courier/lc-lib/transports"
"github.com/driskell/log-courier/lc-lib/transports/tcp"
)

type protocol struct {
conn tcp.Connection
lineReader *harvester.LineReader
}

// Negotiation does not happen for a plain event stream
func (p *protocol) Negotiation() (transports.Event, error) {
return nil, nil
}

// SendEvents is not implemented as this is not a transport
func (p *protocol) SendEvents(nonce string, events []*event.Event) error {
panic("Not implemented")
}

// Acknowledge is not used by a plain event stream
func (p *protocol) Acknowledge(nonce *string, sequence uint32) error {
return nil
}

// Ping is not implemented as this is not a transport
func (p *protocol) Ping() error {
panic("Not implemented")
}

// Pong is not implemented as we will never receiver a Ping
func (p *protocol) Pong() error {
panic("Not implemented")
}

// Read reads a line from the connection and calculates an event
// Returns nil event if shutdown, with an optional error
func (p *protocol) Read() (transports.Event, error) {
var event map[string]interface{}
var err error

for {
event, _, err = p.lineReader.ReadItem()
if err == nil || err == harvester.ErrMaxDataSizeTruncation {
break
}

if err == tcp.ErrIOWouldBlock {
continue
}

return nil, err
}

log.Debugf("Received event: %v", event)

// TODO: Read multiple before generating event
return transports.NewEventsEvent(p.conn.Context(), &transports.NilNonce, []map[string]interface{}{event}), nil
}

// NonBlocking returns trues because a stream does not have well defined event boundaries so we have to check periodically
func (p *protocol) NonBlocking() bool {
return true
}
Loading

0 comments on commit b3a3720

Please sign in to comment.