/
eventsub.cpp
436 lines (393 loc) · 16.2 KB
/
eventsub.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
#include <QNetworkInterface>
#include <QJsonDocument>
#include <QJsonArray>
#include <QStringBuilder>
#include <QUuid>
#include "globals.h"
#include "network.h"
#include "twitch.h"
#include "eventsub.h"
const char *JSON_KEY_METADATA="metadata";
const char *JSON_KEY_METADATA_TYPE="message_type";
const char *JSON_KEY_PAYLOAD="payload";
const char *JSON_KEY_PAYLOAD_SESSION="session";
const char *JSON_KEY_PAYLOAD_SESSION_ID="id";
const char *JSON_KEY_PAYLOAD_SESSION_KEEPALIVE_TIMEOUT="keepalive_timeout_seconds";
const char *JSON_KEY_PAYLOAD_SUBSCRIPTION="subscription";
const char *JSON_KEY_PAYLOAD_SUBSCRIPTION_TYPE="type";
const char *JSON_KEY_CHALLENGE="challenge";
const char *JSON_KEY_EVENT="event";
const char *JSON_KEY_EVENT_REWARD="reward";
const char *JSON_KEY_EVENT_REWARD_TITLE="title";
const char *JSON_KEY_EVENT_FOLLOW="followed at";
const char *JSON_KEY_EVENT_USER_NAME="user_name";
const char *JSON_KEY_EVENT_USER_LOGIN="user_login";
const char *JSON_KEY_EVENT_USER_INPUT="user_input";
const char *JSON_KEY_EVENT_VIEWERS="viewers";
const char *JSON_KEY_EVENT_MESSAGE="message";
const char *JSON_KEY_EVENT_CHEER_AMOUNT="bits";
const char *JSON_KEY_EVENT_HYPE_TRAIN_LEVEL="level";
const char *JSON_KEY_EVENT_HYPE_TRAIN_PROGRESS="progress";
const char *JSON_KEY_EVENT_HYPE_TRAIN_TOTAL="goal";
const char *MESSAGE_TYPE_WELCOME="session_welcome";
const char *MESSAGE_TYPE_KEEPALIVE="session_keepalive";
const char *MESSAGE_TYPE_NOTIFICATION="notification";
const char *EventSub::SETTINGS_CATEGORY_EVENTS="Events";
enum class TwitchCloseCode
{
INTERNAL_SERVER_ERROR=4000,
CLIENT_SENT_INBOUND_TRAFFIC=4001,
CLIENT_FAILED_PING_PONG=4002,
CONNECTION_UNUSED=4003,
RECONNECT_GRACE_TIME_EXPIRED=4004,
NETWORK_TIMEOUT=4005,
NETWORK_ERROR=4006,
INVALID_RECONNECT=4007
};
EventSub::EventSub(Security &security,QObject *parent) : QObject(parent),
security(security),
settingURL(SETTINGS_CATEGORY_EVENTS,"WebsocketURL","wss://eventsub.wss.twitch.tv/ws")
{
messageTypes.insert({MESSAGE_TYPE_WELCOME,MessageType::WELCOME});
messageTypes.insert({MESSAGE_TYPE_NOTIFICATION,MessageType::NOTIFICATION});
messageTypes.insert({MESSAGE_TYPE_KEEPALIVE,MessageType::KEEPALIVE});
subscriptionTypes.insert({SUBSCRIPTION_TYPE_FOLLOW,SubscriptionType::CHANNEL_FOLLOW});
subscriptionTypes.insert({SUBSCRIPTION_TYPE_REDEMPTION,SubscriptionType::CHANNEL_REDEMPTION});
subscriptionTypes.insert({SUBSCRIPTION_TYPE_CHEER,SubscriptionType::CHANNEL_CHEER});
subscriptionTypes.insert({SUBSCRIPTION_TYPE_RAID,SubscriptionType::CHANNEL_RAID});
subscriptionTypes.insert({SUBSCRIPTION_TYPE_SUBSCRIPTION,SubscriptionType::CHANNEL_SUBSCRIPTION});
subscriptionTypes.insert({SUBSCRIPTION_TYPE_RESUBSCRIPTION,SubscriptionType::CHANNEL_SUBSCRIPTION});
subscriptionTypes.insert({SUBSCRIPTION_TYPE_HYPE_TRAIN_START,SubscriptionType::CHANNEL_HYPE_TRAIN});
subscriptionTypes.insert({SUBSCRIPTION_TYPE_HYPE_TRAIN_PROGRESS,SubscriptionType::CHANNEL_HYPE_TRAIN});
subscriptionTypes.insert({SUBSCRIPTION_TYPE_HYPE_TRAIN_END,SubscriptionType::CHANNEL_HYPE_TRAIN});
connect(&keepalive,&QTimer::timeout,this,&EventSub::Dead);
connect(&socket,&QWebSocket::disconnected,this,&EventSub::SocketClosed);
connect(&socket,&QWebSocket::textMessageReceived,this,&EventSub::ParseMessage);
Connect();
}
void EventSub::Connect()
{
socket.open(settingURL);
}
void EventSub::SocketClosed()
{
static const char *TWITCH_API_OPERATION_SOCKET_CLOSED="socket closed";
switch (socket.closeCode())
{
case QWebSocketProtocol::CloseCodeNormal:
break;
case static_cast<int>(TwitchCloseCode::INTERNAL_SERVER_ERROR):
emit Print("Internal server error on Twitch's end",TWITCH_API_OPERATION_SOCKET_CLOSED);
break;
case static_cast<int>(TwitchCloseCode::CLIENT_SENT_INBOUND_TRAFFIC):
emit Print("Sending outgoing messages to the server is prohibited with the exception of pong messages.",TWITCH_API_OPERATION_SOCKET_CLOSED);
break;
case static_cast<int>(TwitchCloseCode::CLIENT_FAILED_PING_PONG):
emit Print("You must respond to ping messages with a pong message.",TWITCH_API_OPERATION_SOCKET_CLOSED);
break;
case static_cast<int>(TwitchCloseCode::CONNECTION_UNUSED):
emit Print("You must create a subscription within 10 seconds.",TWITCH_API_OPERATION_SOCKET_CLOSED);
break;
case static_cast<int>(TwitchCloseCode::RECONNECT_GRACE_TIME_EXPIRED):
emit Print("When you receive a session_reconnect message, you have 30 seconds to reconnect to the server and close the old connection.",TWITCH_API_OPERATION_SOCKET_CLOSED);
break;
case static_cast<int>(TwitchCloseCode::NETWORK_TIMEOUT):
emit Print("Transient network timeout",TWITCH_API_OPERATION_SOCKET_CLOSED);
Connect();
return;
case static_cast<int>(TwitchCloseCode::NETWORK_ERROR):
emit Print("Transient network error",TWITCH_API_OPERATION_SOCKET_CLOSED);
break;
case static_cast<int>(TwitchCloseCode::INVALID_RECONNECT):
emit Print("The reconnect URL is invalid",TWITCH_API_OPERATION_SOCKET_CLOSED);
break;
}
emit Disconnected();
}
void EventSub::Dead()
{
socket.close(static_cast<QWebSocketProtocol::CloseCode>(TwitchCloseCode::NETWORK_TIMEOUT));
}
void EventSub::Subscribe()
{
defaultTypes.push(SUBSCRIPTION_TYPE_REDEMPTION);
defaultTypes.push(SUBSCRIPTION_TYPE_RAID);
defaultTypes.push(SUBSCRIPTION_TYPE_SUBSCRIPTION);
defaultTypes.push(SUBSCRIPTION_TYPE_RESUBSCRIPTION);
defaultTypes.push(SUBSCRIPTION_TYPE_CHEER);
defaultTypes.push(SUBSCRIPTION_TYPE_HYPE_TRAIN_START);
defaultTypes.push(SUBSCRIPTION_TYPE_HYPE_TRAIN_PROGRESS);
defaultTypes.push(SUBSCRIPTION_TYPE_HYPE_TRAIN_END);
Subscribe(defaultTypes.front());
}
void EventSub::Subscribe(const QString &type)
{
static const char *TWITCH_API_OPERATION_SUBSCRIBE="subscribe to event";
emit Print(u"Requesting subscription to %1"_s.arg(type),TWITCH_API_OPERATION_SUBSCRIBE);
Network::Request::Send({Twitch::Endpoint(Twitch::ENDPOINT_EVENTSUB)},Network::Method::POST,[this,type](QNetworkReply *reply) {
emit Print(StringConvert::Dump(reply->readAll()),TWITCH_API_OPERATION_SUBSCRIBE);
switch (reply->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt())
{
case 202:
emit Print(u"Successfully subscribed to %1"_s.arg(type),TWITCH_API_OPERATION_SUBSCRIBE);
NextSubscription();
return;
case 400:
emit Print(u"The subscription request was malformatted"_s,TWITCH_API_OPERATION_SUBSCRIBE);
break;
case 401:
emit Print(u"Invalid OAuth token or authorization header was malformatted"_s,TWITCH_API_OPERATION_SUBSCRIBE);
emit Unauthorized();
return;
case 403:
emit Print(u"Access token is missing the required scopes"_s,TWITCH_API_OPERATION_SUBSCRIBE);
break;
case 409:
emit Print(u"Subscription already exists"_s,TWITCH_API_OPERATION_SUBSCRIBE);
NextSubscription();
return;
case 429:
emit Print(u"Too many subscription requests"_s,TWITCH_API_OPERATION_SUBSCRIBE);
emit RateLimitHit();
return;
}
emit EventSubscriptionFailed(type);
},{},{
{NETWORK_HEADER_AUTHORIZATION,security.Bearer(security.OAuthToken())},
{NETWORK_HEADER_CLIENT_ID,security.ClientID()},
{Network::CONTENT_TYPE,Network::CONTENT_TYPE_JSON}
},QJsonDocument(QJsonObject({
{u"type"_s,type},
{u"version"_s,"1"}, // NOTE: Twitch has already started using this, so going to need to know max versions for subscriptions eventually
{
u"condition"_s,
QJsonObject({{type == SUBSCRIPTION_TYPE_RAID ? u"to_broadcaster_user_id"_s : u"broadcaster_user_id"_s,security.AdministratorID()}})
},
{
u"transport"_s,
QJsonObject({
{u"method"_s,"websocket"},
{u"session_id"_s,sessionID}
})
}
})).toJson(QJsonDocument::Compact));
}
void EventSub::NextSubscription()
{
if (!defaultTypes.empty())
{
defaultTypes.pop();
if (!defaultTypes.empty()) Subscribe(defaultTypes.front());
}
}
void EventSub::ParseMessage(QString message)
{
static const char *OPERATION_PARSE_MESSAGE="parse message";
const JSON::ParseResult parsedJSON=JSON::Parse(StringConvert::ByteArray(message.trimmed()));
if (!parsedJSON)
{
Print(u"Invalid JSON: %1"_s.arg(parsedJSON.error),OPERATION_PARSE_MESSAGE);
return;
}
QJsonObject messageObject=parsedJSON().object();
auto metadata=messageObject.find(JSON_KEY_METADATA);
auto payload=messageObject.find(JSON_KEY_PAYLOAD);
if (metadata == messageObject.end() || payload == messageObject.end())
{
Print(u"Malformatted message"_s,OPERATION_PARSE_MESSAGE);
return;
}
QJsonObject metadataObject=metadata->toObject();
auto type=metadataObject.find(JSON_KEY_METADATA_TYPE);
if (type == metadataObject.end())
{
Print(u"Missing message type"_s,OPERATION_PARSE_MESSAGE);
return;
}
auto messageType=messageTypes.find(type->toString());
if (messageType == messageTypes.end())
{
Print(u"Unknown message type (%1)"_s.arg(type->toString()),OPERATION_PARSE_MESSAGE);
return;
}
switch (messageType->second)
{
case MessageType::WELCOME:
ParseWelcome(payload->toObject());
break;
case MessageType::KEEPALIVE:
keepalive.start();
break;
case MessageType::NOTIFICATION:
ParseNotification(payload->toObject());
break;
default:
throw std::logic_error("Websocket message type recognized but unimplemented");
}
}
void EventSub::ParseWelcome(QJsonObject payload)
{
static const char *OPERATION_PARSE_WELCOME="parse welcome";
auto session=payload.find(JSON_KEY_PAYLOAD_SESSION);
if (session == payload.end())
{
emit Print("Ignoring welcome message with no session data",OPERATION_PARSE_WELCOME);
return;
}
QJsonObject sessionObject=session->toObject();
auto candidateID=sessionObject.find(JSON_KEY_PAYLOAD_SESSION_ID);
if (candidateID == sessionObject.end())
{
emit Print("Unable to create EventSub connection because Twitch did not provide a session ID",OPERATION_PARSE_WELCOME);
return;
}
sessionID=candidateID->toString();
emit Connected();
auto keepaliveCandidate=sessionObject.find(JSON_KEY_PAYLOAD_SESSION_KEEPALIVE_TIMEOUT);
if (keepaliveCandidate != sessionObject.end())
{
keepalive.setInterval(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::seconds(keepaliveCandidate->toInt()*2)));
keepalive.start();
}
}
void EventSub::ParseNotification(QJsonObject notification)
{
static const char *OPERATION_PARSE_NOTIFICATION="parse notification";
auto subscription=notification.find(JSON_KEY_PAYLOAD_SUBSCRIPTION);
if (subscription == notification.end())
{
emit Print("Ignoring notification payload with no subscription data",OPERATION_PARSE_NOTIFICATION);
return;
}
QJsonObject subscriptionObject=subscription->toObject();
SubscriptionType subscriptionType=SubscriptionType::UNKNOWN;
if (auto subscriptionTypeCandidate=subscriptionObject.find(JSON_KEY_PAYLOAD_SUBSCRIPTION_TYPE); subscriptionTypeCandidate != subscriptionObject.end())
{
QString key=subscriptionTypeCandidate->toString();
if (auto subscriptionTypeCandidate=subscriptionTypes.find(key); subscriptionTypeCandidate != subscriptionTypes.end())
{
subscriptionType=subscriptionTypeCandidate->second;
}
}
if (subscriptionType == SubscriptionType::UNKNOWN) return;
auto event=notification.find(JSON_KEY_EVENT);
if (event == notification.end())
{
Print("Event field missing from notification");
return;
}
QJsonObject eventObject=event->toObject();
JSON::SignalPayload *payload=new JSON::SignalPayload(event->toObject());
if (std::optional<QString> prompt=ExtractPrompt(subscriptionType,eventObject); prompt) payload->context=*prompt;
const QString name=eventObject.value(JSON_KEY_EVENT_USER_NAME).toString();
const QString login=eventObject.value(JSON_KEY_EVENT_USER_LOGIN).toString();
connect(payload,&JSON::SignalPayload::Deliver,this,[subscriptionType=subscriptionType,name,login,payload,this](const QJsonObject &eventObject) {
switch (subscriptionType)
{
case SubscriptionType::CHANNEL_FOLLOW:
emit Follow();
break;
case SubscriptionType::CHANNEL_REDEMPTION:
emit Redemption(login,name,eventObject.value(JSON_KEY_EVENT_REWARD).toObject().value(JSON_KEY_EVENT_REWARD_TITLE).toString(),payload->context.toString());
break;
case SubscriptionType::CHANNEL_CHEER:
emit Cheer(name,eventObject.value(JSON_KEY_EVENT_CHEER_AMOUNT).toVariant().toUInt(),eventObject.value(JSON_KEY_EVENT_MESSAGE).toString().section(" ",1));
break;
case SubscriptionType::CHANNEL_RAID:
emit Raid(eventObject.value("from_broadcaster_user_name").toString(),eventObject.value(JSON_KEY_EVENT_VIEWERS).toVariant().toUInt());
break;
case SubscriptionType::CHANNEL_SUBSCRIPTION:
emit ChannelSubscription(eventObject.value(JSON_KEY_EVENT_USER_LOGIN).toString(),eventObject.value(JSON_KEY_EVENT_USER_NAME).toString());
break;
case SubscriptionType::CHANNEL_HYPE_TRAIN:
if (double goal=eventObject.value(JSON_KEY_EVENT_HYPE_TRAIN_TOTAL).toDouble(); goal > 0) emit HypeTrain(eventObject.value(JSON_KEY_EVENT_HYPE_TRAIN_LEVEL).toInt(),eventObject.value(JSON_KEY_EVENT_HYPE_TRAIN_PROGRESS).toDouble()/goal);
break;
default:
throw std::logic_error("Subscription type recognized but unimplemented");
}
});
emit ParseCommand(payload,name,login);
}
void EventSub::RequestEventSubscriptionList()
{
static const char *TWITCH_API_OPERATION_SUBSCRIPTION_LIST="list subscriptions";
Network::Request::Send({Twitch::Endpoint(Twitch::ENDPOINT_EVENTSUB_SUBSCRIPTIONS)},Network::Method::GET,[this](QNetworkReply *reply) {
switch (reply->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt())
{
case 400:
emit Print(u"The subscription request was malformatted"_s,TWITCH_API_OPERATION_SUBSCRIPTION_LIST);
break;
case 401:
emit Print(u"Invalid OAuth token or authorization header was malformatted"_s,TWITCH_API_OPERATION_SUBSCRIPTION_LIST);
return;
}
const QByteArray data=reply->readAll();
emit Print(StringConvert::Dump(data),TWITCH_API_OPERATION_SUBSCRIPTION_LIST);
const JSON::ParseResult parsedJSON=JSON::Parse(StringConvert::ByteArray(data.trimmed()));
if (!parsedJSON)
{
Print(QString("Invalid JSON: %1").arg(parsedJSON.error),TWITCH_API_OPERATION_SUBSCRIPTION_LIST);
return;
}
QJsonObject jsonObject=parsedJSON().object();
if (auto subscriptionList=jsonObject.find(JSON::Keys::DATA); subscriptionList != jsonObject.end())
{
for (const QJsonValue &eventSubscription : subscriptionList->toArray())
{
const QJsonObject entry=eventSubscription.toObject();
const QString id=entry.value("id").toString();
const QString type=entry.value("type").toString();
const QDateTime date=entry.value("created_at").toVariant().toDateTime();
if (auto transport=entry.find("transport"); transport != entry.end())
{
QJsonObject transportObject=transport->toObject();
auto disconnectTime=transportObject.find("disconnected_at");
if (disconnectTime == transportObject.end()) continue;
if (disconnectTime->toVariant().toDateTime().toLocalTime() < QDateTime::currentDateTime()) continue; // FIXME: Not sure what's happening here, but the expiration date on EVERY subscription is older than the current date
}
if (id.isEmpty() || type.isEmpty() || !date.isValid()) continue;
emit EventSubscription(id,type,date,QString());
}
}
},{},{
{NETWORK_HEADER_AUTHORIZATION,security.Bearer(security.OAuthToken())},
{NETWORK_HEADER_CLIENT_ID,security.ClientID()}
});
}
void EventSub::RemoveEventSubscription(const QString &id)
{
static const char *TWITCH_API_OPERATION_SUBSCRIPTION_DELETE="delete subscription";
Network::Request::Send({Twitch::Endpoint(Twitch::ENDPOINT_EVENTSUB_SUBSCRIPTIONS)},Network::Method::DELETE,[this,id](QNetworkReply *reply) {
switch (reply->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt())
{
case 204:
emit Print(u"Removed event "_s.append(id),TWITCH_API_OPERATION_SUBSCRIPTION_DELETE);
emit EventSubscriptionRemoved(id);
break;
case 400:
emit Print(u"The subscription request was malformatted"_s,TWITCH_API_OPERATION_SUBSCRIPTION_DELETE);
break;
case 401:
emit Print(u"Invalid OAuth token or authorization header was malformatted"_s,TWITCH_API_OPERATION_SUBSCRIPTION_DELETE);
break;
case 404:
emit Print(u"Invalid subscription ID"_s,TWITCH_API_OPERATION_SUBSCRIPTION_DELETE);
break;
}
},{
{u"id"_s,id}
},{
{NETWORK_HEADER_AUTHORIZATION,security.Bearer(security.OAuthToken())},
{NETWORK_HEADER_CLIENT_ID,security.ClientID()}
});
}
std::optional<QString> EventSub::ExtractPrompt(SubscriptionType type,const QJsonObject &event) const
{
switch (type)
{
case SubscriptionType::CHANNEL_CHEER:
return "!uptime "+event.value(JSON_KEY_EVENT_MESSAGE).toString().section(" ",1);
default:
return std::nullopt;
}
}