/
eventsub.go
260 lines (243 loc) · 8.92 KB
/
eventsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
package restclient
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"github.com/callummance/nazuna/messages"
"github.com/sirupsen/logrus"
)
const subscriptionEndpoint = apiBaseURL + "/eventsub/subscriptions"
func (c *Client) CreateSubscription(condition interface{}, transport messages.TransportOpts) (*messages.SubscriptionRequestStatus, error) {
var reqBody messages.Subscription
switch t := condition.(type) {
case messages.ConditionChannelUpdate:
reqBody.Type = messages.SubscriptionChannelUpdate
case messages.ConditionChannelFollow:
reqBody.Type = messages.SubscriptionChannelFollow
case messages.ConditionChannelSubscribe:
reqBody.Type = messages.SubscriptionChannelSubscribe
case messages.ConditionChannelCheer:
reqBody.Type = messages.SubscriptionChannelCheer
case messages.ConditionChannelBan:
reqBody.Type = messages.SubscriptionChannelBan
case messages.ConditionChannelUnban:
reqBody.Type = messages.SubscriptionChannelUnban
case messages.ConditionChannelPointsCustomRewardAdd:
reqBody.Type = messages.SubscriptionChannelPointsCustomRewardAdd
case messages.ConditionChannelPointsCustomRewardUpdate:
reqBody.Type = messages.SubscriptionChannelPointsCustomRewardUpdate
case messages.ConditionChannelPointsCustomRewardRemove:
reqBody.Type = messages.SubscriptionChannelPointsCustomRewardRemove
case messages.ConditionChannelPointsCustomRewardRedemptionAdd:
reqBody.Type = messages.SubscriptionChannelPointsCustomRewardRedemptionAdd
case messages.ConditionChannelPointsCustomRewardRedemptionUpdate:
reqBody.Type = messages.SubscriptionChannelPointsCustomRewardRedemptionUpdate
case messages.ConditionChannelHypeTrainBegin:
reqBody.Type = messages.SubscriptionChannelHypeTrainBegin
case messages.ConditionChannelHypeTrainProgress:
reqBody.Type = messages.SubscriptionChannelHypeTrainProgress
case messages.ConditionChannelHypeTrainEnd:
reqBody.Type = messages.SubscriptionChannelHypeTrainEnd
case messages.ConditionStreamOnline:
reqBody.Type = messages.SubscriptionStreamOnline
case messages.ConditionStreamOffline:
reqBody.Type = messages.SubscriptionStreamOffline
case messages.ConditionUserAuthorizationRevoke:
reqBody.Type = messages.SubscriptionUserAuthorizationRevoke
case messages.ConditionUserUpdate:
reqBody.Type = messages.SubscriptionUserUpdate
default:
logrus.Warnf("CreateSubscription call was provided with a condition of unrecognized type")
return nil, fmt.Errorf("type %v supplied to CreateSubscription is not a valid condition", t)
}
reqBody.Version = "1"
reqBody.Condition = condition
reqBody.Transport = transport
//Marshal request body
bodyBytes, err := json.Marshal(reqBody)
logrus.Debugf("Sending request for new eventsub subscription with body %q", reqBody)
if err != nil {
logrus.Warnf("Failed to marshal CreateSubscription request body due to error %v", err)
return nil, err
}
logrus.Tracef("Submitting CreateSubscription request with body %s", bodyBytes)
//Send POST request
req, err := http.NewRequest("POST", subscriptionEndpoint, bytes.NewBuffer(bodyBytes))
if err != nil {
logrus.Warnf("Failed to make CreateSubscription request due to error %v", err)
return nil, err
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Client-ID", c.clientID)
resp, err := c.httpClient.Do(req)
if err != nil {
logrus.Warnf("Failed to make CreateSubscription request due to error %v", err)
return nil, err
}
defer resp.Body.Close()
//Decode response
var result messages.SubscriptionRequestStatus
if resp.StatusCode == http.StatusConflict {
//Quietly ignore duplicate subscriptions
return nil, nil
} else if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
dump, _ := httputil.DumpResponse(resp, true)
logrus.Infof("Got non-OK response %s to subscription creation request", dump)
return nil, fmt.Errorf("got non-OK response %s to subscription creation request", dump)
}
err = json.NewDecoder(resp.Body).Decode(&result)
if err != nil {
logrus.Warnf("Failed to decode response to CreateSubscription request due to error %v", err)
logrus.Tracef("response: %v", resp)
return nil, err
}
return &result, nil
}
type SubscriptionsParams struct {
Status string `json:"status,omitempty"`
Type string `json:"type,omitempty"`
}
func (p SubscriptionsParams) insertToValues(initialValues *url.Values) {
if p.Status != "" {
(*initialValues)["status"] = []string{p.Status}
}
if p.Type != "" {
(*initialValues)["type"] = []string{p.Type}
}
}
func (c *Client) getSubscriptionsPage(params *SubscriptionsParams, pagination *pagination) (*subscriptionsPage, error) {
logrus.Debugf("Requesting page of subscriptions with filters %#v from api.", params)
//Build query URL
var query url.Values
url, err := url.Parse(subscriptionEndpoint)
if err != nil {
logrus.Errorf("Failed to parse subscription endpoint with error %v", err)
return nil, err
}
if params != nil {
params.insertToValues(&query)
}
if pagination != nil {
pagination.insertToValues(&query)
}
url.RawQuery = query.Encode()
//Send GET request
req, err := http.NewRequest("GET", url.String(), http.NoBody)
if err != nil {
logrus.Warnf("Failed to make Subscriptions request due to error %v", err)
return nil, err
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Client-ID", c.clientID)
logrus.Tracef("Sending request %#v to retrieve subscriptions", req)
resp, err := c.httpClient.Do(req)
if err != nil {
logrus.Warnf("Failed to make Subscriptions request due to error %v", err)
return nil, err
}
defer resp.Body.Close()
//Decode response
var result subscriptionsPage
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
dump, _ := httputil.DumpResponse(resp, true)
logrus.Infof("Got non-OK response %s to subscription list request", dump)
return nil, fmt.Errorf("got non-OK response %s to subscription list request", dump)
}
err = json.NewDecoder(resp.Body).Decode(&result)
if err != nil {
logrus.Warnf("Failed to decode response to subscriptions list request due to error %v", err)
logrus.Tracef("response: %v", resp)
return nil, err
}
return &result, nil
}
type SubscriptionResult struct {
Subscription *messages.Subscription
Err error
}
//Subscriptions returns a channel which will be populated with all Eventsub subscriptions owned by the current app
func (c *Client) Subscriptions(filters *SubscriptionsParams) chan SubscriptionResult {
ch := make(chan SubscriptionResult)
go func(c *Client) {
var currentPage subscriptionsPage
initialPageFetched := false
lastLoc := 0
for {
if lastLoc+1 < len(currentPage.Data) {
//We still have subscriptions from the previously fetched page
lastLoc++
ch <- SubscriptionResult{
Subscription: ¤tPage.Data[lastLoc],
Err: nil,
}
} else if currentPage.Pagination.Cursor == "" && initialPageFetched {
//Run out of subscriptions fetched and no pagination data, so we must be done
close(ch)
return
} else {
//Need to fetch more members
pagination := pagination{
After: currentPage.Pagination.Cursor,
}
nextPage, err := c.getSubscriptionsPage(filters, &pagination)
if err != nil {
logrus.Warnf("Failed to fetch page of subscriptions from API due to error %v", err)
ch <- SubscriptionResult{
Subscription: nil,
Err: err,
}
}
//If new page is empty, we must also be done
if len(nextPage.Data) == 0 {
close(ch)
return
}
initialPageFetched = true
currentPage = *nextPage
lastLoc = 0
ch <- SubscriptionResult{
Subscription: ¤tPage.Data[0],
Err: nil,
}
}
}
}(c)
return ch
}
//DeleteSubscription attempts to delete a previously-created EventSub subscription from the twitch API
func (c *Client) DeleteSubscription(subscriptionID string) error {
logrus.Debugf("Requested deletion of subscription with ID %v.", subscriptionID)
//Build query URL
url, err := url.Parse(subscriptionEndpoint)
query := url.Query()
if err != nil {
logrus.Errorf("Failed to parse subscription endpoint with error %v", err)
return err
}
query.Set("id", subscriptionID)
url.RawQuery = query.Encode()
//Send DELETE request
req, err := http.NewRequest("DELETE", url.String(), http.NoBody)
if err != nil {
logrus.Warnf("Failed to make delete subscription request due to error %v", err)
return err
}
req.Header.Add("Client-ID", c.clientID)
logrus.Tracef("Sending request %#v to delete subscriptions", req)
resp, err := c.httpClient.Do(req)
if err != nil {
logrus.Warnf("Failed to make delete subscription request due to error %v", err)
return err
}
defer resp.Body.Close()
//Decode response
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
dump, _ := httputil.DumpResponse(resp, true)
logrus.Infof("Got non-OK response %s to subscription list request", dump)
return fmt.Errorf("got non-OK response %s to subscription list request", dump)
}
return nil
}