From cab494fbdd8bcb0cc7fe3fb7255cb1e343a541cd Mon Sep 17 00:00:00 2001 From: Huansong Fu Date: Wed, 18 May 2022 07:34:32 -0700 Subject: [PATCH] Revert "Replace per-IC sockaddr formatting with generic" This reverts commit bf7f9922c0b9594ce492031c00338e47cf0f041f. --- src/backend/cdb/motion/ic_common.c | 34 ----------- src/backend/cdb/motion/ic_tcp.c | 98 ++++++++++++++++++++++++++---- src/backend/cdb/motion/ic_udpifc.c | 95 ++++++++++++++++++++++++++++- src/include/cdb/ml_ipc.h | 2 - 4 files changed, 180 insertions(+), 49 deletions(-) diff --git a/src/backend/cdb/motion/ic_common.c b/src/backend/cdb/motion/ic_common.c index a2c23b0bcfbc..7ca1ee3084f7 100644 --- a/src/backend/cdb/motion/ic_common.c +++ b/src/backend/cdb/motion/ic_common.c @@ -872,37 +872,3 @@ interconnect_abort_callback(ResourceReleasePhase phase, } } } - -/* - * format_sockaddr - * Format a sockaddr to a human readable string - * - * This function must be kept threadsafe, elog/ereport/palloc etc are not - * allowed within this function. - */ -char * -format_sockaddr(struct sockaddr_storage *sa, char *buf, size_t len) -{ - int ret; - char remote_host[NI_MAXHOST]; - char remote_port[NI_MAXSERV]; - - ret = pg_getnameinfo_all(sa, sizeof(struct sockaddr_storage), - remote_host, sizeof(remote_host), - remote_port, sizeof(remote_port), - NI_NUMERICHOST | NI_NUMERICSERV); - - if (ret != 0) - snprintf(buf, len, "?host?:?port?"); - else - { -#ifdef HAVE_IPV6 - if (sa->ss_family == AF_INET6) - snprintf(buf, len, "[%s]:%s", remote_host, remote_port); - else -#endif - snprintf(buf, len, "%s:%s", remote_host, remote_port); - } - - return buf; -} diff --git a/src/backend/cdb/motion/ic_tcp.c b/src/backend/cdb/motion/ic_tcp.c index 0d1004cbeed8..0a2fdcfac1b4 100644 --- a/src/backend/cdb/motion/ic_tcp.c +++ b/src/backend/cdb/motion/ic_tcp.c @@ -68,6 +68,8 @@ static ChunkTransportStateEntry *startOutgoingConnections(ChunkTransportState *t int *pOutgoingCount); static void format_fd_set(StringInfo buf, int nfds, mpp_fd_set *fds, char *pfx, char *sfx); +static char *format_sockaddr(struct sockaddr *sa, char *buf, int bufsize); + static void setupOutgoingConnection(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn); static void updateOutgoingConnection(ChunkTransportState *transportStates, @@ -596,10 +598,10 @@ setupOutgoingConnection(ChunkTransportState *transportStates, ChunkTransportStat #ifdef ENABLE_IC_PROXY if (Gp_interconnect_type == INTERCONNECT_TYPE_PROXY) { - /* + /* * Using libuv pipe to register backend to proxy. * ic_proxy_backend_connect only appends the connect request into - * connection queue and waits for the libuv_run_loop to handle the queue. + * connection queue and waits for the libuv_run_loop to handle the queue. */ ic_proxy_backend_connect(transportStates->proxyContext, pEntry, conn, true); @@ -805,7 +807,7 @@ sendRegisterMessage(ChunkTransportState *transportStates, ChunkTransportStateEnt errdetail("getsockname sockfd=%d remote=%s: %m", conn->sockfd, conn->remoteHostAndPort))); } - format_sockaddr(&localAddr, conn->localHostAndPort, + format_sockaddr((struct sockaddr *) &localAddr, conn->localHostAndPort, sizeof(conn->localHostAndPort)); if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE) @@ -1200,7 +1202,7 @@ acceptIncomingConnection(void) conn->remoteContentId = -2; /* Save remote and local host:port strings for error messages. */ - format_sockaddr(&remoteAddr, conn->remoteHostAndPort, + format_sockaddr((struct sockaddr *) &remoteAddr, conn->remoteHostAndPort, sizeof(conn->remoteHostAndPort)); addrsize = sizeof(localAddr); if (getsockname(newsockfd, (struct sockaddr *) &localAddr, &addrsize)) @@ -1211,7 +1213,7 @@ acceptIncomingConnection(void) errdetail("getsockname sockfd=%d remote=%s: %m", newsockfd, conn->remoteHostAndPort))); } - format_sockaddr(&localAddr, conn->localHostAndPort, + format_sockaddr((struct sockaddr *) &localAddr, conn->localHostAndPort, sizeof(conn->localHostAndPort)); /* make socket non-blocking */ @@ -1332,11 +1334,11 @@ SetupTCPInterconnect(EState *estate) { incoming_count++; - /* + /* * Using libuv pipe to register backend to proxy. * ic_proxy_backend_connect only appends the connect request * into connection queue and waits for the libuv_run_loop to - * handle the queue. + * handle the queue. */ ic_proxy_backend_connect(interconnect_context->proxyContext, pEntry, conn, false /* isSender */); @@ -2210,6 +2212,80 @@ format_fd_set(StringInfo buf, int nfds, mpp_fd_set *fds, char *pfx, char *sfx) appendStringInfoString(buf, sfx); } +static char * +format_sockaddr(struct sockaddr *sa, char *buf, int bufsize) +{ + /* Save remote host:port string for error messages. */ + if (sa->sa_family == AF_INET) + { + struct sockaddr_in *sin = (struct sockaddr_in *) sa; + uint32 saddr = ntohl(sin->sin_addr.s_addr); + + snprintf(buf, bufsize, "%d.%d.%d.%d:%d", + (saddr >> 24) & 0xff, + (saddr >> 16) & 0xff, + (saddr >> 8) & 0xff, + saddr & 0xff, + ntohs(sin->sin_port)); + } +#ifdef HAVE_IPV6 + else if (sa->sa_family == AF_INET6) + { + char remote_port[32] = {'\0'}; + + if (bufsize > 10) + { + buf[0] = '['; + + /* + * inet_ntop isn't portable. //inet_ntop(AF_INET6, + * &sin6->sin6_addr, buf, bufsize - 8); + * + * postgres has a standard routine for converting addresses to + * printable format, which works for IPv6, IPv4, and Unix domain + * sockets. I've changed this routine to use that, but I think + * the entire format_sockaddr routine could be replaced with it. + */ + int ret = pg_getnameinfo_all((const struct sockaddr_storage *) sa, sizeof(struct sockaddr_storage), + buf + 1, bufsize - 10, + remote_port, sizeof(remote_port), + NI_NUMERICHOST | NI_NUMERICSERV); + + if (ret != 0) + { + elog(LOG, "getnameinfo returned %d: %s, and says %s port %s", ret, gai_strerror(ret), buf, remote_port); + + /* + * Fall back to using our internal inet_ntop routine, which + * really is for inet datatype This is because of a bug in + * solaris, where getnameinfo sometimes fails Once we find out + * why, we can remove this + */ + snprintf(remote_port, sizeof(remote_port), "%d", ((struct sockaddr_in6 *) sa)->sin6_port); + + /* + * This is nasty: our internal inet_net_ntop takes + * PGSQL_AF_INET6, not AF_INET6, which is very odd... They are + * NOT the same value (even though PGSQL_AF_INET == AF_INET + */ +#define PGSQL_AF_INET6 (AF_INET + 1) + inet_net_ntop(PGSQL_AF_INET6, sa, sizeof(struct sockaddr_in6), buf + 1, bufsize - 10); + elog(LOG, "Our alternative method says %s]:%s", buf, remote_port); + + } + buf += strlen(buf); + strcat(buf, "]"); + buf++; + } + snprintf(buf, 8, ":%s", remote_port); + } +#endif + else + snprintf(buf, bufsize, "?host?:?port?"); + + return buf; +} /* format_sockaddr */ + static void flushInterconnectListenerBacklog(void) { @@ -2250,7 +2326,7 @@ flushInterconnectListenerBacklog(void) if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE) { /* Get remote and local host:port strings for message. */ - format_sockaddr(&remoteAddr, remoteHostAndPort, + format_sockaddr((struct sockaddr *) &remoteAddr, remoteHostAndPort, sizeof(remoteHostAndPort)); addrsize = sizeof(localAddr); if (getsockname(newfd, (struct sockaddr *) &localAddr, &addrsize)) @@ -2263,7 +2339,7 @@ flushInterconnectListenerBacklog(void) } else { - format_sockaddr(&localAddr, localHostAndPort, + format_sockaddr((struct sockaddr *) &localAddr, localHostAndPort, sizeof(localHostAndPort)); ereport(DEBUG2, (errmsg("Interconnect clearing incoming connection " "from remote=%s to local=%s. sockfd=%d.", @@ -2546,7 +2622,7 @@ RecvTupleChunkFromAnyTCP(ChunkTransportState *transportStates, struct timeval timeout = tval; int nfds = pEntry->highReadSock; - + /* make sure we check for these. */ ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive); @@ -2575,7 +2651,7 @@ RecvTupleChunkFromAnyTCP(ChunkTransportState *transportStates, if (skipSelect) break; - /* + /* * Also monitor the events on dispatch fds, eg, errors or sequence * request from QEs. */ diff --git a/src/backend/cdb/motion/ic_udpifc.c b/src/backend/cdb/motion/ic_udpifc.c index 289576953a97..e9d29966f3ff 100644 --- a/src/backend/cdb/motion/ic_udpifc.c +++ b/src/backend/cdb/motion/ic_udpifc.c @@ -671,6 +671,7 @@ static ChunkTransportStateEntry *startOutgoingUDPConnections(ChunkTransportState int *pOutgoingCount); static void setupOutgoingUDPConnection(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn); +static char *formatSockAddr(struct sockaddr *sa, char *buf, int bufsize); /* Connection hash table functions. */ static bool initConnHashTable(ConnHashTable *ht, MemoryContext ctx); @@ -2807,7 +2808,7 @@ setupOutgoingUDPConnection(ChunkTransportState *transportStates, ChunkTransportS getSockAddr(&conn->peer, &conn->peer_len, cdbProc->listenerAddr, cdbProc->listenerPort); /* Save the destination IP address */ - format_sockaddr(&conn->peer, conn->remoteHostAndPort, + formatSockAddr((struct sockaddr *) &conn->peer, conn->remoteHostAndPort, sizeof(conn->remoteHostAndPort)); Assert(conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6); @@ -5742,6 +5743,96 @@ doSendStopMessageUDPIFC(ChunkTransportState *transportStates, int16 motNodeID) pthread_mutex_unlock(&ic_control_info.lock); } +/* + * formatSockAddr + * Format sockaddr. + * + * NOTE: Because this function can be called in a thread (rxThreadFunc), + * it must not use services such as elog, ereport, palloc/pfree and StringInfo. + * elog is NOT thread-safe. Developers should instead use something like: + * + * if (DEBUG3 >= log_min_messages) + * write_log("my brilliant log statement here."); + */ +char * +formatSockAddr(struct sockaddr *sa, char *buf, int bufsize) +{ + /* Save remote host:port string for error messages. */ + if (sa->sa_family == AF_INET) + { + struct sockaddr_in *sin = (struct sockaddr_in *) sa; + uint32 saddr = ntohl(sin->sin_addr.s_addr); + + snprintf(buf, bufsize, "%d.%d.%d.%d:%d", + (saddr >> 24) & 0xff, + (saddr >> 16) & 0xff, + (saddr >> 8) & 0xff, + saddr & 0xff, + ntohs(sin->sin_port)); + } +#ifdef HAVE_IPV6 + else if (sa->sa_family == AF_INET6) + { + char remote_port[32]; + + remote_port[0] = '\0'; + buf[0] = '\0'; + + if (bufsize > 10) + { + buf[0] = '['; + buf[1] = '\0'; /* in case getnameinfo fails */ + + /* + * inet_ntop isn't portable. //inet_ntop(AF_INET6, + * &sin6->sin6_addr, buf, bufsize - 8); + * + * postgres has a standard routine for converting addresses to + * printable format, which works for IPv6, IPv4, and Unix domain + * sockets. I've changed this routine to use that, but I think + * the entire formatSockAddr routine could be replaced with it. + */ + int ret = pg_getnameinfo_all((const struct sockaddr_storage *) sa, sizeof(struct sockaddr_in6), + buf + 1, bufsize - 10, + remote_port, sizeof(remote_port), + NI_NUMERICHOST | NI_NUMERICSERV); + + if (ret != 0) + { + write_log("getnameinfo returned %d: %s, and says %s port %s", + ret, gai_strerror(ret), buf, remote_port); + + /* + * Fall back to using our internal inet_ntop routine, which + * really is for inet datatype This is because of a bug in + * solaris, where getnameinfo sometimes fails Once we find out + * why, we can remove this + */ + snprintf(remote_port, sizeof(remote_port), "%d", ((struct sockaddr_in6 *) sa)->sin6_port); + + /* + * This is nasty: our internal inet_net_ntop takes + * PGSQL_AF_INET6, not AF_INET6, which is very odd... They are + * NOT the same value (even though PGSQL_AF_INET == AF_INET + */ +#define PGSQL_AF_INET6 (AF_INET + 1) + inet_net_ntop(PGSQL_AF_INET6, sa, sizeof(struct sockaddr_in6), buf + 1, bufsize - 10); + write_log("Our alternative method says %s]:%s", buf, remote_port); + + } + buf += strlen(buf); + strcat(buf, "]"); + buf++; + } + snprintf(buf, 8, ":%s", remote_port); + } +#endif + else + snprintf(buf, bufsize, "?host?:?port?"); + + return buf; +} /* formatSockAddr */ + /* * dispatcherAYT * Check the connection from the dispatcher to verify that it is still there. @@ -6511,7 +6602,7 @@ handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len) write_log("ACKING PACKET WITH FLAGS: pkt->seq %d 0x%x [pkt->icId %d last-teardown %d interconnect_id %d]", pkt->seq, dummyconn.conn_info.flags, pkt->icId, rx_control_info.lastTornIcId, ic_control_info.ic_instance_id); - format_sockaddr(&dummyconn.peer, buf, sizeof(buf)); + formatSockAddr((struct sockaddr *) &dummyconn.peer, buf, sizeof(buf)); if (DEBUG1 >= log_min_messages) write_log("ACKING PACKET TO %s", buf); diff --git a/src/include/cdb/ml_ipc.h b/src/include/cdb/ml_ipc.h index f69ebd54c6bc..86fef772cc6d 100644 --- a/src/include/cdb/ml_ipc.h +++ b/src/include/cdb/ml_ipc.h @@ -322,6 +322,4 @@ extern void TeardownUDPIFCInterconnect(ChunkTransportState *transportStates, extern uint32 getActiveMotionConns(void); extern void adjustMasterRouting(Slice *recvSlice); -extern char *format_sockaddr(struct sockaddr_storage *sa, char *buf, size_t len); - #endif /* ML_IPC_H */