Skip to content

Commit cc97957

Browse files
authored
Support passing GraphQL schema to bulk loader. (#5509)
Support passing a GraphQL schema file to the bulk loader. The bulk loader will generate the triples relevant triples expected by Dgraph. Related to DGRAPH-1283
1 parent 36be40c commit cc97957

File tree

4 files changed

+87
-7
lines changed

4 files changed

+87
-7
lines changed

dgraph/cmd/bulk/loader.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"log"
2828
"os"
2929
"path/filepath"
30+
"strconv"
3031
"sync"
3132
"time"
3233

@@ -47,6 +48,7 @@ type options struct {
4748
DataFiles string
4849
DataFormat string
4950
SchemaFile string
51+
GqlSchemaFile string
5052
OutDir string
5153
ReplaceOutDir bool
5254
TmpDir string
@@ -237,6 +239,9 @@ func (ld *loader) mapStage() {
237239
}
238240
x.Check(thr.Finish())
239241

242+
// Send the graphql triples
243+
ld.processGqlSchema(loadType)
244+
240245
close(ld.readerChunkCh)
241246
mapperWg.Wait()
242247

@@ -251,6 +256,51 @@ func (ld *loader) mapStage() {
251256
ld.xids = nil
252257
}
253258

259+
func (ld *loader) processGqlSchema(loadType chunker.InputFormat) {
260+
if ld.opt.GqlSchemaFile == "" {
261+
return
262+
}
263+
264+
f, err := os.Open(ld.opt.GqlSchemaFile)
265+
x.Check(err)
266+
defer f.Close()
267+
268+
key := ld.opt.EncryptionKey
269+
if !ld.opt.Encrypted {
270+
key = nil
271+
}
272+
r, err := enc.GetReader(key, f)
273+
x.Check(err)
274+
if filepath.Ext(ld.opt.GqlSchemaFile) == ".gz" {
275+
r, err = gzip.NewReader(r)
276+
x.Check(err)
277+
}
278+
279+
buf, err := ioutil.ReadAll(r)
280+
x.Check(err)
281+
282+
rdfSchema := `_:gqlschema <dgraph.type> "dgraph.graphql" .
283+
_:gqlschema <dgraph.graphql.xid> "dgraph.graphql.schema" .
284+
_:gqlschema <dgraph.graphql.schema> %s .
285+
`
286+
287+
jsonSchema := `{
288+
"dgraph.type": "dgraph.graphql",
289+
"dgraph.graphql.xid": "dgraph.graphql.schema",
290+
"dgraph.graphql.schema": %s
291+
}`
292+
293+
gqlBuf := &bytes.Buffer{}
294+
schema := strconv.Quote(string(buf))
295+
switch loadType {
296+
case chunker.RdfFormat:
297+
x.Check2(gqlBuf.Write([]byte(fmt.Sprintf(rdfSchema, schema))))
298+
case chunker.JsonFormat:
299+
x.Check2(gqlBuf.Write([]byte(fmt.Sprintf(jsonSchema, schema))))
300+
}
301+
ld.readerChunkCh <- gqlBuf
302+
}
303+
254304
func (ld *loader) reduceStage() {
255305
ld.prog.setPhase(reducePhase)
256306

dgraph/cmd/bulk/run.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func init() {
5656
"Location of *.rdf(.gz) or *.json(.gz) file(s) to load.")
5757
flag.StringP("schema", "s", "",
5858
"Location of schema file.")
59+
flag.StringP("graphql_schema", "g", "", "Location of the GraphQL schema file.")
5960
flag.String("format", "",
6061
"Specify file format (rdf or json) instead of getting it from filename.")
6162
flag.Bool("encrypted", false,
@@ -116,6 +117,7 @@ func run() {
116117
DataFiles: Bulk.Conf.GetString("files"),
117118
DataFormat: Bulk.Conf.GetString("format"),
118119
SchemaFile: Bulk.Conf.GetString("schema"),
120+
GqlSchemaFile: Bulk.Conf.GetString("graphql_schema"),
119121
Encrypted: Bulk.Conf.GetBool("encrypted"),
120122
OutDir: Bulk.Conf.GetString("out"),
121123
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),

systest/bulk_live_cases_test.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ func TestBulkSingleUid(t *testing.T) {
343343
_:erin <name> "Erin" .
344344
_:frank <name> "Frank" .
345345
_:grace <name> "Grace" .
346-
`)
346+
`, "")
347347
defer s.cleanup()
348348

349349
// Ensures that the index keys are written to disk after commit.
@@ -474,7 +474,7 @@ func TestDeleteEdgeWithStar(t *testing.T) {
474474
475475
<0x2> <name> "Alice" .
476476
<0x3> <name> "Bob" .
477-
`)
477+
`, "")
478478
defer s.cleanup()
479479

480480
_, err := s.bulkCluster.client.NewTxn().Mutate(context.Background(), &api.Mutation{
@@ -497,6 +497,28 @@ func TestDeleteEdgeWithStar(t *testing.T) {
497497

498498
}
499499

500+
func TestGqlSchema(t *testing.T) {
501+
s := newBulkOnlySuite(t, "", "", "abc")
502+
defer s.cleanup()
503+
504+
t.Run("Get GraphQL schema", s.testCase(`
505+
{
506+
schema(func: has(dgraph.graphql.schema)) {
507+
dgraph.graphql.schema
508+
dgraph.graphql.xid
509+
dgraph.type
510+
}
511+
}`, `
512+
{
513+
"schema": [{
514+
"dgraph.graphql.schema": "abc",
515+
"dgraph.graphql.xid": "dgraph.graphql.schema",
516+
"dgraph.type": ["dgraph.graphql"]
517+
}]
518+
}`))
519+
520+
}
521+
500522
// TODO: Fix this later.
501523
func DONOTRUNTestGoldenData(t *testing.T) {
502524
if testing.Short() {
@@ -506,6 +528,7 @@ func DONOTRUNTestGoldenData(t *testing.T) {
506528
s := newSuiteFromFile(t,
507529
os.ExpandEnv("$GOPATH/src/github.com/dgraph-io/dgraph/systest/data/goldendata.schema"),
508530
os.ExpandEnv("$GOPATH/src/github.com/dgraph-io/dgraph/systest/data/goldendata.rdf.gz"),
531+
"",
509532
)
510533
defer s.cleanup()
511534

systest/bulk_live_fixture_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type suite struct {
5353

5454
type suiteOpts struct {
5555
schema string
56+
gqlSchema string
5657
rdfs string
5758
skipBulkLoader bool
5859
skipLiveLoader bool
@@ -85,7 +86,9 @@ func newSuiteInternal(t *testing.T, opts suiteOpts) *suite {
8586
s.checkFatal(ioutil.WriteFile(rdfFile, []byte(opts.rdfs), 0644))
8687
schemaFile := filepath.Join(rootDir, "schema.txt")
8788
s.checkFatal(ioutil.WriteFile(schemaFile, []byte(opts.schema), 0644))
88-
s.setup(schemaFile, rdfFile)
89+
gqlSchemaFile := filepath.Join(rootDir, "gql_schema.txt")
90+
s.checkFatal(ioutil.WriteFile(gqlSchemaFile, []byte(opts.gqlSchema), 0644))
91+
s.setup(schemaFile, rdfFile, gqlSchemaFile)
8992
return s
9093
}
9194

@@ -97,26 +100,27 @@ func newSuite(t *testing.T, schema, rdfs string) *suite {
97100
return newSuiteInternal(t, opts)
98101
}
99102

100-
func newBulkOnlySuite(t *testing.T, schema, rdfs string) *suite {
103+
func newBulkOnlySuite(t *testing.T, schema, rdfs, gqlSchema string) *suite {
101104
opts := suiteOpts{
102105
schema: schema,
106+
gqlSchema: gqlSchema,
103107
rdfs: rdfs,
104108
skipLiveLoader: true,
105109
}
106110
return newSuiteInternal(t, opts)
107111
}
108112

109-
func newSuiteFromFile(t *testing.T, schemaFile, rdfFile string) *suite {
113+
func newSuiteFromFile(t *testing.T, schemaFile, rdfFile, gqlSchemaFile string) *suite {
110114
if testing.Short() {
111115
t.Skip("Skipping system test with long runtime.")
112116
}
113117
s := &suite{t: t}
114118

115-
s.setup(schemaFile, rdfFile)
119+
s.setup(schemaFile, rdfFile, gqlSchemaFile)
116120
return s
117121
}
118122

119-
func (s *suite) setup(schemaFile, rdfFile string) {
123+
func (s *suite) setup(schemaFile, rdfFile, gqlSchemaFile string) {
120124
var (
121125
bulkDir = filepath.Join(rootDir, "bulk")
122126
liveDir = filepath.Join(rootDir, "live")
@@ -137,6 +141,7 @@ func (s *suite) setup(schemaFile, rdfFile string) {
137141
bulkCmd := exec.Command(testutil.DgraphBinaryPath(), "bulk",
138142
"-f", rdfFile,
139143
"-s", schemaFile,
144+
"-g", gqlSchemaFile,
140145
"--http", "localhost:"+strconv.Itoa(freePort(0)),
141146
"-j=1",
142147
"--store_xids=true",

0 commit comments

Comments
 (0)