Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge quickfixgo 20220626 #58

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions field_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package quickfix
import (
"bytes"
"sort"
"sync"
"time"
)

Expand Down Expand Up @@ -39,6 +40,7 @@ func (t tagSort) Less(i, j int) bool { return t.compare(t.tags[i], t.tags[j]) }
type FieldMap struct {
tagLookup map[Tag]field
tagSort
rwLock *sync.RWMutex
}

// ascending tags
Expand All @@ -49,12 +51,16 @@ func (m *FieldMap) init() {
}

func (m *FieldMap) initWithOrdering(ordering tagOrder) {
m.rwLock = &sync.RWMutex{}
m.tagLookup = make(map[Tag]field)
m.compare = ordering
}

//Tags returns all of the Field Tags in this FieldMap
func (m FieldMap) Tags() []Tag {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

tags := make([]Tag, 0, len(m.tagLookup))
for t := range m.tagLookup {
tags = append(tags, t)
Expand All @@ -70,12 +76,18 @@ func (m FieldMap) Get(parser Field) MessageRejectError {

//Has returns true if the Tag is present in this FieldMap
func (m FieldMap) Has(tag Tag) bool {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

_, ok := m.tagLookup[tag]
return ok
}

//GetField parses of a field with Tag tag. Returned reject may indicate the field is not present, or the field value is invalid.
func (m FieldMap) GetField(tag Tag, parser FieldValueReader) MessageRejectError {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

f, ok := m.tagLookup[tag]
if !ok {
return ConditionallyRequiredFieldMissing(tag)
Expand All @@ -90,6 +102,9 @@ func (m FieldMap) GetField(tag Tag, parser FieldValueReader) MessageRejectError

//GetBytes is a zero-copy GetField wrapper for []bytes fields
func (m FieldMap) GetBytes(tag Tag) ([]byte, MessageRejectError) {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

f, ok := m.tagLookup[tag]
if !ok {
return nil, ConditionallyRequiredFieldMissing(tag)
Expand Down Expand Up @@ -124,6 +139,9 @@ func (m FieldMap) GetInt(tag Tag) (int, MessageRejectError) {

//GetTime is a GetField wrapper for utc timestamp fields
func (m FieldMap) GetTime(tag Tag) (t time.Time, err MessageRejectError) {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

bytes, err := m.GetBytes(tag)
if err != nil {
return
Expand All @@ -148,6 +166,9 @@ func (m FieldMap) GetString(tag Tag) (string, MessageRejectError) {

//GetGroup is a Get function specific to Group Fields.
func (m FieldMap) GetGroup(parser FieldGroupReader) MessageRejectError {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

f, ok := m.tagLookup[parser.Tag()]
if !ok {
return ConditionallyRequiredFieldMissing(parser.Tag())
Expand Down Expand Up @@ -193,6 +214,9 @@ func (m *FieldMap) SetString(tag Tag, value string) *FieldMap {

//Clear purges all fields from field map
func (m *FieldMap) Clear() {
m.rwLock.Lock()
defer m.rwLock.Unlock()

m.tags = m.tags[0:0]
for k := range m.tagLookup {
delete(m.tagLookup, k)
Expand All @@ -201,6 +225,9 @@ func (m *FieldMap) Clear() {

//CopyInto overwrites the given FieldMap with this one
func (m *FieldMap) CopyInto(to *FieldMap) {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

to.tagLookup = make(map[Tag]field)
for tag, f := range m.tagLookup {
clone := make(field, 1)
Expand All @@ -218,6 +245,9 @@ func (m *FieldMap) DeleteTag(tag Tag) {
}

func (m *FieldMap) add(f field) {
m.rwLock.Lock()
defer m.rwLock.Unlock()

t := fieldTag(f)
if _, ok := m.tagLookup[t]; !ok {
m.tags = append(m.tags, t)
Expand All @@ -227,6 +257,9 @@ func (m *FieldMap) add(f field) {
}

func (m *FieldMap) getOrCreate(tag Tag) field {
m.rwLock.Lock()
defer m.rwLock.Unlock()

if f, ok := m.tagLookup[tag]; ok {
f = f[:1]
return f
Expand All @@ -247,6 +280,9 @@ func (m *FieldMap) Set(field FieldWriter) *FieldMap {

//SetGroup is a setter specific to group fields
func (m *FieldMap) SetGroup(field FieldGroupWriter) *FieldMap {
m.rwLock.Lock()
defer m.rwLock.Unlock()

_, ok := m.tagLookup[field.Tag()]
if !ok {
m.tags = append(m.tags, field.Tag())
Expand All @@ -261,6 +297,9 @@ func (m *FieldMap) sortedTags() []Tag {
}

func (m FieldMap) write(buffer *bytes.Buffer) {
m.rwLock.Lock()
defer m.rwLock.Unlock()

for _, tag := range m.sortedTags() {
if f, ok := m.tagLookup[tag]; ok {
writeField(f, buffer)
Expand All @@ -269,6 +308,9 @@ func (m FieldMap) write(buffer *bytes.Buffer) {
}

func (m FieldMap) total() int {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

total := 0
for _, fields := range m.tagLookup {
for _, tv := range fields {
Expand All @@ -284,6 +326,9 @@ func (m FieldMap) total() int {
}

func (m FieldMap) length() int {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

length := 0
for _, fields := range m.tagLookup {
for _, tv := range fields {
Expand Down
37 changes: 0 additions & 37 deletions internal/buffer_pool.go

This file was deleted.

26 changes: 0 additions & 26 deletions message_pool.go

This file was deleted.

5 changes: 1 addition & 4 deletions parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@ import (
"errors"
"io"
"time"

"github.com/cryptogarageinc/quickfix-go/internal"
)

const (
defaultBufSize = 4096
)

var bufferPool internal.BufferPool

type parser struct {
//buffer is a slice of bigBuffer
Expand Down Expand Up @@ -145,7 +142,7 @@ func (p *parser) ReadMessage() (msgBytes *bytes.Buffer, err error) {
return
}

msgBytes = bufferPool.Get()
msgBytes = new(bytes.Buffer)
msgBytes.Reset()
msgBytes.Write(p.buffer[:index])
p.buffer = p.buffer[index:]
Expand Down
5 changes: 4 additions & 1 deletion repeating_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,13 @@ func (f RepeatingGroup) Write() []TagValue {

for _, group := range f.groups {
tags := group.sortedTags()

group.rwLock.RLock()
for _, tag := range tags {
if fields, ok := group.tagLookup[tag]; ok {
tvs = append(tvs, fields...)
}
}
group.rwLock.RUnlock()
}

return tvs
Expand Down Expand Up @@ -236,8 +237,10 @@ func (f *RepeatingGroup) Read(tv []TagValue) ([]TagValue, error) {
f.groups = append(f.groups, group)
}

group.rwLock.Lock()
group.tagLookup[tvRange[0].tag] = tvRange
prevTag = tvRange[0].tag
group.rwLock.Unlock()
}

if len(f.groups) != expectedGroupSize {
Expand Down
3 changes: 0 additions & 3 deletions resend_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ func (s resendState) FixMsgIn(session *session, msg *Message) (nextState session

delete(s.messageStash, targetSeqNum)

//return stashed message to pool
session.returnToPool(msg)

nextState = inSession{}.FixMsgIn(session, msg)
if !nextState.IsLoggedOn() {
return
Expand Down
9 changes: 0 additions & 9 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type session struct {
transportDataDictionary *datadictionary.DataDictionary
appDataDictionary *datadictionary.DataDictionary

messagePool
timestampPrecision TimestampPrecision
linkedAcceptor *Acceptor

Expand Down Expand Up @@ -671,14 +670,6 @@ type fixIn struct {
receiveTime time.Time
}

func (s *session) returnToPool(msg *Message) {
s.messagePool.Put(msg)
if msg.rawMessage != nil {
bufferPool.Put(msg.rawMessage)
msg.rawMessage = nil
}
}

func (s *session) onDisconnect() {
s.log.OnEvent("Disconnected")
if s.ResetOnDisconnect {
Expand Down
5 changes: 1 addition & 4 deletions session_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (sm *stateMachine) Incoming(session *session, m fixIn) {

session.log.OnIncoming(m.bytes.Bytes())

msg := session.messagePool.Get()
msg := NewMessage()
if err := ParseMessageWithDataDictionary(msg, m.bytes, session.transportDataDictionary, session.appDataDictionary); err != nil {
session.log.OnErrorEventParams("Msg Parse Error", err,
LogStringWithSingleQuote("fixInMsg", m.bytes.String()))
Expand All @@ -79,9 +79,6 @@ func (sm *stateMachine) Incoming(session *session, m fixIn) {
sm.fixMsgIn(session, msg)
}

if !msg.keepMessage {
session.returnToPool(msg)
}
session.peerTimer.Reset(time.Duration(float64(1.2) * float64(session.HeartBtInt)))
}

Expand Down