Skip to content

Commit

Permalink
Add utils to create and prepare socket for tcp client (#31009)
Browse files Browse the repository at this point in the history
* Add utils to create and prepare socket for tcp client

* Automated change: Fix sanity tests

* review comments

* review comments

* regenerate projects

* regenerate projects

* review comments

* comments

* fix typo

* remove unused parameter

* review comments

* remove static

* re-introduce forward declaration

Co-authored-by: Vignesh2208 <Vignesh2208@users.noreply.github.com>
  • Loading branch information
Vignesh2208 and Vignesh2208 committed Sep 22, 2022
1 parent eada386 commit 9bcee18
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 7 deletions.
3 changes: 3 additions & 0 deletions BUILD
Expand Up @@ -804,6 +804,7 @@ grpc_cc_library(
"ref_counted_ptr",
"slice",
"slice_refcount",
"status_helper",
"transport_fwd",
],
)
Expand Down Expand Up @@ -2803,6 +2804,7 @@ grpc_cc_library(
"src/core/lib/event_engine/posix_engine/tcp_socket_utils.h",
],
external_deps = [
"absl/cleanup",
"absl/status",
"absl/status:statusor",
"absl/strings",
Expand All @@ -2818,6 +2820,7 @@ grpc_cc_library(
"ref_counted_ptr",
"resource_quota",
"socket_mutator",
"status_helper",
"useful",
],
)
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -32,12 +32,7 @@
#include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
#include "src/core/ext/transport/binder/wire_format/binder.h"
#include "src/core/ext/transport/binder/wire_format/wire_writer.h"

#define RETURN_IF_ERROR(expr) \
do { \
const absl::Status status = (expr); \
if (!status.ok()) return status; \
} while (0)
#include "src/core/lib/gprpp/status_helper.h"

namespace grpc_binder {
namespace {
Expand Down
75 changes: 74 additions & 1 deletion src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
Expand Up @@ -14,11 +14,15 @@

#include <grpc/support/port_platform.h>

#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"

#include <arpa/inet.h>
#include <errno.h>
#include <inttypes.h>
#include <limits.h>

#include "absl/cleanup/cleanup.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
Expand Down Expand Up @@ -49,8 +53,8 @@
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>

#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/status_helper.h"

#ifdef GRPC_HAVE_UNIX_SOCKET
#include <sys/un.h>
Expand Down Expand Up @@ -100,6 +104,32 @@ int CreateSocket(std::function<int(int, int, int)> socket_factory, int family,

const uint8_t kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};

absl::Status PrepareTcpClientSocket(PosixSocketWrapper sock,
const EventEngine::ResolvedAddress& addr,
const PosixTcpOptions& options) {
bool close_fd = true;
auto sock_cleanup = absl::MakeCleanup([&close_fd, &sock]() -> void {
if (close_fd and sock.Fd() >= 0) {
close(sock.Fd());
}
});
RETURN_IF_ERROR(sock.SetSocketNonBlocking(1));
RETURN_IF_ERROR(sock.SetSocketCloexec(1));

if (reinterpret_cast<const sockaddr*>(addr.address())->sa_family != AF_UNIX) {
// If its not a unix socket address.
RETURN_IF_ERROR(sock.SetSocketLowLatency(1));
RETURN_IF_ERROR(sock.SetSocketReuseAddr(1));
sock.TrySetSocketTcpUserTimeout(options, true);
}
RETURN_IF_ERROR(sock.SetSocketNoSigpipeIfPossible());
RETURN_IF_ERROR(sock.ApplySocketMutatorInOptions(
GRPC_FD_CLIENT_CONNECTION_USAGE, options));
// No errors. Set close_fd to false to ensure the socket is not closed.
close_fd = false;
return absl::OkStatus();
}

#endif /* GRPC_POSIX_SOCKET_UTILS_COMMON */

} // namespace
Expand Down Expand Up @@ -771,6 +801,42 @@ absl::StatusOr<PosixSocketWrapper> PosixSocketWrapper::CreateDualStackSocket(
return PosixSocketWrapper(newfd);
}

absl::StatusOr<PosixSocketWrapper::PosixSocketCreateResult>
PosixSocketWrapper::CreateAndPrepareTcpClientSocket(
const PosixTcpOptions& options,
const EventEngine::ResolvedAddress& target_addr) {
PosixSocketWrapper::DSMode dsmode;
EventEngine::ResolvedAddress mapped_target_addr;

// Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
// v6.
if (!SockaddrToV4Mapped(&target_addr, &mapped_target_addr)) {
// addr is v4 mapped to v6 or just v6.
mapped_target_addr = target_addr;
}
absl::StatusOr<PosixSocketWrapper> posix_socket_wrapper =
PosixSocketWrapper::CreateDualStackSocket(nullptr, mapped_target_addr,
SOCK_STREAM, 0, dsmode);
if (!posix_socket_wrapper.ok()) {
return posix_socket_wrapper.status();
}

if (dsmode == PosixSocketWrapper::DSMode::DSMODE_IPV4) {
// Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4.
if (!SockaddrIsV4Mapped(&target_addr, &mapped_target_addr)) {
mapped_target_addr = target_addr;
}
}

auto error = PrepareTcpClientSocket(*posix_socket_wrapper, mapped_target_addr,
options);
if (!error.ok()) {
return error;
}
return PosixSocketWrapper::PosixSocketCreateResult{*posix_socket_wrapper,
mapped_target_addr};
}

#else /* GRPC_POSIX_SOCKET_UTILS_COMMON */

bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* /*resolved_addr*/,
Expand Down Expand Up @@ -875,6 +941,13 @@ PosixSocketWrapper::CreateDualStackSocket(
int /*protocol*/, DSMode& /*dsmode*/) {
GPR_ASSERT(false && "unimplemented");
}

absl::StatusOr<PosixSocketWrapper::PosixSocketCreateResult>
PosixSocketWrapper::CreateAndPrepareTcpClientSocket(
const PosixTcpOptions& /*options*/,
const EventEngine::ResolvedAddress& /*target_addr*/) {
GPR_ASSERT(false && "unimplemented");
}
}

