-
Notifications
You must be signed in to change notification settings - Fork 14
/
res_mgr.go
1270 lines (1049 loc) · 46.5 KB
/
res_mgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// vi: sw=4 ts=4:
/*
---------------------------------------------------------------------------
Copyright (c) 2013-2015 AT&T Intellectual Property
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at:
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---------------------------------------------------------------------------
*/
/*
Mnemonic: res_mgr
Abstract: Manages the inventory of reservations.
We expect it to be executed as a goroutine and requests sent via a channel.
Date: 02 December 2013
Author: E. Scott Daniels
CFG: These config file variables are used when present:
default:alttable - The OVS table number to be used for metadata marking.
default:queue_type - If "endpoint" then we only generate endpoint queues, not intermediate
switch queues.
resmgr:ckpt_dir - name of the directory where checkpoint data is to be kept (/var/lib/tegu)
FWIW: /var/lib/tegu selected based on description:
http://www.tldp.org/LDP/Linux-Filesystem-Hierarchy/html/var.html
resmgr:verbose - Defines the initial verbose setting for reservation manager bleater
deprecated - resmgr:set_vlan - If true (default) then we flag fq-mgr to add vlan setting to flow-mods
resmgr:super_cookie - A cookie that can be used to manage any reservation.
resmgr:hto_limit - The hard timeout limit that should be used to reset flow-mods on long reservation.
resmgr:res_refresh - The rate (seconds) that reservations are refreshed if hto-limit is non-zero.
TODO: need a way to detect when skoogie/controller has been reset meaning that all
pushed reservations need to be pushed again.
need to check to ensure that a VM's IP address has not changed; repush
reservation if it has and cancel the previous one (when skoogi allows drops)
Mods: 03 Apr 2014 (sd) : Added endpoint flowmod support.
30 Apr 2014 (sd) : Enhancements to send flow-mods and reservation request to agents (Tegu-light)
13 May 2014 (sd) : Changed to support exit dscp value in reservation.
18 May 2014 (sd) : Changes to allow cross tenant reservations.
19 May 2014 (sd) : Changes to support using destination floating IP address in flow mod.
07 Jul 2014 (sd) : Changed to send network manager a delete message when deleting a reservation
rather than depending on the http manager to do that -- possible timing issues if we wait.
Added support for reservation refresh.
29 Jul 2014 : Change set user link cap such that 0 is a valid value, and -1 will delete.
27 Aug 2014 : Changes to support using Fq_req for generating flowmods (support of
meta marking of reservation traffic).
28 Aug 2014 : Added message tags to crit/err/warn messages.
29 Aug 2014 : Added code to allow alternate OVS table to be supplied from config.
03 Sep 2014 : Corrected bug introduced with fq_req changes (ignored protocol and port)
08 Sep 2014 : Fixed bugs with tcp oriented proto steering.
24 Sep 2014 : Added support for ITONS traffic class demands.
09 Oct 2014 : Added all_sys_up, and prevent checkpointing until all_sys_up is true.
29 Oct 2014 : Corrected bug -- setting vlan id when VMs are on same switch.
03 Nov 2014 : Removed straggling comments from the bidirectional fix.
General cleanup to merge with steering code.
17 Nov 2014 : Updated to support lazy data collection from openstack -- must update host
information and push to network as we load from a checkpoint file.
19 Nov 2014 : correct bug in loading reservation path.
16 Jan 2014 : Allow mask on a tcp/udp port specification and to set priority a bit higher
when a transport port is specified.
Changed when meta table flow-mods are pushed (now with queues and only to hosts in
the queue list).
01 Feb 2014 : Disables periodic checkpointing as tegu_ha depends on checkpoint files
written only when there are updates.
09 Feb 2015 : Added timeout-limit to prevent overrun of virtual switch hard timeout value.
10 Feb 2015 : Corrected bug -- reporting expired pledges in the get pledge list.
24 Feb 2015 : Added mirroring
27 Feb 2015 : Steering changes to work with lazy update.
17 Mar 2015 : lite version of resmgr brought more in line with steering.
25 Mar 2015 : Reservation pushing only happens after a new queue list is received from netmgr
and sent to fq-mgr. The exception is if the hard switch timeout pops where reservations
are pushed straight away (assumption is that queues don't change).
20 Apr 2015 : Ensured that reservations are pushed following a cancel request.
26 May 2015 : Conversion to support pledge as an interface.
01 Jun 2015 : Corrected bug in delete all which was attempting to delete expired reservations.
15 Jun 2015 : Added oneway support.
18 Jun 2015 : Added oneway delete support.
25 Jun 2015 : Corrected bug preventing mirror reservations from being deleted (they require an agent
command to be run and it wasn't.)
08 Sep 2015 : Prevent checkpoint files from being written in the same second (gh#22).
08 Oct 2015 : Added !pushed check back to active reservation pushes.
27 Jan 2015 : Changes to support passthru reservations.
06 Mar 2016 : Added a second channel interface (rmgrlu_ch) to deal with lookup requests for
mirrors since they need this from agent manager which was creating a deadlock.
07 Mar 2016 : Special tickle channel introduced. It allows only 5 tickels to be queued; tickle will
drop and not block if full. This prevents the main queue from being overrun with tickles
if a request takes a while (chkpt load).
08 Apr 2016 : Added retry cache to the inventory. Cache will track pledges which were loaded from
the datacache (checkpoint) and we've not been able to vet. They should be retried
assuming that vetting failed because of a network graph issue (unknown path etc) and that
later attempt will be successful.
12 Apr 2016 : Added support to detect when a duplicate reservaiton should be allowed, and the previous
one cancelled, due to a host move.
*/
package managers
import (
"bytes"
"fmt"
"os"
"strings"
"time"
"github.com/att/gopkgs/bleater"
"github.com/att/gopkgs/clike"
"github.com/att/gopkgs/chkpt"
"github.com/att/gopkgs/ipc"
"github.com/att/tegu/gizmos"
)
//var ( NO GLOBALS HERE; use globals.go )
// --------------------------------------------------------------------------------------
/*
Manages the reservation inventory
*/
type Inventory struct {
cache map[string]*gizmos.Pledge // cache of pledges
retry map[string]*gizmos.Pledge // pledges loaded from datacache that have not vetted
ulcap_cache map[string]int // cache of user link capacity values (max value)
chkpt *chkpt.Chkpt
}
// --- Private --------------------------------------------------------------------------
/*
Encapsulate all of the current reservations into a single json blob.
*/
func ( i *Inventory ) res2json( ) (json string, err error) {
var (
sep string = ""
)
err = nil;
json = `{ "reservations": [ `
for _, p := range i.cache {
if ! (*p).Is_expired( ) {
json += fmt.Sprintf( "%s%s", sep, (*p).To_json( ) )
sep = ","
}
}
json += " ] }"
return
}
/*
Given a name, send a request to the network manager to translate it to an IP address.
If the name is nil or empty, we return nil. This is legit for steering in the case of
L* endpoint specification.
*/
func name2ip( name *string ) ( ip *string ) {
ip = nil
if name == nil || *name == "" {
return
}
ch := make( chan *ipc.Chmsg )
defer close( ch ) // close it on return
msg := ipc.Mk_chmsg( )
msg.Send_req( nw_ch, ch, REQ_GETIP, name, nil )
msg = <- ch
if msg.State == nil { // success
ip = msg.Response_data.(*string)
} else {
rm_sheep.Baa( 2, "name didn't translate to ip: %s", name )
}
return
}
/*
Given a name, get host info (IP, mac, switch-id, switch-port) from network.
*/
func get_hostinfo( name *string ) ( *string, *string, *string, int ) {
if name != nil && *name != "" {
ch := make( chan *ipc.Chmsg );
req := ipc.Mk_chmsg( )
req.Send_req( nw_ch, ch, REQ_HOSTINFO, name, nil ) // get host info string (mac, ip, switch)
req = <- ch
if req.State == nil {
htoks := strings.Split( req.Response_data.( string ), "," ) // results are: ip, mac, switch-id, switch-port; all strings
return &htoks[0], &htoks[1], &htoks[2], clike.Atoi( htoks[3] )
} else {
rm_sheep.Baa( 1, "get_hostinfo: error from network mgr: %s", req.State )
}
}
rm_sheep.Baa( 1, "get_hostinfo: no name provided" )
return nil, nil, nil, 0
}
/*
Handles a response from the fq-manager that indicates the attempt to send a proactive ingress/egress flowmod to skoogi
has failed. Issues a warning to the log, and resets the pushed flag for the associated reservation.
*/
func (i *Inventory) failed_push( msg *ipc.Chmsg ) {
if msg.Req_data == nil {
rm_sheep.Baa( 0, "IER: notification of failed push had no information" )
return
}
fq_data := msg.Req_data.( *Fq_req ) // data that was passed to fq_mgr (we'll dig out pledge id
// TODO: set a counter in pledge so that we only try to push so many times before giving up.
rm_sheep.Baa( 1, "WRN: proactive ie reservation push failed, pledge marked unpushed: %s [TGURMG002]", *fq_data.Id )
p := i.cache[*fq_data.Id]
if p != nil {
(*p).Reset_pushed()
}
}
/*
Checks to see if any reservations expired in the recent past (seconds). Returns true if there were.
*/
func (i *Inventory) any_concluded( past int64 ) ( bool ) {
for _, p := range i.cache { // run all pledges that are in the cache
if p != nil && (*p).Concluded_recently( past ) { // pledge concluded within past seconds
return true
}
}
return false
}
/*
Checks to see if any reservations became active between (now - past) and the current time, or will become
active between now and now + future seconds. (Past and future are number of seconds on either side of
the current time to check and are NOT timestamps.)
*/
func (i *Inventory) any_commencing( past int64, future int64 ) ( bool ) {
for _, p := range i.cache { // run all pledges that are in the cache
if p != nil && ((*p).Commenced_recently( past ) || (*p).Is_active_soon( future ) ) { // will activate between now and the window
return true
}
}
return false
}
/*
Deprecated -- these should no longer be set by tegu and if really needed should
be set by the ql_bw*fmods and other agent scripts.
Push table 9x flow-mods. The flowmods we toss into the 90 range of
tables generally serve to mark metadata in a packet since metadata
cannot be marked prior to a resub action (flaw in OVS if you ask me).
Marking metadata is needed so that when one of our f-mods match we can
resubmit into table 0 without triggering a loop, or a match of any
of our other rules.
Table is the table number (we assume 9x, but it could be anything)
Meta is a string supplying the value/mask that is used on the action (e.g. 0x02/0x02)
to set the 00000010 bit as an and operation.
Cookie is the cookie value used on the f-mod.
*/
func table9x_fmods( rname *string, host string, table int, meta string, cookie int ) {
fq_data := Mk_fqreq( rname ) // f-mod request with defaults (output==none)
fq_data.Table = table
fq_data.Cookie = cookie
fq_data.Expiry = 0 // never expire
// CAUTION: fq_mgr generic fmod needs to be changed and when it does these next three lines will need to change too
fq_data.Espq = gizmos.Mk_spq( host, -1, -1 ) // send to specific host
dup_str := "br-int" // these go to br-int only
fq_data.Swid = &dup_str
fq_data.Action.Meta = &meta // sole purpose is to set metadata
msg := ipc.Mk_chmsg()
msg.Send_req( fq_ch, nil, REQ_GEN_FMOD, fq_data, nil ) // no response right now -- eventually we want an asynch error
}
/*
Causes all alternate table flow-mods to be sent for the hosts in the given queue list
It can be expensive (1-2 seconds/flow mod), so we assume this is being driven only
when there are queue changes. Phsuffix is the host suffix that is added to any host
name (e.g. -ops).
*/
func send_meta_fmods( qlist []string, alt_table int ) {
target_hosts := make( map[string]bool ) // hosts that are actually affected by the queue list
for i := range qlist { // make a list of hosts we need to send fmods to
toks := strings.SplitN( qlist[i], "/", 2 ) // split host from front
if len( toks ) == 2 { // should always be, but don't choke if not
target_hosts[toks[0]] = true // fq-mgr will add suffix if needed
}
}
for h := range target_hosts {
rm_sheep.Baa( 2, "sending metadata flow-mods to %s alt-table base %d", h, alt_table )
id := "meta_" + h
table9x_fmods( &id, h, alt_table, "0x01/0x01", 0xe5d )
table9x_fmods( &id, h, alt_table+1, "0x02/0x02", 0xe5d )
}
}
/*
Runs the list of reservations in the cache and pushes out any that are about to become active (in the
next 15 seconds). Also handles undoing any mirror reservations that have expired.
Favour_v6 is passed to push_bw and will favour the IPv6 address if a host has both addresses defined.
Returns the number of reservations that were pushed.
*/
func (i *Inventory) push_reservations( ch chan *ipc.Chmsg, alt_table int, hto_limit int64, pref_v6 bool ) ( npushed int ) {
var (
bw_push_count int = 0
st_push_count int = 0
pend_count int = 0
pushed_count int = 0
)
rm_sheep.Baa( 4, "pushing reservations, %d in cache", len( i.cache ) )
for rname, p := range i.cache { // run all pledges that are in the cache
if p != nil {
if (*p).Is_expired() { // some reservations need to be explicitly undone at expiry
if (*p).Is_pushed() { // no need if not pushed
switch (*p).(type) {
case *gizmos.Pledge_mirror: // mirror requests need to be undone when they become inactive
undo_mirror_reservation( p, rname, ch )
}
(*p).Reset_pushed()
}
} else {
if ! (*p).Is_pushed() && ((*p).Is_active() || (*p).Is_active_soon( 15 )) { // not pushed, and became active while we napped, or will activate in the next 15 seconds
switch (*p).(type) {
case *gizmos.Pledge_bwow:
bwow_push_res( p, &rname, ch, hto_limit, pref_v6 )
(*p).Set_pushed( )
case *gizmos.Pledge_bw:
bw_push_count++
bw_push_res( p, &rname, ch, hto_limit, alt_table, pref_v6 )
case *gizmos.Pledge_steer:
st_push_count++
push_st_reservation( p, rname, ch, hto_limit )
case *gizmos.Pledge_mirror:
push_mirror_reservation( p, rname, ch )
case *gizmos.Pledge_pass:
pass_push_res( p, &rname, ch, hto_limit )
}
pushed_count++
} else { // stil pending
pend_count++
}
}
}
}
if st_push_count > 0 || bw_push_count > 0 || rm_sheep.Would_baa( 3 ) { // bleat if we pushed something, or if higher level is set in the sheep
rm_sheep.Baa( 1, "push_reservations: %d bandwidth, %d steering, %d pending, %d already pushed", bw_push_count, st_push_count, pend_count, pushed_count )
}
return pushed_count
}
/*
Turn pause mode on for all current reservations and reset their push flag so that they all get pushed again.
*/
func (i *Inventory) pause_on( ) {
for _, p := range i.cache {
(*p).Pause( true ) // also reset the push flag
}
}
/*
Turn pause mode off for all current reservations and reset their push flag so that they all get pushed again.
*/
func (i *Inventory) pause_off( ) {
for _, p := range i.cache {
(*p).Resume( true ) // also reset the push flag
}
}
/*
Resets the pushed flag for all reservations so that we can periodically send them
when needing to avoid timeout limits in virtual switches.
*/
func (i *Inventory) reset_push() {
for _, p := range i.cache {
(*p).Reset_pushed( )
}
}
/*
Run the set of reservations in the cache and write any that are not expired out to the checkpoint file.
For expired reservations, we'll delete them if they test positive for extinction (dead for more than 120
seconds).
Because of timestamp limitations on the file system, it is possible for the start process to select the
wrong checkpoint file if more than one checkpoint files were created within a second of each other. To
prevent problems this function will only write a checkpoint if the last one was written more than two
seconds ago (to avoid clock issues and the nano timer). If it hasn't been longe enough, this function
returns true (retry) and the calling function should call again (probably after a tickler pop) to
issue a checkpoint. There is no need to "queue" anything because if several checkpoint requests are
made in the same second, then all of them will be captured the next time a write is allowed and the
inventory is parsed. If the checkpoint can be written, then false is returned. In either case,
the time that the last checkpoint file was written is also returned.
*/
func (i *Inventory) write_chkpt( last int64 ) ( retry bool, timestamp int64 ) {
now := time.Now().Unix()
if now - last < 2 {
rm_sheep.Baa( 2, "retry checkpoint signaled" )
return true, last // can only dump 1/min; show queued to force main loop to recall
}
err := i.chkpt.Create( )
if err != nil {
rm_sheep.Baa( 0, "CRI: resmgr: unable to create checkpoint file: %s [TGURMG003]", err )
return false, last
}
for nm, v := range i.ulcap_cache { // write out user link capacity limits that have been set
fmt.Fprintf( i.chkpt, "ucap: %s %d\n", nm, v ) // we'll check the overall error state on close
}
for key, p := range i.cache {
s := (*p).To_chkpt()
if s != "expired" {
fmt.Fprintf( i.chkpt, "%s\n", s ) // we'll check the overall error state on close
} else {
if (*p).Is_extinct( 120 ) && (*p).Is_pushed( ) { // if really old and extension was pushed, safe to clean it out
rm_sheep.Baa( 1, "extinct reservation purged: %s", key )
delete( i.cache, key )
}
}
}
for key, p := range i.retry {
s := (*p).To_chkpt()
if s != "expired" {
fmt.Fprintf( i.chkpt, "%s\n", s ) // we'll check the overall error state on close
} else {
if (*p).Is_extinct( 120 ) && (*p).Is_pushed( ) { // if really old and extension was pushed, safe to clean it out
rm_sheep.Baa( 1, "extinct reservation purged: %s", key )
delete( i.cache, key )
}
}
}
ckpt_name, err := i.chkpt.Close( )
if err != nil {
rm_sheep.Baa( 0, "CRI: resmgr: checkpoint write failed: %s: %s [TGURMG004]", ckpt_name, err )
} else {
rm_sheep.Baa( 1, "resmgr: checkpoint successful: %s", ckpt_name )
}
return false, time.Now().Unix() // not queued, and send back the new chkpt time
}
/*
Given a host name, return all pledges that involve that host as a list.
Currently no error is detected and the list may be nil if there are no pledges.
*/
func (inv *Inventory) pledge_list( vmname *string ) ( []*gizmos.Pledge, error ) {
if len( inv.cache ) <= 0 {
return nil, nil
}
plist := make( []*gizmos.Pledge, len( inv.cache ) )
i := 0
for _, p := range inv.cache {
if (*p).Has_host( vmname ) && ! (*p).Is_expired() && ! (*p).Is_paused() {
plist[i] = p
i++
}
}
return plist[0:i], nil
}
/*
Set the user link capacity and forward it on to the network manager. We expect this
to be a request from the far side (user/admin) or read from the chkpt file so
the value is passed as a string (which is also what network wants too.
*/
func (inv *Inventory) add_ulcap( name *string, sval *string ) {
val := clike.Atoi( *sval )
pdata := make( []*string, 2 ) // parameters for message to network
pdata[0] = name
pdata[1] = sval
if val >= 0 && val < 101 {
rm_sheep.Baa( 2, "adding user cap: %s %d", *name, val )
inv.ulcap_cache[*name] = val
req := ipc.Mk_chmsg( )
req.Send_req( nw_ch, nil, REQ_SETULCAP, pdata, nil ) // push into the network environment
} else {
if val == -1 {
delete( inv.ulcap_cache, *name )
req := ipc.Mk_chmsg( )
req.Send_req( nw_ch, nil, REQ_SETULCAP, pdata, nil ) // push into the network environment
} else {
rm_sheep.Baa( 1, "user link capacity not set %d is out of range (1-100)", val )
}
}
}
// --- Public ---------------------------------------------------------------------------
/*
constructor
*/
func Mk_inventory( ) (inv *Inventory) {
inv = &Inventory { }
inv.cache = make( map[string]*gizmos.Pledge, 4096 ) // initial size is not a limit but a hint
inv.retry = make( map[string]*gizmos.Pledge, 2048 )
inv.ulcap_cache = make( map[string]int, 64 )
return
}
/*
Stuff the pledge into the cache erroring if the pledge already exists.
Expect either a Pledge, or a pointer to a pledge.
*/
func (inv *Inventory) Add_res( pi interface{} ) (err error) {
var (
p *gizmos.Pledge
)
err = nil
px, ok := pi.( gizmos.Pledge )
if ok {
p = &px
} else {
py, ok := pi.( *gizmos.Pledge )
if ok {
p = py
} else {
err = fmt.Errorf( "internal mishap in Add_res: expected Pledge or *Pledge, got neither" )
rm_sheep.Baa( 1, "%s", err )
return
}
}
id := (*p).Get_id()
if inv.cache[*id] != nil {
rm_sheep.Baa( 2, "reservation not added to inventory, already exists: %s", *id )
err = fmt.Errorf( "reservation already exists: %s", *id )
return
}
inv.cache[*id] = p
rm_sheep.Baa( 1, "resgmgr: added reservation: %s", (*p).To_chkpt() )
return
}
/*
Return the reservation that matches the name passed in provided that the cookie supplied
matches the cookie on the reservation as well. The cookie may be either the cookie that
the user supplied when the reservation was created, or may be the 'super cookie' admin
'root' as you will, which allows access to all reservations. The return will be nil,nil
if it's not found; nil,state indicates an error.
*/
func (inv *Inventory) Get_res( name *string, cookie *string ) (p *gizmos.Pledge, state error) {
state = nil
p = inv.cache[*name]
if p == nil {
state = fmt.Errorf( "cannot find reservation: %s", *name )
return
}
if ! (*p).Is_valid_cookie( cookie ) && *cookie != *super_cookie {
rm_sheep.Baa( 2, "resgmgr: denied fetch of reservation: cookie supplied (%s) didn't match that on pledge %s", *cookie, *name )
p = nil
state = fmt.Errorf( "not authorised to access or delete reservation: %s", *name )
return
}
rm_sheep.Baa( 2, "resgmgr:: fetched reservation: %s", (*p).To_str() )
return
}
/*
Search the retry cache for the reservation and return if it is found and the given
cookie matches, or the super cookie is given.
*/
func (inv *Inventory) Get_retry_res( name *string, cookie *string ) (p *gizmos.Pledge, state error) {
state = nil
p = inv.retry[*name]
if p == nil {
state = fmt.Errorf( "cannot find reservation in retry cache: %s", *name )
return
}
if ! (*p).Is_valid_cookie( cookie ) && *cookie != *super_cookie {
rm_sheep.Baa( 2, "resgmgr: denied fetch of reservation: cookie supplied (%s) didn't match that on pledge %s", *cookie, *name )
p = nil
state = fmt.Errorf( "not authorised to access or delete reservation: %s", *name )
return
}
rm_sheep.Baa( 2, "resgmgr:: fetched reservation from retry cache: %s", (*p).To_str() )
return
}
/*
Check the two pledges (old, new) to see if the related physical hosts have moved.
Returns true if the physical hosts have changed. We get the current physical location
for the hosts from the network based on the new pledge, and look at the path of the
old pledge to see if they are the same as what was captured in the original path.
We return true if they are different.
*/
func phosts_changed( old *gizmos.Pledge, new *gizmos.Pledge ) ( bool ) {
var (
p2 *string = nil
)
if old == nil || new == nil {
return false
}
a1, a2 := (*new).Get_hosts( ) // get hosts from the new pledge
ch := make( chan *ipc.Chmsg ) // do not close -- senders close channels
req := ipc.Mk_chmsg( )
req.Send_req( nw_ch, ch, REQ_GETPHOST, a1, nil ) // xlate hostnames to physical host location
req = <- ch // wait for response from network
p1 := req.Response_data.( *string )
if a2 != nil {
if len( *a2) > 1 && (*a2)[0:1] != "!" { // !// names aren't known, don't map
req.Send_req( nw_ch, ch, REQ_GETPHOST, a2, nil )
req = <- ch
if req.Response_data != nil { // for an external address this will be unknown
p2 = req.Response_data.( *string )
}
}
}
return (*old).Same_anchors( p1, p2 )
}
/*
Search the given cache for a duplicate of the target pledge. Return the reservation ID
if a dup. If a duplicate, and a bandwidth reservation, then test to see if the physical
hosts have changed. If they have, then we expire the old reservation, and allow this
to go in by returning a nil string.
*/
func dup_in_cache( cache map[string]*gizmos.Pledge, target *gizmos.Pledge ) ( rid *string ) {
isbw := false
switch (*target).( type ) {
case *gizmos.Pledge_bw:
isbw = true
case *gizmos.Pledge_bwow:
isbw = true
case *gizmos.Pledge_pass:
isbw = true
default:
}
rid = nil
for _, r := range cache {
if !(*r).Is_expired() && (*target).Equals( r ) { // duplicates; if a bandwidth pledge, see if anchor has shifted
rid := (*r).Get_id( ) // duplicate id to send back if not a bw or no change to phost
if isbw { // if passed pledge is a bandwidth, check paths
if ! phosts_changed( r, target ) { // if they aren't on the same places, then we should refresh
(*r).Reset_pushed( ) // we'll force this out
(*r).Set_expiry( time.Now().Unix() + 15 ) // force expiry of old
rm_sheep.Baa( 1, "duplicate with different anchors will be refreshed: %s", *r )
return nil
}
}
rm_sheep.Baa( 2, "duplicate detected: %s", *r )
return rid
}
}
return rid
}
/*
Accept a reservation (pledge) and see if it matches any existing reservation in
the inventory. If it does, return the reservation id as data, set error if
we encounter problems.
*/
func (inv *Inventory) dup_check( p *gizmos.Pledge ) ( rid *string, state error ) {
rid = nil
state = nil
if inv == nil {
state = fmt.Errorf( "inventory is nil" )
return
}
rid = dup_in_cache( inv.cache, p )
if rid != nil {
return rid, nil
}
rid = dup_in_cache( inv.retry, p )
if rid != nil {
return rid, nil
}
/*
bwr2, isbw := (*p).( *gizmos.Pledge_bw )
for _, r := range inv.cache {
if !(*r).Is_expired() && (*p).Equals( r ) { // duplicates; if a bandwidth pledge, see if anchor has shifted
rid := (*r).Get_id( ) // duplicate id to send back if not a bw or no change to phost
if isbw { // if passed pledge is a bandwidth, check paths
bwr1, ok := (*r).( *gizmos.Pledge_bw ) // should be, but be paranoid
if ok {
if ! phosts_changed( bw1, bw2 ) { // if they aren't on the same places, then we should refresh
(*r).Reset_pushed( ) // we'll force this out
(*r).Set_expiry( time.Now().Unix() + 15 ) // force expiry of old
rm_sheep.Baa( 1, "duplicate with different anchors will be refreshed: %s", *r )
return nil, nil
}
}
}
rm_sheep.Baa( 2, "duplicate detected: %s", *r )
return rid, nil
}
}
for _, r := range inv.retry { // not in main cache, check the retry cache
if !(*r).Is_expired() && (*p).Equals( r ) {
rid := (*r).Get_id( ) // duplicate id to send back if not
if isbw { // if passed pledge is a bandwidth, check paths
bwr1, ok := (*r).( *gizmos.Pledge_bw ) // should be, but be paranoid
if ok {
if ! bwr1.Same_paths( bwr2 ) { // different paths; something changed so trash the existing and let the new come in
(*r).Reset_pushed( ) // we'll force this out
(*r).Set_expiry( time.Now().Unix() + 15 ) // force expiry of old
rm_sheep.Baa( 1, "duplicate with different anchors will be refreshed: %s", *r )
return nil, nil
}
}
}
rm_sheep.Baa( 2, "duplicate detected: %s", *r )
return rid, nil
}
}
*/
return nil, nil
}
func (inv *Inventory) Get_mirrorlist() ( string ) {
sep := ""
bs := bytes.NewBufferString("")
for _, gp := range inv.cache {
//if (*p).Is_mirroring() && !(*p).Is_expired() {
//if (*p).Is_ptype( gizmos.PT_MIRRORING ) && !(*p).Is_expired() {
p, ok := (*gp).(*gizmos.Pledge_mirror)
if ok && !p.Is_expired() {
bs.WriteString(fmt.Sprintf("%s%s", sep, *p.Get_id()))
sep = " "
}
}
return bs.String()
}
/*
Looks for the named reservation and deletes it if found. The cookie must be either the
supper cookie, or the cookie that the user supplied when the reservation was created.
Deletion is affected by resetting the expiry time on the pledge to now + a few seconds.
This will cause a new set of flow-mods to be sent out with an expiry time that will
take them out post haste and without the need to send "delete" flow-mods out.
This function sends a request to the network manager to delete the related queues. This
must be done here so as to prevent any issues with the loosely coupled management of
reservation and queue settings. It is VERY IMPORTANT to delete the reservation from
the network perspective BEFORE the expiry time is reset. If it is reset first then
the network splits timeslices based on the new expiry and queues end up dangling.
*/
func (inv *Inventory) Del_res( name *string, cookie *string ) (state error) {
gp, state := inv.Get_res( name, cookie )
if gp != nil {
rm_sheep.Baa( 2, "resgmgr: deleted reservation: %s", (*gp).To_str() )
state = nil
switch p := (*gp).(type) {
case *gizmos.Pledge_mirror:
p.Set_expiry( time.Now().Unix() ) // expire the mirror NOW
p.Set_pushed() // need this to force undo to occur
case *gizmos.Pledge_bw, *gizmos.Pledge_bwow: // network handles either type
ch := make( chan *ipc.Chmsg ) // do not close -- senders close channels
req := ipc.Mk_chmsg( )
req.Send_req( nw_ch, ch, REQ_DEL, p, nil ) // delete from the network point of view
req = <- ch // wait for response from network
state = req.State
p.Set_expiry( time.Now().Unix() + 15 ) // set the expiry to 15s from now which will force it out
(*gp).Reset_pushed() // force push of flow-mods that reset the expiry
case *gizmos.Pledge_pass:
p.Set_expiry( time.Now().Unix() + 15 ) // set the expiry to 15s from now which will force it out
(*gp).Reset_pushed() // force push of flow-mods that reset the expiry
}
} else {
if state == nil {
gp, state = inv.Get_retry_res( name, cookie ) // see if it's in the retry cache and cookie was valid for it
if gp != nil {
// FIXME????
// do we need to mark and continue to retry this and after it passes vetting then let it delete by pusshing out
// short term flow-mods? this would cover the case where the flow-mods were pushed, but when tegu restarted ostack
// didn't have enough info to vet the pledge, and thus the existing flow-mods do need to be reset on the phyisical
// host.
delete( inv.retry, *name ) // for pledges on the retry cache, they can just be deleted since no flow-mods exist etc
}
} else {
rm_sheep.Baa( 2, "resgmgr: unable to delete reservation: not found: %s", *name )
}
}
return
}
/*
delete all of the reservations provided that the cookie is the super cookie. If cookie
is a user cookie, then deletes all reservations that match the cookie.
*/
func (inv *Inventory) Del_all_res( cookie *string ) ( ndel int ) {
var (
plist []*string // we'll create a list to avoid deletion issues with range
i int
)
ndel = 0
plist = make( []*string, len( inv.cache ) ) // build a list so we can safely remove from the map
for _, pledge := range inv.cache {
if ! (*pledge).Is_expired( ) {
plist[i] = (*pledge).Get_id()
i++
}
}
plist = plist[:i] // slice down to what was actually filled in
for _, pname := range plist {
rm_sheep.Baa( 2, "delete all attempt to delete: %s", *pname )
err := inv.Del_res( pname, cookie )
if err == nil {
ndel++
rm_sheep.Baa( 1, "delete all deleted reservation %s", *pname )
} else {
rm_sheep.Baa( 1, "delete all skipped reservation %s", *pname )
}
}
rm_sheep.Baa( 1, "delete all deleted %d reservations %s", ndel )
return
}
/*
Pulls the reservation from the inventory. Similar to delete, but not quite the same.
This will clone the pledge. The clone is expired and left in the inventory to force
a reset of flowmods. The network manager is sent a request to delete the queues
associated with the path and the path is removed from the original pledge. The original
pledge is returned so that it can be used to generate a new set of paths based on the
hosts, expiry and bandwidth requirements of the initial reservation.
Unlike the get/del functions, this is meant for internal support and does not
require a cookie.
It is important to delete the reservation from the network manager point of view
BEFORE the expiry is reset. If expiry is set first then the network manager will
cause queue timeslices to be split on that boundary leaving dangling queues.
*/
func (inv *Inventory) yank_res( name *string ) ( p *gizmos.Pledge, state error) {
state = nil
p = inv.cache[*name]
if p != nil {
switch pldg := (*p).(type) {
case *gizmos.Pledge_bw:
rm_sheep.Baa( 2, "resgmgr: yanked reservation: %s", (*p).To_str() )
cp := pldg.Clone( *name + ".yank" ) // clone but DO NOT set conclude time until after network delete!
icp := gizmos.Pledge(cp) // must convert to a pledge interface
inv.cache[*name + ".yank"] = &icp // and then insert the address of the interface
inv.cache[*name] = nil // yank original from the list
delete( inv.cache, *name )
pldg.Set_path_list( nil ) // no path list for this pledge
ch := make( chan *ipc.Chmsg )
req := ipc.Mk_chmsg( )
req.Send_req( nw_ch, ch, REQ_DEL, cp, nil ) // delete from the network point of view
req = <- ch // wait for response from network
state = req.State
// now safe to set these
cp.Set_expiry( time.Now().Unix() + 1 ) // force clone to be expired
cp.Reset_pushed( ) // force it to go out again
// not supported for other pledge types
}
} else {
state = fmt.Errorf( "no reservation with name: %s", *name )
rm_sheep.Baa( 2, "resgmgr: unable to yank, no reservation with name: %s", *name )
}
return
}
/*
Wait and respond to RMLU_ requests received on the channel.
This interface is provided because agent manager wants to look up reservations
rather than queuing the data to be attached to a reservation onto the main
repmgr queue.
*/
func rm_lookup( my_chan chan *ipc.Chmsg, inv *Inventory ) {
for {
msg := <- my_chan // wait for next message
switch msg.Msg_type {
case RMLU_GET_MIRRORS: // user initiated get list of mirrors
t := inv.Get_mirrorlist()
msg.Response_data = &t;
case RMLU_GET: // user initiated get -- requires cookie
data := msg.Req_data.( []*string ) // assume pointers to name and cookie
msg.Response_data, msg.State = inv.Get_res( data[0], data[1] )
default:
rm_sheep.Baa( 1, "invalid request received by rm_lookup: %d", msg.Msg_type )
}
if msg.Response_ch != nil { // if a response channel was provided
msg.Response_ch <- msg // send our result back to the requester
}
}
}
//---- res-mgr main goroutine -------------------------------------------------------------------------------
/*
Executes as a goroutine to drive the reservation manager portion of tegu.
*/
func Res_manager( my_chan chan *ipc.Chmsg, cookie *string ) {
var (