Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(metadata): avoid using sync.Once #498

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func load(rootDmap *sync.Map, d *Directory, subPath, rootPath string) error {
}
// Mark this as a pending Fileinfo reference
d.datafile[leafPath] = new(io.TimeBucketInfo)
d.datafile[leafPath].IsRead = false
d.datafile[leafPath].IsInitialized = io.TbiNotInitialized
d.datafile[leafPath].Path = leafPath
yearFileBase := filepath.Base(leafPath)
yearString := yearFileBase[:len(yearFileBase)-4]
Expand Down
188 changes: 119 additions & 69 deletions utils/io/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"os"
"sync"
"sync/atomic"
"unsafe"

"fmt"
Expand All @@ -18,6 +19,13 @@ import (
const Headersize = 37024
const FileinfoVersion = int64(2.0)

type IsInitialized = uint32

var (
TbiInitialized IsInitialized = 1
TbiNotInitialized IsInitialized = 0
)

func nanosecondsInYear(year int) int64 {
start := time.Date(year, time.January, 1, 0, 0, 0, 0, time.Local)
end := time.Date(year+1, time.January, 1, 0, 0, 0, 0, time.Local)
Expand All @@ -30,12 +38,12 @@ func FileSize(tf time.Duration, year int, recordSize int) int64 {
}

type TimeBucketInfo struct {
// Year, Path and IsRead are all set on catalog startup
// Year, Path and IsInitialized are all set on catalog startup
Year int16
// Path is the absolute path to the data binary file.
// (e.g. "/project/marketstore/data/TEST/1Sec/Tick/2021.bin")
Path string
IsRead bool
Path string
IsInitialized IsInitialized

version int64
description string
Expand All @@ -52,7 +60,7 @@ type TimeBucketInfo struct {
elementNames []string
elementTypes []EnumElementType

once sync.Once
mu sync.Mutex
}

func AlignedSize(unalignedSize int) (alignedSize int) {
Expand All @@ -67,26 +75,62 @@ func AlignedSize(unalignedSize int) (alignedSize int) {
func NewTimeBucketInfo(tf utils.Timeframe, path, description string, year int16, dsv []DataShape, recordType EnumRecordType) (f *TimeBucketInfo) {
elementTypes, elementNames := CreateShapesForTimeBucketInfo(dsv)
f = &TimeBucketInfo{
version: FileinfoVersion,
Path: filepath.Join(path, strconv.Itoa(int(year))+".bin"),
IsRead: true,
timeframe: tf.Duration,
description: description,
Year: year,
nElements: int32(len(elementTypes)),
Year: year,
Path: filepath.Join(path, strconv.Itoa(int(year))+".bin"),
IsInitialized: 1,
version: FileinfoVersion,
description: description,
timeframe: tf.Duration,

nElements: int32(len(elementTypes)),
recordType: recordType,

elementTypes: elementTypes,
elementNames: elementNames,
recordType: recordType,
}
if f.recordType == FIXED {
f.recordLength = int32(AlignedSize(f.getFieldRecordLength())) + 8 // add an 8-byte epoch field
f.variableRecordLength = 0
} else if f.recordType == VARIABLE {
f.recordLength = 24 // Length of the indirect data pointer {index, offset, len}
f.variableRecordLength = 0
}
return f
}

func NewTimeBucketInfoFromFile(path string) (*TimeBucketInfo, error) {
header, err := readHeader(path)
if err != nil {
return nil, fmt.Errorf("read header of TimeBucketInfo path=%s:%w", path, err)
}

return NewTimeBucketInfoFromHeader(header, path), nil
}

// NewTimeBucketInfoFromHeader creates a TimeBucketInfo from a given Header
func NewTimeBucketInfoFromHeader(hp *Header, path string) *TimeBucketInfo {
f := &TimeBucketInfo{
Year: int16(hp.Year),
Path: filepath.Clean(path),
IsInitialized: TbiInitialized,
version: hp.Version,
description: string(bytes.Trim(hp.Description[:], "\x00")),
timeframe: time.Duration(hp.Timeframe),
nElements: int32(hp.NElements),
recordType: EnumRecordType(hp.RecordType),
recordLength: int32(hp.RecordLength),
variableRecordLength: 0,
elementNames: nil,
elementTypes: nil,
}
for i := 0; i < int(f.nElements); i++ {
baseName := string(bytes.Trim(hp.ElementNames[i][:], "\x00"))
f.elementNames = append(f.elementNames, baseName)
f.elementTypes = append(f.elementTypes, EnumElementType(hp.ElementTypes[i]))
}
return f
}

func CreateShapesForTimeBucketInfo(dsv []DataShape) (elementTypes []EnumElementType, elementNames []string) {
/*
Takes a datashape array and returns elementTypes and elementNames
Expand Down Expand Up @@ -125,11 +169,14 @@ func (f *TimeBucketInfo) getFieldRecordLength() (fieldRecordLength int) {

// GetDeepCopy returns a copy of this TimeBucketInfo.
func (f *TimeBucketInfo) GetDeepCopy() *TimeBucketInfo {
f.once.Do(f.initFromFile)
err := f.initFromFile()
if err != nil {

}
fcopy := TimeBucketInfo{
Year: f.Year,
Path: f.Path,
IsRead: f.IsRead,
IsInitialized: f.IsInitialized,
version: f.version,
description: f.description,
timeframe: f.timeframe,
Expand All @@ -145,62 +192,88 @@ func (f *TimeBucketInfo) GetDeepCopy() *TimeBucketInfo {
return &fcopy
}

func (f *TimeBucketInfo) initFromFile() {
if f.IsRead {
// initFromFile retrieves all TimeBucketInfo parameters from f.filepath
// if the tbi is not initialized.
// Context: When catalog.Directory is loaded, new TimeBucketInfo struct is lazily constructed
// with only f.filepath and f.isInitialized=io.TbiNotInitialized parameters
// so that it doesn't need to actually open files to get the params.
// This function is called when the TimeBucketInfo is actually used.
func (f *TimeBucketInfo) initFromFile() error {
if atomic.LoadUint32(&f.IsInitialized) == TbiInitialized {
// do nothing if we found it already done
return
return nil
}
if err := f.readHeader(f.Path); err != nil {
log.Fatal(err.Error())

f.mu.Lock()
defer f.mu.Unlock()

// if not initialized yet, read the information from the file
tbi, err := NewTimeBucketInfoFromFile(f.Path)
if err != nil {
return fmt.Errorf("failed to read TimeBucketInfo from file %s:%w", f.Path, err)
}
f.IsRead = true
f.Year = tbi.Year
f.Path = tbi.Path
f.version = tbi.version
f.description = tbi.description
f.timeframe = tbi.timeframe
f.nElements = tbi.nElements
f.recordType = tbi.recordType
f.recordLength = tbi.recordLength
f.variableRecordLength = tbi.variableRecordLength
f.elementNames = make([]string, len(tbi.elementNames))
f.elementTypes = make([]EnumElementType, len(tbi.elementTypes))
copy(f.elementNames, tbi.elementNames)
copy(f.elementTypes, tbi.elementTypes)

atomic.StoreUint32(&f.IsInitialized, TbiInitialized)
return nil
}

// GetVersion returns the version number for the given TimeBucketInfo.
func (f *TimeBucketInfo) GetVersion() int64 {
f.once.Do(f.initFromFile)
f.initFromFile()
return f.version
}

// GetDescription returns the description string contained in the
// given TimeBucketInfo.
func (f *TimeBucketInfo) GetDescription() string {
f.once.Do(f.initFromFile)
f.initFromFile()
return f.description
}

// GetTimeframe returns the duration for which each record's data is valid.
// This means for 1Min resolution data, GetTimeframe will return time.Minute.
func (f *TimeBucketInfo) GetTimeframe() time.Duration {
f.once.Do(f.initFromFile)
f.initFromFile()
return f.timeframe
}

// GetIntervals returns the number of records that can fit in a 24 hour day.
func (f *TimeBucketInfo) GetIntervals() int64 {
f.once.Do(f.initFromFile)
return int64(utils.Day.Nanoseconds()) / int64(f.timeframe.Nanoseconds())
f.initFromFile()
return utils.Day.Nanoseconds() / f.timeframe.Nanoseconds()
}

// GetNelements returns the number of elements (data fields) for a given
// TimeBucketInfo.
func (f *TimeBucketInfo) GetNelements() int32 {
f.once.Do(f.initFromFile)
f.initFromFile()
return f.nElements
}

// GetRecordLength returns the length of a single record in the file described
// by the given TimeBucketInfo
func (f *TimeBucketInfo) GetRecordLength() int32 {
f.once.Do(f.initFromFile)
f.initFromFile()
return f.recordLength
}

// GetVariableRecordLength returns the length of a single record for a variable
// length TimeBucketInfo file
func (f *TimeBucketInfo) GetVariableRecordLength() int32 {
f.once.Do(f.initFromFile)

f.initFromFile()
if f.recordType == VARIABLE && f.variableRecordLength == 0 {
// Variable records use the raw element sizes plus a 4-byte trailer for interval ticks
f.variableRecordLength = int32(f.getFieldRecordLength()) + 4 // Variable records have a 4-byte trailer
Expand All @@ -211,21 +284,21 @@ func (f *TimeBucketInfo) GetVariableRecordLength() int32 {
// GetRecordType returns the type of the file described by the TimeBucketInfo
// as an EnumRecordType
func (f *TimeBucketInfo) GetRecordType() EnumRecordType {
f.once.Do(f.initFromFile)
f.initFromFile()
return f.recordType
}

// GetElementNames returns the field names contained by the file described by
// the given TimeBucketInfo
func (f *TimeBucketInfo) GetElementNames() []string {
f.once.Do(f.initFromFile)
f.initFromFile()
return f.elementNames
}

// GetElementTypes returns the field types contained by the file described by
// the given TimeBucketInfo
func (f *TimeBucketInfo) GetElementTypes() []EnumElementType {
f.once.Do(f.initFromFile)
f.initFromFile()
return f.elementTypes
}

Expand All @@ -241,76 +314,52 @@ func (f *TimeBucketInfo) SetElementTypes(newTypes []EnumElementType) error {
return nil
}

func (f *TimeBucketInfo) readHeader(path string) (err error) {
func readHeader(path string) (header *Header, err error) {
file, err := os.Open(path)
if err != nil {
log.Error("Failed to open file: %v - Error: %v", path, err)
return err
return nil, err
}
defer file.Close()
var buffer [Headersize]byte
header := (*Header)(unsafe.Pointer(&buffer))
header = (*Header)(unsafe.Pointer(&buffer))
// Read the top part of the header, which is not dependent on the number of elements
n, err := file.Read(buffer[:312])
if err != nil || n != 312 {
log.Error("Failed to read header part1 from file: %v - Error: %v", path, err)
return err
return nil, fmt.Errorf("failed to read header part1 from file: %s - Error: %w", path, err)
}

// Second part of read element names
secondReadSize := header.NElements * 32
n, err = file.Read(buffer[312 : 312+secondReadSize])
if err != nil || n != int(secondReadSize) {
log.Error("Failed to read header part2 from file: %v - Error: %v", path, err)
return err
return nil, fmt.Errorf("failed to read header part2 from file: %s - Error: %w", path, err)
}
// Read past empty element name space
file.Seek(1024*32-secondReadSize, os.SEEK_CUR)
_, err = file.Seek(1024*32-secondReadSize, os.SEEK_CUR)
if err != nil {
return nil, fmt.Errorf("failed to seek file %v to read past empty element name space: %w", file, err)
}

// Read element types
start := 312 + 1024*32
n, err = file.Read(buffer[start : start+int(header.NElements)])
if err != nil || n != int(header.NElements) {
log.Error("Failed to read header part3 from file: %v - Error: %v", path, err)
return err
return nil, fmt.Errorf("failed to read header part3 from file: %s - Error: %w", path, err)
}
if EnumRecordType(header.RecordType) == VARIABLE {
// Read to end of header
start += int(header.NElements)
n, err = file.Read(buffer[start:Headersize])
if err != nil || n != (Headersize-start) {
log.Error("Failed to read header part4 from file: %v - Error: %v", path, err)
return err
return nil, fmt.Errorf("failed to read header part4 from file: %s - Error: %w", path, err)
}
}
f.load(header, path)
return nil
}

func (f *TimeBucketInfo) load(hp *Header, path string) {
f.version = hp.Version
f.description = string(bytes.Trim(hp.Description[:], "\x00"))
f.Year = int16(hp.Year)
f.Path = filepath.Clean(path)
f.IsRead = true
f.timeframe = time.Duration(hp.Timeframe)
f.nElements = int32(hp.NElements)
f.recordLength = int32(hp.RecordLength)
f.recordType = EnumRecordType(hp.RecordType)
f.elementNames = nil
f.elementTypes = nil
for i := 0; i < int(f.nElements); i++ {
baseName := string(bytes.Trim(hp.ElementNames[i][:], "\x00"))
f.elementNames = append(f.elementNames, baseName)
f.elementTypes = append(f.elementTypes, EnumElementType(hp.ElementTypes[i]))
}
}

// NewTimeBucketInfoFromHeader creates a TimeBucketInfo from a given Header
func NewTimeBucketInfoFromHeader(hp *Header, path string) *TimeBucketInfo {
tbi := new(TimeBucketInfo)
tbi.load(hp, path)
return tbi
return header, nil
}

// Header is the on-disk byte representation of the file header
Expand Down Expand Up @@ -354,7 +403,8 @@ func (hp *Header) Load(f *TimeBucketInfo) {
hp.RecordLength = int64(f.GetRecordLength())
hp.RecordType = int64(f.GetRecordType())
for i := 0; i < int(hp.NElements); i++ {
copy(hp.ElementNames[i][:], f.GetElementNames()[i])
copy(hp.ElementNames[i][:],
f.GetElementNames()[i])
hp.ElementTypes[i] = byte(f.GetElementTypes()[i])
}
hp.RecordType = int64(f.GetRecordType())
Expand Down