#endif /* GRPC_POSIX_SOCKET_UTILS_COMMON */
Expand Down
25 changes: 25 additions & 0 deletions src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
Expand Up @@ -171,6 +171,8 @@ class PosixSocketWrapper {
public:
explicit PosixSocketWrapper(int fd) : fd_(fd) { GPR_ASSERT(fd_ > 0); }

PosixSocketWrapper() : fd_(-1){};

~PosixSocketWrapper() = default;

// Instruct the kernel to wait for specified number of bytes to be received on
Expand Down Expand Up @@ -301,10 +303,33 @@ class PosixSocketWrapper {
const experimental::EventEngine::ResolvedAddress& addr, int type,
int protocol, DSMode& dsmode);

struct PosixSocketCreateResult;
// Return a PosixSocketCreateResult which manages a configured, unbound,
// unconnected TCP client fd.
// options: may contain custom tcp settings for the fd.
// target_addr: the destination address.
//
// Returns: Not-OK status on error. Otherwise it returns a
// PosixSocketWrapper::PosixSocketCreateResult type which includes a sock
// of type PosixSocketWrapper and a mapped_target_addr which is
// target_addr mapped to an address appropriate to the type of socket FD
// created. For example, if target_addr is IPv4 and dual stack sockets are
// available, mapped_target_addr will be an IPv4-mapped IPv6 address.
//
static absl::StatusOr<PosixSocketCreateResult>
CreateAndPrepareTcpClientSocket(
const PosixTcpOptions& options,
const EventEngine::ResolvedAddress& target_addr);

private:
int fd_;
};

struct PosixSocketWrapper::PosixSocketCreateResult {
PosixSocketWrapper sock;
EventEngine::ResolvedAddress mapped_target_addr;
};

} // namespace posix_engine
} // namespace grpc_event_engine

Expand Down
6 changes: 6 additions & 0 deletions src/core/lib/gprpp/status_helper.h
Expand Up @@ -38,6 +38,12 @@ struct google_rpc_Status;
struct upb_Arena;
}

#define RETURN_IF_ERROR(expr) \
do { \
const absl::Status status = (expr); \
if (!status.ok()) return status; \
} while (0)

namespace grpc_core {

/// This enum should have the same value of grpc_error_ints
Expand Down

0 comments on commit 9bcee18

Please sign in to comment.