Skip to content

Commit

Permalink
Support bulk loader use-case to import unencrypted export and encrypt…
Browse files Browse the repository at this point in the history
… the result. (#5209) (#5213)

Fixes DGRAPH-1254
(cherry-picked from commit d982be3)
  • Loading branch information
parasssh committed Apr 15, 2020
1 parent 9e8601b commit ddad5fa
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 13 deletions.
7 changes: 4 additions & 3 deletions chunker/chunk.go
Expand Up @@ -347,9 +347,10 @@ func slurpQuoted(r *bufio.Reader, out *bytes.Buffer) error {
}
}

// FileReader returns an open reader and file on the given file. Gzip-compressed input is detected
// and decompressed automatically even without the gz extension. The caller is responsible for
// calling the returned cleanup function when done with the reader.
// FileReader returns an open reader on the given file. Gzip-compressed input is detected
// and decompressed automatically even without the gz extension. The keyfile, if non-nil,
// is used to decrypt the file. The caller is responsible for calling the returned cleanup
// function when done with the reader.
func FileReader(file string, keyfile string) (rd *bufio.Reader, cleanup func()) {
var f *os.File
var err error
Expand Down
20 changes: 15 additions & 5 deletions dgraph/cmd/bulk/loader.go
Expand Up @@ -63,6 +63,7 @@ type options struct {
IgnoreErrors bool
CustomTokenizers string
NewUids bool
Encrypted bool

MapShards int
ReduceShards int
Expand Down Expand Up @@ -110,7 +111,7 @@ func newLoader(opt *options) *loader {
readerChunkCh: make(chan *bytes.Buffer, opt.NumGoroutines),
writeTs: getWriteTimestamp(zero),
}
st.schema = newSchemaStore(readSchema(opt.SchemaFile, opt.BadgerKeyFile), opt, st)
st.schema = newSchemaStore(readSchema(opt), opt, st)
ld := &loader{
state: st,
mappers: make([]*mapper, opt.NumGoroutines),
Expand All @@ -137,13 +138,18 @@ func getWriteTimestamp(zero *grpc.ClientConn) uint64 {
}
}

func readSchema(filename string, keyfile string) *schema.ParsedSchema {
f, err := os.Open(filename)
func readSchema(opt *options) *schema.ParsedSchema {
f, err := os.Open(opt.SchemaFile)
x.Check(err)
defer f.Close()

keyfile := opt.BadgerKeyFile
if !opt.Encrypted {
keyfile = ""
}
r, err := enc.GetReader(keyfile, f)
x.Check(err)
if filepath.Ext(filename) == ".gz" {
if filepath.Ext(opt.SchemaFile) == ".gz" {
r, err = gzip.NewReader(r)
x.Check(err)
}
Expand Down Expand Up @@ -194,7 +200,11 @@ func (ld *loader) mapStage() {
go func(file string) {
defer thr.Done(nil)

r, cleanup := chunker.FileReader(file, ld.opt.BadgerKeyFile)
keyfile := ld.opt.BadgerKeyFile
if !ld.opt.Encrypted {
keyfile = ""
}
r, cleanup := chunker.FileReader(file, keyfile)
defer cleanup()

chunk := chunker.NewChunker(loadType, 1000)
Expand Down
19 changes: 14 additions & 5 deletions dgraph/cmd/bulk/run.go
Expand Up @@ -58,18 +58,15 @@ func init() {
"Location of schema file.")
flag.String("format", "",
"Specify file format (rdf or json) instead of getting it from filename.")
flag.Bool("encrypted", false,
"Flag to indicate whether schema and data files are encrypted.")
flag.String("out", defaultOutDir,
"Location to write the final dgraph data directories.")
flag.Bool("replace_out", false,
"Replace out directory and its contents if it exists.")
flag.String("tmp", "tmp",
"Temp directory used to use for on-disk scratch space. Requires free space proportional"+
" to the size of the RDF file and the amount of indexing used.")
flag.String("encryption_key_file", "",
"The file that stores the encryption key. The key size must be 16, 24, or 32 bytes long. "+
"The key size determines the corresponding block size for AES encryption "+
"(AES-128, AES-192, and AES-256 respectively). Enterprise feature.")

flag.IntP("num_go_routines", "j", int(math.Ceil(float64(runtime.NumCPU())/4.0)),
"Number of worker threads to use. MORE THREADS LEAD TO HIGHER RAM USAGE.")
flag.Int64("mapoutput_mb", 64,
Expand Down Expand Up @@ -101,13 +98,21 @@ func init() {
"Comma separated list of tokenizer plugins")
flag.Bool("new_uids", false,
"Ignore UIDs in load files and assign new ones.")

// Options around how to set up Badger.
flag.String("encryption_key_file", "",
"The file that stores the encryption key. The key size must be 16/24/32 bytes long."+
" The key size indicates the chosen AES encryption (AES-128/192/256 respectively). "+
" This key is used to encrypt the output data directories and to decrypt the input "+
" schema and data files (if encrytped). Enterprise feature.")
}

func run() {
opt := options{
DataFiles: Bulk.Conf.GetString("files"),
DataFormat: Bulk.Conf.GetString("format"),
SchemaFile: Bulk.Conf.GetString("schema"),
Encrypted: Bulk.Conf.GetBool("encrypted"),
OutDir: Bulk.Conf.GetString("out"),
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),
TmpDir: Bulk.Conf.GetString("tmp"),
Expand Down Expand Up @@ -137,6 +142,10 @@ func run() {
fmt.Printf("Cannot enable encryption: %s", x.ErrNotSupported)
os.Exit(1)
}
if opt.Encrypted && opt.BadgerKeyFile == "" {
fmt.Printf("Must use --encryption_key_file option with --encrypted option.\n")
os.Exit(1)
}
if opt.SchemaFile == "" {
fmt.Fprint(os.Stderr, "Schema file must be specified.\n")
os.Exit(1)
Expand Down

0 comments on commit ddad5fa

Please sign in to comment.