Skip to content
Merged
5 changes: 5 additions & 0 deletions backend/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ func main() {
orderTopic := order_topic.NewSendTopic()
loggerTopic := logger_topic.NewEnableTopic()
loggerTopic.SetDataLogger(subloggers[data_logger.Name].(*data_logger.Logger))
loggerHandler.SetOnStart(func() {
if err := loggerTopic.NotifyStarted(); err != nil {
trace.Error().Err(err).Msg("failed to notify logger started")
}
})

messageTopic := message_topic.NewUpdateTopic()
stateOrderTopic := order_topic.NewState(idToBoard, trace.Logger)
Expand Down
5 changes: 5 additions & 0 deletions backend/pkg/broker/topics/logger/enable.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ func (enable *Enable) handleVariables(_ websocket.ClientId, message *websocket.M
return nil
}

func (enable *Enable) NotifyStarted() error {
enable.isRunning.Store(true)
return enable.broadcastState()
}

func (enable *Enable) broadcastState() error {
payload, err := json.Marshal(enable.isRunning.Load())
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions backend/pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type Logger struct {
subloggers map[abstraction.LoggerName]abstraction.Logger

trace zerolog.Logger

onStart func()
}

/**************
Expand Down Expand Up @@ -83,9 +85,17 @@ func (logger *Logger) Start() error {
}

logger.trace.Info().Msg("started")

if logger.onStart != nil {
logger.onStart()
}
return nil
}

func (logger *Logger) SetOnStart(cb func()) {
logger.onStart = cb
}

// PushRecord works as a proxy for the PushRecord method of the subloggers
func (logger *Logger) PushRecord(record abstraction.LoggerRecord) error {

Expand Down
170 changes: 109 additions & 61 deletions backend/pkg/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,44 +119,17 @@ func (transport *Transport) HandleServer(config tcp.ServerConfig, local string)
// handleTCPConn is used to handle the specific TCP connections to the boards. It detects errors caused
// on concurrent reads and writes, so other routines should not worry about closing or handling errors
func (transport *Transport) handleTCPConn(conn net.Conn) error {
if tcpConn, ok := conn.(*net.TCPConn); ok {
transport.logger.Trace().Str("remoteAddress", conn.RemoteAddr().String()).Msg("setting connection linger")
err := tcpConn.SetLinger(0)
if err != nil {
transport.errChan <- err
transport.logger.Error().Stack().Err(err).Str("remoteAddress", conn.RemoteAddr().String()).Msg("set linger")
}

transport.logger.Trace().Str("remoteAddress", conn.RemoteAddr().String()).Msg("setting connection no delay")
err = tcpConn.SetNoDelay(true)
if err != nil {
transport.errChan <- err
transport.logger.Error().Stack().Err(err).Str("remoteAddress", conn.RemoteAddr().String()).Msg("set no delay")
}
}
transport.configureTCPConn(conn)

target, ok := transport.ipToTarget[conn.RemoteAddr().(*net.TCPAddr).IP.String()]
if !ok {
conn.Close()
transport.logger.Warn().Str("remoteAddress", conn.RemoteAddr().(*net.TCPAddr).IP.String()).Msg("ip target not found")
err := ErrUnknownTarget{Remote: conn.RemoteAddr()}
transport.errChan <- err
target, err := transport.targetFromTCPConn(conn)
if err != nil {
return err
}

connectionLogger := transport.logger.With().Str("remoteAddress", conn.RemoteAddr().String()).Str("target", string(target)).Logger()
connectionLogger.Info().Msg("new connection")

if err := func() error {
transport.connectionsMx.Lock()
defer transport.connectionsMx.Unlock()
if _, ok := transport.connections[target]; ok {
conn.Close()
connectionLogger.Debug().Msg("already connected")
return ErrTargetAlreadyConnected{Target: target}
}
return nil
}(); err != nil {
if err := transport.rejectIfConnectedTCPConn(target, conn, connectionLogger); err != nil {
transport.errChan <- err
return err
}
Expand All @@ -167,56 +140,125 @@ func (transport *Transport) handleTCPConn(conn net.Conn) error {
connectionLogger.Info().Msg("close")
}()

func() {
transport.connectionsMx.Lock()
defer transport.connectionsMx.Unlock()
connectionLogger.Debug().Msg("added connection")
transport.connections[target] = conn
}()
defer func() {
transport.connectionsMx.Lock()
defer transport.connectionsMx.Unlock()
connectionLogger.Debug().Msg("removed connection")
delete(transport.connections, target)
}()
cleanupConn := transport.registerTCPConn(target, conn, connectionLogger)
defer cleanupConn()

transport.api.ConnectionUpdate(target, true)
defer transport.api.ConnectionUpdate(target, false)

transport.readLoopTCPConn(conn, connectionLogger)

err = <-errChan
if err != nil {
connectionLogger.Error().Stack().Err(err).Msg("")
transport.errChan <- err
}
return err
}

// configureTCPConn sets TCP-level options like linger and no-delay.
func (transport *Transport) configureTCPConn(conn net.Conn) {
tcpConn, ok := conn.(*net.TCPConn)
if !ok {
return
}

remote := conn.RemoteAddr().String()

transport.logger.Trace().Str("remoteAddress", remote).Msg("setting connection linger")
err := tcpConn.SetLinger(0)
if err != nil {
transport.errChan <- err
transport.logger.Error().Stack().Err(err).Str("remoteAddress", remote).Msg("set linger")
}

transport.logger.Trace().Str("remoteAddress", remote).Msg("setting connection no delay")
err = tcpConn.SetNoDelay(true)
if err != nil {
transport.errChan <- err
transport.logger.Error().Stack().Err(err).Str("remoteAddress", remote).Msg("set no delay")
}
}

// targetFromTCPConn maps the remote IP address of the connection to a TransportTarget
// using the ipToTarget map.
func (transport *Transport) targetFromTCPConn(conn net.Conn) (abstraction.TransportTarget, error) {
remoteAddr := conn.RemoteAddr().(*net.TCPAddr)
ip := remoteAddr.IP.String()

target, ok := transport.ipToTarget[ip]
if !ok {
conn.Close()
transport.logger.Warn().Str("remoteAddress", ip).Msg("ip target not found")
err := ErrUnknownTarget{Remote: conn.RemoteAddr()}
transport.errChan <- err
var zero abstraction.TransportTarget
return zero, err

}
return target, nil
}

// rejectIfConnectedTCPConn closes and rejects conn if target already has an active connection.
func (transport *Transport) rejectIfConnectedTCPConn(target abstraction.TransportTarget, conn net.Conn, logger zerolog.Logger,) error {
transport.connectionsMx.Lock()
defer transport.connectionsMx.Unlock()

if _, ok := transport.connections[target]; ok {
conn.Close()
logger.Debug().Msg("already connected")
err := ErrTargetAlreadyConnected{Target: target}
transport.errChan <- err
return err
}
return nil
}

// registerTCPConn stores conn for target and returns a cleanup that removes it.
func (transport *Transport) registerTCPConn(target abstraction.TransportTarget, conn net.Conn, logger zerolog.Logger) func() {
transport.connectionsMx.Lock()
logger.Debug().Msg("added connection")
transport.connections[target] = conn
transport.connectionsMx.Unlock()

return func() {
transport.connectionsMx.Lock()
logger.Debug().Msg("removed connection")
delete(transport.connections, target)
transport.connectionsMx.Unlock()
}
}

// readLoopTCPConn reads packets from conn and forwards notifications until an error occurs.
func (transport *Transport) readLoopTCPConn(conn net.Conn, logger zerolog.Logger) {
from := conn.RemoteAddr().String()
to := conn.LocalAddr().String()

go func() {
for {
packet, err := transport.decoder.DecodeNext(conn)
if err != nil {
connectionLogger.Error().Stack().Err(err).Msg("decode")
logger.Error().Stack().Err(err).Msg("decode")
transport.errChan <- err
transport.SendFault()
return
}

if transport.propagateFault && packet.Id() == 0 {
connectionLogger.Info().Msg("replicating packet with id 0 to all boards")
logger.Info().Msg("replicating packet with id 0 to all boards")
err := transport.handlePacketEvent(NewPacketMessage(packet))
if err != nil {
connectionLogger.Error().Err(err).Msg("failed to replicate packet")
logger.Error().Err(err).Msg("failed to replicate packet")
}
}

from := conn.RemoteAddr().String()
to := conn.LocalAddr().String()

connectionLogger.Trace().Type("type", packet).Msg("packet")
logger.Trace().Type("type", packet).Msg("packet")
transport.api.Notification(NewPacketNotification(packet, from, to, time.Now()))
}
}()

err := <-errChan
if err != nil {
connectionLogger.Error().Stack().Err(err).Msg("")
transport.errChan <- err
}
return err
}


// SendMessage triggers an event to send something to the vehicle. Some messages
// might additional means to pass information around (e.g. file read and write)
func (transport *Transport) SendMessage(message abstraction.TransportMessage) error {
Expand All @@ -233,8 +275,10 @@ func (transport *Transport) SendMessage(message abstraction.TransportMessage) er
err = ErrUnrecognizedEvent{message.Event()}
}
// handlePacketEvent already sends the error through the channel, so this avoids duplicates
if _, ok := err.(ErrConnClosed); !ok {
transport.errChan <- err
if err != nil {
if _, ok := err.(ErrConnClosed); !ok {
transport.errChan <- err
}
}
return err
}
Expand Down Expand Up @@ -326,14 +370,18 @@ func (transport *Transport) handlePacketEvent(message PacketMessage) error {
// handleFileWrite writes a file through tftp to the blcu
func (transport *Transport) handleFileWrite(message FileWriteMessage) error {
_, err := transport.tftp.WriteFile(message.Filename(), tftp.BinaryMode, message)
transport.errChan <- err
if err != nil {
transport.errChan <- err
}
return err
}

// handleFileRead reads a file through tftp from the blcu
func (transport *Transport) handleFileRead(message FileReadMessage) error {
_, err := transport.tftp.ReadFile(message.Filename(), tftp.BinaryMode, message)
transport.errChan <- err
if err != nil {
transport.errChan <- err
}
return err
}

Expand Down
Loading
Loading