-
Notifications
You must be signed in to change notification settings - Fork 248
/
executor.go
807 lines (752 loc) · 30.2 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package executor xxx
package executor
import (
"encoding/base64"
"fmt"
"os"
"strconv"
"sync"
"time"
"github.com/Tencent/bk-bcs/bcs-common/common/blog"
"github.com/Tencent/bk-bcs/bcs-common/pkg/scheduler/mesosproto/mesos"
conn "github.com/Tencent/bk-bcs/bcs-runtime/bcs-mesos/bcs-container-executor/connection"
"github.com/Tencent/bk-bcs/bcs-runtime/bcs-mesos/bcs-container-executor/logs"
exec "github.com/Tencent/bk-bcs/bcs-runtime/bcs-mesos/bcs-container-executor/mesos/executor"
"github.com/golang/protobuf/proto"
"github.com/mesos/mesos-go/api/v0/upid"
"github.com/pborman/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"golang.org/x/net/context"
// "github.com/gogo/protobuf/test/fuzztests"
)
const (
// DefaultMetricsTextFile xxx
DefaultMetricsTextFile = "/data/bcs/export_data"
)
// DriverEnv The following environment variables are set by the agent that can be
//
// used by the executor upon startup:
//
// MESOS_FRAMEWORK_ID: FrameworkID of the scheduler needed as part of the SUBSCRIBE call.
// MESOS_EXECUTOR_ID: ExecutorID of the executor needed as part of the SUBSCRIBE call.
// MESOS_DIRECTORY: Path to the working directory for the executor on the host filesystem(deprecated).
// MESOS_SANDBOX: Path to the mapped sandbox inside of the container (determined by the
//
// agent flag sandbox_directory) for either mesos container with image or docker container.
// For the case of command task without image specified, it is the path to the sandbox
// on the host filesystem, which is identical to MESOS_DIRECTORY. MESOS_DIRECTORY
// is always the sandbox on the host filesystem.
//
// MESOS_AGENT_ENDPOINT: agent endpoint i.e. ip:port to be used by the executor to connect
//
// to the agent.
//
// MESOS_CHECKPOINT: If set to true, denotes that framework has checkpointing enabled.
// MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD: Amount of time the agent would wait for an
//
// executor to shut down (e.g., 60secs, 3mins etc.) after sending a SHUTDOWN event.
//
// If MESOS_CHECKPOINT is set i.e. when framework checkpointing is enabled, the following
//
// additional variables are also set that can be used by the executor for retrying
// upon a disconnection with the agent:
//
// MESOS_RECOVERY_TIMEOUT: The total duration that the executor should spend retrying
//
// before shutting itself down when it is disconnected from the agent (e.g., 15mins,
// 5secs etc.). This is configurable at agent startup via the flag --recovery_timeout.
//
// MESOS_SUBSCRIPTION_BACKOFF_MAX: The maximum backoff duration to be used by the executor
//
// between two retries when disconnected (e.g., 250ms, 1mins etc.). This is configurable
// at agent startup via the flag --executor_reregistration_timeout.
type DriverEnv struct {
MesosSlavePID string // agent slave pid
MesosSlaveID string // agent slave uniq id
MesosAgentEndpoint string // agent ip:port endpoint to connect to the agent
MesosFrameworkID string // frameworkid from agent
MesosExecutorID string // exector id from agent
SSLEnabled bool // true is agent enable https
MesosSandBox string // Path to the mapped sandbox inside of the container
MesosCheckpoint bool // If set to true, denotes that framework has checkpointing enabled
MesosRecoveryTimeout int // The total duration that the executor should spend retrying before shutting it self down when it is disconnected from the agent
MesosSubscriptionBackoff int // The maximum backoff duration between two retries when disconnected
MesosShutdownGracePeriod int // Amount of time the agent would wait for an executor to shut down (e.g., 60secs, 3mins etc.) after sending a SHUTDOWN event
}
// GetAllEnvs get all info from environment
func (ee *DriverEnv) GetAllEnvs() error {
ee.MesosSlavePID = os.Getenv("MESOS_SLAVE_PID")
if ee.MesosSlavePID == "" {
return fmt.Errorf("Expecting MESOS_SLAVE_PID to be set in the environment")
}
ee.MesosSlaveID = os.Getenv("MESOS_SLAVE_ID")
ee.MesosFrameworkID = os.Getenv("MESOS_FRAMEWORK_ID")
if ee.MesosFrameworkID == "" {
return fmt.Errorf("Expecting MESOS_FRAMEWORK_ID to be set in the environment")
}
ee.MesosExecutorID = os.Getenv("MESOS_EXECUTOR_ID")
if ee.MesosExecutorID == "" {
return fmt.Errorf("Expecting MESOS_EXECUTOR_ID to be set in the environment")
}
value := os.Getenv("SSL_ENABLED")
if value == "1" || value == "true" {
ee.SSLEnabled = true
}
ee.MesosSandBox = os.Getenv("MESOS_SANDBOX")
if ee.MesosSandBox == "" {
return fmt.Errorf("Expecting MESOS_SANDBOX to be set in the environment")
}
ee.MesosAgentEndpoint = os.Getenv("MESOS_AGENT_ENDPOINT")
if ee.MesosAgentEndpoint == "" {
return fmt.Errorf("Expecting MESOS_AGENT_ENDPOINT to be set in the environment")
}
ee.MesosCheckpoint = false
checkPoint := os.Getenv("MESOS_CHECKPOINT")
if checkPoint == "1" || checkPoint == "true" {
ee.MesosCheckpoint = true
// get MESOS_RECOVERY_TIMEOUT & MESOS_SUBSCRIPTION_BACKOFF_MAX
ee.MesosRecoveryTimeout, _ = strconv.Atoi(os.Getenv("MESOS_RECOVERY_TIMEOUT"))
ee.MesosSubscriptionBackoff, _ = strconv.Atoi(os.Getenv("MESOS_SUBSCRIPTION_BACKOFF_MAX"))
}
ee.MesosShutdownGracePeriod, _ = strconv.Atoi(os.Getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD"))
return nil
}
// DriverConfig hold all custom info for ExecutorDriver
type DriverConfig struct {
Executor Executor // Executor interface
}
// BcsExecutorDriver BCS implementation for ExecutorDriver
type BcsExecutorDriver struct {
lock sync.RWMutex // lock for status/data
executor Executor // custom executor
exeEnv *DriverEnv // executor environment required
status mesos.Status // driver status
connected bool // flag for connection
stateReConnected bool // flag for re-connecting
reConCxt context.Context // context for reconnection
frameworkID *mesos.FrameworkID // scheduler frameworkid from environment
agentID *mesos.AgentID // mesos slave ID form environment
agentPID *upid.UPID // mesos slave upid for identify
executorID *mesos.ExecutorID // self executor id from environment
connection conn.Connection // network connection to mesos slave
tasks map[string]*mesos.TaskInfo // key is uuid, task info map, send to slave when reregistered
updates *exec.Call_Update // key is uuid, call_Update send to mesos slave when reregistered
currentTaskStatus map[string]*mesos.TaskState
stopCxt context.Context // context for cancel
canceler context.CancelFunc // function for cancel
}
// NewExecutorDriver create BcsExecutorDriver with ExecutorConfig
func NewExecutorDriver(bcsExe Executor) ExecutorDriver {
// parse all config item from environment
envs := new(DriverEnv)
envErr := envs.GetAllEnvs()
if envErr != nil {
fmt.Fprintf(os.Stderr, "Get environments failed, %s\n", envErr.Error())
return nil
}
slaveUpid, err := upid.Parse(envs.MesosSlavePID)
if err != nil {
fmt.Fprintf(os.Stderr, "Can't not parse mesos slave upid: %s\n", envs.MesosSlavePID)
return nil
}
rootCxt, rootCancel := context.WithCancel(context.Background())
stopCxt, _ := context.WithCancel(rootCxt)
reCon, _ := context.WithCancel(rootCxt)
return &BcsExecutorDriver{
executor: bcsExe,
exeEnv: envs,
status: mesos.Status_DRIVER_NOT_STARTED,
connected: false,
stateReConnected: false,
reConCxt: reCon,
frameworkID: &mesos.FrameworkID{Value: proto.String(envs.MesosFrameworkID)},
agentID: &mesos.AgentID{Value: proto.String(envs.MesosSlaveID)},
agentPID: slaveUpid,
executorID: &mesos.ExecutorID{Value: proto.String(envs.MesosExecutorID)},
tasks: make(map[string]*mesos.TaskInfo),
// updates: make([]*exec.Call_Update,0),
currentTaskStatus: make(map[string]*mesos.TaskState),
stopCxt: stopCxt,
canceler: rootCancel,
}
}
// ExecutorID get ExecutorID from mesos slave
func (driver *BcsExecutorDriver) ExecutorID() string {
return driver.exeEnv.MesosExecutorID
}
// Start the executor driver. This needs to be called before any
// other driver calls are made.
func (driver *BcsExecutorDriver) Start() (mesos.Status, error) {
fmt.Fprintln(os.Stdout, "Starting BcsExecutorDriver...")
if driver.status != mesos.Status_DRIVER_NOT_STARTED {
return driver.status, fmt.Errorf("Unable start driver, expecting %s, but got %s", mesos.Status_DRIVER_NOT_STARTED,
driver.status)
}
// create connection for driver
// driver.connection = conn.NewFakeConnection()
driver.connection = conn.NewConnection()
// install all message handler, only one handler can execute each time
var handlerLock sync.Mutex
// subscribe
driver.connection.Install(exec.Event_SUBSCRIBED, func(from *upid.UPID, event *exec.Event) {
subs := event.GetSubscribed()
// handlerLock.Lock()
// defer handlerLock.Unlock()
driver.subscribed(from, subs)
executorSlaveConnection.Set(1)
})
// launch
driver.connection.Install(exec.Event_LAUNCH, func(from *upid.UPID, event *exec.Event) {
launch := event.GetLaunch()
handlerLock.Lock()
defer handlerLock.Unlock()
driver.runTask(from, launch)
})
// launchgroup
driver.connection.Install(exec.Event_LAUNCH_GROUP, func(from *upid.UPID, event *exec.Event) {
launchGroup := event.GetLaunchGroup()
handlerLock.Lock()
defer handlerLock.Unlock()
driver.runTaskGroup(from, launchGroup)
})
// kill
driver.connection.Install(exec.Event_KILL, func(from *upid.UPID, event *exec.Event) {
kill := event.GetKill()
handlerLock.Lock()
defer handlerLock.Unlock()
driver.killTask(from, kill)
})
// framework message
driver.connection.Install(exec.Event_MESSAGE, func(from *upid.UPID, event *exec.Event) {
ack := event.GetMessage()
handlerLock.Lock()
defer handlerLock.Unlock()
driver.frameworkMessage(from, ack)
})
// acknowledged
driver.connection.Install(exec.Event_ACKNOWLEDGED, func(from *upid.UPID, event *exec.Event) {
ack := event.GetAcknowledged()
handlerLock.Lock()
defer handlerLock.Unlock()
driver.acknowledgementMessage(from, ack)
})
// shutdown
driver.connection.Install(exec.Event_SHUTDOWN, func(from *upid.UPID, event *exec.Event) {
// no message body for shutdown
handlerLock.Lock()
defer handlerLock.Unlock()
driver.shutdown(from, event)
})
// error message
driver.connection.Install(exec.Event_ERROR, func(from *upid.UPID, event *exec.Event) {
err := event.GetError()
handlerLock.Lock()
defer handlerLock.Unlock()
driver.frameworkError(from, err)
})
// http connection close callback
driver.connection.Install(conn.Event_CONNECTION_CLOSE, func(from *upid.UPID, event *exec.Event) {
// from & event all nil, do not use them
handlerLock.Lock()
defer handlerLock.Unlock()
driver.connected = false
driver.stateReConnected = true
// create goroutine reconnect
go driver.reconnectLoop()
executorSlaveConnection.Set(0)
// subscribe success, setting state in reConnected
})
// fix by developerJim, 2016-12-31
// check http or https
if driver.exeEnv.SSLEnabled {
// todo(developerJim): add SSL INFO for connection
return driver.status, fmt.Errorf("HTTPS Unimplementation")
}
if err := driver.connection.Start("http://"+driver.exeEnv.MesosAgentEndpoint, "/api/v1/executor"); err != nil {
fmt.Fprintf(os.Stderr, "BcsExecutorDriver starting connection to slave failed: %s\n", err.Error())
return driver.status, err
}
// todo(developerJim): get upid from connection
// ready to send subcribe message to mesos slave
if err := driver.subscribe(); err != nil {
fmt.Fprintf(os.Stderr, "BcsExecutorDriver send Call_Subscribe message failed: %s\n", err.Error())
return driver.status, err
}
// sending subscribe message success, wait launch or launchGroup
driver.status = mesos.Status_DRIVER_RUNNING
fmt.Fprintf(os.Stdout, "BcsExecutorDriver starting with ExecutorID: %s\n", driver.executorID.GetValue())
// handle prometheus metrics to text file
go driver.metricsToText()
return driver.status, nil
}
// metricsToText xxx
// handle executor metrics to textfile
// module nodeexport report the textfile to prometheus
func (driver *BcsExecutorDriver) metricsToText() {
err := os.MkdirAll(DefaultMetricsTextFile, 0755)
if err != nil {
blog.Errorf("mkdir dir %s error %s", DefaultMetricsTextFile, err.Error())
}
for {
time.Sleep(time.Minute)
mfs, err := prometheus.DefaultGatherer.Gather()
if err != nil {
blog.Errorf("prometheus gather %s", err.Error())
continue
}
fPath := fmt.Sprintf("%s/%s.prom", DefaultMetricsTextFile, driver.executorID.GetValue())
f, err := os.OpenFile(fPath, os.O_CREATE|os.O_RDWR, 0664)
if err != nil {
blog.Errorf("openfile %s error %s", fPath, err.Error())
continue
}
f.Truncate(0)
for _, mf := range mfs {
_, err = expfmt.MetricFamilyToText(f, mf)
if err != nil {
blog.Errorf("file %s MetricFamilyToText error %s", fPath, err.Error())
}
}
// close file
f.Close()
}
}
// Stop the executor driver.
// executor will exited
func (driver *BcsExecutorDriver) Stop() (mesos.Status, error) {
fmt.Fprintln(os.Stdout, "Stop ExecutorDriver")
if driver.status != mesos.Status_DRIVER_RUNNING {
return driver.status, fmt.Errorf("Unable Stop, status is Not RUNNING")
}
// ready to stop connection
if driver.connected {
logs.Infoln("ExecutorDriver is under connection, wait slave reply acknowledged")
// check all update info acknowledged
checkTick := time.NewTicker(500 * time.Microsecond)
defer checkTick.Stop()
timeoutTick := time.NewTicker(5 * time.Second)
defer timeoutTick.Stop()
for driver.updates != nil && driver.connected {
// if connection lost, no need to wait acknowledgement
select {
case <-timeoutTick.C:
fmt.Fprintln(os.Stdout, "ExecutorDriver wait acknowledgement from slave timeout(5 seconds)")
goto StopConnection
case <-checkTick.C:
if driver.updates != nil && driver.connected {
fmt.Fprintln(os.Stdout, "ExecutorDriver accepts all acknowledgement, ready to exit")
goto StopConnection
}
}
}
StopConnection:
fmt.Fprintln(os.Stdout, "ExecutorDriver is stopping Connection...")
driver.connection.Stop(true)
driver.connected = false
}
logs.Infoln("ExecutorDriver connection to slave handle done, Stop flow done")
driver.canceler()
driver.status = mesos.Status_DRIVER_STOPPED
return driver.status, nil
}
// Abort the driver so that no more callbacks can be made to the
// executor. The semantics of abort and stop have deliberately been
// separated so that code can detect an aborted driver (i.e., via
// the return status of ExecutorDriver.Join, see below), and
// instantiate and start another driver if desired (from within the
// same process ... although this functionality is currently not
// supported for executors).
func (driver *BcsExecutorDriver) Abort() (mesos.Status, error) {
fmt.Fprintln(os.Stdout, "Abort ExecutorDriver")
if driver.status != mesos.Status_DRIVER_RUNNING {
return driver.status, fmt.Errorf("Unable Abort, status is Not RUNNING")
}
// ready to stop connection
if driver.connected {
fmt.Fprintln(os.Stdout, "ExecutorDriver is stopping Connection...")
driver.connection.Stop(true)
driver.connected = false
}
driver.canceler()
driver.status = mesos.Status_DRIVER_ABORTED
return driver.status, nil
}
// Join Waits for the driver to be stopped or aborted, possibly
// blocking the calling goroutine indefinitely. The return status of
// this function can be used to determine if the driver was aborted
// (see package mesos for a description of Status).
func (driver *BcsExecutorDriver) Join() (mesos.Status, error) {
fmt.Println("join ExecutorDriver, wait for ExecutorDriver stop")
if driver.status != mesos.Status_DRIVER_RUNNING {
return driver.status, fmt.Errorf("Unable Abort, status is Not RUNNING")
}
// wait for stop signal
select {
case <-driver.stopCxt.Done():
fmt.Fprintf(os.Stderr, "ExecutorDriver exit in join...\n")
return driver.status, nil
}
}
// Run Starts and immediately joins (i.e., blocks on) the driver.
func (driver *BcsExecutorDriver) Run() (mesos.Status, error) {
status, err := driver.Start()
if err != nil {
return driver.Stop()
}
if status != mesos.Status_DRIVER_RUNNING {
return status, fmt.Errorf("Unable to Run, expect status %s, but got %s", mesos.Status_DRIVER_RUNNING, status)
}
return driver.Join()
}
// SendStatusUpdate a status update to the framework scheduler, retrying as
// necessary until an acknowledgement has been received or the
// executor is terminated (in which case, a TASK_LOST status update
// will be sent). See Scheduler.StatusUpdate for more information
// about status update acknowledgements.
func (driver *BcsExecutorDriver) SendStatusUpdate(taskStatus *mesos.TaskStatus) (mesos.Status, error) {
if driver.status != mesos.Status_DRIVER_RUNNING {
fmt.Fprintf(os.Stderr, "Unable to SendStatusUpdate, expecting Status %s, but got %s", mesos.Status_DRIVER_RUNNING,
driver.status)
return driver.status, fmt.Errorf("ExecutorDriver status Not RUNNING")
}
if taskStatus.GetState() == mesos.TaskState_TASK_STAGING {
err := fmt.Errorf("Executor is Not Allowed to send Staging Task")
fmt.Fprintf(os.Stderr, "Send Error: %s, ExecutorDriver Abort\n", err.Error())
if _, stopErr := driver.Abort(); stopErr != nil {
fmt.Fprintf(os.Stderr, "ExecutorDriver Abort in SendStatusUpdate failed: %s", stopErr.Error())
}
return driver.status, err
}
// setting TaskStatus attributes
ID := uuid.NewUUID()
now := float64(time.Now().Unix())
taskStatus.Timestamp = proto.Float64(now)
taskStatus.AgentId = driver.agentID
taskStatus.ExecutorId = driver.executorID
taskStatus.Uuid = ID
callUpdate := &exec.Call_Update{
Status: taskStatus,
}
// fmt.Fprintf(os.Stdout, "ExecutorDriver send TaskStatus update %s\n", callUpdate.String())
// create Call
call := &exec.Call{
FrameworkId: driver.frameworkID,
ExecutorId: driver.executorID,
Type: exec.Call_UPDATE.Enum(),
Update: callUpdate,
}
fmt.Fprintf(os.Stdout, "ExecutorDriver send task %s, UUID: %s\n", taskStatus.GetTaskId().GetValue(), ID.String())
/*driver.lock.Lock()
driver.updates = callUpdate
driver.lock.Unlock()*/
// send message to slave
if err := driver.connection.Send(call, false); err != nil {
logs.Errorf("ExecutorDriver send Call_Update failed: %s\n", err.Error())
return driver.status, err
}
// executorID = taskgroupid
taskgroupReportTotal.WithLabelValues(driver.executorID.GetValue()).Inc()
return driver.status, nil
}
// SendFrameworkMessage send a message to the framework scheduler. These messages are
// best effort; do not expect a framework message to be
// retransmitted in any reliable fashion.
func (driver *BcsExecutorDriver) SendFrameworkMessage(data string) (mesos.Status, error) {
logs.Infof("Sending Framework message: %s", data)
if driver.status != mesos.Status_DRIVER_RUNNING {
fmt.Fprintf(os.Stderr, "Unable to SendFramworkMessage, expecting status %s, but Got %s\n",
mesos.Status_DRIVER_RUNNING, driver.status)
return driver.status, fmt.Errorf("ExecutorDriver is Not Running")
}
// create Message
call := &exec.Call{
FrameworkId: driver.frameworkID,
ExecutorId: driver.executorID,
Type: exec.Call_MESSAGE.Enum(),
Message: &exec.Call_Message{
Data: []byte(data),
},
}
if err := driver.connection.Send(call, false); err != nil {
fmt.Fprintf(os.Stderr, "ExecutorDriver send Call_Message failed: %s\n", err.Error())
return driver.status, err
}
return driver.status, nil
}
// subscribe send subscribe message to mesos slave
// check tasks & updates info, if these two map are not
// empty, ExecutorDriver must be disconnected with mesos slave,
// TaskInfo & TaskStatus will consider Unacknowledged, combine
// all info to Call_Subscribe
func (driver *BcsExecutorDriver) subscribe() error {
subscribe := new(exec.Call_Subscribe)
driver.lock.Lock()
if len(driver.tasks) != 0 {
for _, value := range driver.tasks {
subscribe.UnacknowledgedTasks = append(subscribe.UnacknowledgedTasks, value)
}
}
/*if driver.updates!=nil {
subscribe.UnacknowledgedUpdates = append(subscribe.UnacknowledgedUpdates, driver.updates)
}*/
driver.lock.Unlock()
call := &exec.Call{
ExecutorId: driver.executorID,
FrameworkId: driver.frameworkID,
Type: exec.Call_SUBSCRIBE.Enum(),
Subscribe: subscribe,
}
if err := driver.connection.Send(call, true); err != nil {
return err
}
fmt.Fprintln(os.Stderr, "ExecutorDriver is waiting callback...")
return nil
}
/*
* all register functions
* only one callback handler can be call each time, see BcsExecutorDriver.Start()
*/
func (driver *BcsExecutorDriver) subscribed(from *upid.UPID, pbMsg *exec.Event_Subscribed) {
agentInfo := pbMsg.GetAgentInfo()
if driver.status == mesos.Status_DRIVER_ABORTED {
fmt.Fprintf(os.Stdout, "ignoring subscribed message from slave %s because aborted\n", agentInfo.GetHostname())
return
}
if driver.isStopped() {
fmt.Fprintf(os.Stdout, "ignoring subcribed message from slave %s because stopped\n", agentInfo.GetHostname())
return
}
// todo(developerJim): check frameworkID & ExecutorID are equal locals
driver.connected = true
fmt.Fprintf(os.Stdout, "ExecutorDriver registered with slave %s/%s success\n", agentInfo.GetHostname(),
agentInfo.GetId().GetValue())
executorInfo := pbMsg.GetExecutorInfo()
frameworkInfo := pbMsg.GetFrameworkInfo()
if driver.stateReConnected {
driver.stateReConnected = false
driver.executor.Reregistered(driver, agentInfo)
} else {
driver.executor.Registered(driver, executorInfo, frameworkInfo, agentInfo)
}
}
// reconnect backoff strategy implementation for MESOS_RECOVERY_TIMEOUT, If it is
// not able to establish a subscription with the agent within this duration, it should gracefully exit.
func (driver *BcsExecutorDriver) reconnect(from *upid.UPID, pbMsg proto.Message) {
fmt.Fprintln(os.Stderr, "Reconnect backoff strategy is Not Implemented!")
}
// runTask running one task
func (driver *BcsExecutorDriver) runTask(from *upid.UPID, pbMsg *exec.Event_Launch) {
if driver.status == mesos.Status_DRIVER_ABORTED {
fmt.Fprintln(os.Stdout, "Ignore Launch message from slave because ExecutorDriver Abort")
return
}
if driver.status == mesos.Status_DRIVER_STOPPED {
fmt.Fprintln(os.Stdout, "Ignore Launch message from slave because ExecutorDriver Stop.")
return
}
task := pbMsg.GetTask()
taskID := task.GetTaskId()
agentID := task.GetAgentId()
fmt.Fprintf(os.Stdout, "BcsExecutorDriver get task %s from slave %s.\n", taskID.GetValue(), agentID.GetValue())
// check taskInfo is duplicated
if _, exist := driver.tasks[taskID.GetValue()]; exist {
fmt.Fprintf(os.Stderr, "BcsExecutorDriver get duplicated task from slave %s, Executor Exit\n", agentID.GetValue())
os.Exit(255)
}
// recored and launch task
driver.tasks[taskID.GetValue()] = task
driver.executor.LaunchTask(driver, task)
}
// runTaskGroup running TaskGroup from slave
func (driver *BcsExecutorDriver) runTaskGroup(from *upid.UPID, pbMsg *exec.Event_LaunchGroup) {
if driver.status == mesos.Status_DRIVER_ABORTED {
fmt.Fprintln(os.Stdout, "Ignore LaunchTasks message from slave because ExecutorDriver Abort")
return
}
if driver.status == mesos.Status_DRIVER_STOPPED {
fmt.Fprintln(os.Stdout, "Ignore LaunchTasks message from slave because ExecutorDriver Stop.")
return
}
taskGroup := pbMsg.GetTaskGroup()
tasks := taskGroup.GetTasks()
if len(tasks) == 0 {
// Error, No tasks in LaunchGroup, ExecutorDriver exit
fmt.Fprintln(os.Stderr, "ExecutorDriver Get 0 task in LaunchGroup message.")
os.Exit(255)
}
agentID := tasks[0].GetAgentId()
fmt.Fprintf(os.Stdout, "BcsExecutorDriver get %d tasks from salve %s\n", len(tasks), agentID.GetValue())
// check existence in local taskInfo
for _, task := range tasks {
if _, exist := driver.tasks[task.GetTaskId().GetValue()]; exist {
fmt.Fprintf(os.Stderr, "BcsExecutorDriver get duplicated TaskId %s from slave\n", task.GetTaskId())
os.Exit(255)
}
// update TaskInfo in local
driver.tasks[task.GetTaskId().GetValue()] = task
}
// ready to post Executor
driver.executor.LaunchTaskGroup(driver, taskGroup)
}
// killTask xxx
// kill task
func (driver *BcsExecutorDriver) killTask(from *upid.UPID, pbMsg *exec.Event_Kill) {
if driver.status == mesos.Status_DRIVER_ABORTED {
fmt.Fprintln(os.Stdout, "Ignore Kill message from slave because ExecutorDriver Abort")
return
}
if driver.status == mesos.Status_DRIVER_STOPPED {
fmt.Fprintln(os.Stdout, "Ignore Kill message from slave because ExecutorDriver Stop")
return
}
taskID := pbMsg.GetTaskId()
fmt.Fprintf(os.Stdout, "BcsExecutorDriver ready to kill task %s\n", taskID.GetValue())
driver.executor.KillTask(driver, taskID)
}
// acknowledgementMessage acknowledge task status for scheduler
func (driver *BcsExecutorDriver) acknowledgementMessage(from *upid.UPID, pbMsg *exec.Event_Acknowledged) {
if driver.status == mesos.Status_DRIVER_ABORTED {
fmt.Fprintln(os.Stdout, "Ignore Acknowledged message from slave because ExecutorDriver Abort")
return
}
taskID := pbMsg.GetTaskId()
updateUuID := uuid.UUID(pbMsg.GetUuid())
if driver.status == mesos.Status_DRIVER_STOPPED {
fmt.Fprintf(
os.Stdout,
"Ignore Acknowledged taskId %s, uuid %s from slave because ExecutorDriver Stop",
taskID.GetValue(),
updateUuID.String(),
)
return
}
driver.lock.Lock()
delete(driver.tasks, taskID.GetValue())
driver.lock.Unlock()
fmt.Fprintf(os.Stdout, "ExecutorDriver get acknowledgement from slave, taskId %s, uuid %s\n", taskID.GetValue(),
updateUuID.String())
taskgroupAckTotal.WithLabelValues(driver.executorID.GetValue()).Inc()
// clean local Unacknowledged info with taskId & uuid
// todo(developerJim): how to handle if missing TaskInfo & uuid in local map
/*driver.lock.Lock()
defer driver.lock.Unlock()
delete(driver.tasks, taskID.GetValue())
//delete(driver.updates, updateUuID.String())
//No need to notify Executor
uid := uuid.UUID(driver.updates.Status.GetUuid())
if uid.String() == updateUuID.String(){
driver.updates = nil
}*/
return
}
// frameworkMessage xxx
// receive framework message from scheduler
// the task complete is sync
func (driver *BcsExecutorDriver) frameworkMessage(from *upid.UPID, pbMsg *exec.Event_Message) {
if driver.status == mesos.Status_DRIVER_ABORTED {
fmt.Fprintln(os.Stdout, "Ignore Message info because ExecutorDriver Abort")
return
}
if driver.status == mesos.Status_DRIVER_STOPPED {
fmt.Fprintln(os.Stdout, "Ignore Message info because ExecutorDriver Stop")
return
}
data, err := base64.StdEncoding.DecodeString(string(pbMsg.GetData()))
if err != nil {
fmt.Fprintf(os.Stderr, "Decode base64 FrameworkMessage err: %s\n", err.Error())
return
}
driver.executor.FrameworkMessage(driver, string(data))
}
// shutdown the executor, and killall containers
func (driver *BcsExecutorDriver) shutdown(from *upid.UPID, pbMsg *exec.Event) {
if driver.status == mesos.Status_DRIVER_ABORTED {
fmt.Fprintln(os.Stdout, "Ignore Shutdown info because ExecutorDriver Abort")
return
}
if driver.status == mesos.Status_DRIVER_STOPPED {
fmt.Fprintln(os.Stdout, "Ignore Shutdown info because ExecutorDriver Stop")
return
}
fmt.Fprintln(os.Stdout, "BcsExecutorDriver asked to shutdown")
driver.executor.Shutdown(driver)
driver.Stop()
}
func (driver *BcsExecutorDriver) frameworkError(from *upid.UPID, pbMsg *exec.Event_Error) {
if driver.status == mesos.Status_DRIVER_ABORTED {
fmt.Fprintln(os.Stdout, "Ignore Error message because ExecutorDriver Abort")
return
}
fmt.Fprintln(os.Stdout, "BcsExecutorDriver received Error message")
driver.executor.Error(driver, pbMsg.GetMessage())
}
func (driver *BcsExecutorDriver) networkError(from *upid.UPID, pbMsg proto.Message) {
fmt.Fprintln(os.Stderr, "ExecutorDriver Not Implemented")
}
// driver status access
// Status return driver status
func (driver *BcsExecutorDriver) Status() mesos.Status {
driver.lock.RLock()
defer driver.lock.RUnlock()
return driver.status
}
// IsRunning check driver is running
func (driver *BcsExecutorDriver) IsRunning() bool {
driver.lock.RLock()
defer driver.lock.RUnlock()
return driver.status == mesos.Status_DRIVER_RUNNING
}
// isStopped check driver is stopped
func (driver *BcsExecutorDriver) isStopped() bool {
driver.lock.RLock()
defer driver.lock.RUnlock()
return driver.status == mesos.Status_DRIVER_STOPPED
}
// isConnected xxx
// isStopped check driver is stopped
func (driver *BcsExecutorDriver) isConnected() bool {
driver.lock.RLock()
defer driver.lock.RUnlock()
return driver.connected
}
// reconnectLoop create loop for reconnect to slave when long connection is down
func (driver *BcsExecutorDriver) reconnectLoop() {
fmt.Fprintln(os.Stdout,
"##################ExecutorDriver enter reconnecting loop, retry every 1 seconds#####################")
tick := time.NewTicker(time.Second * 1)
// todo(developerJim): add retry handler according MESOS_RECOVERY_TIMEOUT,
// a suitable backoff strategy must be implemented later
// now is only subscribe simply, actually, we need method reconnect
for {
select {
case <-driver.reConCxt.Done():
fmt.Fprintln(os.Stdout, "ExecutorDriver ask to exit in reconnection loop")
return
case now := <-tick.C:
fmt.Fprintf(os.Stdout, "ExecutorDriver tick: %s, ready to resubscribed......\n", now.String())
if !driver.stateReConnected {
fmt.Fprintln(os.Stdout, "ExecutorDriver is not under reconnect status, close reconnection loop.")
return
}
if err := driver.subscribe(); err != nil {
fmt.Fprintf(os.Stderr,
"ExecutorDriver send Call_Subscribe message in reconnection loop failed: %s, wait for next tick\n", err.Error())
} else {
fmt.Fprintln(os.Stdout, "ExecutorDriver Send Subscribe success, wait for reply in 5 seconds")
}
}
}
}