Skip to content

Commit

Permalink
Merge pull request #54 from cryptogarageinc/feature/add-log-interface
Browse files Browse the repository at this point in the history
feat: add log interface
  • Loading branch information
k-matsuzawa committed Jun 17, 2022
2 parents 70f1340 + 52cc0be commit 74939a4
Show file tree
Hide file tree
Showing 15 changed files with 245 additions and 63 deletions.
29 changes: 15 additions & 14 deletions acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net"
"runtime/debug"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -224,7 +223,7 @@ func (a *Acceptor) listenForConnections(listener net.Listener) {
}

func (a *Acceptor) invalidMessage(msg *bytes.Buffer, err error) {
a.globalLog.OnEventf("Invalid Message: %s, %v", msg.Bytes(), err.Error())
a.globalLog.OnErrorEventParams("Invalid Message", err, LogString("recvMsgBytes", msg.String()))
}

func (a *Acceptor) handleConnection(netConn net.Conn) {
Expand All @@ -234,7 +233,7 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
}

if err := netConn.Close(); err != nil {
a.globalLog.OnEvent(err.Error())
a.globalLog.OnErrorEvent("connection Close failed", err)
}
}()

Expand All @@ -246,7 +245,7 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
if err == io.EOF {
a.globalLog.OnEvent("Connection Terminated")
} else {
a.globalLog.OnEvent(err.Error())
a.globalLog.OnErrorEvent("failed to ReadMessage", err)
}
return
}
Expand Down Expand Up @@ -316,15 +315,16 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
localConnectionPort := netConn.LocalAddr().(*net.TCPAddr).Port
if expectedPort, ok := a.sessionHostPort[sessID]; ok && expectedPort != localConnectionPort {
// If it is not included in sessionHostPort, we will check if it is included in sessions in a later process.
a.globalLog.OnEventf("Session %v not found for incoming message: %s", sessID, msgBytes)
a.globalLog.OnErrorEventParams("Session not found for incoming message", nil,
LogString("sessionID", sessID.String()), LogMessage("incomingMessage", msgBytes.Bytes()))
return
}

// We have a session ID and a network connection. This seems to be a good place for any custom authentication logic.
if a.connectionValidator != nil {
if err := a.connectionValidator.Validate(netConn, sessID); err != nil {
a.globalLog.OnEventf("Unable to validate a connection %v", err.Error())
a.globalLog.OnEventf("failed incoming message: %s", strings.ReplaceAll(msgBytes.String(), "\u0001", "|"))
a.globalLog.OnErrorEventParams("Unable to validate a connection", err,
LogString("sessionID", sessID.String()), LogMessage("incomingMessage", msgBytes.Bytes()))
return
}
}
Expand All @@ -337,13 +337,14 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
session, ok := a.sessions[sessID]
if !ok {
if !a.dynamicSessions {
a.globalLog.OnEventf("Session %v not found for incoming message: %s", sessID, msgBytes)
a.globalLog.OnErrorEventParams("Session not found for incoming message", nil,
LogString("sessionID", sessID.String()), LogMessage("incomingMessage", msgBytes.Bytes()))
return
}
dynamicSession, err := a.sessionFactory.createSession(sessID, a.storeFactory, a.settings.globalSettings.clone(), a.logFactory, a.app)
if err != nil {
a.globalLog.OnEventf("Dynamic session %v failed to create: %v", sessID, err)
a.globalLog.OnEventf("failed incoming message: %s", strings.ReplaceAll(msgBytes.String(), "\u0001", "|"))
a.globalLog.OnErrorEventParams("Dynamic session failed to create", err,
LogString("sessionID", sessID.String()), LogMessage("incomingMessage", msgBytes.Bytes()))
return
}
dynamicSession.linkedAcceptor = a
Expand All @@ -359,8 +360,7 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
msgOut := make(chan []byte)

if err := session.connect(msgIn, msgOut); err != nil {
a.globalLog.OnEventf("Unable to accept %v", err.Error())
a.globalLog.OnEventf("failed incoming message: %s", strings.ReplaceAll(msgBytes.String(), "\u0001", "|"))
a.globalLog.OnErrorEventParams("Unable to accept", err, LogMessage("incomingMessage", msgBytes.Bytes()))
return
}
a.sessionAddr.Store(sessID, netConn.RemoteAddr())
Expand All @@ -374,7 +374,7 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {

if session.hasStopByDisconnect {
if tmpErr := netConn.Close(); tmpErr != nil {
session.log.OnEventf("net.Close error: %v", tmpErr)
session.log.OnErrorEvent("net.Close error", tmpErr)
}
}
session.log.OnEvent("handleConnection finish")
Expand Down Expand Up @@ -402,7 +402,8 @@ LOOP:
session.run()
err := UnregisterSession(session.sessionID)
if err != nil {
a.globalLog.OnEventf("Unregister dynamic session %v failed: %v", session.sessionID, err)
a.globalLog.OnErrorEventParams("Unregister dynamic session failed",
err, LogString("sessionID", session.sessionID.String()))
return
}
complete <- sessionID
Expand Down
2 changes: 1 addition & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func writeLoop(connection io.Writer, messageOut chan []byte, log Log) {
}

if _, err := connection.Write(msg); err != nil {
log.OnEvent(err.Error())
log.OnErrorEvent("connection Write failed", err)
}
}
}
Expand Down
50 changes: 34 additions & 16 deletions custom_log.go
Original file line number Diff line number Diff line change
@@ -1,54 +1,72 @@
package quickfix

