1717package client
1818
1919import (
20- "context"
2120 "fmt"
22- "log"
23- "strings"
24- "sync"
25- "sync/atomic"
2621 "time"
2722
28- "google.golang.org/grpc"
29- "google.golang.org/grpc/codes"
30-
3123 "github.com/dgraph-io/dgraph/protos"
3224 "github.com/dgraph-io/dgraph/types"
3325 "github.com/dgraph-io/dgraph/x"
26+ geom "github.com/twpayne/go-geom"
27+ "github.com/twpayne/go-geom/encoding/geojson"
3428)
3529
3630type Op int
@@ -71,6 +65,19 @@ func checkNQuad(nq protos.NQuad) error {
7165 return nil
7266}
7367
68+ func checkSchema (schema protos.SchemaUpdate ) error {
69+ typ := types .TypeID (schema .ValueType )
70+ if typ == types .UidID && schema .Directive == protos .SchemaUpdate_INDEX {
71+ // index on uid type
72+ return x .Errorf ("Index not allowed on predicate of type uid on predicate %s" ,
73+ schema .Predicate )
74+ } else if typ != types .UidID && schema .Directive == protos .SchemaUpdate_REVERSE {
75+ // reverse on non-uid type
76+ return x .Errorf ("Cannot reverse for non-uid type on predicate %s" , schema .Predicate )
77+ }
78+ return nil
79+ }
80+
7481// SetQuery sets a query with graphQL variables as part of the request.
7582func (req * Req ) SetQuery (q string ) {
7683 req .gr .Query = q
@@ -82,46 +89,31 @@ func (req *Req) SetQueryWithVariables(q string, vars map[string]string) {
8289 req .gr .Vars = vars
8390}
8491
85- func (req * Req ) addMutation (nq protos. NQuad , op Op ) {
92+ func (req * Req ) addMutation (e Edge , op Op ) {
8693 if req .gr .Mutation == nil {
8794 req .gr .Mutation = new (protos.Mutation )
8895 }
8996
9097 if op == SET {
91- req .gr .Mutation .Set = append (req .gr .Mutation .Set , & nq )
98+ req .gr .Mutation .Set = append (req .gr .Mutation .Set , & e . nq )
9299 } else if op == DEL {
93- req .gr .Mutation .Del = append (req .gr .Mutation .Del , & nq )
100+ req .gr .Mutation .Del = append (req .gr .Mutation .Del , & e . nq )
94101 }
95102}
96103
97- // AddMutation adds (but does not send) a mutation to the Req object. Mutations
98- // are sent when client.Run() is called.
99- func (req * Req ) AddMutation (nq protos.NQuad , op Op ) error {
100- if err := checkNQuad (nq ); err != nil {
104+ func (req * Req ) Set (e Edge ) error {
105+ if err := checkNQuad (e .nq ); err != nil {
101106 return err
102107 }
103- req .addMutation (nq , op )
104- return nil
105- }
106-
107- func AddFacet (key string , val string , nq * protos.NQuad ) error {
108- nq .Facets = append (nq .Facets , & protos.Facet {
109- Key : key ,
110- Val : val ,
111- })
108+ req .addMutation (e , SET )
112109 return nil
113110}
114111
115- func checkSchema (schema protos.SchemaUpdate ) error {
116- typ := types .TypeID (schema .ValueType )
117- if typ == types .UidID && schema .Directive == protos .SchemaUpdate_INDEX {
118- // index on uid type
119- return x .Errorf ("Index not allowed on predicate of type uid on predicate %s" ,
120- schema .Predicate )
121- } else if typ != types .UidID && schema .Directive == protos .SchemaUpdate_REVERSE {
122- // reverse on non-uid type
123- return x .Errorf ("Cannot reverse for non-uid type on predicate %s" , schema .Predicate )
112+ func (req * Req ) Delete (e Edge ) error {
113+ if err := checkNQuad (e .nq ); err != nil {
114+ return err
124115 }
116+ req .addMutation (e , DEL )
125117 return nil
126118}
127119
@@ -149,159 +141,179 @@ func (req *Req) reset() {
149141}
150142
151143type nquadOp struct {
152- nq protos. NQuad
144+ e Edge
153145 op Op
154146}
155147
156- // BatchMutation is used to batch mutations and send them off to the server
157- // concurrently. It is useful while doing migrations and bulk data loading.
158- // It is possible to control the batch size and the number of concurrent requests
159- // to make.
160- type BatchMutation struct {
161- size int
162- pending int
163-
164- nquads chan nquadOp
165- schema chan protos.SchemaUpdate
166- dc protos.DgraphClient
167- wg sync.WaitGroup
168-
169- // Miscellaneous information to print counters.
170- // Num of RDF's sent
171- rdfs uint64
172- // Num of mutations sent
173- mutations uint64
174- // To get time elapsed.
175- start time.Time
148+ type Node uint64
149+
150+ func (n Node ) String () string {
151+ return fmt .Sprintf ("%#x" , uint64 (n ))
176152}
177153
178- func (batch * BatchMutation ) request (req * Req ) {
179- counter := atomic .AddUint64 (& batch .mutations , 1 )
180- RETRY:
181- _ , err := batch .dc .Run (context .Background (), & req .gr )
182- if err != nil {
183- errString := err .Error ()
184- // Irrecoverable
185- if strings .Contains (errString , "x509" ) || grpc .Code (err ) == codes .Internal {
186- log .Fatal (errString )
187- }
188- fmt .Printf ("Retrying req: %d. Error: %v\n " , counter , errString )
189- time .Sleep (5 * time .Millisecond )
190- goto RETRY
191- }
192- req .reset ()
154+ func (n * Node ) ConnectTo (pred string , n1 Node ) Edge {
155+ e := Edge {}
156+ e .nq .Subject = n .String ()
157+ e .nq .Predicate = pred
158+ e .ConnectTo (n1 )
159+ return e
160+ }
161+
162+ func (n * Node ) Edge (pred string ) Edge {
163+ e := Edge {}
164+ e .nq .Subject = n .String ()
165+ e .nq .Predicate = pred
166+ return e
193167}
194168
195- func (batch * BatchMutation ) makeRequests () {
196- req := new (Req )
169+ type Edge struct {
170+ nq protos.NQuad
171+ }
197172
198- for n := range batch .nquads {
199- req .addMutation (n .nq , n .op )
200- if req .size () == batch .size {
201- batch .request (req )
202- }
203- }
173+ func NewEdge (nq protos.NQuad ) Edge {
174+ return Edge {nq }
175+ }
204176
205- if req .size () > 0 {
206- batch .request (req )
177+ func (e * Edge ) ConnectTo (n Node ) error {
178+ if e .nq .ObjectType > 0 {
179+ return ErrValue
207180 }
208- batch .wg .Done ()
181+ e .nq .ObjectId = n .String ()
182+ return nil
209183}
210184
211- func (batch * BatchMutation ) makeSchemaRequests () {
212- req := new (Req )
213- LOOP:
214- for {
215- select {
216- case s , ok := <- batch .schema :
217- if ! ok {
218- break LOOP
219- }
220- req .addSchema (s )
221- default :
222- start := time .Now ()
223- if req .size () > 0 {
224- batch .request (req )
225- }
226- elapsedMillis := time .Since (start ).Seconds () * 1e3
227- if elapsedMillis < 10 {
228- time .Sleep (time .Duration (int64 (10 - elapsedMillis )) * time .Millisecond )
229- }
185+ func validateStr (val string ) error {
186+ for idx , c := range val {
187+ if c == '"' && (idx == 0 || val [idx - 1 ] != '\\' ) {
188+ return fmt .Errorf (`" must be preceded by a \ in object value` )
230189 }
231190 }
191+ return nil
192+ }
232193
233- if req .size () > 0 {
234- batch .request (req )
194+ func (e * Edge ) SetValueString (val string ) error {
195+ if len (e .nq .ObjectId ) > 0 {
196+ return ErrConnected
235197 }
236- batch .wg .Done ()
198+ if err := validateStr (val ); err != nil {
199+ return err
200+ }
201+
202+ v , err := types .ObjectValue (types .StringID , val )
203+ if err != nil {
204+ return err
205+ }
206+ e .nq .ObjectValue = v
207+ e .nq .ObjectType = int32 (types .StringID )
208+ return nil
237209}
238210
239- // NewBatchMutation is used to create a new batch.
240- // size is the number of RDF's that are sent as part of one request to Dgraph.
241- // pending is the number of concurrent requests to make to Dgraph server.
242- func NewBatchMutation (ctx context.Context , client protos.DgraphClient ,
243- size int , pending int ) * BatchMutation {
244- bm := BatchMutation {
245- size : size ,
246- pending : pending ,
247- nquads : make (chan nquadOp , 2 * size ),
248- schema : make (chan protos.SchemaUpdate , 2 * size ),
249- start : time .Now (),
250- dc : client ,
251- }
252-
253- for i := 0 ; i < pending ; i ++ {
254- bm .wg .Add (1 )
255- go bm .makeRequests ()
256- }
257- bm .wg .Add (1 )
258- go bm .makeSchemaRequests ()
259- return & bm
211+ func (e * Edge ) SetValueInt (val int64 ) error {
212+ if len (e .nq .ObjectId ) > 0 {
213+ return ErrConnected
214+ }
215+ v , err := types .ObjectValue (types .IntID , val )
216+ if err != nil {
217+ return err
218+ }
219+ e .nq .ObjectValue = v
220+ e .nq .ObjectType = int32 (types .IntID )
221+ return nil
260222}
261223
262- // AddMutation is used to add a NQuad to a batch. It can either have SET or
263- // DEL as Op(operation).
264- func (batch * BatchMutation ) AddMutation (nq protos.NQuad , op Op ) error {
265- if err := checkNQuad (nq ); err != nil {
224+ func (e * Edge ) SetValueFloat (val float64 ) error {
225+ if len (e .nq .ObjectId ) > 0 {
226+ return ErrConnected
227+ }
228+ v , err := types .ObjectValue (types .FloatID , val )
229+ if err != nil {
266230 return err
267231 }
268- batch . nquads <- nquadOp { nq : nq , op : op }
269- atomic . AddUint64 ( & batch . rdfs , 1 )
232+ e . nq . ObjectValue = v
233+ e . nq . ObjectType = int32 ( types . FloatID )
270234 return nil
271235}
272236
273- // Flush waits for all pending requests to complete. It should always be called
274- // after adding all the NQuads using batch.AddMutation().
275- func (batch * BatchMutation ) Flush () {
276- close (batch .nquads )
277- close (batch .schema )
278- batch .wg .Wait ()
237+ func (e * Edge ) SetValueBool (val bool ) error {
238+ if len (e .nq .ObjectId ) > 0 {
239+ return ErrConnected
240+ }
241+ v , err := types .ObjectValue (types .BoolID , val )
242+ if err != nil {
243+ return err
244+ }
245+ e .nq .ObjectValue = v
246+ e .nq .ObjectType = int32 (types .BoolID )
247+ return nil
279248}
280249
281- // AddSchema is used to add a schema mutation.
282- func (batch * BatchMutation ) AddSchema (s protos.SchemaUpdate ) error {
283- if err := checkSchema (s ); err != nil {
250+ func (e * Edge ) SetValuePassword (val string ) error {
251+ if len (e .nq .ObjectId ) > 0 {
252+ return ErrConnected
253+ }
254+ v , err := types .ObjectValue (types .PasswordID , val )
255+ if err != nil {
284256 return err
285257 }
286- batch .schema <- s
258+ e .nq .ObjectValue = v
259+ e .nq .ObjectType = int32 (types .PasswordID )
287260 return nil
288261}
289262
290- // Counter keeps a track of various parameters about a batch mutation.
291- type Counter struct {
292- // Number of RDF's processed by server.
293- Rdfs uint64
294- // Number of mutations processed by the server.
295- Mutations uint64
296- // Time elapsed sinze the batch started.
297- Elapsed time.Duration
263+ func (e * Edge ) SetValueDatetime (dateTime time.Time ) error {
264+ if len (e .nq .ObjectId ) > 0 {
265+ return ErrConnected
266+ }
267+ d , err := types .ObjectValue (types .DateTimeID , dateTime )
268+ if err != nil {
269+ return err
270+ }
271+ e .nq .ObjectValue = d
272+ e .nq .ObjectType = int32 (types .DateTimeID )
273+ return nil
298274}
299275
300- // Counter returns the current state of the BatchMutation.
301- func (batch * BatchMutation ) Counter () Counter {
302- return Counter {
303- Rdfs : atomic .LoadUint64 (& batch .rdfs ),
304- Mutations : atomic .LoadUint64 (& batch .mutations ),
305- Elapsed : time .Since (batch .start ),
276+ func (e * Edge ) SetValueGeoJson (json string ) error {
277+ if len (e .nq .ObjectId ) > 0 {
278+ return ErrConnected
306279 }
280+ var g geom.T
281+ // Parse the json
282+ err := geojson .Unmarshal ([]byte (json ), & g )
283+ if err != nil {
284+ return err
285+ }
286+
287+ geo , err := types .ObjectValue (types .GeoID , g )
288+ if err != nil {
289+ return err
290+ }
291+
292+ e .nq .ObjectValue = geo
293+ e .nq .ObjectType = int32 (types .GeoID )
294+ return nil
295+ }
296+
297+ func (e * Edge ) SetValueDefault (val string ) error {
298+ if len (e .nq .ObjectId ) > 0 {
299+ return ErrConnected
300+ }
301+ if err := validateStr (val ); err != nil {
302+ return err
303+ }
304+
305+ v , err := types .ObjectValue (types .DefaultID , val )
306+ if err != nil {
307+ return err
308+ }
309+ e .nq .ObjectValue = v
310+ e .nq .ObjectType = int32 (types .StringID )
311+ return nil
312+ }
313+
314+ func (e * Edge ) AddFacet (key , val string ) {
315+ e .nq .Facets = append (e .nq .Facets , & protos.Facet {
316+ Key : key ,
317+ Val : val ,
318+ })
307319}
0 commit comments