Skip to content

Commit

Permalink
Revert "Replace per-IC sockaddr formatting with generic"
Browse files Browse the repository at this point in the history
This reverts commit bf7f992.
  • Loading branch information
huansong committed May 18, 2022
1 parent 956a966 commit cab494f
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 49 deletions.
34 changes: 0 additions & 34 deletions src/backend/cdb/motion/ic_common.c
Expand Up @@ -872,37 +872,3 @@ interconnect_abort_callback(ResourceReleasePhase phase,
}
}
}

/*
* format_sockaddr
* Format a sockaddr to a human readable string
*
* This function must be kept threadsafe, elog/ereport/palloc etc are not
* allowed within this function.
*/
char *
format_sockaddr(struct sockaddr_storage *sa, char *buf, size_t len)
{
int ret;
char remote_host[NI_MAXHOST];
char remote_port[NI_MAXSERV];

ret = pg_getnameinfo_all(sa, sizeof(struct sockaddr_storage),
remote_host, sizeof(remote_host),
remote_port, sizeof(remote_port),
NI_NUMERICHOST | NI_NUMERICSERV);

if (ret != 0)
snprintf(buf, len, "?host?:?port?");
else
{
#ifdef HAVE_IPV6
if (sa->ss_family == AF_INET6)
snprintf(buf, len, "[%s]:%s", remote_host, remote_port);
else
#endif
snprintf(buf, len, "%s:%s", remote_host, remote_port);
}

return buf;
}
98 changes: 87 additions & 11 deletions src/backend/cdb/motion/ic_tcp.c
Expand Up @@ -68,6 +68,8 @@ static ChunkTransportStateEntry *startOutgoingConnections(ChunkTransportState *t
int *pOutgoingCount);

static void format_fd_set(StringInfo buf, int nfds, mpp_fd_set *fds, char *pfx, char *sfx);
static char *format_sockaddr(struct sockaddr *sa, char *buf, int bufsize);