import (
"bytes"
"fmt"
)

func makeReadable(s []byte) string {
return string(bytes.Replace(s, []byte("\x01"), []byte("|"), -1))
}
const (
LogIncomingMessage string = "FIX incoming"
LogOutgoingMessage string = "FIX outgoing"
LogPrefixGlobal string = "GLOBAL"
)

type customLog struct {
sessionPrefix string
logFunc func(msg string, keysAndValues ...interface{})
logFunc func(prefix, msg string, keysAndValues ...LogParam)
logErrorFunc func(prefix, msg string, err error, keysAndValues ...LogParam)
}

func (l customLog) OnIncoming(s []byte) {
msg := fmt.Sprintf("FIX incoming [%v |%v]", l.sessionPrefix, makeReadable(s))
l.logFunc(msg)
l.logFunc(l.sessionPrefix, LogIncomingMessage, LogString("incomingMessage", makeReadable(s)))
}

func (l customLog) OnOutgoing(s []byte) {
msg := fmt.Sprintf("FIX outgoing [%v - |%v]", l.sessionPrefix, makeReadable(s))
l.logFunc(msg)
l.logFunc(l.sessionPrefix, LogOutgoingMessage, LogString("outgoingMessage", makeReadable(s)))
}

func (l customLog) OnEvent(s string) {
msg := fmt.Sprintf("FIX event [%v - %v]", l.sessionPrefix, s)
l.logFunc(msg)
l.logFunc(l.sessionPrefix, s)
}

func (l customLog) OnEventf(format string, a ...interface{}) {
l.OnEvent(fmt.Sprintf(format, a...))
}

func (l customLog) OnErrorEvent(message string, err error) {
l.logErrorFunc(l.sessionPrefix, message, err)
}

func (l customLog) OnEventParams(message string, v ...LogParam) {
l.logFunc(l.sessionPrefix, message, v...)
}

func (l customLog) OnErrorEventParams(message string, err error, v ...LogParam) {
l.logErrorFunc(l.sessionPrefix, message, err, v...)
}

type customLogFactory struct {
logFunc func(msg string, keysAndValues ...interface{})
logFunc func(prefix, msg string, keysAndValues ...LogParam)
logErrorFunc func(prefix, msg string, err error, keysAndValues ...LogParam)
}

func (f customLogFactory) Create() (Log, error) {
log := customLog{"GLOBAL", f.logFunc}
log := customLog{LogPrefixGlobal, f.logFunc, f.logErrorFunc}
return log, nil
}

func (f customLogFactory) CreateSessionLog(sessionID SessionID) (Log, error) {
log := customLog{sessionID.String(), f.logFunc}
log := customLog{sessionID.String(), f.logFunc, f.logErrorFunc}
return log, nil
}

