/
lrp_events.go
164 lines (137 loc) · 4 KB
/
lrp_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package commands
import (
"encoding/json"
"io"
"code.cloudfoundry.org/bbs"
"code.cloudfoundry.org/bbs/events"
"code.cloudfoundry.org/bbs/models"
"code.cloudfoundry.org/cfdot/commands/helpers"
multierror "github.com/hashicorp/go-multierror"
"github.com/spf13/cobra"
)
var (
lrpEventsCellIdFlag string
lrpEventsExcludeActualLRPGroups bool
)
var lrpEventsCmd = &cobra.Command{
Use: "lrp-events",
Short: "Subscribe to BBS LRP events",
Long: "Subscribe to BBS LRP events",
RunE: lrpEvents,
}
type LRPEvent struct {
Type string `json:"type"`
Data interface{} `json:"data"`
}
func init() {
AddBBSFlags(lrpEventsCmd)
lrpEventsCmd.Flags().StringVarP(&lrpEventsCellIdFlag, "cell-id", "c", "", "retrieve only events for the given cell id")
lrpEventsCmd.Flags().BoolVarP(&lrpEventsExcludeActualLRPGroups, "exclude-actual-lrp-groups", "x", false, "exclude actual lrp group events")
RootCmd.AddCommand(lrpEventsCmd)
}
func lrpEvents(cmd *cobra.Command, args []string) error {
err := validateLRPEventsArguments(args)
if err != nil {
return NewCFDotValidationError(cmd, err)
}
if !lrpEventsExcludeActualLRPGroups {
err = printLRPGroupEventsWarning(cmd.OutOrStderr())
if err != nil {
return NewCFDotError(cmd, err)
}
}
bbsClient, err := helpers.NewBBSClient(cmd, Config)
if err != nil {
return NewCFDotError(cmd, err)
}
err = LRPEvents(cmd.OutOrStdout(), cmd.OutOrStderr(), bbsClient, lrpEventsCellIdFlag, lrpEventsExcludeActualLRPGroups)
if err != nil {
return NewCFDotError(cmd, err)
}
return nil
}
func validateLRPEventsArguments(args []string) error {
if len(args) > 0 {
return errExtraArguments
}
return nil
}
func LRPEvents(stdout, stderr io.Writer, bbsClient bbs.Client, cellID string, excludeActualLRPGroups bool) error {
logger := globalLogger.Session("lrp-events")
oldEventStream := make(chan models.Event)
newEventStream := make(chan models.Event)
oldErrChan := make(chan error)
newErrChan := make(chan error)
readEvent := func(es events.EventSource, eventStreamChan chan models.Event, errChan chan error) {
for {
event, err := es.Next()
if err != nil {
errChan <- err
return
}
eventStreamChan <- event
}
}
encoder := json.NewEncoder(stdout)
eventStreamCount := 1
if !excludeActualLRPGroups {
//lint:ignore SA1019 - if this flag is set, we're intentionally using this deprecated behavior in conjunction with the new behavior
oldES, err := bbsClient.SubscribeToEventsByCellID(logger, cellID)
if err != nil {
return models.ConvertError(err)
}
defer oldES.Close()
eventStreamCount += 1
go readEvent(oldES, oldEventStream, oldErrChan)
}
instanceES, err := bbsClient.SubscribeToInstanceEventsByCellID(logger, cellID)
if err != nil {
return models.ConvertError(err)
}
defer instanceES.Close()
go readEvent(instanceES, newEventStream, newErrChan)
ret := &multierror.Error{}
var event models.Event
var lrpEvent LRPEvent
for {
var err error
select {
case e := <-oldEventStream:
switch e.EventType() {
//lint:ignore SA1019 - cfdot needs to process deprecated ActualLRP data until it is removed from BBS
case models.EventTypeActualLRPCreated, models.EventTypeActualLRPChanged, models.EventTypeActualLRPRemoved:
event = e
default:
continue
}
case event = <-newEventStream:
case err = <-oldErrChan:
multierror.Append(ret, err)
case err = <-newErrChan:
multierror.Append(ret, err)
}
if len(ret.Errors) >= eventStreamCount {
for _, err := range ret.Errors {
if err != io.EOF {
return ret.ErrorOrNil()
}
}
return nil
}
if err != nil {
continue
}
lrpEvent.Type = event.EventType()
lrpEvent.Data = event
err = encoder.Encode(lrpEvent)
if err != nil {
logger.Error("failed-to-marshal", err)
}
}
}
func printLRPGroupEventsWarning(stderr io.Writer) error {
_, err := io.WriteString(stderr,
`Event types "actual_lrp_created", "actual_lrp_changed" and "actual_lrp_removed" are deprecated. `+
`Use "--exclude-actual-lrp-groups" flag to exclude them.`+"\n")
return err
}