Skip to content

Commit

Permalink
Add disconnection detection on send
Browse files Browse the repository at this point in the history
- bAutoReconnectOnFailedSend will auto-attempt reconnection
- OnDisconnection will emit on failed send, toggleable
- does not detect connection failures until a send is attempted
  • Loading branch information
getnamo committed Jul 29, 2022
1 parent 5da68fc commit 9792b3b
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 28 deletions.
104 changes: 78 additions & 26 deletions Source/TCPWrapper/Private/TCPClientComponent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,25 @@ UTCPClientComponent::UTCPClientComponent(const FObjectInitializer &init) : UActo
bReceiveDataOnGameThread = true;
bWantsInitializeComponent = true;
bAutoActivate = true;
bAutoDisconnectOnSendFailure = true;
bAutoReconnectOnSendFailure = true;
ConnectionIP = FString(TEXT("127.0.0.1"));
ConnectionPort = 3000;
ClientSocketName = FString(TEXT("ue4-tcp-client"));
ClientSocketName = FString(TEXT("unreal-tcp-client"));
ClientSocket = nullptr;

BufferMaxSize = 2 * 1024 * 1024; //default roughly 2mb
}

void UTCPClientComponent::ConnectToSocketAsClient(const FString& InIP /*= TEXT("127.0.0.1")*/, const int32 InPort /*= 3000*/)
{

//Already connected? attempt reconnect
if (IsConnected())
{
CloseSocket();
ConnectToSocketAsClient(InIP, InPort);
}

ISocketSubsystem* SocketSubsystem = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM);

if (SocketSubsystem == nullptr)
Expand All @@ -47,36 +55,43 @@ void UTCPClientComponent::ConnectToSocketAsClient(const FString& InIP /*= TEXT("
}

RemoteAdress = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->CreateInternetAddr();

//bool bIsValid;

RemoteAdress->SetRawIp(ResolveInfo->GetResolvedAddress().GetRawIp()); // todo: somewhat wasteful, we could probably use the same address object?
RemoteAdress->SetPort(InPort);

/*if (!bIsValid)
{
UE_LOG(LogTemp, Error, TEXT("TCP address is invalid <%s:%d>"), *InIP, InPort);
return ;
}*/

ClientSocket = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->CreateSocket(NAME_Stream, ClientSocketName, false);

//Set Send Buffer Size
ClientSocket->SetSendBufferSize(BufferMaxSize, BufferMaxSize);
ClientSocket->SetReceiveBufferSize(BufferMaxSize, BufferMaxSize);

ClientSocket->Connect(*RemoteAdress);

if (IsConnected())
{
OnConnected.Broadcast();
}
bShouldReceiveData = true;

//Listen for data on our end
ClientConnectionFinishedFuture = FTCPWrapperUtility::RunLambdaOnBackGroundThread([&]()
{
double LastConnectionCheck = FPlatformTime::Seconds();

uint32 BufferSize = 0;
TArray<uint8> ReceiveBuffer;
bShouldAttemptConnection = true;

while (bShouldAttemptConnection)
{
if (ClientSocket->Connect(*RemoteAdress))
{
FTCPWrapperUtility::RunLambdaOnGameThread([&]()
{
OnConnected.Broadcast();
});
bShouldAttemptConnection = false;
continue;
}

//reconnect attempt every 3 sec
FPlatformProcess::Sleep(3.f);
}

bShouldReceiveData = true;

while (bShouldReceiveData)
{
if (ClientSocket->HasPendingData(BufferSize))
Expand All @@ -103,8 +118,26 @@ void UTCPClientComponent::ConnectToSocketAsClient(const FString& InIP /*= TEXT("
OnReceivedBytes.Broadcast(ReceiveBuffer);
}
}
//sleep until there is data or 10 ticks
ClientSocket->Wait(ESocketWaitConditions::WaitForReadOrWrite, FTimespan(1));
//sleep until there is data or 10 ticks (0.1micro seconds
ClientSocket->Wait(ESocketWaitConditions::WaitForReadOrWrite, FTimespan(10));

//Check every second if we're still connected
//NB: this doesn't really work atm, disconnects are not captured on receive pipe
//detectable on send failure though
/*double Now = FPlatformTime::Seconds();
if (Now > (LastConnectionCheck + 1.0))
{
LastConnectionCheck = Now;
if (!IsConnected())
{
bShouldReceiveData = false;
FTCPWrapperUtility::RunLambdaOnGameThread([&]()
{
OnDisconnected.Broadcast();
});
}
}*/
}
});
}
Expand All @@ -119,25 +152,44 @@ void UTCPClientComponent::CloseSocket()
ClientSocket->Close();
ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->DestroySocket(ClientSocket);
ClientSocket = nullptr;

OnDisconnected.Broadcast();
}
}

