Skip to content
Permalink
Browse files

bundleFile abstraction to hide gzip/non-gzip differences

  • Loading branch information...
mlapshin committed Sep 24, 2018
1 parent b5ed854 commit dc3623506cbe369d129a6d91f4f18578db5ba7fc
Showing with 140 additions and 90 deletions.
  1. +139 −89 load.go
  2. +1 −1 load_test.go
228 load.go
@@ -26,6 +26,59 @@ import (

type bundleType int

type bundleFile struct {
file *os.File
gzr *gzip.Reader
}

func openFile(fileName string) (*bundleFile, error) {
result := new(bundleFile)

f, err := os.OpenFile(fileName, os.O_RDONLY, 0644)

if err != nil {
return nil, errors.Wrap(err, "cannot open bundle file")
}

result.file = f

gzr, err := gzip.NewReader(result.file)

if err != nil {
result.file.Seek(0, 0)
result.gzr = nil
} else {
result.gzr = gzr
}

return result, nil
}

func (bf *bundleFile) Read(p []byte) (n int, err error) {
if bf.gzr != nil {
return bf.gzr.Read(p)
}

return bf.file.Read(p)
}

func (bf *bundleFile) Close() {
defer bf.file.Close()

if bf.gzr != nil {
bf.gzr.Close()
}
}

func (bf *bundleFile) Rewind() {
bf.file.Seek(0, 0)

if bf.gzr != nil {
bf.gzr.Close()
bf.gzr.Reset(bf.file)
}
}

const (
ndjsonBundleType bundleType = iota
fhirBundleType
@@ -229,21 +282,19 @@ func (s *copyFromBundleSource) Err() error {
return s.err
}

type multilineBundle struct {
count int
fileName string
file *os.File
gzr *gzip.Reader
reader *bufio.Reader
curline int
type ndjsonBundle struct {
count int
file *bundleFile
gzr *gzip.Reader
reader *bufio.Reader
curline int
}

type fhirBundle struct {
count int
fileName string
file *os.File
curline int
iter *jsoniter.Iterator
count int
file *bundleFile
curline int
iter *jsoniter.Iterator
}

func (b *fhirBundle) Close() {
@@ -283,33 +334,24 @@ func (b *fhirBundle) Next() (map[string]interface{}, error) {
return nil, fmt.Errorf("got non-object value at entry.resource")
}

fmt.Printf("%v\n\n", resMap)

return resMap, nil
}

func newFhirBundle(fileName string) (*fhirBundle, error) {
func newFhirBundle(f *bundleFile) (*fhirBundle, error) {
var result fhirBundle
result.fileName = fileName

file, err := os.Open(fileName)

if err != nil {
return nil, err
}

result.file = file
result.file = f
result.iter = jsoniter.Parse(jsoniter.ConfigFastest, result.file, 32*1024)

err = goToEntriesInFhirBundle(result.iter)
err := goToEntriesInFhirBundle(result.iter)

if err != nil {
return nil, errors.Wrap(err, "cannot find `entry` key in the bundle")
}

linesCount, err := countEntriesInBundle(result.iter)

result.file.Seek(0, 0)
result.file.Rewind()
result.iter.Reset(result.file)

if err != nil {
@@ -327,19 +369,15 @@ func newFhirBundle(fileName string) (*fhirBundle, error) {
return &result, nil
}

func (b *multilineBundle) Close() {
defer b.file.Close()

if b.gzr != nil {
b.gzr.Close()
}
func (b *ndjsonBundle) Close() {
b.file.Close()
}

func (b *multilineBundle) Count() int {
func (b *ndjsonBundle) Count() int {
return b.count
}

func (b *multilineBundle) Next() (map[string]interface{}, error) {
func (b *ndjsonBundle) Next() (map[string]interface{}, error) {
line, err := b.reader.ReadBytes('\n')

iter := jsoniter.ConfigDefault.BorrowIterator(line)
@@ -360,40 +398,18 @@ func (b *multilineBundle) Next() (map[string]interface{}, error) {
return result.(map[string]interface{}), iter.Error
}

func newMultilineBundle(fileName string) (*multilineBundle, error) {
var result multilineBundle
result.fileName = fileName

file, err := os.Open(fileName)

if err != nil {
return nil, err
}

result.file = file

zr, err := gzip.NewReader(file)

if err == gzip.ErrHeader {
file.Seek(0, 0)
result.gzr = nil
result.reader = bufio.NewReader(result.file)
} else {
result.gzr = zr
result.reader = bufio.NewReader(zr)
}
func newNdjsonBundle(f *bundleFile) (*ndjsonBundle, error) {
var result ndjsonBundle
result.file = f
result.reader = bufio.NewReader(result.file)

linesCount, err := countLinesInReader(result.reader)
result.file.Seek(0, 0)

if err != nil {
return nil, err
return nil, errors.Wrap(err, "cannot count lines in ndjson bundle")
}

if result.gzr != nil {
result.gzr.Close()
result.gzr.Reset(result.file)
}
result.file.Rewind()

result.count = linesCount

@@ -402,26 +418,49 @@ func newMultilineBundle(fileName string) (*multilineBundle, error) {

type multifileBundle struct {
count int
fileNames []string
bundles []bundle
currentBndlIdx int
currentBndl bundle
}

func newMultifileBundle(fileNames []string) (*multifileBundle, error) {
var result multifileBundle
result.fileNames = fileNames
result.bundles = make([]bundle, 0, len(fileNames))
result.count = 0
result.currentBndlIdx = -1
result.currentBndlIdx = 0

for _, fileName := range fileNames {
f, err := openFile(fileName)

if err != nil {
fmt.Printf("Cannot open %s: %v\n", fileName, err)
}

bndlType, err := guessBundleType(f)

if err != nil {
fmt.Printf("Cannot determine type of %s: %v\n", fileName, err)
}

f.Rewind()

for _, fileName := range result.fileNames {
bndl, err := newFhirBundle(fileName)
var bndl bundle

if bndlType == ndjsonBundleType {
bndl, err = newNdjsonBundle(f)
} else if bndlType == fhirBundleType {
bndl, err = newFhirBundle(f)
} else {
fmt.Printf("cannot create bundle for %s\n", fileName)
}

if err != nil {
return nil, err
return nil, errors.Wrap(err, "cannot create bundle")
}

result.count = result.count + bndl.Count()
bndl.Close()
if bndl != nil {
result.bundles = append(result.bundles, bndl)
result.count = result.count + bndl.Count()
}
}

return &result, nil
@@ -432,39 +471,50 @@ func (b *multifileBundle) Count() int {
}

func (b *multifileBundle) Close() {
if b.currentBndl != nil {
b.currentBndl.Close()
b.currentBndl = nil
b.currentBndlIdx = -1
for _, bndl := range b.bundles {
if bndl != nil {
b.Close()
}
}

b.currentBndlIdx = -1
}

func (b *multifileBundle) Next() (map[string]interface{}, error) {
if b.currentBndl == nil {
b.currentBndlIdx = b.currentBndlIdx + 1
if b.currentBndlIdx >= len(b.bundles) {
return nil, io.EOF
}

if b.currentBndlIdx > len(b.fileNames)-1 {
return nil, io.EOF
}
currentBndl := b.bundles[b.currentBndlIdx]

currentBndl, err := newFhirBundle(b.fileNames[b.currentBndlIdx])
// if b.currentBndl == nil {
// b.currentBndlIdx = b.currentBndlIdx + 1

if err != nil {
b.currentBndlIdx = b.currentBndlIdx + 1
return nil, errors.Wrap(err, "cannot create bundle")
}
// if b.currentBndlIdx > len(b.fileNames)-1 {
// return nil, io.EOF
// }

b.currentBndl = currentBndl
}
// currentBndl, err := newFhirBundle(b.fileNames[b.currentBndlIdx])

res, err := b.currentBndl.Next()
// if err != nil {
// b.currentBndlIdx = b.currentBndlIdx + 1
// return nil, errors.Wrap(err, "cannot create bundle")
// }

// b.currentBndl = currentBndl
// }

res, err := currentBndl.Next()

if err != nil {
if err == io.EOF {
b.currentBndl.Close()
b.currentBndl = nil
currentBndl.Close()
b.bundles[b.currentBndlIdx] = nil
b.currentBndlIdx = b.currentBndlIdx + 1

return b.Next()
}

return nil, errors.Wrap(err, "cannot read next entry from bundle")
}

@@ -14,7 +14,7 @@ var fileTypeCases = map[string]bundleType{
"{\"foo\": \"{{\\\"}bar\", \"resourceType\": \"Patient\"}": singleResourceBundleType,
}

func TestGuessFileType(t *testing.T) {
func TestGuessBundleType(t *testing.T) {
i := 0
for str, tpe := range fileTypeCases {
i++

0 comments on commit dc36235

Please sign in to comment.
You can’t perform that action at this time.