Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PubSub client creation/pending topic resolving #3037

Merged
merged 7 commits into from
Jul 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Minor: Added informative messages for recent-messages API's errors. (#3029)
- Minor: Added section with helpful Chatterino-related links to the About page. (#3068)
- Bugfix: Fixed "smiley" emotes being unable to be "Tabbed" with autocompletion, introduced in v2.3.3. (#3010)
- Bugfix: Fixed PubSub not properly trying to resolve pending listens when the pending listens list was larger than 50. (#3037)
- Bugfix: Copy buttons in usercard now show properly in light mode (#3057)
- Bugfix: Fixed comma appended to username completion when not at the beginning of the message. (#3060)
- Bugfix: Fixed bug misplacing chat when zooming on Chrome with Chatterino Native Host extension (#1936)
Expand Down
111 changes: 87 additions & 24 deletions src/providers/twitch/PubsubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "providers/twitch/PubsubActions.hpp"
#include "providers/twitch/PubsubHelpers.hpp"
#include "singletons/Settings.hpp"
#include "util/DebugCount.hpp"
#include "util/Helpers.hpp"
#include "util/RapidjsonHelpers.hpp"

Expand All @@ -23,7 +24,8 @@ namespace chatterino {

static const char *pingPayload = "{\"type\":\"PING\"}";

static std::map<QString, QString> sentMessages;
static std::map<QString, RequestMessage> sentListens;
static std::map<QString, RequestMessage> sentUnlistens;

namespace detail {

Expand Down Expand Up @@ -59,21 +61,21 @@ namespace detail {
// This PubSubClient is already at its peak listens
return false;
}

this->numListens_ += numRequestedListens;
DebugCount::increase("PubSub topic pending listens",
numRequestedListens);

for (const auto &topic : message["data"]["topics"].GetArray())
{
this->listeners_.emplace_back(
Listener{topic.GetString(), false, false, false});
}

auto uuid = generateUuid();

rj::set(message, "nonce", uuid);
auto nonce = generateUuid();
rj::set(message, "nonce", nonce);

QString payload = rj::stringify(message);
sentMessages[uuid] = payload;
sentListens[nonce] = RequestMessage{payload, numRequestedListens};

this->send(payload.toUtf8());

Expand Down Expand Up @@ -103,14 +105,19 @@ namespace detail {
return;
}

auto message = createUnlistenMessage(topics);
int numRequestedUnlistens = topics.size();

auto uuid = generateUuid();
this->numListens_ -= numRequestedUnlistens;
DebugCount::increase("PubSub topic pending unlistens",
numRequestedUnlistens);

rj::set(message, "nonce", generateUuid());
auto message = createUnlistenMessage(topics);

auto nonce = generateUuid();
rj::set(message, "nonce", nonce);

QString payload = rj::stringify(message);
sentMessages[uuid] = payload;
sentUnlistens[nonce] = RequestMessage{payload, numRequestedUnlistens};

this->send(payload.toUtf8());
}
Expand Down Expand Up @@ -865,6 +872,13 @@ PubSub::PubSub()

void PubSub::addClient()
{
if (this->addingClient)
{
return;
}

this->addingClient = true;

websocketpp::lib::error_code ec;
auto con = this->websocketClient.get_connection(TWITCH_PUBSUB_URL, ec);

Expand Down Expand Up @@ -998,6 +1012,8 @@ void PubSub::listen(rapidjson::Document &&msg)

this->requests.emplace_back(
std::make_unique<rapidjson::Document>(std::move(msg)));

DebugCount::increase("PubSub topic backlog");
}

bool PubSub::tryListen(rapidjson::Document &msg)
Expand Down Expand Up @@ -1066,7 +1082,7 @@ void PubSub::onMessage(websocketpp::connection_hdl hdl,

if (type == "RESPONSE")
{
this->handleListenResponse(msg);
this->handleResponse(msg);
}
else if (type == "MESSAGE")
{
Expand Down Expand Up @@ -1107,6 +1123,9 @@ void PubSub::onMessage(websocketpp::connection_hdl hdl,

void PubSub::onConnectionOpen(WebsocketHandle hdl)
{
DebugCount::increase("PubSub connections");
this->addingClient = false;

auto client =
std::make_shared<detail::PubSubClient>(this->websocketClient, hdl);

Expand All @@ -1123,17 +1142,24 @@ void PubSub::onConnectionOpen(WebsocketHandle hdl)
const auto &request = *it;
if (client->listen(*request))
{
DebugCount::decrease("PubSub topic backlog");
it = this->requests.erase(it);
}
else
{
++it;
}
}

if (!this->requests.empty())
{
this->addClient();
}
}

void PubSub::onConnectionClose(WebsocketHandle hdl)
{
DebugCount::decrease("PubSub connections");
auto clientIt = this->clients.find(hdl);

// If this assert goes off, there's something wrong with the connection
Expand Down Expand Up @@ -1169,27 +1195,64 @@ PubSub::WebsocketContextPtr PubSub::onTLSInit(websocketpp::connection_hdl hdl)
return ctx;
}

void PubSub::handleListenResponse(const rapidjson::Document &msg)
void PubSub::handleResponse(const rapidjson::Document &msg)
{
QString error;

if (rj::getSafe(msg, "error", error))
{
QString nonce;
rj::getSafe(msg, "nonce", nonce);
if (!rj::getSafe(msg, "error", error))
return;

if (error.isEmpty())
{
qCDebug(chatterinoPubsub)
<< "Successfully listened to nonce" << nonce;
// Nothing went wrong
return;
}
QString nonce;
rj::getSafe(msg, "nonce", nonce);

const bool failed = !error.isEmpty();

if (failed)
{
qCDebug(chatterinoPubsub)
<< "PubSub error:" << error << "on nonce" << nonce;
<< QString("Error %1 on nonce %2").arg(error, nonce);
}

if (auto it = sentListens.find(nonce); it != sentListens.end())
{
this->handleListenResponse(it->second, failed);
return;
}

if (auto it = sentUnlistens.find(nonce); it != sentUnlistens.end())
{
this->handleUnlistenResponse(it->second, failed);
return;
}

qCDebug(chatterinoPubsub)
<< "Response on unused" << nonce << "client/topic listener mismatch?";
}

void PubSub::handleListenResponse(const RequestMessage &msg, bool failed)
{
DebugCount::decrease("PubSub topic pending listens", msg.topicCount);
if (failed)
{
DebugCount::increase("PubSub topic failed listens", msg.topicCount);
}
else
{
DebugCount::increase("PubSub topic listening", msg.topicCount);
}
}

void PubSub::handleUnlistenResponse(const RequestMessage &msg, bool failed)
{
DebugCount::decrease("PubSub topic pending unlistens", msg.topicCount);
if (failed)
{
DebugCount::increase("PubSub topic failed unlistens", msg.topicCount);
}
else
{
DebugCount::decrease("PubSub topic listening", msg.topicCount);
}
}

void PubSub::handleMessageResponse(const rapidjson::Value &outerData)
Expand Down
10 changes: 9 additions & 1 deletion src/providers/twitch/PubsubClient.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ using WebsocketErrorCode = websocketpp::lib::error_code;
#define MAX_PUBSUB_LISTENS 50
#define MAX_PUBSUB_CONNECTIONS 10

struct RequestMessage {
QString payload;
int topicCount;
};

namespace detail {

struct Listener {
Expand Down Expand Up @@ -172,6 +177,7 @@ class PubSub
bool isListeningToTopic(const QString &topic);

void addClient();
std::atomic<bool> addingClient{false};

State state = State::Connected;

Expand All @@ -192,7 +198,9 @@ class PubSub
void onConnectionClose(websocketpp::connection_hdl hdl);
WebsocketContextPtr onTLSInit(websocketpp::connection_hdl hdl);

void handleListenResponse(const rapidjson::Document &msg);
void handleResponse(const rapidjson::Document &msg);
void handleListenResponse(const RequestMessage &msg, bool failed);
void handleUnlistenResponse(const RequestMessage &msg, bool failed);
void handleMessageResponse(const rapidjson::Value &data);

void runThread();
Expand Down
28 changes: 28 additions & 0 deletions src/util/DebugCount.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ class DebugCount
reinterpret_cast<int64_t &>(it.value())++;
}
}
static void increase(const QString &name, const int64_t &amount)
{
auto counts = counts_.access();

auto it = counts->find(name);
if (it == counts->end())
{
counts->insert(name, amount);
}
else
{
reinterpret_cast<int64_t &>(it.value()) += amount;
}
}

static void decrease(const QString &name)
{
Expand All @@ -42,6 +56,20 @@ class DebugCount
reinterpret_cast<int64_t &>(it.value())--;
}
}
static void decrease(const QString &name, const int64_t &amount)
{
auto counts = counts_.access();

auto it = counts->find(name);
if (it == counts->end())
{
counts->insert(name, -amount);
}
else
{
reinterpret_cast<int64_t &>(it.value()) -= amount;
}
}

static QString getDebugText()
{
Expand Down