// NewCustomLogFactory creates an instance of LogFactory that
// logs messages and events using the provided log function
func NewCustomLogFactory(logFunc func(msg string, keysAndValues ...interface{})) LogFactory {
return customLogFactory{logFunc: logFunc}
func NewCustomLogFactory(
logFunc func(prefix, msg string, keysAndValues ...LogParam),
logErrorFunc func(prefix, msg string, err error, keysAndValues ...LogParam),
) LogFactory {
return customLogFactory{
logFunc: logFunc,
logErrorFunc: logErrorFunc,
}
}
28 changes: 28 additions & 0 deletions file_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,34 @@ func (l fileLog) OnEventf(format string, v ...interface{}) {
l.eventLogger.Printf(format, v...)
}

func (l fileLog) OnErrorEvent(message string, err error) {
l.eventLogger.Printf("%s: %+v", message, err)
}

func (l fileLog) OnEventParams(message string, v ...LogParam) {
var str string
for i, val := range v {
if i == 0 {
str += ": " + val.String()
} else {
str += ", " + val.String()
}
}
l.eventLogger.Printf("%s%s", message, str)
}

func (l fileLog) OnErrorEventParams(message string, err error, v ...LogParam) {
var str string
for i, val := range v {
if i == 0 {
str += ": " + val.String()
} else {
str += ", " + val.String()
}
}
l.eventLogger.Printf("%s, %+v%s", message, err, str)
}

type fileLogFactory struct {
globalLogPath string
sessionLogPaths map[SessionID]string
Expand Down
11 changes: 6 additions & 5 deletions in_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func (state inSession) Timeout(session *session, event internal.Event) (nextStat
session.peerTimer.Reset(time.Duration(float64(1.2) * float64(session.HeartBtInt)))
return pendingTimeout{state}
default:
session.log.OnEventf("receive event: %s, %v", state.String(), event)
session.log.OnEventParams("receive event",
LogString("sessionState", state.String()), LogObject("event", event))
}

return state
Expand All @@ -83,19 +84,19 @@ func (state inSession) handleLogout(session *session, msg *Message) (nextState s
session.log.OnEvent("Sending logout response")

if err := session.sendLogoutInReplyTo("", msg); err != nil {
session.logError(err)
session.logError("sendLogoutInReplyTo failed", err)
}
} else {
session.log.OnEvent("Received logout response")
}

if err := session.store.IncrNextTargetMsgSeqNum(); err != nil {
session.logError(err)
session.logError("logout increment MsgSeqNum failed", err)
}

if session.ResetOnLogout {
if err := session.dropAndReset(); err != nil {
session.logError(err)
session.logError("logout Reset failed", err)
}
}

Expand Down Expand Up @@ -211,7 +212,7 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int

msgs, err := session.store.GetMessages(beginSeqNo, endSeqNo)
if err != nil {
session.log.OnEventf("error retrieving messages from store: %s", err.Error())
session.log.OnErrorEvent("error retrieving messages from store", err)
return
}

Expand Down
8 changes: 4 additions & 4 deletions initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di

netConn, err := dialer.Dial("tcp", address)
if err != nil {
session.log.OnEventf("Failed to connect: %v", err)
session.log.OnErrorEvent("Failed to connect", err)
goto reconnect
} else if tlsConfig != nil {
// Unless InsecureSkipVerify is true, server name config is required for TLS
Expand All @@ -164,7 +164,7 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
}
tlsConn := tls.Client(netConn, tlsConfig)
if err = tlsConn.Handshake(); err != nil {
session.log.OnEventf("Failed handshake: %v", err)
session.log.OnErrorEvent("Failed handshake", err)
goto reconnect
}
netConn = tlsConn
Expand All @@ -173,7 +173,7 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
msgIn = make(chan fixIn)
msgOut = make(chan []byte)
if err := session.connect(msgIn, msgOut); err != nil {
session.log.OnEventf("Failed to initiate: %v", err)
session.log.OnErrorEvent("Failed to initiate", err)
goto reconnect
}

Expand All @@ -182,7 +182,7 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
go func() {
writeLoop(netConn, msgOut, session.log)
if err := netConn.Close(); err != nil {
session.log.OnEvent(err.Error())
session.log.OnErrorEvent("connection Close failed", err)
}
close(disconnected)
}()
Expand Down
2 changes: 1 addition & 1 deletion latent_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func (state latentState) FixMsgIn(session *session, msg *Message) (nextState ses
}

func (state latentState) Timeout(session *session, event internal.Event) (nextState sessionState) {
session.log.OnEventf("receive event: %s, %v", state.String(), event)
session.log.OnEventParams("receive event", LogString("sessionState", state.String()), LogObject("event", event))
return state
}

Expand Down

0 comments on commit 74939a4

Please sign in to comment.