bool UTCPClientComponent::Emit(const TArray<uint8>& Bytes)
{
if (ClientSocket && ClientSocket->GetConnectionState() == SCS_Connected)
if (IsConnected())
{
int32 BytesSent = 0;
return ClientSocket->Send(Bytes.GetData(), Bytes.Num(), BytesSent);
bool bDidSend = ClientSocket->Send(Bytes.GetData(), Bytes.Num(), BytesSent);


//If we're supposedly connected but failed to send
if (IsConnected() && !bDidSend)
{
UE_LOG(LogTemp, Warning, TEXT("Sending Failure detected"));

if (bAutoDisconnectOnSendFailure)
{
UE_LOG(LogTemp, Warning, TEXT("disconnecting socket."));
CloseSocket();
}

if (bAutoReconnectOnSendFailure)
{
UE_LOG(LogTemp, Warning, TEXT("reconnecting..."));
ConnectToSocketAsClient(ConnectionIP, ConnectionPort);
}
}
return bDidSend;
}
return false;
}

bool UTCPClientComponent::IsConnected()
{
if (ClientSocket && (ClientSocket->GetConnectionState() == ESocketConnectionState::SCS_Connected))
return true;

return false;
return (ClientSocket && (ClientSocket->GetConnectionState() == ESocketConnectionState::SCS_Connected));
}

void UTCPClientComponent::InitializeComponent()
Expand Down
5 changes: 5 additions & 0 deletions Source/TCPWrapper/Private/TCPWrapperUtility.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@ class FTCPWrapperUtility
{
return Async(EAsyncExecution::Thread, InFunction);
}

static TFuture<void> RunLambdaOnGameThread(TFunction< void()> InFunction)
{
return Async(EAsyncExecution::TaskGraphMainThread, InFunction);
}
};
15 changes: 14 additions & 1 deletion Source/TCPWrapper/Public/TCPClientComponent.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ class TCPWRAPPER_API UTCPClientComponent : public UActorComponent
UPROPERTY(BlueprintAssignable, Category = "TCP Events")
FTCPMessageSignature OnReceivedBytes;

/** Callback when we start listening on the TCP receive socket*/
/** Callback when we've connected to end point*/
UPROPERTY(BlueprintAssignable, Category = "TCP Events")
FTCPEventSignature OnConnected;

/** Callback when we've disconnected from end point only captured on send failure */
UPROPERTY(BlueprintAssignable, Category = "TCP Events")
FTCPEventSignature OnDisconnected;

/** Default sending socket IP string in form e.g. 127.0.0.1. */
UPROPERTY(EditAnywhere, BlueprintReadWrite, Category = "TCP Connection Properties")
FString ConnectionIP;
Expand All @@ -45,6 +49,14 @@ class TCPWRAPPER_API UTCPClientComponent : public UActorComponent
UPROPERTY(EditAnywhere, BlueprintReadWrite, Category = "TCP Connection Properties")
bool bReceiveDataOnGameThread;

/** When a send failure occurs, should we automatically try to disconnect? */
UPROPERTY(EditAnywhere, BlueprintReadWrite, Category = "TCP Connection Properties")
bool bAutoDisconnectOnSendFailure;

/** When a send failure occurs, should we automatically try to reconnect after disconnection? */
UPROPERTY(EditAnywhere, BlueprintReadWrite, Category = "TCP Connection Properties")
bool bAutoReconnectOnSendFailure;


/**
* Connect to a TCP endpoint, optional method if auto-connect is set to true.
Expand Down Expand Up @@ -82,6 +94,7 @@ class TCPWRAPPER_API UTCPClientComponent : public UActorComponent
protected:
FSocket* ClientSocket;
FThreadSafeBool bShouldReceiveData;
FThreadSafeBool bShouldAttemptConnection;
TFuture<void> ClientConnectionFinishedFuture;

//FTCPSocketReceiver* TCPReceiver;
Expand Down
2 changes: 1 addition & 1 deletion TCPWrapper.uplugin
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"FileVersion": 3,
"Version": 1,
"VersionName": "1.0.3",
"VersionName": "1.1.0",
"FriendlyName": "TCPWrapper",
"Description": "Convenience TCP wrapper plugin.",
"Category": "Networking",
Expand Down

0 comments on commit 9792b3b

Please sign in to comment.