Skip to content

Commit

Permalink
Fix PubSub client creation/pending topic resolving (Chatterino#3037)
Browse files Browse the repository at this point in the history
Co-authored-by: Rasmus Karlsson <rasmus.karlsson@pajlada.com>
Co-authored-by: Felanbird <41973452+Felanbird@users.noreply.github.com>
Co-authored-by: zneix <zneix@zneix.eu>
  • Loading branch information
3 people committed Jul 25, 2021
1 parent 33d1837 commit 770b9f2
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Minor: Added informative messages for recent-messages API's errors. (#3029)
- Minor: Added section with helpful Chatterino-related links to the About page. (#3068)
- Bugfix: Fixed "smiley" emotes being unable to be "Tabbed" with autocompletion, introduced in v2.3.3. (#3010)
- Bugfix: Fixed PubSub not properly trying to resolve pending listens when the pending listens list was larger than 50. (#3037)
- Bugfix: Copy buttons in usercard now show properly in light mode (#3057)
- Bugfix: Fixed comma appended to username completion when not at the beginning of the message. (#3060)
- Bugfix: Fixed bug misplacing chat when zooming on Chrome with Chatterino Native Host extension (#1936)
Expand Down
111 changes: 87 additions & 24 deletions src/providers/twitch/PubsubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "providers/twitch/PubsubActions.hpp"
#include "providers/twitch/PubsubHelpers.hpp"
#include "singletons/Settings.hpp"
#include "util/DebugCount.hpp"
#include "util/Helpers.hpp"
#include "util/RapidjsonHelpers.hpp"

Expand All @@ -23,7 +24,8 @@ namespace chatterino {

static const char *pingPayload = "{\"type\":\"PING\"}";

static std::map<QString, QString> sentMessages;
static std::map<QString, RequestMessage> sentListens;
static std::map<QString, RequestMessage> sentUnlistens;

namespace detail {

Expand Down Expand Up @@ -59,21 +61,21 @@ namespace detail {
// This PubSubClient is already at its peak listens
return false;
}

this->numListens_ += numRequestedListens;
DebugCount::increase("PubSub topic pending listens",
numRequestedListens);

for (const auto &topic : message["data"]["topics"].GetArray())
{
this->listeners_.emplace_back(
Listener{topic.GetString(), false, false, false});
}

auto uuid = generateUuid();

rj::set(message, "nonce", uuid);
auto nonce = generateUuid();
rj::set(message, "nonce", nonce);

QString payload = rj::stringify(message);
sentMessages[uuid] = payload;
sentListens[nonce] = RequestMessage{payload, numRequestedListens};

this->send(payload.toUtf8());

Expand Down Expand Up @@ -103,14 +105,19 @@ namespace detail {
return;
}

auto message = createUnlistenMessage(topics);
int numRequestedUnlistens = topics.size();

auto uuid = generateUuid();
this->numListens_ -= numRequestedUnlistens;
DebugCount::increase("PubSub topic pending unlistens",
numRequestedUnlistens);

rj::set(message, "nonce", generateUuid());
auto message = createUnlistenMessage(topics);

auto nonce = generateUuid();
rj::set(message, "nonce", nonce);

QString payload = rj::stringify(message);
sentMessages[uuid] = payload;
sentUnlistens[nonce] = RequestMessage{payload, numRequestedUnlistens};

this->send(payload.toUtf8());
}
Expand Down Expand Up @@ -865,6 +872,13 @@ PubSub::PubSub()

void PubSub::addClient()
{
if (this->addingClient)
{
return;
}

this->addingClient = true;

websocketpp::lib::error_code ec;
auto con = this->websocketClient.get_connection(TWITCH_PUBSUB_URL, ec);

Expand Down Expand Up @@ -998,6 +1012,8 @@ void PubSub::listen(rapidjson::Document &&msg)

this->requests.emplace_back(
std::make_unique<rapidjson::Document>(std::move(msg)));

DebugCount::increase("PubSub topic backlog");
}

bool PubSub::tryListen(rapidjson::Document &msg)
Expand Down Expand Up @@ -1066,7 +1082,7 @@ void PubSub::onMessage(websocketpp::connection_hdl hdl,

if (type == "RESPONSE")
{
this->handleListenResponse(msg);
this->handleResponse(msg);
}
else if (type == "MESSAGE")
{
Expand Down Expand Up @@ -1107,6 +1123,9 @@ void PubSub::onMessage(websocketpp::connection_hdl hdl,

void PubSub::onConnectionOpen(WebsocketHandle hdl)
{
DebugCount::increase("PubSub connections");
this->addingClient = false;

auto client =
std::make_shared<detail::PubSubClient>(this->websocketClient, hdl);

Expand All @@ -1123,17 +1142,24 @@ void PubSub::onConnectionOpen(WebsocketHandle hdl)
const auto &request = *it;
if (client->listen(*request))
{
DebugCount::decrease("PubSub topic backlog");
it = this->requests.erase(it);
}
else
{
++it;
}
}

if (!this->requests.empty())
{
this->addClient();
}
}

void PubSub::onConnectionClose(WebsocketHandle hdl)
{
DebugCount::decrease("PubSub connections");
auto clientIt = this->clients.find(hdl);

// If this assert goes off, there's something wrong with the connection
Expand Down Expand Up @@ -1169,27 +1195,64 @@ PubSub::WebsocketContextPtr PubSub::onTLSInit(websocketpp::connection_hdl hdl)
return ctx;
}

void PubSub::handleListenResponse(const rapidjson::Document &msg)
void PubSub::handleResponse(const rapidjson::Document &msg)
{
QString error;

if (rj::getSafe(msg, "error", error))
{
QString nonce;
rj::getSafe(msg, "nonce", nonce);
if (!rj::getSafe(msg, "error", error))
return;

if (error.isEmpty())
{
qCDebug(chatterinoPubsub)
<< "Successfully listened to nonce" << nonce;
// Nothing went wrong
return;
}
QString nonce;
rj::getSafe(msg, "nonce", nonce);

const bool failed = !error.isEmpty();

if (failed)
{
qCDebug(chatterinoPubsub)
<< "PubSub error:" << error << "on nonce" << nonce;
<< QString("Error %1 on nonce %2").arg(error, nonce);
}

if (auto it = sentListens.find(nonce); it != sentListens.end())
{
this->handleListenResponse(it->second, failed);
return;
}

if (auto it = sentUnlistens.find(nonce); it != sentUnlistens.end())
{
this->handleUnlistenResponse(it->second, failed);
return;
}

qCDebug(chatterinoPubsub)
<< "Response on unused" << nonce << "client/topic listener mismatch?";
}

void PubSub::handleListenResponse(const RequestMessage &msg, bool failed)
{
DebugCount::decrease("PubSub topic pending listens", msg.topicCount);
if (failed)
{
DebugCount::increase("PubSub topic failed listens", msg.topicCount);
}
else
{
DebugCount::increase("PubSub topic listening", msg.topicCount);
}
}

void PubSub::handleUnlistenResponse(const RequestMessage &msg, bool failed)
{
DebugCount::decrease("PubSub topic pending unlistens", msg.topicCount);
if (failed)
{
DebugCount::increase("PubSub topic failed unlistens", msg.topicCount);
}
else
{
DebugCount::decrease("PubSub topic listening", msg.topicCount);
}
}

void PubSub::handleMessageResponse(const rapidjson::Value &outerData)
Expand Down
10 changes: 9 additions & 1 deletion src/providers/twitch/PubsubClient.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ using WebsocketErrorCode = websocketpp::lib::error_code;
#define MAX_PUBSUB_LISTENS 50
#define MAX_PUBSUB_CONNECTIONS 10

struct RequestMessage {
QString payload;
int topicCount;
};

namespace detail {

struct Listener {
Expand Down Expand Up @@ -172,6 +177,7 @@ class PubSub
bool isListeningToTopic(const QString &topic);

void addClient();
std::atomic<bool> addingClient{false};

State state = State::Connected;

Expand All @@ -192,7 +198,9 @@ class PubSub
void onConnectionClose(websocketpp::connection_hdl hdl);
WebsocketContextPtr onTLSInit(websocketpp::connection_hdl hdl);

void handleListenResponse(const rapidjson::Document &msg);
void handleResponse(const rapidjson::Document &msg);
void handleListenResponse(const RequestMessage &msg, bool failed);
void handleUnlistenResponse(const RequestMessage &msg, bool failed);
void handleMessageResponse(const rapidjson::Value &data);

void runThread();
Expand Down
28 changes: 28 additions & 0 deletions src/util/DebugCount.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ class DebugCount
reinterpret_cast<int64_t &>(it.value())++;
}
}
static void increase(const QString &name, const int64_t &amount)
{
auto counts = counts_.access();

auto it = counts->find(name);
if (it == counts->end())
{
counts->insert(name, amount);
}
else
{
reinterpret_cast<int64_t &>(it.value()) += amount;
}
}

static void decrease(const QString &name)
{
Expand All @@ -42,6 +56,20 @@ class DebugCount
reinterpret_cast<int64_t &>(it.value())--;
}
}
static void decrease(const QString &name, const int64_t &amount)
{
auto counts = counts_.access();

auto it = counts->find(name);
if (it == counts->end())
{
counts->insert(name, -amount);
}
else
{
reinterpret_cast<int64_t &>(it.value()) -= amount;
}
}

static QString getDebugText()
{
Expand Down

0 comments on commit 770b9f2

Please sign in to comment.