Skip to content
Permalink
Browse files

bundle type guessing code

  • Loading branch information...
mlapshin committed Sep 22, 2018
1 parent 56d2784 commit b5ed854faf2ccc62cc3655917a50bc7062c1619d
Showing with 269 additions and 13 deletions.
  1. +2 −3 Gopkg.lock
  2. +1 −1 Gopkg.toml
  3. +5 −1 bulk.go
  4. +219 −8 load.go
  5. +37 −0 load_test.go
  6. +5 −0 main.go

Some generated files are not rendered by default. Learn more.

@@ -16,7 +16,7 @@

[[constraint]]
name = "github.com/urfave/cli"
version = "1.20.0"
revision = "934abfb2f102315b5794e15ebc7949e4ca253920"

[prune]
unused-packages = true
@@ -190,6 +190,7 @@ func startDlWorker(n uint, bars *mpb.Progress, jobs chan string, results chan in

go func() {
defer wg.Done()
client := &http.Client{}

for url := range jobs {
parsedURL, err := urlPkg.Parse(url)
@@ -208,7 +209,9 @@ func startDlWorker(n uint, bars *mpb.Progress, jobs chan string, results chan in
continue
}

resp, err := http.Get(url)
req, err := http.NewRequest("GET", url, nil)
req.Header.Add("Accept-Encoding", "gzip")
resp, err := client.Do(req)

if err != nil {
results <- errors.Wrap(err, "cannot perform HTTP request")
@@ -220,6 +223,7 @@ func startDlWorker(n uint, bars *mpb.Progress, jobs chan string, results chan in
}

contentLengthHeader := resp.Header.Get("Content-Length")

size, err := strconv.ParseInt(contentLengthHeader, 10, 64)

counterDecorator := decor.CountersKibiByte("%6.1f / %6.1f", decor.WCSyncWidth)
227 load.go
@@ -24,6 +24,15 @@ import (
"github.com/vbauerster/mpb/decor"
)

type bundleType int

const (
ndjsonBundleType bundleType = iota
fhirBundleType
singleResourceBundleType
unknownBundleType
)

type bundle interface {
Next() (map[string]interface{}, error)
Close()
@@ -46,6 +55,89 @@ type copyFromBundleSource struct {
fhirVersion string
}

func isCompleteJSONObject(s string) bool {
numBraces := 0
inString := false
escaped := false

for _, b := range s {
if !escaped {
if !inString {
if b == '{' {
numBraces = numBraces + 1
} else if b == '}' {
numBraces = numBraces - 1
} else if b == '"' {
inString = true
}
} else {
if b == '"' {
inString = false
} else if b == '\\' {
escaped = true
}
}
} else {
escaped = false
}
}

return numBraces == 0
}

func guessJSONBundleType(r io.Reader) (bundleType, error) {
iter := jsoniter.Parse(jsoniter.ConfigFastest, r, 32*1024)

if iter.WhatIsNext() != jsoniter.ObjectValue {
return unknownBundleType, fmt.Errorf("Expecting to get JSON object at the root of the resource")
}

for k := iter.ReadObject(); k != ""; k = iter.ReadObject() {
if k == "resourceType" {
rt := iter.ReadString()

if rt == "Bundle" {
return fhirBundleType, nil
} else if rt != "" {
return singleResourceBundleType, nil
}

return unknownBundleType, nil
}

iter.Skip()
}

return fhirBundleType, nil
}

func guessBundleType(f io.Reader) (bundleType, error) {
rdr := bufio.NewReader(f)
firstLine, err := rdr.ReadString('\n')

if err != nil {
if err == io.EOF {
// only one line is available
return guessJSONBundleType(strings.NewReader(firstLine))
}

return unknownBundleType, err
}

secondLine, err := rdr.ReadString('\n')

if err != nil && err != io.EOF {
return unknownBundleType, err
}

if isCompleteJSONObject(firstLine) && isCompleteJSONObject(secondLine) {
return ndjsonBundleType, nil
}

return guessJSONBundleType(io.MultiReader(strings.NewReader(firstLine),
strings.NewReader(secondLine), rdr))
}

func newCopyFromBundleSource(bndl bundle, fhirVersion string, cb loaderCb) *copyFromBundleSource {
s := new(copyFromBundleSource)

@@ -146,6 +238,95 @@ type multilineBundle struct {
curline int
}

type fhirBundle struct {
count int
fileName string
file *os.File
curline int
iter *jsoniter.Iterator
}

func (b *fhirBundle) Close() {
b.file.Close()
}

func (b *fhirBundle) Count() int {
return b.count
}

func (b *fhirBundle) Next() (map[string]interface{}, error) {
if !b.iter.ReadArray() {
return nil, io.EOF
}

entry := b.iter.Read()

if entry == nil {
return nil, b.iter.Error
}

entryMap, ok := entry.(map[string]interface{})

if !ok {
return nil, fmt.Errorf("got non-object value in the entries array")
}

res, ok := entryMap["resource"]

if !ok {
return nil, fmt.Errorf("cannot get entry.resource attribute")
}

resMap, ok := res.(map[string]interface{})

if !ok {
return nil, fmt.Errorf("got non-object value at entry.resource")
}

fmt.Printf("%v\n\n", resMap)

return resMap, nil
}

func newFhirBundle(fileName string) (*fhirBundle, error) {
var result fhirBundle
result.fileName = fileName

file, err := os.Open(fileName)

if err != nil {
return nil, err
}

result.file = file
result.iter = jsoniter.Parse(jsoniter.ConfigFastest, result.file, 32*1024)

err = goToEntriesInFhirBundle(result.iter)

if err != nil {
return nil, errors.Wrap(err, "cannot find `entry` key in the bundle")
}

linesCount, err := countEntriesInBundle(result.iter)

result.file.Seek(0, 0)
result.iter.Reset(result.file)

if err != nil {
return nil, errors.Wrap(err, "cannot reset fhir bundle iterator")
}

err = goToEntriesInFhirBundle(result.iter)

if err != nil {
return nil, errors.Wrap(err, "cannot find `entry` key in the bundle")
}

result.count = linesCount

return &result, nil
}

func (b *multilineBundle) Close() {
defer b.file.Close()

@@ -233,7 +414,7 @@ func newMultifileBundle(fileNames []string) (*multifileBundle, error) {
result.currentBndlIdx = -1

for _, fileName := range result.fileNames {
bndl, err := newMultilineBundle(fileName)
bndl, err := newFhirBundle(fileName)

if err != nil {
return nil, err
@@ -266,11 +447,11 @@ func (b *multifileBundle) Next() (map[string]interface{}, error) {
return nil, io.EOF
}

currentBndl, err := newMultilineBundle(b.fileNames[b.currentBndlIdx])
currentBndl, err := newFhirBundle(b.fileNames[b.currentBndlIdx])

if err != nil {
b.currentBndlIdx = b.currentBndlIdx + 1
return nil, err
return nil, errors.Wrap(err, "cannot create bundle")
}

b.currentBndl = currentBndl
@@ -284,7 +465,7 @@ func (b *multifileBundle) Next() (map[string]interface{}, error) {
b.currentBndl = nil
return b.Next()
}
return nil, err
return nil, errors.Wrap(err, "cannot read next entry from bundle")
}

return res, nil
@@ -325,6 +506,37 @@ func countLinesInReader(r io.Reader) (int, error) {
}
}

func goToEntriesInFhirBundle(iter *jsoniter.Iterator) error {
if iter.WhatIsNext() != jsoniter.ObjectValue {
return fmt.Errorf("Expecting to get JSON object at the root of the FHIR Bundle")
}

curAttr := iter.ReadObject()

for curAttr != "" {
if curAttr == "entry" && iter.WhatIsNext() == jsoniter.ArrayValue {
return nil
}

iter.Skip()

curAttr = iter.ReadObject()
}

return io.EOF
}

func countEntriesInBundle(iter *jsoniter.Iterator) (int, error) {
count := 0

for iter.ReadArray() {
count = count + 1
iter.Skip()
}

return count, nil
}

type copyLoader struct {
fhirVersion string
}
@@ -379,7 +591,6 @@ func (l *insertLoader) Load(db *pgx.Conn, bndl bundle, cb loaderCb) error {
}

if curResource%batchSize == 0 || curResource == totalCount-1 {
// PrintMemUsage()
batch.Send(context.Background(), nil)
batch.Close()

@@ -400,7 +611,7 @@ func (l *insertLoader) Load(db *pgx.Conn, bndl bundle, cb loaderCb) error {
return nil
}

func loadFiles(files []string, ldr loader, memUsage bool) error {
func loadNdjsonFiles(files []string, ldr loader, memUsage bool) error {
db := GetConnection(nil)
defer db.Close()

@@ -508,8 +719,8 @@ func LoadCommand(c *cli.Context) error {
f.Close()
}

return loadFiles(files, ldr, memUsage)
return loadNdjsonFiles(files, ldr, memUsage)
}

return loadFiles(c.Args(), ldr, memUsage)
return loadNdjsonFiles(c.Args(), ldr, memUsage)
}
@@ -0,0 +1,37 @@
package main

import (
"fmt"
"strings"
"testing"
)

var fileTypeCases = map[string]bundleType{
"{\"foo\": \"bar\"}\n{\"foo\": \"bar\"}": ndjsonBundleType,
"{\"foo\": \"{{\\\"}bar\"}\n{\"foo\": \"bar\"}": ndjsonBundleType,
"{\"foo\": \"{{\\\"}bar\",\n\n\"resourceType\": \"Bundle\"}": fhirBundleType,
"{\"foo\": \"bar\", \n\n\n\n\n \"resourceType\": \"Observation\"}": singleResourceBundleType,
"{\"foo\": \"{{\\\"}bar\", \"resourceType\": \"Patient\"}": singleResourceBundleType,
}

func TestGuessFileType(t *testing.T) {
i := 0
for str, tpe := range fileTypeCases {
i++

t.Run(fmt.Sprintf("File type case #%v", str), func(t *testing.T) {
bt, err := guessBundleType(strings.NewReader(str))

if err != nil {
t.Error(err)
}

if bt != tpe {
t.Logf("bundle type not matched (expected %v, got %v)", tpe, bt)
t.Fail()
}

})
}

}

0 comments on commit b5ed854

Please sign in to comment.
You can’t perform that action at this time.