-
Notifications
You must be signed in to change notification settings - Fork 56
/
evtsubscription.go
1295 lines (1203 loc) · 51.1 KB
/
evtsubscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//(C) Copyright [2020] Hewlett Packard Enterprise Development LP
//
//Licensed under the Apache License, Version 2.0 (the "License"); you may
//not use this file except in compliance with the License. You may obtain
//a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
//WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
//License for the specific language governing permissions and limitations
// under the License.
// Package events have the functionality of
// - Create Event Subscription
// - Delete Event Subscription
// - Get Event Subscription
// - Post Event Subscription to destination
// - Post TestEvent (SubmitTestEvent)
// and corresponding unit test cases
package events
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"strings"
"sync"
"github.com/ODIM-Project/ODIM/lib-dmtf/model"
"github.com/ODIM-Project/ODIM/lib-utilities/common"
"github.com/ODIM-Project/ODIM/lib-utilities/config"
"github.com/ODIM-Project/ODIM/lib-utilities/errors"
l "github.com/ODIM-Project/ODIM/lib-utilities/logs"
eventsproto "github.com/ODIM-Project/ODIM/lib-utilities/proto/events"
errResponse "github.com/ODIM-Project/ODIM/lib-utilities/response"
"github.com/ODIM-Project/ODIM/lib-utilities/services"
"github.com/ODIM-Project/ODIM/svc-events/evcommon"
"github.com/ODIM-Project/ODIM/svc-events/evmodel"
"github.com/ODIM-Project/ODIM/svc-events/evresponse"
"github.com/google/uuid"
)
// FillInSubTaskID is used as a nil value for subtask id as for some tasks subtask is not created
// and the function expects a subtask id in the parameters
var FillInSubTaskID = ""
// ValidateRequest input request for create subscription
func (e *ExternalInterfaces) ValidateRequest(ctx context.Context, req *eventsproto.EventSubRequest,
postRequest model.EventDestination) (int32, string, []interface{}, error) {
invalidProperties, err := common.RequestParamsCaseValidator(req.PostBody, postRequest)
if err != nil {
errMsg := "error while validating request parameters: " + err.Error()
return http.StatusInternalServerError, errResponse.InternalError, nil, fmt.Errorf(errMsg)
} else if invalidProperties != "" {
errorMessage := "error: one or more properties given in the request body are not valid, ensure properties are listed in upper camel case "
return http.StatusBadRequest, errResponse.PropertyUnknown, []interface{}{invalidProperties}, fmt.Errorf(errorMessage)
}
//check mandatory fields
statusCode, statusMessage, messageArgs, invalidFieldError := validateFields(&postRequest)
if invalidFieldError != nil {
return statusCode, statusMessage, messageArgs, invalidFieldError
}
//validate destination URI in the request
if !common.URIValidator(postRequest.Destination) {
errorMessage := "error: request body contains invalid value for Destination field, " + postRequest.Destination
return http.StatusBadRequest, errResponse.PropertyValueFormatError, []interface{}{postRequest.Destination, "Destination"}, fmt.Errorf(errorMessage)
}
// check any of the subscription present for the destination from the request
// if errored out or no subscriptions then add subscriptions else return an error
subscriptionDetails, _ := e.GetEvtSubscriptions(postRequest.Destination)
if len(subscriptionDetails) > 0 {
return http.StatusConflict, errResponse.ResourceInUse, []interface{}{postRequest.Destination, "Destination"}, fmt.Errorf("subscription already present for the requested destination")
}
return http.StatusOK, common.OK, []interface{}{}, nil
}
// CreateEventSubscription is a API to create event subscription
func (e *ExternalInterfaces) CreateEventSubscription(ctx context.Context, taskID string, sessionUserName string, req *eventsproto.EventSubRequest) errResponse.RPC {
var (
err error
resp errResponse.RPC
postRequest model.EventDestination
percentComplete int32 = 100
targetURI = "/redfish/v1/EventService/Subscriptions"
)
if err = json.Unmarshal(req.PostBody, &postRequest); err != nil {
l.LogWithFields(ctx).Error(err.Error())
evcommon.GenErrorResponse(err.Error(), errResponse.MalformedJSON, http.StatusBadRequest, []interface{}{}, &resp)
e.UpdateTask(ctx, fillTaskData(taskID, targetURI, string(req.PostBody), resp, common.Exception, common.Critical, percentComplete, http.MethodPost))
return resp
}
// ValidateRequest input request for create subscription
statusCode, statusMessage, messageArgs, validationErr := e.ValidateRequest(ctx, req, postRequest)
if validationErr != nil {
evcommon.GenErrorResponse(validationErr.Error(), statusMessage, statusCode,
messageArgs, &resp)
l.LogWithFields(ctx).Error(validationErr.Error())
e.UpdateTask(ctx, fillTaskData(taskID, targetURI, string(req.PostBody),
resp, common.Exception, common.Critical, percentComplete, http.MethodPost))
return resp
}
// Get the target device details from the origin resources
// Loop through all origin list and form individual event subscription request,
// Which will then forward to plugin to make subscription with target device
var wg, taskCollectionWG sync.WaitGroup
var result = &evresponse.MutexLock{
Response: make(map[string]evresponse.EventResponse),
Hosts: make(map[string]string),
Lock: &sync.Mutex{},
}
// remove odataid in the origin resources
originResources := removeOdataIDfromOriginResources(postRequest.OriginResources)
// check and remove if duplicate OriginResources exist in the request
removeDuplicatesFromSlice(&originResources)
// If origin resource is nil then subscribe to all collection
if len(originResources) == 0 {
originResources = []string{
"/redfish/v1/Systems",
"/redfish/v1/Chassis",
"/redfish/v1/Fabrics",
"/redfish/v1/Managers",
"/redfish/v1/TaskService/Tasks",
}
}
var collectionList = make([]string, 0)
subTaskChan := make(chan int32, len(originResources))
taskCollectionWG.Add(1)
bubbleUpStatusCode := int32(http.StatusCreated)
go func() {
// Collect the channels and update percentComplete in Task
for i := 1; ; i++ {
statusCode, chanActive := <-subTaskChan
if !chanActive {
defer taskCollectionWG.Done()
break
}
if statusCode > bubbleUpStatusCode {
bubbleUpStatusCode = statusCode
}
if i <= len(originResources) && statusCode != http.StatusAccepted {
percentComplete = int32((i / len(originResources)) * 100)
if resp.StatusCode == 0 {
resp.StatusCode = http.StatusAccepted
}
e.UpdateTask(ctx, fillTaskData(taskID, targetURI, string(req.PostBody), resp, common.Running, common.OK, percentComplete, http.MethodPost))
}
}
}()
var isServerAdded = false
for _, origin := range originResources {
_, err := getUUID(origin)
if err != nil {
collection, collectionName, collectionFlag, aggregateResource, isAggregate, _ := e.checkCollection(origin)
wg.Add(1)
// for origin is collection
subTaskID := e.CreateSubTask(ctx, sessionUserName, taskID)
if len(collection) > 0 {
// This means we are handling a collection. For collections we are creating event subscriptions synchronously
// as the task status for collections is updated in event services but the task status for resources under collection is updated
// in task service through task events. In order to update the parent task all child tasks should be completed.
// To ensure this, we complete the event subscription for collection first and after completing it, we create event subscriptions
// for resources under collection asynchronously
e.createEventSubscription(ctx, taskID, subTaskID, subTaskChan, sessionUserName, targetURI, postRequest, origin,
result, &wg, collectionFlag, collectionName, aggregateResource, isAggregate)
} else {
// creating event subscriptions for resources under a collection
go e.createEventSubscription(ctx, taskID, subTaskID, subTaskChan, sessionUserName, targetURI, postRequest, origin,
result, &wg, collectionFlag, collectionName, aggregateResource, isAggregate)
}
for i := 0; i < len(collection); i++ {
isServerAdded = true
wg.Add(1)
// for subordinate origin
subTaskID = e.CreateSubTask(ctx, sessionUserName, taskID)
go e.createEventSubscription(ctx, taskID, subTaskID, subTaskChan, sessionUserName, targetURI, postRequest, collection[i],
result, &wg, false, "", aggregateResource, isAggregate)
}
if !isAggregate {
collectionList = append(collectionList, collection...)
}
} else {
isServerAdded = true
wg.Add(1)
subTaskID := e.CreateSubTask(ctx, sessionUserName, taskID)
go e.createEventSubscription(ctx, taskID, subTaskID, subTaskChan, sessionUserName, targetURI,
postRequest, origin, result, &wg, false, "", "", false)
}
}
wg.Wait()
// close channel once all sub-routines created have exited
close(subTaskChan)
// wait till all the subtasks are collected and routine is exited
taskCollectionWG.Wait()
var (
locationHeader string
successfulSubscriptionList = make([]model.Link, 0)
)
result.Lock.Lock()
originResourceProcessedCount := len(result.Response)
successfulSubscriptionList, result.Response = getSuccessfulResponse(result.Response)
result.Lock.Unlock()
// remove the underlying resource uri's from successfulSubscriptionList
for i := 0; i < len(collectionList); i++ {
for j := 0; j < len(successfulSubscriptionList); j++ {
if collectionList[i] == successfulSubscriptionList[j].Oid {
originResourceProcessedCount--
successfulSubscriptionList = append(successfulSubscriptionList[:j], successfulSubscriptionList[j+1:]...)
break
}
}
}
if len(successfulSubscriptionList) > 0 {
subscriptionID := uuid.New().String()
var hosts []string
resp, hosts = result.ReadResponse(subscriptionID)
if len(postRequest.OriginResources) == 0 {
successfulSubscriptionList = []model.Link{}
hosts = []string{}
}
statusCode, statusMessage, messageArgs, err = e.SaveSubscription(ctx, sessionUserName, subscriptionID,
hosts, successfulSubscriptionList, postRequest)
if err != nil {
l.LogWithFields(ctx).Error(err.Error())
evcommon.GenErrorResponse(err.Error(), statusMessage, statusCode,
messageArgs, &resp)
e.UpdateTask(ctx, fillTaskData(taskID, targetURI, string(req.PostBody),
resp, common.Exception, common.Critical, percentComplete, http.MethodPost))
return resp
}
locationHeader = resp.Header["Location"]
}
// if plugin returns the response code status accepted, then the task and child tasks will be updated by task service
if bubbleUpStatusCode == http.StatusAccepted {
if !isServerAdded {
e.UpdateTask(ctx, fillTaskData(taskID, targetURI, string(req.PostBody), resp, common.Completed, common.OK, percentComplete, http.MethodPost))
return resp
} else {
task := fillTaskData(taskID, targetURI, string(req.PostBody), resp, common.Running, common.OK, percentComplete-1, http.MethodPost)
task.FinalResponse = resp
e.UpdateTask(ctx, task)
return resp
}
}
l.LogWithFields(ctx).Debug("Process Count,", originResourceProcessedCount,
" successOriginResourceCount ", len(successfulSubscriptionList))
percentComplete = 100
if originResourceProcessedCount == len(successfulSubscriptionList) {
resp.StatusCode = http.StatusCreated
e.UpdateTask(ctx, fillTaskData(taskID, targetURI, string(req.PostBody), resp, common.Completed, common.OK, percentComplete, http.MethodPost))
} else {
args := errResponse.Args{
Code: errResponse.GeneralError,
Message: "event subscription for one or more origin resource(s) failed, check sub tasks for more info.",
}
resp.Body = args.CreateGenericErrorResponse()
resp.StatusCode = bubbleUpStatusCode
if locationHeader != "" {
resp.Header["Location"] = locationHeader
}
e.UpdateTask(ctx, fillTaskData(taskID, targetURI, string(req.PostBody), resp, common.Exception, common.Critical, percentComplete, http.MethodPost))
}
return resp
}
// SaveSubscription function save subscription in db
func (e *ExternalInterfaces) SaveSubscription(ctx context.Context, sessionUserName, subscriptionID string,
hosts []string, successfulSubscriptionList []model.Link, postRequest model.EventDestination) (int32, string, []interface{}, error) {
evtSubscription := evmodel.SubscriptionResource{
UserName: sessionUserName,
SubscriptionID: subscriptionID,
EventDestination: &model.EventDestination{
Destination: postRequest.Destination,
Name: postRequest.Name,
Context: postRequest.Context,
EventTypes: postRequest.EventTypes,
MessageIds: postRequest.MessageIds,
ResourceTypes: postRequest.ResourceTypes,
EventFormatType: postRequest.EventFormatType,
SubordinateResources: postRequest.SubordinateResources,
Protocol: postRequest.Protocol,
SubscriptionType: postRequest.SubscriptionType,
OriginResources: successfulSubscriptionList,
DeliveryRetryPolicy: postRequest.DeliveryRetryPolicy,
},
Hosts: hosts,
}
if err := e.SaveEventSubscription(evtSubscription); err != nil {
return http.StatusInternalServerError, errResponse.InternalError, []interface{}{}, err
}
return http.StatusOK, common.OK, []interface{}{}, nil
}
// eventSubscription method update subscription on device
func (e *ExternalInterfaces) eventSubscription(ctx context.Context, postRequest model.EventDestination, origin,
collectionName string, collectionFlag bool, subTaskID string) (string, evresponse.EventResponse) {
var resp evresponse.EventResponse
var err error
var plugin *common.Plugin
var contactRequest evcommon.PluginContactRequest
var target *common.Target
if !collectionFlag {
if strings.Contains(origin, "Fabrics") {
return e.createFabricSubscription(ctx, postRequest, origin, collectionName, collectionFlag)
}
target, resp, err = e.getTargetDetails(origin)
if err != nil {
return "", resp
}
var errs *errors.Error
plugin, errs = e.GetPluginData(target.PluginID)
if errs != nil {
errorMessage := "error while getting plugin data: " + errs.Error()
evcommon.GenEventErrorResponse(errorMessage, errResponse.InternalError, http.StatusInternalServerError,
&resp, []interface{}{})
l.LogWithFields(ctx).Error(errorMessage)
return "", resp
}
contactRequest.Plugin = plugin
if strings.EqualFold(plugin.PreferredAuthType, "XAuthToken") {
token := e.getPluginToken(ctx, plugin)
if token == "" {
evcommon.GenEventErrorResponse("error: Unable to create session with plugin "+plugin.ID, errResponse.NoValidSession, http.StatusUnauthorized,
&resp, []interface{}{})
return "", resp
}
contactRequest.Token = token
} else {
contactRequest.LoginCredential = map[string]string{
"UserName": plugin.Username,
"Password": string(plugin.Password),
}
}
}
subscriptionPost := model.EventDestination{
Name: postRequest.Name,
Destination: postRequest.Destination,
EventTypes: postRequest.EventTypes,
MessageIds: postRequest.MessageIds,
ResourceTypes: postRequest.ResourceTypes,
Protocol: postRequest.Protocol,
SubscriptionType: postRequest.SubscriptionType,
EventFormatType: postRequest.EventFormatType,
SubordinateResources: postRequest.SubordinateResources,
Context: postRequest.Context,
DeliveryRetryPolicy: postRequest.DeliveryRetryPolicy,
}
res, err := e.IsEventsSubscribed(ctx, "", origin, &subscriptionPost, plugin, target, collectionFlag, collectionName, false, "", false)
if err != nil {
resp.Response = res.Body
resp.StatusCode = int(res.StatusCode)
return "", resp
}
if collectionFlag {
l.LogWithFields(ctx).Info("Saving device subscription details of collection subscription")
if collectionName == "AggregateCollections" {
resp.StatusCode = http.StatusCreated
resp.Response = createEventSubscriptionResponse()
return collectionName, resp
}
err = e.saveDeviceSubscriptionDetails(common.DeviceSubscription{
Location: "",
EventHostIP: collectionName,
OriginResource: origin,
})
if err != nil {
errorMessage := "error while trying to save event subscription of device data: " + err.Error()
evcommon.GenEventErrorResponse(errorMessage, errResponse.InternalError, http.StatusInternalServerError,
&resp, []interface{}{})
l.LogWithFields(ctx).Error(errorMessage)
return "", resp
}
resp.StatusCode = http.StatusCreated
resp.Response = createEventSubscriptionResponse()
return collectionName, resp
}
return e.SaveSubscriptionOnDevice(ctx, origin, target, plugin, contactRequest, subscriptionPost, subTaskID)
}
// SaveSubscriptionOnDevice method update subscription on device
func (e *ExternalInterfaces) SaveSubscriptionOnDevice(ctx context.Context, origin string, target *common.Target,
plugin *common.Plugin, contactRequest evcommon.PluginContactRequest,
subscriptionPost model.EventDestination, subTaskID string) (string, evresponse.EventResponse) {
var resp evresponse.EventResponse
var pluginTaskInfo evmodel.PluginTaskInfo
postBody, err := json.Marshal(subscriptionPost)
if err != nil {
errorMessage := "error while marshaling: " + err.Error()
evcommon.GenEventErrorResponse(errorMessage, errResponse.InternalError, http.StatusInternalServerError,
&resp, []interface{}{})
l.LogWithFields(ctx).Error(errorMessage)
return "", resp
}
var reqData string
//replacing the request url with south bound translation URL
for key, value := range config.Data.URLTranslation.SouthBoundURL {
reqData = strings.Replace(string(postBody), key, value, -1)
}
target.PostBody = []byte(reqData)
contactRequest.URL = "/ODIM/v1/Subscriptions"
contactRequest.HTTPMethodType = http.MethodPost
contactRequest.PostBody = target
l.LogWithFields(ctx).Debug("Subscription Request: " + reqData)
response, err := e.callPlugin(context.TODO(), contactRequest)
if err != nil {
if evcommon.GetPluginStatus(ctx, plugin) {
response, err = e.callPlugin(context.TODO(), contactRequest)
}
if err != nil {
errorMessage := "error while contact plugin : " + err.Error()
evcommon.GenEventErrorResponse(errorMessage, errResponse.InternalError, http.StatusInternalServerError,
&resp, []interface{}{})
l.LogWithFields(ctx).Error(errorMessage)
return "", resp
}
}
defer response.Body.Close()
l.LogWithFields(ctx).Debug("Subscription Response StatusCode: " + strconv.Itoa(int(response.StatusCode)))
if response.StatusCode == http.StatusAccepted && subTaskID != FillInSubTaskID {
pluginTaskInfo.Location = response.Header.Get("Location")
pluginTaskInfo.PluginIP = response.Header.Get(common.XForwardedFor)
services.SavePluginTaskInfo(ctx, pluginTaskInfo.PluginIP, plugin.IP,
subTaskID, pluginTaskInfo.Location)
} else if response.StatusCode > http.StatusAccepted {
body, err := ioutil.ReadAll(response.Body)
if err != nil {
errorMessage := "error while trying to read response body: " + err.Error()
evcommon.GenEventErrorResponse(errorMessage, errResponse.InternalError, http.StatusInternalServerError,
&resp, []interface{}{})
l.LogWithFields(ctx).Error(errorMessage)
return "", resp
}
l.LogWithFields(ctx).Info("Subscription Response: " + string(body))
var res interface{}
err = json.Unmarshal(body, &res)
if err != nil {
errorMessage := "error while unmarshal the body : " + err.Error()
evcommon.GenEventErrorResponse(errorMessage, errResponse.InternalError, http.StatusInternalServerError,
&resp, []interface{}{})
l.LogWithFields(ctx).Error(errorMessage)
return "", resp
}
errorMessage := "error while trying to create event subscription"
resp.Response = res
resp.StatusCode = response.StatusCode
l.LogWithFields(ctx).Error(errorMessage)
return "", resp
}
// if Subscription location is empty then don't store event details in DB
locationHdr := response.Header.Get("location")
if locationHdr == "" {
errorMessage := "Subscription Location is missing in the response header"
evcommon.GenEventErrorResponse(errorMessage, errors.InternalError, http.StatusInternalServerError,
&resp, []interface{}{})
l.LogWithFields(ctx).Error(errorMessage)
return "", resp
}
// if status is 202 location will contain the task url
if response.StatusCode == http.StatusAccepted {
locationHdr = ""
}
// get the ip address from the host name
deviceIPAddress, err := common.GetIPFromHostName(target.ManagerAddress)
if err != nil {
evcommon.GenEventErrorResponse(err.Error(), errResponse.ResourceNotFound, http.StatusNotFound,
&resp, []interface{}{"ManagerAddress", target.ManagerAddress})
l.LogWithFields(ctx).Error(err.Error())
return "", resp
}
l.LogWithFields(ctx).Debug("Saving device subscription details : ", deviceIPAddress)
evtSubscription := common.DeviceSubscription{
Location: locationHdr,
EventHostIP: deviceIPAddress,
OriginResource: origin,
}
host, _, err := net.SplitHostPort(target.ManagerAddress)
if err != nil {
host = target.ManagerAddress
}
if !(strings.Contains(locationHdr, host)) {
evtSubscription.Location = "https://" + target.ManagerAddress + locationHdr
}
err = e.saveDeviceSubscriptionDetails(evtSubscription)
if err != nil {
errorMessage := "error while trying to save event subscription of device data: " + err.Error()
evcommon.GenEventErrorResponse(errorMessage, errResponse.InternalError, http.StatusInternalServerError,
&resp, []interface{}{})
l.LogWithFields(ctx).Error(errorMessage)
return "", resp
}
var outBody interface{}
body, err := ioutil.ReadAll(response.Body)
if err != nil {
errorMessage := "error while reading body : " + err.Error()
evcommon.GenEventErrorResponse(errorMessage, errResponse.InternalError, http.StatusInternalServerError,
&resp, []interface{}{})
l.LogWithFields(ctx).Error(errorMessage)
return "", resp
}
err = json.Unmarshal(body, &outBody)
if err != nil {
errorMessage := "error while unmarshal the body : " + err.Error()
evcommon.GenEventErrorResponse(errorMessage, errResponse.InternalError, http.StatusInternalServerError,
&resp, []interface{}{})
l.LogWithFields(ctx).Error(errorMessage)
return "", resp
}
resp.Response = outBody
resp.StatusCode = response.StatusCode
resp.Location = response.Header.Get("location")
return deviceIPAddress, resp
}
// IsEventsSubscribed is to check events already subscribed.
// if event already subscribed then will do search the subscription details in db against host IP
// if data found then delete the entry in db and get the event types
// and also delete the subscription on device also
// subscription: New Subscription
// subscriptionDetails : subscription details stored in db for the particular device
func (e *ExternalInterfaces) IsEventsSubscribed(ctx context.Context, token, origin string, subscription *model.EventDestination, plugin *common.Plugin, target *common.Target, collectionFlag bool, collectionName string, isAggregate bool, aggregateID string, isRemove bool) (errResponse.RPC, error) {
var resp errResponse.RPC
var err error
var host, originResource, searchKey string
// if Origin is collection then setting host with collection name
if collectionFlag {
host = collectionName
searchKey = collectionName
} else {
host, err := GetIPFromHostNameFunc(target.ManagerAddress)
if err != nil {
evcommon.GenErrorResponse(err.Error(), errResponse.ResourceNotFound, http.StatusNotFound,
[]interface{}{"ManagerAddress", target.ManagerAddress}, &resp)
l.LogWithFields(ctx).Error(err.Error())
return resp, err
}
l.LogWithFields(ctx).Info("After look up, manager address is: ", host)
searchKey = evcommon.GetSearchKey(host, evmodel.SubscriptionIndex)
}
var (
eventTypes = subscription.EventTypes
messageIDs = subscription.MessageIds
resourceTypes = subscription.ResourceTypes
)
originResource = origin
subscriptionDetails, err := e.GetEvtSubscriptions(searchKey)
if err != nil && !strings.Contains(err.Error(), "No data found for the key") {
errorMessage := "error while get subscription details: " + err.Error()
evcommon.GenErrorResponse(errorMessage, errResponse.InternalError, http.StatusInternalServerError,
[]interface{}{}, &resp)
l.LogWithFields(ctx).Error(errorMessage)
return resp, err
}
if isAggregate {
subscriptionDetails = append(subscriptionDetails, e.GetAggregateSubscriptionList(ctx, host, aggregateID, isRemove)...)
}
// if there is no subscription happened then create event subscription
if len(subscriptionDetails) < 1 {
return resp, nil
}
var subscriptionPresent bool
for index, evtSubscriptions := range subscriptionDetails {
if isHostPresent(evtSubscriptions.Hosts, host) {
subscriptionPresent = true
if len(evtSubscriptions.EventDestination.EventTypes) > 0 && (index == 0 || len(eventTypes) > 0) {
eventTypes = append(eventTypes, evtSubscriptions.EventDestination.EventTypes...)
} else {
eventTypes = []string{}
}
if len(evtSubscriptions.EventDestination.MessageIds) > 0 && (index == 0 || len(messageIDs) > 0) {
messageIDs = append(messageIDs, evtSubscriptions.EventDestination.MessageIds...)
} else {
messageIDs = []string{}
}
if len(evtSubscriptions.EventDestination.ResourceTypes) > 0 && (index == 0 || len(resourceTypes) > 0) {
resourceTypes = append(resourceTypes, evtSubscriptions.EventDestination.ResourceTypes...)
} else {
resourceTypes = []string{}
}
}
}
if !subscriptionPresent {
return resp, nil
}
if !collectionFlag {
l.LogWithFields(ctx).Debug("Delete Subscription from device")
if strings.Contains(originResource, "Fabrics") {
resp, err = e.DeleteFabricsSubscription(ctx, originResource, plugin)
if err != nil {
return resp, err
}
} else {
resp, err = e.DeleteSubscriptions(ctx, originResource, token, plugin, target)
if err != nil {
return resp, err
}
}
}
// updating the subscription information
removeDuplicatesFromSlice(&eventTypes)
removeDuplicatesFromSlice(&messageIDs)
removeDuplicatesFromSlice(&resourceTypes)
subscription.EventTypes = eventTypes
subscription.MessageIds = messageIDs
subscription.ResourceTypes = resourceTypes
return resp, nil
}
// CreateDefaultEventSubscription is creates the subscription with event
// types which will be required to rediscover the inventory after computer
// system restarts ,This will triggered from aggregation service whenever
// a computer system is added
func (e *ExternalInterfaces) CreateDefaultEventSubscription(ctx context.Context, originResources, eventTypes, messageIDs, resourceTypes []string, protocol string) errResponse.RPC {
l.LogWithFields(ctx).Info("Creation of default subscriptions started for: " + strings.Join(originResources, "::"))
var resp errResponse.RPC
var response evresponse.EventResponse
var partialResultFlag bool
if protocol == "" {
protocol = "Redfish"
}
bubbleUpStatusCode := http.StatusCreated
var postRequest model.EventDestination
postRequest.Destination = ""
postRequest.EventTypes = eventTypes
postRequest.MessageIds = messageIDs
postRequest.ResourceTypes = resourceTypes
postRequest.Context = "Creating the Default Event Subscription"
postRequest.Protocol = protocol
postRequest.SubscriptionType = evmodel.SubscriptionType
postRequest.SubordinateResources = true
_, response = e.eventSubscription(ctx, postRequest, originResources[0], "", false, FillInSubTaskID)
e.checkCollectionSubscription(ctx, originResources[0], protocol)
if response.StatusCode != http.StatusCreated {
partialResultFlag = true
if response.StatusCode > bubbleUpStatusCode {
bubbleUpStatusCode = response.StatusCode
}
}
if !partialResultFlag || len(originResources) == 1 {
resp.StatusCode = int32(response.StatusCode)
} else {
resp.StatusCode = int32(bubbleUpStatusCode)
}
resp.Body = response.Response
resp.StatusCode = http.StatusCreated
l.LogWithFields(ctx).Info("Creation of default subscriptions completed for : " + strings.Join(originResources, "::"))
return resp
}
// saveDeviceSubscriptionDetails will first check if already origin resource details present
// if its present then Update location
// otherwise add an entry to redis
func (e *ExternalInterfaces) saveDeviceSubscriptionDetails(evtSubscription common.DeviceSubscription) error {
searchKey := evcommon.GetSearchKey(evtSubscription.EventHostIP, evmodel.DeviceSubscriptionIndex)
deviceSubscription, _ := e.GetDeviceSubscriptions(searchKey)
var newDevSubscription = common.DeviceSubscription{
EventHostIP: evtSubscription.EventHostIP,
Location: evtSubscription.Location,
OriginResources: []string{evtSubscription.OriginResource},
}
// if device subscriptions details for the device is present in db then don't add again
var save = true
if deviceSubscription != nil {
save = true
// if the origin resource is present in device subscription details then don't add
for _, originResource := range deviceSubscription.OriginResources {
if evtSubscription.OriginResource == originResource {
save = false
} else {
newDevSubscription.OriginResources = append(newDevSubscription.OriginResources, originResource)
save = false
}
}
err := e.UpdateDeviceSubscriptionLocation(newDevSubscription)
if err != nil {
return err
}
}
if save {
return e.SaveDeviceSubscription(newDevSubscription)
}
return nil
}
// getTargetDetails return device credentials from using device UUID
func (e *ExternalInterfaces) getTargetDetails(origin string) (*common.Target, evresponse.EventResponse, error) {
var resp evresponse.EventResponse
uuid, err := getUUID(origin)
if err != nil {
evcommon.GenEventErrorResponse(err.Error(), errResponse.ResourceNotFound, http.StatusNotFound,
&resp, []interface{}{"System", origin})
return nil, resp, err
}
// Get target device Credentials from using device UUID
target, err := e.GetTarget(uuid)
if err != nil {
// Frame the RPC response body and response Header below
errorMessage := "error while getting Systems(Target device Credentials) table details: " + err.Error()
evcommon.GenEventErrorResponse(errorMessage, errResponse.ResourceNotFound, http.StatusNotFound,
&resp, []interface{}{"Systems", origin})
return nil, resp, err
}
decryptedPasswordByte, err := DecryptWithPrivateKeyFunc(target.Password)
if err != nil {
// Frame the RPC response body and response Header below
errorMessage := "error while trying to decrypt device password: " + err.Error()
evcommon.GenEventErrorResponse(errorMessage, errResponse.InternalError, http.StatusInternalServerError,
&resp, []interface{}{})
return nil, resp, err
}
target.Password = decryptedPasswordByte
return target, resp, nil
}
// DeleteSubscriptions will delete subscription from device
func (e *ExternalInterfaces) DeleteSubscriptions(ctx context.Context, originResource, token string, plugin *common.Plugin, target *common.Target) (errResponse.RPC, error) {
var resp errResponse.RPC
var err error
var deviceSubscription *common.DeviceSubscription
addr, err := common.GetIPFromHostName(target.ManagerAddress)
if err != nil {
evcommon.GenErrorResponse(err.Error(), errResponse.ResourceNotFound, http.StatusNotFound,
[]interface{}{"ManagerAddress", target.ManagerAddress}, &resp)
l.LogWithFields(ctx).Error(err.Error())
return resp, err
}
searchKey := evcommon.GetSearchKey(addr, evmodel.DeviceSubscriptionIndex)
deviceSubscription, err = e.GetDeviceSubscriptions(searchKey)
if err != nil {
// if its first subscription then no need to check events subscribed
if strings.Contains(err.Error(), "No data found for the key") {
return resp, nil
}
errorMessage := "Error while get subscription details: " + err.Error()
evcommon.GenErrorResponse(errorMessage, errResponse.InternalError, http.StatusInternalServerError,
[]interface{}{}, &resp)
l.LogWithFields(ctx).Error(errorMessage)
return resp, err
}
var contactRequest evcommon.PluginContactRequest
contactRequest.Plugin = plugin
if strings.EqualFold(plugin.PreferredAuthType, "XAuthToken") {
token := e.getPluginToken(ctx, plugin)
if token == "" {
evcommon.GenErrorResponse("error: Unable to create session with plugin "+plugin.ID, errResponse.NoValidSession, http.StatusUnauthorized,
[]interface{}{}, &resp)
return resp, fmt.Errorf("error: Unable to create session with plugin " + plugin.ID)
}
contactRequest.Token = token
} else {
contactRequest.LoginCredential = map[string]string{
"UserName": plugin.Username,
"Password": string(plugin.Password),
}
}
target.Location = deviceSubscription.Location
// Call to delete subscription to plugin
contactRequest.URL = "/ODIM/v1/Subscriptions"
contactRequest.HTTPMethodType = http.MethodDelete
contactRequest.PostBody = target
resp, _, _, _, err = e.PluginCall(ctx, contactRequest)
if err != nil {
return resp, err
}
return resp, nil
}
func (e *ExternalInterfaces) createEventSubscription(ctx context.Context, taskID string, subTaskID string, subTaskChan chan<- int32, reqSessionToken string,
targetURI string, request model.EventDestination, originResource string, result *evresponse.MutexLock,
wg *sync.WaitGroup, collectionFlag bool, collectionName string, aggregateResource string, isAggregateCollection bool) {
var (
reqBody []byte
reqJSON string
err error
resp errResponse.RPC
percentComplete int32
)
defer wg.Done()
reqBody, err = json.Marshal(request)
if err != nil {
l.LogWithFields(ctx).Error("error while trying to marshal create event request: " + err.Error())
}
reqJSON = string(reqBody)
resp.StatusCode = http.StatusAccepted
e.UpdateTask(ctx, fillTaskData(subTaskID, targetURI, reqJSON, resp, common.Running, common.OK, percentComplete, http.MethodPost))
host, response := e.eventSubscription(ctx, request, originResource, collectionName, collectionFlag, subTaskID)
l.LogWithFields(ctx).Debugf("Event subscription response for originResource: %s, collectionName: %s, subTaskID: %s"+
"is %v with the status code: %v", originResource, collectionName, subTaskID, response, response.StatusCode)
resp.Body = response.Response
resp.StatusCode = int32(response.StatusCode)
if isAggregateCollection {
if resp.StatusCode == http.StatusConflict {
response.StatusCode = http.StatusCreated
}
result.AddResponse(aggregateResource, getAggregateID(aggregateResource), response)
} else {
result.AddResponse(originResource, host, response)
}
percentComplete = 100
if response.StatusCode != http.StatusCreated && response.StatusCode != http.StatusAccepted {
// got error while subscribing event. updating task status immediately
e.UpdateTask(ctx, fillTaskData(subTaskID, targetURI, reqJSON, resp, common.Exception, common.Critical, percentComplete, http.MethodPost))
} else if response.StatusCode == http.StatusCreated && collectionFlag {
// handling the event subscription for collection. The status of the task will be temporarily updated to suspended
// This will be updated to completed by task service upon the completion of all resources under the collection
response.StatusCode = http.StatusAccepted
e.UpdateTask(ctx, fillTaskData(subTaskID, targetURI, reqJSON, resp, common.Suspended, common.OK, percentComplete, http.MethodPost))
} else if response.StatusCode == http.StatusCreated {
// updating the task status to completed if plugin (plugin in which queue and prioritization is not implemented)
// returns the status code 201
e.UpdateTask(ctx, fillTaskData(subTaskID, targetURI, reqJSON, resp, common.Completed, common.OK, percentComplete, http.MethodPost))
}
subTaskChan <- int32(response.StatusCode)
}
// CreateSubTask creates a child task for a task calling RPC to task service and returns the subtask ID
func (e *ExternalInterfaces) CreateSubTask(ctx context.Context, reqSessionToken string, parentTask string) string {
subTaskURI, err := e.CreateChildTask(ctx, reqSessionToken, parentTask)
if err != nil {
l.LogWithFields(ctx).Error("Error while creating the SubTask")
}
trimmedURI := strings.TrimSuffix(subTaskURI, "/")
subTaskID := trimmedURI[strings.LastIndex(trimmedURI, "/")+1:]
return subTaskID
}
// checkCollectionSubscription checks if any collection based subscription exists
// If its' exists it will update the existing subscription information with newly added server origin
func (e *ExternalInterfaces) checkCollectionSubscription(ctx context.Context, origin, protocol string) {
//Creating key to get all the System Collection subscription
var searchKey string
var bmcFlag bool
if strings.Contains(origin, "Fabrics") {
searchKey = "/redfish/v1/Fabrics"
} else {
bmcFlag = true
searchKey = "/redfish/v1/Systems"
}
subscriptions, err := e.GetEvtSubscriptions(searchKey)
if err != nil {
return
}
var chassisSubscriptions, managersSubscriptions []evmodel.SubscriptionResource
if bmcFlag {
chassisSubscriptions, _ = e.GetEvtSubscriptions("/redfish/v1/Chassis")
subscriptions = append(subscriptions, chassisSubscriptions...)
managersSubscriptions, _ = e.GetEvtSubscriptions("/redfish/v1/Managers")
subscriptions = append(subscriptions, managersSubscriptions...)
}
// Checking the collection based subscription
var collectionSubscription = make([]evmodel.SubscriptionResource, 0)
for _, evtSubscription := range subscriptions {
for _, originResource := range evtSubscription.EventDestination.OriginResources {
if strings.Contains(origin, "Systems") && (originResource.Oid == "/redfish/v1/Systems" || originResource.Oid == "/redfish/v1/Chassis" || originResource.Oid == "/redfish/v1/Managers") {
collectionSubscription = append(collectionSubscription, evtSubscription)
} else if strings.Contains(origin, "Fabrics") && originResource.Oid == "/redfish/v1/Fabrics" {
collectionSubscription = append(collectionSubscription, evtSubscription)
}
}
}
if len(collectionSubscription) <= 0 {
return
}
// using the one of the destination
var destination string
var context string
var eventTypes, messageIDs, resourceTypes []string
for index, evtSubscription := range collectionSubscription {
destination = evtSubscription.EventDestination.Destination
if len(evtSubscription.EventDestination.EventTypes) > 0 && (index == 0 || len(eventTypes) > 0) {
eventTypes = append(eventTypes, evtSubscription.EventDestination.EventTypes...)
} else {
eventTypes = []string{}
}
if len(evtSubscription.EventDestination.MessageIds) > 0 && (index == 0 || len(messageIDs) > 0) {
messageIDs = append(messageIDs, evtSubscription.EventDestination.MessageIds...)
} else {
messageIDs = []string{}
}
if len(evtSubscription.EventDestination.ResourceTypes) > 0 && (index == 0 || len(resourceTypes) > 0) {
resourceTypes = append(resourceTypes, evtSubscription.EventDestination.ResourceTypes...)
} else {
resourceTypes = []string{}
}
}
removeDuplicatesFromSlice(&eventTypes)
removeDuplicatesFromSlice(&messageIDs)
removeDuplicatesFromSlice(&resourceTypes)
subordinateFlag := false
if strings.Contains(origin, "Fabrics") {
subordinateFlag = true
}
subscriptionPost := model.EventDestination{
EventTypes: eventTypes,
MessageIds: messageIDs,
ResourceTypes: resourceTypes,
Context: context,
Destination: destination,
Protocol: protocol,
SubordinateResources: subordinateFlag,
}
subscriptionPost.OriginResources = []model.Link{
{
Oid: origin,
},
}
// Subscribing newly added server with collated event list
// passing an empty subtask id as a subtask is not created for this operation
host, response := e.eventSubscription(ctx, subscriptionPost, origin, "", false, FillInSubTaskID)
if response.StatusCode != http.StatusCreated {
return
}
// Get Device Subscription Details if collection is bmc and update chassis and managers uri
if bmcFlag {
searchKey := evcommon.GetSearchKey(host, evmodel.DeviceSubscriptionIndex)
deviceSubscription, _ := e.GetDeviceSubscriptions(searchKey)
data := strings.Split(origin, "/redfish/v1/Systems/")
chassisList, _ := e.GetAllMatchingDetails("Chassis", data[1], common.InMemory)
managersList, _ := e.GetAllMatchingDetails("Managers", data[1], common.InMemory)
var newDevSubscription = common.DeviceSubscription{
EventHostIP: deviceSubscription.EventHostIP,
Location: deviceSubscription.Location,
OriginResources: deviceSubscription.OriginResources,
}
newDevSubscription.OriginResources = append(newDevSubscription.OriginResources, chassisList...)
newDevSubscription.OriginResources = append(newDevSubscription.OriginResources, managersList...)
err := e.UpdateDeviceSubscriptionLocation(newDevSubscription)
if err != nil {
l.LogWithFields(ctx).Error("error while updating device subscription : " + err.Error())
}
}
}
func (e *ExternalInterfaces) createFabricSubscription(ctx context.Context, postRequest model.EventDestination, origin, collectionName string, collectionFlag bool) (string, evresponse.EventResponse) {
var resp evresponse.EventResponse
var err error
var plugin *common.Plugin