Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Loader: Fix memory usage by JSON parser #3794

Merged
merged 21 commits into from Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ded891b
Create a new NQuads struct and move Parse under it.
manishrjain Aug 10, 2019
9ac7d50
Move all nquads to the new NQuads struct.
manishrjain Aug 10, 2019
a6047b1
Make parse tests work
manishrjain Aug 10, 2019
6832162
Code runs and all
manishrjain Aug 10, 2019
f4cf4be
Run GC every 5s
manishrjain Aug 10, 2019
39a00b3
GC every 10s
manishrjain Aug 10, 2019
b81b207
Set worker goroutines aka mappers to 1/4th of the number of cores.
manishrjain Aug 10, 2019
fa831be
Only run GC if it hadn't been run. Also mention the RAM usage being p…
manishrjain Aug 10, 2019
70eb295
Files moved to one directory to allow easier sharing of the code.
manishrjain Aug 11, 2019
b7a299b
Add batchSize option to Chunker and NQuadBuffer to simplify loader an…
manishrjain Aug 11, 2019
17b557b
Bring the edgraph/server.go code back. Move multiple RDF parsing to c…
manishrjain Aug 12, 2019
3f89c68
Hook up NQuadBuffer to RDFChunker.NQuads
manishrjain Aug 12, 2019
5e12649
Address golint issues
manishrjain Aug 12, 2019
948bfde
Fix the live loader test failure.
manishrjain Aug 12, 2019
3e25781
Don't think we need a for loop around chunker.Parse
manishrjain Aug 12, 2019
541c9be
Move FacetDelimiter to x package to avoid a cyclic import loop.
manishrjain Aug 12, 2019
297651f
Fix gql package test
manishrjain Aug 12, 2019
8f7f8cf
Fix a test failure by handling io.EOF
manishrjain Aug 12, 2019
1d1f529
Don't return io.EOF unnecessarily
manishrjain Aug 12, 2019
de862d1
Address comments by PR folks. Also, no need to handle io.EOF because …
manishrjain Aug 12, 2019
5c420fb
Address PR folks comments
manishrjain Aug 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
File renamed without changes.
56 changes: 34 additions & 22 deletions chunker/chunk.go
Expand Up @@ -28,10 +28,7 @@ import (
"strings"
"unicode"

"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgo/x"
"github.com/dgraph-io/dgraph/chunker/json"
"github.com/dgraph-io/dgraph/chunker/rdf"
"github.com/dgraph-io/dgraph/lex"

"github.com/pkg/errors"
Expand All @@ -42,14 +39,27 @@ type Chunker interface {
Begin(r *bufio.Reader) error
Chunk(r *bufio.Reader) (*bytes.Buffer, error)
End(r *bufio.Reader) error
Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error)
Parse(chunkBuf *bytes.Buffer) error
NQuads() *NQuadBuffer
}

type rdfChunker struct {
lexer *lex.Lexer
nqs *NQuadBuffer
}

type jsonChunker struct{}
func (rdfChunker) NQuads() *NQuadBuffer {
// TODO: Build this.
return nil
}

type jsonChunker struct {
Nqs *NQuadBuffer
}

func (jc *jsonChunker) NQuads() *NQuadBuffer {
return jc.Nqs
}

// InputFormat represents the multiple formats supported by Chunker.
type InputFormat byte
Expand All @@ -64,12 +74,17 @@ const (
)

// NewChunker returns a new chunker for the specified format.
func NewChunker(inputFormat InputFormat) Chunker {
func NewChunker(inputFormat InputFormat, batchSize int) Chunker {
switch inputFormat {
case RdfFormat:
return &rdfChunker{lexer: &lex.Lexer{}}
return &rdfChunker{
nqs: NewNQuadBuffer(batchSize),
lexer: &lex.Lexer{},
}
case JsonFormat:
return &jsonChunker{}
return &jsonChunker{
Nqs: NewNQuadBuffer(batchSize),
}
default:
panic("unknown input format")
}
Expand Down Expand Up @@ -117,28 +132,26 @@ func (c *rdfChunker) Chunk(r *bufio.Reader) (*bytes.Buffer, error) {
}

// Parse is not thread-safe. Only call it serially, because it reuses lexer object.
func (c *rdfChunker) Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error) {
func (c *rdfChunker) Parse(chunkBuf *bytes.Buffer) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receiver name c should be consistent with previous receiver name rc for rdfChunker (from golint)

if chunkBuf.Len() == 0 {
return nil, io.EOF
return io.EOF
}

nqs := make([]*api.NQuad, 0)
for chunkBuf.Len() > 0 {
str, err := chunkBuf.ReadString('\n')
if err != nil && err != io.EOF {
x.Check(err)
}

nq, err := rdf.Parse(str, c.lexer)
if err == rdf.ErrEmpty {
nq, err := ParseRDF(str, c.lexer)
if err == ErrEmpty {
continue // blank line or comment
} else if err != nil {
return nil, errors.Wrapf(err, "while parsing line %q", str)
return errors.Wrapf(err, "while parsing line %q", str)
}
nqs = append(nqs, &nq)
c.nqs.Push(&nq)
}

return nqs, nil
return nil
}

// RDF files don't require any special processing at the end of the file.
Expand Down Expand Up @@ -231,18 +244,17 @@ func (jsonChunker) Chunk(r *bufio.Reader) (*bytes.Buffer, error) {
return out, nil
}

func (jsonChunker) Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error) {
func (jc *jsonChunker) Parse(chunkBuf *bytes.Buffer) error {
if chunkBuf.Len() == 0 {
return nil, io.EOF
return io.EOF
}

nqs, err := json.Parse(chunkBuf.Bytes(), json.SetNquads)
err := jc.Nqs.ParseJSON(chunkBuf.Bytes(), SetNquads)
if err != nil && err != io.EOF {
x.Check(err)
}
chunkBuf.Reset()

return nqs, err
return err
}

func (jsonChunker) End(r *bufio.Reader) error {
Expand Down
8 changes: 4 additions & 4 deletions chunker/chunk_test.go
Expand Up @@ -46,7 +46,7 @@ func TestJSONLoadStart(t *testing.T) {
}

for _, test := range tests {
chunker := NewChunker(JsonFormat)
chunker := NewChunker(JsonFormat, 1000)
require.Error(t, chunker.Begin(bufioReader(test.json)), test.desc)
}
}
Expand All @@ -64,7 +64,7 @@ func TestJSONLoadReadNext(t *testing.T) {
{"[{}", "malformed array"},
}
for _, test := range tests {
chunker := NewChunker(JsonFormat)
chunker := NewChunker(JsonFormat, 1000)
reader := bufioReader(test.json)
require.NoError(t, chunker.Begin(reader), test.desc)

Expand Down Expand Up @@ -113,7 +113,7 @@ func TestJSONLoadSuccessFirst(t *testing.T) {
},
}
for _, test := range tests {
chunker := NewChunker(JsonFormat)
chunker := NewChunker(JsonFormat, 1000)
reader := bufioReader(test.json)
require.NoError(t, chunker.Begin(reader), test.desc)

Expand Down Expand Up @@ -176,7 +176,7 @@ func TestJSONLoadSuccessAll(t *testing.T) {
}`,
}

chunker := NewChunker(JsonFormat)
chunker := NewChunker(JsonFormat, 1000)
reader := bufioReader(testDoc)

var json *bytes.Buffer
Expand Down