forked from root-project/root
-
Notifications
You must be signed in to change notification settings - Fork 0
/
TProofPlayer.h
462 lines (380 loc) · 20.2 KB
/
TProofPlayer.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
// @(#)root/proofplayer:$Id$
// Author: Maarten Ballintijn 07/01/02
/*************************************************************************
* Copyright (C) 1995-2001, Rene Brun and Fons Rademakers. *
* All rights reserved. *
* *
* For the licensing terms see $ROOTSYS/LICENSE. *
* For the list of contributors see $ROOTSYS/README/CREDITS. *
*************************************************************************/
#ifndef ROOT_TProofPlayer
#define ROOT_TProofPlayer
//////////////////////////////////////////////////////////////////////////
// //
// TProofPlayer //
// //
// This internal class and its subclasses steer the processing in PROOF.//
// Instances of the TProofPlayer class are created on the worker nodes //
// per session and do the processing. //
// Instances of its subclass - TProofPlayerRemote are created per each //
// query on the master(s) and on the client. On the master(s), //
// TProofPlayerRemote coordinate processing, check the dataset, create //
// the packetizer and take care of merging the results of the workers. //
// The instance on the client collects information on the input //
// (dataset and selector), it invokes the Begin() method and finalizes //
// the query by calling Terminate(). //
// //
//////////////////////////////////////////////////////////////////////////
#ifndef ROOT_TVirtualProofPlayer
#include "TVirtualProofPlayer.h"
#endif
#ifndef ROOT_TArrayL64
#include "TArrayL64.h"
#endif
#ifndef ROOT_TArrayF
#include "TArrayF.h"
#endif
#ifndef ROOT_TArrayI
#include "TArrayI.h"
#endif
#ifndef ROOT_TList
#include "TList.h"
#endif
#ifndef ROOT_TSystem
#include "TSystem.h"
#endif
#ifndef ROOT_TQueryResult
#include "TQueryResult.h"
#endif
#ifndef ROOT_TProofProgressStatus
#include "TProofProgressStatus.h"
#endif
#ifndef ROOT_TError
#include "TError.h"
#endif
class TSelector;
class TSocket;
class TVirtualPacketizer;
class TSlave;
class TEventIter;
class TProofStats;
class TMutex;
class TStatus;
class TTimer;
class THashList;
class TH1;
class TFile;
class TStopwatch;
//------------------------------------------------------------------------
class TProofPlayer : public TVirtualProofPlayer {
private:
TList *fAutoBins; // Map of min/max values by name for slaves
protected:
TList *fInput; //-> list with input objects
THashList *fOutput; // list with output objects
TSelector *fSelector; //! the latest selector
Bool_t fCreateSelObj; //! kTRUE when fSelector has been created locally
TClass *fSelectorClass; //! class of the latest selector
TTimer *fFeedbackTimer; //! timer for sending intermediate results
Long_t fFeedbackPeriod; //! period (ms) for sending intermediate results
TEventIter *fEvIter; //! iterator on events or objects
TStatus *fSelStatus; //! status of query in progress
EExitStatus fExitStatus; // exit status
Long64_t fTotalEvents; // number of events requested
TProofProgressStatus *fProgressStatus; // the progress status object;
Long64_t fReadBytesRun; //! Bytes read in this run
Long64_t fReadCallsRun; //! Read calls in this run
Long64_t fProcessedRun; //! Events processed in this run
TList *fQueryResults; //List of TQueryResult
TQueryResult *fQuery; //Instance of TQueryResult currently processed
TQueryResult *fPreviousQuery; //Previous instance of TQueryResult processed
Int_t fDrawQueries; //Number of Draw queries in the list
Int_t fMaxDrawQueries; //Max number of Draw queries kept
TTimer *fStopTimer; //Timer associated with a stop request
TMutex *fStopTimerMtx; //To protect the stop timer
TTimer *fDispatchTimer; //Dispatch pending events while processing
TTimer *fProcTimeTimer; //Notifies reaching of allowed max proc time
TStopwatch *fProcTime; //Packet proc time
TString fOutputFilePath; //Path to file with (partial) results of the query
TFile *fOutputFile; //TFile object attached to fOutputFilePath
Long_t fSaveMemThreshold; //Threshold for saving output to file
Bool_t fSavePartialResults; //Whether to save the partial results
Bool_t fSaveResultsPerPacket; //Whether to save partial results after each packet
static THashList *fgDrawInputPars; // List of input parameters to be kept on drawing actions
void *GetSender() { return this; } //used to set gTQSender
virtual Int_t DrawCanvas(TObject *obj); // Canvas drawing via libProofDraw
virtual void SetupFeedback(); // specialized setup
virtual void MergeOutput(Bool_t savememvalues = kFALSE);
public: // fix for broken compilers so TCleanup can call StopFeedback()
virtual void StopFeedback(); // specialized teardown
protected:
class TCleanup {
private:
TProofPlayer *fPlayer;
public:
TCleanup(TProofPlayer *p) : fPlayer(p) { }
~TCleanup() { fPlayer->StopFeedback(); }
};
Int_t AssertSelector(const char *selector_file);
Bool_t CheckMemUsage(Long64_t &mfreq, Bool_t &w80r, Bool_t &w80v, TString &wmsg);
void MapOutputListToDataMembers() const;
public:
enum EStatusBits { kDispatchOneEvent = BIT(15), kIsProcessing = BIT(16),
kMaxProcTimeReached = BIT(17), kMaxProcTimeExtended = BIT(18) };
TProofPlayer(TProof *proof = 0);
virtual ~TProofPlayer();
Long64_t Process(TDSet *set,
const char *selector, Option_t *option = "",
Long64_t nentries = -1, Long64_t firstentry = 0);
Long64_t Process(TDSet *set,
TSelector *selector, Option_t *option = "",
Long64_t nentries = -1, Long64_t firstentry = 0);
virtual Bool_t JoinProcess(TList *workers);
TVirtualPacketizer *GetPacketizer() const { return 0; }
Long64_t Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE);
Long64_t Finalize(TQueryResult *qr);
Long64_t DrawSelect(TDSet *set, const char *varexp,
const char *selection, Option_t *option = "",
Long64_t nentries = -1, Long64_t firstentry = 0);
Int_t GetDrawArgs(const char *var, const char *sel, Option_t *opt,
TString &selector, TString &objname);
void HandleGetTreeHeader(TMessage *mess);
void HandleRecvHisto(TMessage *mess);
void FeedBackCanvas(const char *name, Bool_t create);
void StopProcess(Bool_t abort, Int_t timeout = -1);
void AddInput(TObject *inp);
void ClearInput();
TObject *GetOutput(const char *name) const;
TList *GetOutputList() const;
TList *GetInputList() const { return fInput; }
TList *GetListOfResults() const { return fQueryResults; }
void AddQueryResult(TQueryResult *q);
TQueryResult *GetCurrentQuery() const { return fQuery; }
TQueryResult *GetQueryResult(const char *ref);
void RemoveQueryResult(const char *ref);
void SetCurrentQuery(TQueryResult *q);
void SetMaxDrawQueries(Int_t max) { fMaxDrawQueries = max; }
void RestorePreviousQuery() { fQuery = fPreviousQuery; }
Int_t AddOutputObject(TObject *obj);
void AddOutput(TList *out); // Incorporate a list
void StoreOutput(TList *out); // Adopts the list
void StoreFeedback(TObject *slave, TList *out); // Adopts the list
void Progress(Long64_t total, Long64_t processed); // *SIGNAL*
void Progress(TSlave *, Long64_t total, Long64_t processed)
{ Progress(total, processed); }
void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti); // *SIGNAL*
void Progress(TSlave *, Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti)
{ Progress(total, processed, bytesread, initTime, procTime,
evtrti, mbrti); } // *SIGNAL*
void Progress(TProofProgressInfo *pi); // *SIGNAL*
void Progress(TSlave *, TProofProgressInfo *pi) { Progress(pi); } // *SIGNAL*
void Feedback(TList *objs); // *SIGNAL*
TDrawFeedback *CreateDrawFeedback(TProof *p);
void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt);
void DeleteDrawFeedback(TDrawFeedback *f);
TDSetElement *GetNextPacket(TSlave *slave, TMessage *r);
Int_t ReinitSelector(TQueryResult *qr);
void UpdateAutoBin(const char *name,
Double_t& xmin, Double_t& xmax,
Double_t& ymin, Double_t& ymax,
Double_t& zmin, Double_t& zmax);
Bool_t IsClient() const { return kFALSE; }
void SetExitStatus(EExitStatus st) { fExitStatus = st; }
EExitStatus GetExitStatus() const { return fExitStatus; }
Long64_t GetEventsProcessed() const { return fProgressStatus->GetEntries(); }
void AddEventsProcessed(Long64_t ev) { fProgressStatus->IncEntries(ev); }
void SetDispatchTimer(Bool_t on = kTRUE);
void SetStopTimer(Bool_t on = kTRUE,
Bool_t abort = kFALSE, Int_t timeout = 0);
virtual void SetInitTime() { }
virtual void SetMerging(Bool_t = kTRUE) { }
Long64_t GetCacheSize();
Int_t GetLearnEntries();
void SetOutputFilePath(const char *fp) { fOutputFilePath = fp; }
Int_t SavePartialResults(Bool_t queryend = kFALSE, Bool_t force = kFALSE);
void SetProcessing(Bool_t on = kTRUE);
TProofProgressStatus *GetProgressStatus() const { return fProgressStatus; }
void UpdateProgressInfo();
ClassDef(TProofPlayer,0) // Basic PROOF player
};
//------------------------------------------------------------------------
class TProofPlayerLocal : public TProofPlayer {
private:
Bool_t fIsClient;
protected:
void SetupFeedback() { }
void StopFeedback() { }
public:
TProofPlayerLocal(Bool_t client = kTRUE) : fIsClient(client) { }
virtual ~TProofPlayerLocal() { }
Bool_t IsClient() const { return fIsClient; }
Long64_t Process(const char *selector, Long64_t nentries = -1, Option_t *option = "");
Long64_t Process(TSelector *selector, Long64_t nentries = -1, Option_t *option = "");
Long64_t Process(TDSet *set,
const char *selector, Option_t *option = "",
Long64_t nentries = -1, Long64_t firstentry = 0) {
return TProofPlayer::Process(set, selector, option, nentries, firstentry); }
Long64_t Process(TDSet *set,
TSelector *selector, Option_t *option = "",
Long64_t nentries = -1, Long64_t firstentry = 0) {
return TProofPlayer::Process(set, selector, option, nentries, firstentry); }
ClassDef(TProofPlayerLocal,0) // PROOF player running on client
};
//------------------------------------------------------------------------
//////////////////////////////////////////////////////////////////////////
// //
// TProofPlayerRemote //
// //
// Instances of TProofPlayerRemote are created per each query on the //
// master(s) and on the client. On the master(s), TProofPlayerRemote //
// coordinate processing, check the dataset, create the packetizer //
// and take care of merging the results of the workers. //
// The instance on the client collects information on the input //
// (dataset and selector), it invokes the Begin() method and finalizes //
// the query by calling Terminate(). //
// //
//////////////////////////////////////////////////////////////////////////
class TProofPlayerRemote : public TProofPlayer {
protected:
TProof *fProof; // link to associated PROOF session
TList *fOutputLists; // results returned by slaves
TList *fFeedback; // reference for use on master
TList *fFeedbackLists; // intermediate results
TVirtualPacketizer *fPacketizer; // transform TDSet into packets for slaves
Bool_t fMergeFiles; // is True when merging output files centrally is needed
TDSet *fDSet; //!tdset for current processing
ErrorHandlerFunc_t fErrorHandler; // Store previous handler when redirecting output
Bool_t fMergeTH1OneByOne; // If kTRUE forces TH1 merge one-by-one [kTRUE]
TH1 *fProcPackets; //!Histogram with packets being processed (owned by TPerfStats)
TMessage *fProcessMessage; // Process message to replay when adding new workers dynamically
TString fSelectorFileName; // Current Selector's name, set by Process()
TStopwatch *fMergeSTW; // Merging stop watch
Int_t fNumMergers; // Number of submergers
virtual Bool_t HandleTimer(TTimer *timer);
Int_t InitPacketizer(TDSet *dset, Long64_t nentries,
Long64_t first, const char *defpackunit,
const char *defpackdata);
TList *MergeFeedback();
Bool_t MergeOutputFiles();
void NotifyMemory(TObject *obj);
void SetLastMergingMsg(TObject *obj);
virtual Bool_t SendSelector(const char *selector_file); //send selector to slaves
TProof *GetProof() const { return fProof; }
void SetupFeedback(); // specialized setup
void StopFeedback(); // specialized teardown
void SetSelectorDataMembersFromOutputList();
public:
TProofPlayerRemote(TProof *proof = 0) : fProof(proof), fOutputLists(0), fFeedback(0),
fFeedbackLists(0), fPacketizer(0),
fMergeFiles(kFALSE), fDSet(0), fErrorHandler(0),
fMergeTH1OneByOne(kTRUE), fProcPackets(0),
fProcessMessage(0), fMergeSTW(0), fNumMergers(0)
{ fProgressStatus = new TProofProgressStatus(); }
virtual ~TProofPlayerRemote(); // Owns the fOutput list
virtual Long64_t Process(TDSet *set, const char *selector,
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0);
virtual Long64_t Process(TDSet *set, TSelector *selector,
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0);
virtual Bool_t JoinProcess(TList *workers);
virtual Long64_t Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE);
virtual Long64_t Finalize(TQueryResult *qr);
Long64_t DrawSelect(TDSet *set, const char *varexp,
const char *selection, Option_t *option = "",
Long64_t nentries = -1, Long64_t firstentry = 0);
void RedirectOutput(Bool_t on = kTRUE);
void StopProcess(Bool_t abort, Int_t timeout = -1);
void StoreOutput(TList *out); // Adopts the list
virtual void StoreFeedback(TObject *slave, TList *out); // Adopts the list
Int_t Incorporate(TObject *obj, TList *out, Bool_t &merged);
TObject *HandleHistogram(TObject *obj, Bool_t &merged);
Bool_t HistoSameAxis(TH1 *h0, TH1 *h1);
Int_t AddOutputObject(TObject *obj);
void AddOutput(TList *out); // Incorporate a list
virtual void MergeOutput(Bool_t savememvalues = kFALSE);
void Progress(Long64_t total, Long64_t processed); // *SIGNAL*
void Progress(TSlave*, Long64_t total, Long64_t processed)
{ Progress(total, processed); }
void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti); // *SIGNAL*
void Progress(TSlave *, Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti)
{ Progress(total, processed, bytesread, initTime, procTime,
evtrti, mbrti); } // *SIGNAL*
void Progress(TProofProgressInfo *pi); // *SIGNAL*
void Progress(TSlave *, TProofProgressInfo *pi) { Progress(pi); } // *SIGNAL*
void Feedback(TList *objs); // *SIGNAL*
TDSetElement *GetNextPacket(TSlave *slave, TMessage *r);
TVirtualPacketizer *GetPacketizer() const { return fPacketizer; }
Bool_t IsClient() const;
void SetInitTime();
void SetMerging(Bool_t on = kTRUE);
ClassDef(TProofPlayerRemote,0) // PROOF player running on master server
};
//------------------------------------------------------------------------
class TProofPlayerSlave : public TProofPlayer {
private:
TSocket *fSocket;
TList *fFeedback; // List of objects to send updates of
Bool_t HandleTimer(TTimer *timer);
protected:
void SetupFeedback();
void StopFeedback();
public:
TProofPlayerSlave(TSocket *socket = 0) : fSocket(socket), fFeedback(0) { }
void HandleGetTreeHeader(TMessage *mess);
ClassDef(TProofPlayerSlave,0) // PROOF player running on slave server
};
//------------------------------------------------------------------------
class TProofPlayerSuperMaster : public TProofPlayerRemote {
private:
TArrayL64 fSlaveProgress;
TArrayL64 fSlaveTotals;
TArrayL64 fSlaveBytesRead;
TArrayF fSlaveInitTime;
TArrayF fSlaveProcTime;
TArrayF fSlaveEvtRti;
TArrayF fSlaveMBRti;
TArrayI fSlaveActW;
TArrayI fSlaveTotS;
TArrayF fSlaveEffS;
TList fSlaves;
Bool_t fReturnFeedback;
protected:
Bool_t HandleTimer(TTimer *timer);
void SetupFeedback();
public:
TProofPlayerSuperMaster(TProof *proof = 0) :
TProofPlayerRemote(proof), fReturnFeedback(kFALSE) { }
virtual ~TProofPlayerSuperMaster() { }
Long64_t Process(TDSet *set, const char *selector,
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0);
Long64_t Process(TDSet *set, TSelector *selector,
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0)
{ return TProofPlayerRemote::Process(set, selector, option,
nentries, firstentry); }
void Progress(Long64_t total, Long64_t processed)
{ TProofPlayerRemote::Progress(total, processed); }
void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti)
{ TProofPlayerRemote::Progress(total, processed, bytesread,
initTime, procTime, evtrti, mbrti); }
void Progress(TProofProgressInfo *pi) { TProofPlayerRemote::Progress(pi); }
void Progress(TSlave *sl, Long64_t total, Long64_t processed);
void Progress(TSlave *sl, Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti);
void Progress(TSlave *sl, TProofProgressInfo *pi);
ClassDef(TProofPlayerSuperMaster,0) // PROOF player running on super master
};
#endif