static void setupOutgoingConnection(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn *conn);
static void updateOutgoingConnection(ChunkTransportState *transportStates,
Expand Down Expand Up @@ -596,10 +598,10 @@ setupOutgoingConnection(ChunkTransportState *transportStates, ChunkTransportStat
#ifdef ENABLE_IC_PROXY
if (Gp_interconnect_type == INTERCONNECT_TYPE_PROXY)
{
/*
/*
* Using libuv pipe to register backend to proxy.
* ic_proxy_backend_connect only appends the connect request into
* connection queue and waits for the libuv_run_loop to handle the queue.
* connection queue and waits for the libuv_run_loop to handle the queue.
*/
ic_proxy_backend_connect(transportStates->proxyContext,
pEntry, conn, true);
Expand Down Expand Up @@ -805,7 +807,7 @@ sendRegisterMessage(ChunkTransportState *transportStates, ChunkTransportStateEnt
errdetail("getsockname sockfd=%d remote=%s: %m",
conn->sockfd, conn->remoteHostAndPort)));
}
format_sockaddr(&localAddr, conn->localHostAndPort,
format_sockaddr((struct sockaddr *) &localAddr, conn->localHostAndPort,
sizeof(conn->localHostAndPort));

if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE)
Expand Down Expand Up @@ -1200,7 +1202,7 @@ acceptIncomingConnection(void)
conn->remoteContentId = -2;

/* Save remote and local host:port strings for error messages. */
format_sockaddr(&remoteAddr, conn->remoteHostAndPort,
format_sockaddr((struct sockaddr *) &remoteAddr, conn->remoteHostAndPort,
sizeof(conn->remoteHostAndPort));
addrsize = sizeof(localAddr);
if (getsockname(newsockfd, (struct sockaddr *) &localAddr, &addrsize))
Expand All @@ -1211,7 +1213,7 @@ acceptIncomingConnection(void)
errdetail("getsockname sockfd=%d remote=%s: %m",
newsockfd, conn->remoteHostAndPort)));
}
format_sockaddr(&localAddr, conn->localHostAndPort,
format_sockaddr((struct sockaddr *) &localAddr, conn->localHostAndPort,
sizeof(conn->localHostAndPort));

/* make socket non-blocking */
Expand Down Expand Up @@ -1332,11 +1334,11 @@ SetupTCPInterconnect(EState *estate)
{
incoming_count++;

/*
/*
* Using libuv pipe to register backend to proxy.
* ic_proxy_backend_connect only appends the connect request
* into connection queue and waits for the libuv_run_loop to
* handle the queue.
* handle the queue.
*/
ic_proxy_backend_connect(interconnect_context->proxyContext,
pEntry, conn, false /* isSender */);
Expand Down Expand Up @@ -2210,6 +2212,80 @@ format_fd_set(StringInfo buf, int nfds, mpp_fd_set *fds, char *pfx, char *sfx)
appendStringInfoString(buf, sfx);
}

static char *
format_sockaddr(struct sockaddr *sa, char *buf, int bufsize)
{
/* Save remote host:port string for error messages. */
if (sa->sa_family == AF_INET)
{
struct sockaddr_in *sin = (struct sockaddr_in *) sa;
uint32 saddr = ntohl(sin->sin_addr.s_addr);

snprintf(buf, bufsize, "%d.%d.%d.%d:%d",
(saddr >> 24) & 0xff,
(saddr >> 16) & 0xff,
(saddr >> 8) & 0xff,
saddr & 0xff,
ntohs(sin->sin_port));
}
#ifdef HAVE_IPV6
else if (sa->sa_family == AF_INET6)
{
char remote_port[32] = {'\0'};

if (bufsize > 10)
{
buf[0] = '[';

/*
* inet_ntop isn't portable. //inet_ntop(AF_INET6,
* &sin6->sin6_addr, buf, bufsize - 8);
*
* postgres has a standard routine for converting addresses to
* printable format, which works for IPv6, IPv4, and Unix domain
* sockets. I've changed this routine to use that, but I think
* the entire format_sockaddr routine could be replaced with it.
*/
int ret = pg_getnameinfo_all((const struct sockaddr_storage *) sa, sizeof(struct sockaddr_storage),
buf + 1, bufsize - 10,
remote_port, sizeof(remote_port),
NI_NUMERICHOST | NI_NUMERICSERV);

if (ret != 0)
{
elog(LOG, "getnameinfo returned %d: %s, and says %s port %s", ret, gai_strerror(ret), buf, remote_port);

/*
* Fall back to using our internal inet_ntop routine, which
* really is for inet datatype This is because of a bug in
* solaris, where getnameinfo sometimes fails Once we find out
* why, we can remove this
*/
snprintf(remote_port, sizeof(remote_port), "%d", ((struct sockaddr_in6 *) sa)->sin6_port);

/*
* This is nasty: our internal inet_net_ntop takes
* PGSQL_AF_INET6, not AF_INET6, which is very odd... They are
* NOT the same value (even though PGSQL_AF_INET == AF_INET
*/
#define PGSQL_AF_INET6 (AF_INET + 1)
inet_net_ntop(PGSQL_AF_INET6, sa, sizeof(struct sockaddr_in6), buf + 1, bufsize - 10);
elog(LOG, "Our alternative method says %s]:%s", buf, remote_port);

}
buf += strlen(buf);
strcat(buf, "]");
buf++;
}
snprintf(buf, 8, ":%s", remote_port);
}
#endif
else
snprintf(buf, bufsize, "?host?:?port?");

return buf;
} /* format_sockaddr */

static void
flushInterconnectListenerBacklog(void)
{
Expand Down Expand Up @@ -2250,7 +2326,7 @@ flushInterconnectListenerBacklog(void)
if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE)
{
/* Get remote and local host:port strings for message. */
format_sockaddr(&remoteAddr, remoteHostAndPort,
format_sockaddr((struct sockaddr *) &remoteAddr, remoteHostAndPort,
sizeof(remoteHostAndPort));
addrsize = sizeof(localAddr);
if (getsockname(newfd, (struct sockaddr *) &localAddr, &addrsize))
Expand All @@ -2263,7 +2339,7 @@ flushInterconnectListenerBacklog(void)
}
else
{
format_sockaddr(&localAddr, localHostAndPort,
format_sockaddr((struct sockaddr *) &localAddr, localHostAndPort,
sizeof(localHostAndPort));
ereport(DEBUG2, (errmsg("Interconnect clearing incoming connection "
"from remote=%s to local=%s. sockfd=%d.",
Expand Down Expand Up @@ -2546,7 +2622,7 @@ RecvTupleChunkFromAnyTCP(ChunkTransportState *transportStates,

struct timeval timeout = tval;
int nfds = pEntry->highReadSock;

/* make sure we check for these. */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);

Expand Down Expand Up @@ -2575,7 +2651,7 @@ RecvTupleChunkFromAnyTCP(ChunkTransportState *transportStates,
if (skipSelect)
break;

/*
/*
* Also monitor the events on dispatch fds, eg, errors or sequence
* request from QEs.
*/
Expand Down
95 changes: 93 additions & 2 deletions src/backend/cdb/motion/ic_udpifc.c
Expand Up @@ -671,6 +671,7 @@ static ChunkTransportStateEntry *startOutgoingUDPConnections(ChunkTransportState
int *pOutgoingCount);
static void setupOutgoingUDPConnection(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn *conn);
static char *formatSockAddr(struct sockaddr *sa, char *buf, int bufsize);

/* Connection hash table functions. */
static bool initConnHashTable(ConnHashTable *ht, MemoryContext ctx);
Expand Down Expand Up @@ -2807,7 +2808,7 @@ setupOutgoingUDPConnection(ChunkTransportState *transportStates, ChunkTransportS
getSockAddr(&conn->peer, &conn->peer_len, cdbProc->listenerAddr, cdbProc->listenerPort);

/* Save the destination IP address */
format_sockaddr(&conn->peer, conn->remoteHostAndPort,
formatSockAddr((struct sockaddr *) &conn->peer, conn->remoteHostAndPort,
sizeof(conn->remoteHostAndPort));

Assert(conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6);
Expand Down Expand Up @@ -5742,6 +5743,96 @@ doSendStopMessageUDPIFC(ChunkTransportState *transportStates, int16 motNodeID)
pthread_mutex_unlock(&ic_control_info.lock);
}

/*
* formatSockAddr
* Format sockaddr.
*
* NOTE: Because this function can be called in a thread (rxThreadFunc),
* it must not use services such as elog, ereport, palloc/pfree and StringInfo.
* elog is NOT thread-safe. Developers should instead use something like:
*
* if (DEBUG3 >= log_min_messages)
* write_log("my brilliant log statement here.");
*/
char *
formatSockAddr(struct sockaddr *sa, char *buf, int bufsize)
{
/* Save remote host:port string for error messages. */
if (sa->sa_family == AF_INET)
{
struct sockaddr_in *sin = (struct sockaddr_in *) sa;
uint32 saddr = ntohl(sin->sin_addr.s_addr);

snprintf(buf, bufsize, "%d.%d.%d.%d:%d",
(saddr >> 24) & 0xff,
(saddr >> 16) & 0xff,
(saddr >> 8) & 0xff,
saddr & 0xff,
ntohs(sin->sin_port));
}
#ifdef HAVE_IPV6
else if (sa->sa_family == AF_INET6)
{
char remote_port[32];

remote_port[0] = '\0';
buf[0] = '\0';

if (bufsize > 10)
{
buf[0] = '[';
buf[1] = '\0'; /* in case getnameinfo fails */

/*
* inet_ntop isn't portable. //inet_ntop(AF_INET6,
* &sin6->sin6_addr, buf, bufsize - 8);
*
* postgres has a standard routine for converting addresses to
* printable format, which works for IPv6, IPv4, and Unix domain
* sockets. I've changed this routine to use that, but I think
* the entire formatSockAddr routine could be replaced with it.
*/
int ret = pg_getnameinfo_all((const struct sockaddr_storage *) sa, sizeof(struct sockaddr_in6),
buf + 1, bufsize - 10,
remote_port, sizeof(remote_port),
NI_NUMERICHOST | NI_NUMERICSERV);

if (ret != 0)
{
write_log("getnameinfo returned %d: %s, and says %s port %s",
ret, gai_strerror(ret), buf, remote_port);

/*
* Fall back to using our internal inet_ntop routine, which
* really is for inet datatype This is because of a bug in
* solaris, where getnameinfo sometimes fails Once we find out
* why, we can remove this
*/
snprintf(remote_port, sizeof(remote_port), "%d", ((struct sockaddr_in6 *) sa)->sin6_port);

/*
* This is nasty: our internal inet_net_ntop takes
* PGSQL_AF_INET6, not AF_INET6, which is very odd... They are
* NOT the same value (even though PGSQL_AF_INET == AF_INET
*/
#define PGSQL_AF_INET6 (AF_INET + 1)
inet_net_ntop(PGSQL_AF_INET6, sa, sizeof(struct sockaddr_in6), buf + 1, bufsize - 10);
write_log("Our alternative method says %s]:%s", buf, remote_port);

}
buf += strlen(buf);
strcat(buf, "]");
buf++;
}
snprintf(buf, 8, ":%s", remote_port);
}
#endif
else
snprintf(buf, bufsize, "?host?:?port?");

return buf;
} /* formatSockAddr */

/*
* dispatcherAYT
* Check the connection from the dispatcher to verify that it is still there.
Expand Down Expand Up @@ -6511,7 +6602,7 @@ handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len)
write_log("ACKING PACKET WITH FLAGS: pkt->seq %d 0x%x [pkt->icId %d last-teardown %d interconnect_id %d]",
pkt->seq, dummyconn.conn_info.flags, pkt->icId, rx_control_info.lastTornIcId, ic_control_info.ic_instance_id);

format_sockaddr(&dummyconn.peer, buf, sizeof(buf));
formatSockAddr((struct sockaddr *) &dummyconn.peer, buf, sizeof(buf));

if (DEBUG1 >= log_min_messages)
write_log("ACKING PACKET TO %s", buf);
Expand Down
2 changes: 0 additions & 2 deletions src/include/cdb/ml_ipc.h
Expand Up @@ -322,6 +322,4 @@ extern void TeardownUDPIFCInterconnect(ChunkTransportState *transportStates,
extern uint32 getActiveMotionConns(void);
extern void adjustMasterRouting(Slice *recvSlice);

extern char *format_sockaddr(struct sockaddr_storage *sa, char *buf, size_t len);

#endif /* ML_IPC_H */

0 comments on commit cab494f

Please sign in to comment.