@@ -21,17 +21,19 @@ import (
2121 "flag"
2222 "fmt"
2323 "io/ioutil"
24+ "log"
25+ "math/rand"
2426 "net"
2527 "net/http"
2628 "runtime"
2729 "strings"
2830 "time"
2931
3032 "golang.org/x/net/context"
33+ "golang.org/x/net/trace"
3134
3235 "google.golang.org/grpc"
3336
34- "github.com/Sirupsen/logrus"
3537 "github.com/dgraph-io/dgraph/commit"
3638 "github.com/dgraph-io/dgraph/gql"
3739 "github.com/dgraph-io/dgraph/posting"
@@ -44,8 +46,6 @@ import (
4446 "github.com/dgraph-io/dgraph/x"
4547)
4648
47- var glog = x .Log ("server" )
48-
4949var postingDir = flag .String ("postings" , "" , "Directory to store posting lists" )
5050var uidDir = flag .String ("uids" , "" , "XID UID posting lists directory" )
5151var mutationDir = flag .String ("mutations" , "" , "Directory to store mutations" )
@@ -57,6 +57,7 @@ var instanceIdx = flag.Uint64("instanceIdx", 0,
5757var workers = flag .String ("workers" , "" ,
5858 "Comma separated list of IP addresses of workers" )
5959var nomutations = flag .Bool ("nomutations" , false , "Don't allow mutations on this server." )
60+ var tracing = flag .Float64 ("trace" , 0.5 , "The ratio of queries to trace." )
6061
6162func addCorsHeaders (w http.ResponseWriter ) {
6263 w .Header ().Set ("Access-Control-Allow-Origin" , "*" )
@@ -68,7 +69,7 @@ func addCorsHeaders(w http.ResponseWriter) {
6869 w .Header ().Set ("Connection" , "close" )
6970}
7071
71- func mutationHandler (mu * gql.Mutation ) error {
72+ func mutationHandler (ctx context. Context , mu * gql.Mutation ) error {
7273 if * nomutations {
7374 return fmt .Errorf ("Mutations are forbidden on this server." )
7475 }
@@ -83,7 +84,7 @@ func mutationHandler(mu *gql.Mutation) error {
8384 }
8485 nq , err := rdf .Parse (ln )
8586 if err != nil {
86- glog . WithError ( err ). Error ( "While parsing RDF." )
87+ x . Trace ( ctx , "Error while parsing RDF: %v" , err )
8788 return err
8889 }
8990 nquads = append (nquads , nq )
@@ -99,8 +100,8 @@ func mutationHandler(mu *gql.Mutation) error {
99100 }
100101 }
101102 if len (xidToUid ) > 0 {
102- if err := worker .GetOrAssignUidsOverNetwork (& xidToUid ); err != nil {
103- glog . WithError ( err ). Error ( " GetOrAssignUidsOverNetwork" )
103+ if err := worker .GetOrAssignUidsOverNetwork (ctx , & xidToUid ); err != nil {
104+ x . Trace ( ctx , "Error while GetOrAssignUidsOverNetwork: %v" , err )
104105 return err
105106 }
106107 }
@@ -109,21 +110,21 @@ func mutationHandler(mu *gql.Mutation) error {
109110 for _ , nq := range nquads {
110111 edge , err := nq .ToEdgeUsing (xidToUid )
111112 if err != nil {
112- glog .WithField ("nquad" , nq ).WithError (err ).
113- Error ("While converting to edge" )
113+ x .Trace (ctx , "Error while converting to edge: %v %v" , nq , err )
114114 return err
115115 }
116116 edges = append (edges , edge )
117117 }
118118
119- left , err := worker .MutateOverNetwork (edges )
119+ left , err := worker .MutateOverNetwork (ctx , edges )
120120 if err != nil {
121+ x .Trace (ctx , "Error while MutateOverNetwork: %v" , err )
121122 return err
122123 }
123124 if len (left ) > 0 {
124- glog . WithField ( "left" , len ( left )). Error ( "Some edges couldn't be applied" )
125+ x . Trace ( ctx , "%d edges couldn't be applied", len ( left ) )
125126 for _ , e := range left {
126- glog . WithField ( "edge" , e ). Debug ( "Unable to apply mutation" )
127+ x . Trace ( ctx , "Unable to apply mutation for edge: %v" , e )
127128 }
128129 return fmt .Errorf ("Unapplied mutations" )
129130 }
@@ -140,28 +141,37 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
140141 return
141142 }
142143
144+ ctx , cancel := context .WithTimeout (context .Background (), time .Minute )
145+ defer cancel ()
146+
147+ if rand .Float64 () < * tracing {
148+ tr := trace .New ("Dgraph" , "Query" )
149+ defer tr .Finish ()
150+ ctx = trace .NewContext (ctx , tr )
151+ }
152+
143153 var l query.Latency
144154 l .Start = time .Now ()
145155 defer r .Body .Close ()
146156 q , err := ioutil .ReadAll (r .Body )
147157 if err != nil || len (q ) == 0 {
148- x .Err ( glog , err ). Error ( "While reading query" )
158+ x .Trace ( ctx , "Error while reading query: %v" , err )
149159 x .SetStatus (w , x .E_INVALID_REQUEST , "Invalid request encountered." )
150160 return
151161 }
152162
153- glog . WithField ( "q " , string (q )). Debug ( "Query received." )
163+ x . Trace ( ctx , "Query received: %v " , string (q ))
154164 gq , mu , err := gql .Parse (string (q ))
155165 if err != nil {
156- x .Err ( glog , err ). Error ( "While parsing query" )
166+ x .Trace ( ctx , "Error while parsing query: %v" , err )
157167 x .SetStatus (w , x .E_INVALID_REQUEST , err .Error ())
158168 return
159169 }
160170
161171 // If we have mutations, run them first.
162172 if mu != nil && len (mu .Set ) > 0 {
163- if err = mutationHandler (mu ); err != nil {
164- glog . WithError ( err ). Error ( "While handling mutations." )
173+ if err = mutationHandler (ctx , mu ); err != nil {
174+ x . Trace ( ctx , "Error while handling mutations: %v" , err )
165175 x .SetStatus (w , x .E_ERROR , err .Error ())
166176 return
167177 }
@@ -172,37 +182,33 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
172182 return
173183 }
174184
175- sg , err := query .ToSubGraph (gq )
185+ sg , err := query .ToSubGraph (ctx , gq )
176186 if err != nil {
177- x .Err ( glog , err ). Error ( "While conversion to internal format" )
187+ x .Trace ( ctx , "Error while conversion to internal format: %v" , err )
178188 x .SetStatus (w , x .E_INVALID_REQUEST , err .Error ())
179189 return
180190 }
181191 l .Parsing = time .Since (l .Start )
182- glog . WithField ( "q" , string ( q )). Debug ( "Query parsed. " )
192+ x . Trace ( ctx , "Query parsed" )
183193
184194 rch := make (chan error )
185- go query .ProcessGraph (sg , rch , time . Minute )
195+ go query .ProcessGraph (ctx , sg , rch )
186196 err = <- rch
187197 if err != nil {
188- x .Err ( glog , err ). Error ( "While executing query" )
198+ x .Trace ( ctx , "Error while executing query: %v" , err )
189199 x .SetStatus (w , x .E_ERROR , err .Error ())
190200 return
191201 }
192202 l .Processing = time .Since (l .Start ) - l .Parsing
193- glog . WithField ( "q" , string ( q )). Debug ( "Graph processed. " )
203+ x . Trace ( ctx , "Graph processed" )
194204 js , err := sg .ToJson (& l )
195205 if err != nil {
196- x .Err ( glog , err ). Error ( "While converting to Json." )
206+ x .Trace ( ctx , "Error while converting to Json: %v" , err )
197207 x .SetStatus (w , x .E_ERROR , err .Error ())
198208 return
199209 }
200- glog .WithFields (logrus.Fields {
201- "total" : time .Since (l .Start ),
202- "parsing" : l .Parsing ,
203- "process" : l .Processing ,
204- "json" : l .Json ,
205- }).Info ("Query Latencies" )
210+ x .Trace (ctx , "Latencies: Total: %v Parsing: %v Process: %v Json: %v" ,
211+ time .Since (l .Start ), l .Parsing , l .Processing , l .Json )
206212
207213 w .Header ().Set ("Content-Type" , "application/json" )
208214 fmt .Fprint (w , string (js ))
@@ -215,27 +221,34 @@ type server struct{}
215221// client as a protocol buffer message.
216222func (s * server ) Query (ctx context.Context ,
217223 req * graph.Request ) (* graph.Response , error ) {
224+
225+ if rand .Float64 () < * tracing {
226+ tr := trace .New ("Dgraph" , "GrpcQuery" )
227+ defer tr .Finish ()
228+ ctx = trace .NewContext (ctx , tr )
229+ }
230+
218231 resp := new (graph.Response )
219232 if len (req .Query ) == 0 {
220- glog . Error ( "While reading query" )
233+ x . Trace ( ctx , "Empty query" )
221234 return resp , fmt .Errorf ("Empty query" )
222235 }
223236
224237 var l query.Latency
225238 l .Start = time .Now ()
226239 // TODO(pawan): Refactor query parsing and graph processing code to a common
227240 // function used by Query and queryHandler
228- glog . WithField ( "q" , req . Query ). Debug ( "Query received." )
241+ x . Trace ( ctx , "Query received: %v" , req . Query )
229242 gq , mu , err := gql .Parse (req .Query )
230243 if err != nil {
231- x .Err ( glog , err ). Error ( "While parsing query" )
244+ x .Trace ( ctx , "Error while parsing query: %v" , err )
232245 return resp , err
233246 }
234247
235248 // If we have mutations, run them first.
236249 if mu != nil && len (mu .Set ) > 0 {
237- if err = mutationHandler (mu ); err != nil {
238- glog . WithError ( err ). Error ( "While handling mutations." )
250+ if err = mutationHandler (ctx , mu ); err != nil {
251+ x . Trace ( ctx , "Error while handling mutations: %v" , err )
239252 return resp , err
240253 }
241254 }
@@ -244,27 +257,27 @@ func (s *server) Query(ctx context.Context,
244257 return resp , err
245258 }
246259
247- sg , err := query .ToSubGraph (gq )
260+ sg , err := query .ToSubGraph (ctx , gq )
248261 if err != nil {
249- x .Err ( glog , err ). Error ( "While conversion to internal format" )
262+ x .Trace ( ctx , "Error while conversion to internal format: %v" , err )
250263 return resp , err
251264 }
252265 l .Parsing = time .Since (l .Start )
253- glog . WithField ( "q" , req . Query ). Debug ( "Query parsed. " )
266+ x . Trace ( ctx , "Query parsed" )
254267
255268 rch := make (chan error )
256- go query .ProcessGraph (sg , rch , time . Minute )
269+ go query .ProcessGraph (ctx , sg , rch )
257270 err = <- rch
258271 if err != nil {
259- x .Err ( glog , err ). Error ( "While executing query" )
272+ x .Trace ( ctx , "Error while executing query: %v" , err )
260273 return resp , err
261274 }
262275 l .Processing = time .Since (l .Start ) - l .Parsing
263- glog . WithField ( "q" , req . Query ). Debug ( "Graph processed. " )
276+ x . Trace ( ctx , "Graph processed" )
264277
265278 node , err := sg .ToProtocolBuffer (& l )
266279 if err != nil {
267- x .Err ( glog , err ). Error ( "While converting to protocol buffer." )
280+ x .Trace ( ctx , "Error while converting to ProtocolBuffer: %v" , err )
268281 return resp , err
269282 }
270283 resp .N = node
@@ -282,31 +295,29 @@ func (s *server) Query(ctx context.Context,
282295func runGrpcServer (address string ) {
283296 ln , err := net .Listen ("tcp" , address )
284297 if err != nil {
285- glog .Fatalf ("While running server for client: %v" , err )
298+ log .Fatalf ("While running server for client: %v" , err )
286299 return
287300 }
288- glog . WithField ( "address " , ln .Addr ()). Info ( "Client Worker listening" )
301+ log . Printf ( "Client worker listening: %v " , ln .Addr ())
289302
290303 s := grpc .NewServer ()
291304 graph .RegisterDGraphServer (s , & server {})
292305 if err = s .Serve (ln ); err != nil {
293- glog .Fatalf ("While serving gRpc requests" , err )
306+ log .Fatalf ("While serving gRpc requests" , err )
294307 }
295308 return
296309}
297310
298311func main () {
299312 flag .Parse ()
300313 if ! flag .Parsed () {
301- glog .Fatal ("Unable to parse flags" )
314+ log .Fatal ("Unable to parse flags" )
302315 }
303- logrus .SetLevel (logrus .InfoLevel )
304316 numCpus := * numcpu
305317 prev := runtime .GOMAXPROCS (numCpus )
306- glog .WithField ("num_cpu" , numCpus ).WithField ("prev_maxprocs" , prev ).
307- Info ("Set max procs to num cpus" )
318+ log .Printf ("num_cpu: %v. prev_maxprocs: %v. Set max procs to num cpus" , numCpus , prev )
308319 if * port % 2 != 0 {
309- glog .Fatalf ("Port should be an even number: %v" , * port )
320+ log .Fatalf ("Port should be an even number: %v" , * port )
310321 }
311322
312323 ps := new (store.Store )
@@ -343,8 +354,8 @@ func main() {
343354 go runGrpcServer (fmt .Sprintf (":%d" , * port + 1 ))
344355
345356 http .HandleFunc ("/query" , queryHandler )
346- glog . WithField ( "port" , * port ). Info ( " Listening for requests..." )
357+ log . Printf ( " Listening for requests at port: %v" , * port )
347358 if err := http .ListenAndServe (fmt .Sprintf (":%d" , * port ), nil ); err != nil {
348- x . Err ( glog , err ). Fatal ( "ListenAndServe" )
359+ log . Fatalf ( "ListenAndServe: %v" , err )
349360 }
350361}
0 commit comments