Permalink
Browse files

NetPlay: Implement chunked data transfer

This sends arbitrary packets in chunks to be reassembled at the other
end, allowing large data transfers to be speed-limited and interleaved
with other packets being sent. It also enables tracking the progress of
large data transfers.
  • Loading branch information...
Techjar committed Oct 18, 2018
1 parent e6b2758 commit d94922002b8a5b37d0dcb30b5d2662abc20a6187
@@ -10,6 +10,7 @@
#include "Common/Logging/Log.h"
#include "Common/MsgHandler.h"
#include "Common/Random.h"
#include "Core/NetPlayProto.h"
TraversalClient::TraversalClient(ENetHost* netHost, const std::string& server, const u16 port)
: m_NetHost(netHost), m_Server(server), m_port(port)
@@ -316,11 +317,11 @@ bool EnsureTraversalClient(const std::string& server, u16 server_port, u16 liste
g_OldListenPort = listen_port;
ENetAddress addr = {ENET_HOST_ANY, listen_port};
ENetHost* host = enet_host_create(&addr, // address
50, // peerCount
1, // channelLimit
0, // incomingBandwidth
0); // outgoingBandwidth
ENetHost* host = enet_host_create(&addr, // address
50, // peerCount
NetPlay::CHANNEL_COUNT, // channelLimit
0, // incomingBandwidth
0); // outgoingBandwidth
if (!host)
{
g_MainNetHost.reset();
@@ -33,6 +33,11 @@ const ConfigInfo<bool> NETPLAY_USE_UPNP{{System::Main, "NetPlay", "UseUPNP"}, fa
const ConfigInfo<bool> NETPLAY_ENABLE_QOS{{System::Main, "NetPlay", "EnableQoS"}, true};
const ConfigInfo<bool> NETPLAY_ENABLE_CHUNKED_UPLOAD_LIMIT{
{System::Main, "NetPlay", "EnableChunkedUploadLimit"}, false};
const ConfigInfo<u32> NETPLAY_CHUNKED_UPLOAD_LIMIT{{System::Main, "NetPlay", "ChunkedUploadLimit"},
3000};
const ConfigInfo<u32> NETPLAY_BUFFER_SIZE{{System::Main, "NetPlay", "BufferSize"}, 5};
const ConfigInfo<u32> NETPLAY_CLIENT_BUFFER_SIZE{{System::Main, "NetPlay", "BufferSizeClient"}, 1};
@@ -30,6 +30,9 @@ extern const ConfigInfo<bool> NETPLAY_USE_UPNP;
extern const ConfigInfo<bool> NETPLAY_ENABLE_QOS;
extern const ConfigInfo<bool> NETPLAY_ENABLE_CHUNKED_UPLOAD_LIMIT;
extern const ConfigInfo<u32> NETPLAY_CHUNKED_UPLOAD_LIMIT;
extern const ConfigInfo<u32> NETPLAY_BUFFER_SIZE;
extern const ConfigInfo<u32> NETPLAY_CLIENT_BUFFER_SIZE;
@@ -112,7 +112,7 @@ NetPlayClient::NetPlayClient(const std::string& address, const u16 port, NetPlay
if (!traversal_config.use_traversal)
{
// Direct Connection
m_client = enet_host_create(nullptr, 1, 3, 0, 0);
m_client = enet_host_create(nullptr, 1, CHANNEL_COUNT, 0, 0);
if (m_client == nullptr)
{
@@ -124,7 +124,7 @@ NetPlayClient::NetPlayClient(const std::string& address, const u16 port, NetPlay
enet_address_set_host(&addr, address.c_str());
addr.port = port;
m_server = enet_host_connect(m_client, &addr, 3, 0);
m_server = enet_host_connect(m_client, &addr, CHANNEL_COUNT, 0);
if (m_server == nullptr)
{
@@ -338,6 +338,61 @@ unsigned int NetPlayClient::OnData(sf::Packet& packet)
}
break;
case NP_MSG_CHUNKED_DATA_START:
{
u32 cid;
packet >> cid;
std::string title;
packet >> title;
u64 data_size = Common::PacketReadU64(packet);
m_chunked_data_receive_queue.emplace(cid, sf::Packet{});
std::vector<int> players;
players.push_back(m_local_player->pid);
m_dialog->ShowChunkedProgressDialog(title, data_size, players);
}
break;
case NP_MSG_CHUNKED_DATA_END:
{
u32 cid;
packet >> cid;
OnData(m_chunked_data_receive_queue[cid]);
m_chunked_data_receive_queue.erase(m_chunked_data_receive_queue.find(cid));
m_dialog->HideChunkedProgressDialog();
sf::Packet complete_packet;
complete_packet << static_cast<MessageId>(NP_MSG_CHUNKED_DATA_COMPLETE);
complete_packet << cid;
Send(complete_packet, CHUNKED_DATA_CHANNEL);
}
break;
case NP_MSG_CHUNKED_DATA_PAYLOAD:
{
u32 cid;
packet >> cid;
while (!packet.endOfPacket())
{
u8 byte;
packet >> byte;
m_chunked_data_receive_queue[cid] << byte;
}
m_dialog->SetChunkedProgress(m_local_player->pid,
m_chunked_data_receive_queue[cid].getDataSize());
sf::Packet progress_packet;
progress_packet << static_cast<MessageId>(NP_MSG_CHUNKED_DATA_PROGRESS);
progress_packet << cid;
progress_packet << sf::Uint64{m_chunked_data_receive_queue[cid].getDataSize()};
Send(progress_packet, CHUNKED_DATA_CHANNEL);
}
break;
case NP_MSG_PAD_MAPPING:
{
for (PadMapping& mapping : m_pad_map)
@@ -1050,11 +1105,11 @@ unsigned int NetPlayClient::OnData(sf::Packet& packet)
return 0;
}
void NetPlayClient::Send(const sf::Packet& packet)
void NetPlayClient::Send(const sf::Packet& packet, const u8 channel_id)
{
ENetPacket* epac =
enet_packet_create(packet.getData(), packet.getDataSize(), ENET_PACKET_FLAG_RELIABLE);
enet_peer_send(m_server, 0, epac);
enet_peer_send(m_server, channel_id, epac);
}
void NetPlayClient::DisplayPlayersPing()
@@ -1104,11 +1159,11 @@ void NetPlayClient::Disconnect()
m_server = nullptr;
}
void NetPlayClient::SendAsync(sf::Packet&& packet)
void NetPlayClient::SendAsync(sf::Packet&& packet, const u8 channel_id)
{
{
std::lock_guard<std::recursive_mutex> lkq(m_crit.async_queue_write);
m_async_queue.Push(std::move(packet));
m_async_queue.Push(AsyncQueueEntry{std::move(packet), channel_id});
}
ENetUtil::WakeupThread(m_client);
}
@@ -1136,7 +1191,10 @@ void NetPlayClient::ThreadFunc()
net = enet_host_service(m_client, &netEvent, 250);
while (!m_async_queue.Empty())
{
Send(m_async_queue.Front());
{
auto& e = m_async_queue.Front();
Send(e.packet, e.channel_id);
}
m_async_queue.Pop();
}
if (net > 0)
@@ -1557,7 +1615,7 @@ void NetPlayClient::OnConnectReady(ENetAddress addr)
if (m_connection_state == ConnectionState::WaitingForTraversalClientConnectReady)
{
m_connection_state = ConnectionState::Connecting;
enet_host_connect(m_client, &addr, 0, 0);
enet_host_connect(m_client, &addr, CHANNEL_COUNT, 0);
}
}
@@ -13,6 +13,8 @@
#include <optional>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
#include "Common/CommonTypes.h"
@@ -60,6 +62,11 @@ class NetPlayUI
virtual void SetMD5Progress(int pid, int progress) = 0;
virtual void SetMD5Result(int pid, const std::string& result) = 0;
virtual void AbortMD5() = 0;
virtual void ShowChunkedProgressDialog(const std::string& title, u64 data_size,
const std::vector<int>& players) = 0;
virtual void HideChunkedProgressDialog() = 0;
virtual void SetChunkedProgress(int pid, u64 progress) = 0;
};
enum class PlayerGameStatus
@@ -85,7 +92,7 @@ class NetPlayClient : public TraversalClientClient
{
public:
void ThreadFunc();
void SendAsync(sf::Packet&& packet);
void SendAsync(sf::Packet&& packet, u8 channel_id = DEFAULT_CHANNEL);
NetPlayClient(const std::string& address, const u16 port, NetPlayUI* dialog,
const std::string& name, const NetTraversalConfig& traversal_config);
@@ -130,6 +137,12 @@ class NetPlayClient : public TraversalClientClient
void AdjustPadBufferSize(unsigned int size);
protected:
struct AsyncQueueEntry
{
sf::Packet packet;
u8 channel_id;
};
void ClearBuffers();
struct
@@ -140,7 +153,7 @@ class NetPlayClient : public TraversalClientClient
std::recursive_mutex async_queue_write;
} m_crit;
Common::SPSCQueue<sf::Packet, false> m_async_queue;
Common::SPSCQueue<AsyncQueueEntry, false> m_async_queue;
std::array<Common::SPSCQueue<GCPadStatus>, 4> m_pad_buffer;
std::array<Common::SPSCQueue<NetWiimote>, 4> m_wiimote_buffer;
@@ -203,7 +216,7 @@ class NetPlayClient : public TraversalClientClient
void AddPadStateToPacket(int in_game_pad, const GCPadStatus& np, sf::Packet& packet);
void SendWiimoteState(int in_game_pad, const NetWiimote& nw);
unsigned int OnData(sf::Packet& packet);
void Send(const sf::Packet& packet);
void Send(const sf::Packet& packet, u8 channel_id = DEFAULT_CHANNEL);
void Disconnect();
bool Connect();
void ComputeMD5(const std::string& file_identifier);
@@ -233,6 +246,7 @@ class NetPlayClient : public TraversalClientClient
u16 m_sync_ar_codes_count = 0;
u16 m_sync_ar_codes_success_count = 0;
bool m_sync_ar_codes_complete = false;
std::unordered_map<u32, sf::Packet> m_chunked_data_receive_queue;
u64 m_initial_rtc = 0;
u32 m_timebase_frame = 0;
@@ -110,6 +110,12 @@ enum
NP_MSG_CHAT_MESSAGE = 0x30,
NP_MSG_CHUNKED_DATA_START = 0x40,
NP_MSG_CHUNKED_DATA_END = 0x41,
NP_MSG_CHUNKED_DATA_PAYLOAD = 0x42,
NP_MSG_CHUNKED_DATA_PROGRESS = 0x43,
NP_MSG_CHUNKED_DATA_COMPLETE = 0x44,
NP_MSG_PAD_DATA = 0x60,
NP_MSG_PAD_MAPPING = 0x61,
NP_MSG_PAD_BUFFER = 0x62,
@@ -179,6 +185,10 @@ enum
constexpr u32 NETPLAY_LZO_IN_LEN = 1024 * 64;
constexpr u32 NETPLAY_LZO_OUT_LEN = NETPLAY_LZO_IN_LEN + (NETPLAY_LZO_IN_LEN / 16) + 64 + 3;
constexpr size_t CHUNKED_DATA_UNIT_SIZE = 16384;
constexpr u8 CHANNEL_COUNT = 2;
constexpr u8 DEFAULT_CHANNEL = 0;
constexpr u8 CHUNKED_DATA_CHANNEL = 1;
using NetWiimote = std::vector<u8>;
using MessageId = u8;
Oops, something went wrong.

0 comments on commit d949220

Please sign in to comment.