Navigation Menu

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New txn operation #3197

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
187a485
saving unfinished state
Mar 8, 2019
275ccd0
parser mutation with txn working, saving state
Mar 12, 2019
ae49bf8
lex/lexer.go: remove debug print
Mar 13, 2019
160bedf
gql/parser_mutation.go: cleanup txn.query parsing code
Mar 13, 2019
9c143e3
gql/parser_mutation.go: return early and avoid break
Mar 13, 2019
d8f7df2
gql/parser_mutation.go: parse query and examine error
Mar 14, 2019
1ed0c26
chunker/rdf/parse.go: add varname to nquad
Mar 23, 2019
a54a3e2
edgraph/server.go: add txn query into mutation and get uid vars
Mar 23, 2019
9a48c97
gql/parser.go: remote unbalanced needs/defined vars test for now
Mar 23, 2019
80d2119
gql/parser_mutation.go: move query processing to Server.doMutate()
Mar 23, 2019
f5a3ce5
query/mutation.go: add subject substitution from txn query vars uid f…
Mar 23, 2019
0cf1444
query/query.go: add method to get uids after query processing without…
Mar 23, 2019
5a85690
gql/parser_test.go: move ParseMutation tests to their own file
Mar 27, 2019
5163ce5
gql/parser_mutation_test.go: new file for all ParseMutation tests
Mar 27, 2019
c25c7a3
gql/parser_mutation.go: fix parse errors
Mar 27, 2019
8abb00a
gql/state.go: eat space before txn query block
Mar 27, 2019
51faac5
add reminders
Mar 27, 2019
f1c8ceb
edgraph/server.go: add check and rename to CondQuery
Mar 28, 2019
9688e32
gql/parser_mutation.go: rename TxnQuery to CondQuery, update tests
Mar 28, 2019
de9bf43
query/mutation.go: fix panic if no query vars were defined
Mar 28, 2019
1bfedfd
query/mutation.go: fix panic on uninitialized query vars
Mar 28, 2019
22e171e
edgraph/server.go: add cond query in error for debugging
Mar 28, 2019
cda25f5
query/query5_test.go: add txn mutation tests
Mar 28, 2019
cd5c72c
edgraph/server.go: minor change
Mar 28, 2019
639e2f8
Merge branch 'master' of github.com:/dgraph-io/dgraph into srfrog/iss…
Apr 1, 2019
bd16a1d
edgraph/server.go: move query conditional processing out of doMutate
Apr 2, 2019
e1ea674
edgraph/server.go: use assertion to check for startTs, it should neve…
Apr 2, 2019
2e7e196
edgraph/server.go: remove error when no queryVars are returned,
Apr 4, 2019
1a28735
gql/parser.go: add comment to checkDependency about change that affec…
Apr 4, 2019
808db4f
gql/state.go: rename lexTextQuery to lexTextCondQuery
Apr 4, 2019
af59259
gql/parser_mutation.go: rename itemMutationContent to itemMutationOpC…
Apr 4, 2019
e730520
gql/state.go: make error a bit clearer when unbalanced braces are det…
Apr 4, 2019
754fa49
gql/parser_mutation_test.go: update test with new error text
Apr 4, 2019
ebd0c23
query/mutation.go: add support for n-quad object uid vars
Apr 5, 2019
b8c727a
chunker/rdf/state.go: emit specific var name for subject or object
Apr 5, 2019
b9d6384
chunker/rdf/parse.go: add support for object vars in mutation, update…
Apr 5, 2019
5421819
gql/parser_mutation_test.go: update tests for subject vars and add ob…
Apr 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 11 additions & 5 deletions chunker/rdf/parse.go
Expand Up @@ -78,7 +78,13 @@ L:
return rnq, x.Errorf("Expected '(', found: %s", item.Val)
}
it.Next()
if item = it.Item(); item.Typ != itemVarName {
item = it.Item()
switch item.Typ {
case itemSubjectVarName:
rnq.SubjectVar = item.Val
case itemObjectVarName:
rnq.ObjectVar = item.Val
default:
return rnq, x.Errorf("Expected variable name, found: %s", item.Val)
}

Expand Down Expand Up @@ -183,14 +189,14 @@ L:
if seenOval && rnq.ObjectValue == nil {
rnq.ObjectValue = &api.Value{Val: &api.Value_DefaultVal{DefaultVal: oval}}
}
if len(rnq.Subject) == 0 || len(rnq.Predicate) == 0 {
if (len(rnq.Subject) == 0 && len(rnq.SubjectVar) == 0) || len(rnq.Predicate) == 0 {
return rnq, x.Errorf("Empty required fields in NQuad. Input: [%s]", line)
}
if len(rnq.ObjectId) == 0 && rnq.ObjectValue == nil {
if len(rnq.ObjectId) == 0 && rnq.ObjectValue == nil && len(rnq.ObjectVar) == 0 {
return rnq, x.Errorf("No Object in NQuad. Input: [%s]", line)
}
if !sane(rnq.Subject) || !sane(rnq.Predicate) ||
!sane(rnq.ObjectId) || !sane(rnq.Label) {
if (!sane(rnq.Subject) && len(rnq.SubjectVar) == 0) || !sane(rnq.Predicate) ||
(!sane(rnq.ObjectId) && len(rnq.ObjectVar) == 0) || !sane(rnq.Label) {
return rnq, x.Errorf("NQuad failed sanity check:%+v", rnq)
}

Expand Down
41 changes: 23 additions & 18 deletions chunker/rdf/state.go
Expand Up @@ -26,23 +26,24 @@ import (

// The constants represent different types of lexed Items possible for an rdf N-Quad.
const (
itemText lex.ItemType = 5 + iota // plain text
itemSubject // subject, 6
itemPredicate // predicate, 7
itemObject // object, 8
itemLabel // label, 9
itemLiteral // literal, 10
itemLanguage // language, 11
itemObjectType // object type, 12
itemValidEnd // end with dot, 13
itemComment // comment, 14
itemComma // comma, 15
itemEqual // equal, 16
itemLeftRound // '(', 17
itemRightRound // ')', 18
itemStar // *, 19
itemVarKeyword // var, 20
itemVarName // 21
itemText lex.ItemType = 5 + iota // plain text
itemSubject // subject, 6
itemPredicate // predicate, 7
itemObject // object, 8
itemLabel // label, 9
itemLiteral // literal, 10
itemLanguage // language, 11
itemObjectType // object type, 12
itemValidEnd // end with dot, 13
itemComment // comment, 14
itemComma // comma, 15
itemEqual // equal, 16
itemLeftRound // '(', 17
itemRightRound // ')', 18
itemStar // *, 19
itemVarKeyword // var, 20
itemSubjectVarName // 21
itemObjectVarName // 22
)

// These constants keep a track of the depth while parsing an rdf N-Quad.
Expand Down Expand Up @@ -459,7 +460,11 @@ func lexVariable(l *lex.Lexer) lex.StateFn {
break
}
}
l.Emit(itemVarName)
if l.Depth == atSubject {
l.Emit(itemSubjectVarName)
} else {
l.Emit(itemObjectVarName)
}

l.IgnoreRun(isSpace)

Expand Down
67 changes: 66 additions & 1 deletion edgraph/server.go
Expand Up @@ -384,6 +384,60 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign
return s.doMutate(ctx, mu)
}

// doCondQuery processes a conditional query within the same transaction of a mutation.
// Returns a map with the vars created from the query, otherwise nil and an error.
func doCondQuery(ctx context.Context, l *query.Latency, req *api.Request,
) (map[string][]string, error) {
var queryVars map[string][]string

if ctx.Err() != nil {
return nil, ctx.Err()
}

// Nothing to do
if req.Query == "" {
return queryVars, nil
}
x.AssertTruef(req.StartTs != 0, "Transaction timestamp is zero")

parsedReq, err := gql.Parse(gql.Request{
Str: req.Query,
Variables: make(map[string]string),
})
if err != nil {
return nil, err
}

if err = validateQuery(parsedReq.Query); err != nil {
return nil, err
}

qr := query.QueryRequest{
Latency: l,
GqlQuery: &parsedReq,
ReadTs: req.StartTs,
}
if err = qr.ProcessQuery(ctx); err != nil {
return nil, x.Wrapf(err, "while processing query: %q", req.Query)
}

queryVars = qr.GetUids()
if len(queryVars) == 0 {
glog.V(2).Infof("No variables defined in conditional query: %q", req.Query)
return nil, nil
}
glog.V(3).Infof("CondQuery: qr=%+v vars=%v", qr, queryVars)

// NOTE: This is a temporary check until the mutation logic is defined clearly.
for varName, uids := range queryVars {
if len(uids) > 1 {
return nil, x.Errorf("Too many Uids for %q, expected 1 got %d", varName, len(uids))
}
}

return queryVars, nil
}

func (s *Server) doMutate(ctx context.Context, mu *api.Mutation) (resp *api.Assigned, rerr error) {
if ctx.Err() != nil {
return nil, ctx.Err()
Expand Down Expand Up @@ -433,6 +487,17 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation) (resp *api.Assi

var l query.Latency
l.Start = time.Now()

// Parse query and process
queryVars, err := doCondQuery(ctx, &l, &api.Request{
StartTs: mu.StartTs,
Query: mu.CondQuery,
})
// Quit early if we got an error or expected queryVars and got none.
if err != nil || (mu.CondQuery != "" && queryVars == nil) {
return resp, err
}

gmu, err := parseMutationObject(mu)
if err != nil {
return resp, err
Expand All @@ -449,7 +514,7 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation) (resp *api.Assi
}
}()

newUids, err := query.AssignUids(ctx, gmu.Set)
newUids, err := query.AssignUids(ctx, gmu.Set, queryVars)
if err != nil {
return resp, err
}
Expand Down
8 changes: 7 additions & 1 deletion gql/parser.go
Expand Up @@ -586,8 +586,14 @@ func checkDependency(vl []*Vars) error {
}

if len(defines) > len(needs) {
return x.Errorf("Some variables are defined but not used\nDefined:%v\nUsed:%v\n",
// NOTE: This error prevents conditional mutations to work, because we expect variables
// created from a query that are used in a following mutation block. For now,
// we notify the user in the output. I will revisit this later.
// return x.Errorf("Some variables are defined but not used\nDefined:%v\nUsed:%v\n",
// defines, needs)
glog.Warningf("Some variables are defined but not used\nDefined:%v\nUsed:%v\n",
defines, needs)
return nil
}

if len(defines) < len(needs) {
Expand Down
62 changes: 60 additions & 2 deletions gql/parser_mutation.go
Expand Up @@ -24,16 +24,35 @@ import (
"github.com/dgraph-io/dgraph/x"
)

// ParseMutation parses a block of text into a mutation.
// TODO: add examples.
// Returns an object with a mutation or a transaction query with mutation, otherwise returns
// nil with an error.
func ParseMutation(mutation string) (*api.Mutation, error) {
lexer := lex.NewLexer(mutation)
lexer.Run(lexInsideMutation)
it := lexer.NewIterator()
var mu api.Mutation
var inTxn bool

if !it.Next() {
return nil, errors.New("Invalid mutation")
}
item := it.Item()
// Inside txn{ ... } block.
// Here we switch into txn mode and try to fetch any query inside a txn block.
// If no query text is found, this txn is a no-op.
if item.Typ == itemMutationTxn {
var err error
// Get the query text: txn{ query { ... }}
mu.CondQuery, err = parseMutationCondQuery(it)
if err != nil {
return nil, err
}
inTxn = true
item = it.Item()
// fallthrough to regular mutation parsing.
}
if item.Typ != itemLeftCurl {
return nil, x.Errorf("Expected { at the start of block. Got: [%s]", item.Val)
}
Expand All @@ -45,7 +64,7 @@ func ParseMutation(mutation string) (*api.Mutation, error) {
}
if item.Typ == itemRightCurl {
// mutations must be enclosed in a single block.
if it.Next() && it.Item().Typ != lex.ItemEOF {
if !inTxn && it.Next() && it.Item().Typ != lex.ItemEOF {
return nil, x.Errorf("Unexpected %s after the end of the block.", it.Item().Val)
}
return &mu, nil
Expand All @@ -59,6 +78,45 @@ func ParseMutation(mutation string) (*api.Mutation, error) {
return nil, x.Errorf("Invalid mutation.")
}

// parseMutationCondQuery gets the text inside a txn query block. It is possible that there's
// no query to be found, in that case it's the caller's responsbility to fail.
// Returns the query text if any is found, otherwise an empty string with error.
func parseMutationCondQuery(it *lex.ItemIterator) (string, error) {
var query string
var parse bool
for it.Next() {
item := it.Item()
switch item.Typ {
case itemLeftCurl:
continue
case itemMutationOpContent:
if !parse {
return "", x.Errorf("Invalid query block.")
}
query = item.Val
case itemMutationTxnOp:
if item.Val == "query" {
if parse {
return "", x.Errorf("Too many query blocks in txn")
}
parse = true
continue
}
// TODO: mutation conditionals
if item.Val != "mutation" {
return "", x.Errorf("Invalid txn operator %q.", item.Val)
}
if !it.Next() {
return "", errors.New("Invalid mutation block")
}
return query, nil
default:
return "", x.Errorf("Unexpected %q inside of txn block.", item.Val)
}
}
return query, nil
}

// parseMutationOp parses and stores set or delete operation string in Mutation.
func parseMutationOp(it *lex.ItemIterator, op string, mu *api.Mutation) error {
parse := false
Expand All @@ -73,7 +131,7 @@ func parseMutationOp(it *lex.ItemIterator, op string, mu *api.Mutation) error {
}
parse = true
}
if item.Typ == itemMutationContent {
if item.Typ == itemMutationOpContent {
if !parse {
return x.Errorf("Mutation syntax invalid.")
}
Expand Down