Skip to content

Commit

Permalink
Merge pull request #1315 from cybozu/add-retry-limit-and-ignore-msg
Browse files Browse the repository at this point in the history
Add retry limit and ignore message flags
  • Loading branch information
terassyi committed Apr 19, 2024
2 parents 5cd476d + 8e37c65 commit 1470f1e
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 22 deletions.
14 changes: 9 additions & 5 deletions tcp-keepalive/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ Flags:
-h, --help help for server
-i, --interval duration Interval to send a keepalive message (default 5s)
-l, --listen string Listen address and port (default ":8000")
--silent Server doesn't send keepalive message
--retry-limit int The limit to retry, 0 is no limit
-t, --timeout duration Deadline to receive a keepalive message (default 15s)
```

Expand All @@ -36,9 +38,11 @@ Usage:
tcp-keepalive client [flags]

Flags:
-r, --connect-retry duration Connect retry interval (default 1s)
-h, --help help for client
-i, --interval duration Interval to send a keepalive message (default 5s)
-s, --server string Server running host (default "127.0.0.1:8000")
-t, --timeout duration Deadline to receive a keepalive message (default 15s)
-h, --help help for client
--ignore-server-msg Ignore whether receiving the message from server or not
-i, --interval duration Interval to send a keepalive message (default 5s)
-y, --retry Try to connect after a previous connection is closed
-r, --retry-interval duration Connect retry interval (default 1s)
-s, --server string Server running host (default "127.0.0.1:8000")
-t, --timeout duration Deadline to receive a keepalive message (default 15s)
```
2 changes: 1 addition & 1 deletion tcp-keepalive/src/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/neco-containers/tcp-keepalive

go 1.21.3
go 1.22

require github.com/spf13/cobra v1.8.0

Expand Down
79 changes: 63 additions & 16 deletions tcp-keepalive/src/tcp-keepalive/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ func init() {
ServerCmd.Flags().StringP("listen", "l", ":8000", "Listen address and port")
ServerCmd.Flags().DurationP("interval", "i", time.Second*5, "Interval to send a keepalive message")
ServerCmd.Flags().DurationP("timeout", "t", time.Second*15, "Deadline to receive a keepalive message")
ServerCmd.Flags().Int("retry-limit", 0, "The limit to retry, 0 is no limit")
ServerCmd.Flags().Bool("silent", false, "Server doesn't send keepalive message")
ClientCmd.Flags().StringP("server", "s", "127.0.0.1:8000", "Server running host")
ClientCmd.Flags().DurationP("interval", "i", time.Second*5, "Interval to send a keepalive message")
ClientCmd.Flags().DurationP("timeout", "t", time.Second*15, "Deadline to receive a keepalive message")
ClientCmd.Flags().BoolP("retry", "y", false, "Try to connect after a previous connection is closed")
ClientCmd.Flags().DurationP("retry-interval", "r", time.Second, "Connect retry interval")
ClientCmd.Flags().Bool("ignore-server-msg", false, "Ignore whether receiving the message from server or not")
rootCmd.AddCommand(&ServerCmd)
rootCmd.AddCommand(&ClientCmd)
}
Expand Down Expand Up @@ -77,6 +80,18 @@ func serverMain(cmd *cobra.Command, args []string) error {
return err
}

retryLimit, err := cmd.Flags().GetInt("retry-limit")
if err != nil {
logger.Error("Failed to get retry-limit flag", slog.Any("error", err))
return err
}

noMsg, err := cmd.Flags().GetBool("silent")
if err != nil {
logger.Error("Failed to get silent flag", slog.Any("error", err))
return err
}

logger = logger.With("local", addr)

tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
Expand All @@ -88,7 +103,9 @@ func serverMain(cmd *cobra.Command, args []string) error {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

logger.Info("Start TCP server")
logger.Info("Start TCP server", slog.Int("retry-limit", retryLimit), slog.Duration("interval", interval), slog.Duration("timeout", timeout))

retryCount := 0

listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
Expand All @@ -98,6 +115,7 @@ func serverMain(cmd *cobra.Command, args []string) error {

ctx, cancel := context.WithCancel(context.Background())
connections := make(chan *net.TCPConn)
closeNotifyChan := make(chan struct{})

go func(ctx context.Context) {
for {
Expand All @@ -109,8 +127,9 @@ func serverMain(cmd *cobra.Command, args []string) error {
if err != nil {
logger.ErrorContext(ctx, "Failed to accept new TCP connection", slog.Any("error", err))
}
logger.InfoContext(ctx, "Accept new connection.", slog.Any("remote", conn.RemoteAddr()))
logger.InfoContext(ctx, "Accept new connection.", slog.Any("remote", conn.RemoteAddr()), slog.Int("retry-count", retryCount))
connections <- conn
retryCount += 1
}
}

Expand All @@ -124,13 +143,19 @@ func serverMain(cmd *cobra.Command, args []string) error {
cancel()
return nil
case conn := <-connections:
go handleConnection(ctx, conn, interval, timeout)
go handleConnection(ctx, conn, interval, timeout, closeNotifyChan, noMsg, false)
case <-closeNotifyChan:
if retryLimit != 0 && retryCount >= retryLimit {
logger.WarnContext(ctx, "Exceed retry limit. exit.")
cancel()
return nil
}
}

}
}

func handleConnection(ctx context.Context, conn net.Conn, interval, timeout time.Duration) {
func handleConnection(ctx context.Context, conn net.Conn, interval, timeout time.Duration, closeNotifyChan chan struct{}, notSendMsg, ignoreRecvMsg bool) {
logger := initLogger().With("remote", conn.RemoteAddr())

intervalTicker := time.NewTicker(interval)
Expand All @@ -145,21 +170,37 @@ func handleConnection(ctx context.Context, conn net.Conn, interval, timeout time
case <-ctx.Done():
logger.InfoContext(ctx, "Close connection")
conn.Close()
if closeNotifyChan != nil {
closeNotifyChan <- struct{}{}
}
return
case <-closeChan:
logger.InfoContext(ctx, "Close connection")
conn.Close()
if closeNotifyChan != nil {
closeNotifyChan <- struct{}{}
}
return
case <-intervalTicker.C:
if _, err := conn.Write([]byte("keepalive")); err != nil {
logger.ErrorContext(ctx, "Failed to send keepalive message", slog.Any("error", err))
return
if !notSendMsg {
if _, err := conn.Write([]byte("keepalive")); err != nil {
logger.ErrorContext(ctx, "Failed to send keepalive message", slog.Any("error", err))
if closeNotifyChan != nil {
closeNotifyChan <- struct{}{}
}
return
}
logger.InfoContext(ctx, "Send a keepalive message")
}
logger.InfoContext(ctx, "Send a keepalive message")
case <-timeoutTicker.C:
logger.WarnContext(ctx, "Deadline exceeded to receive a keepalive message. Close connection")
conn.Close()
return
if !ignoreRecvMsg {
logger.WarnContext(ctx, "Deadline exceeded to receive a keepalive message. Close connection")
conn.Close()
if closeNotifyChan != nil {
closeNotifyChan <- struct{}{}
}
return
}
case <-receiveChan:
timeoutTicker.Reset(timeout)
}
Expand Down Expand Up @@ -187,15 +228,21 @@ func clientMain(cmd *cobra.Command, args []string) error {
return err
}

retryInterval, err := cmd.Flags().GetDuration("connect-retry-interval")
retryInterval, err := cmd.Flags().GetDuration("retry-interval")
if err != nil {
logger.Error("Failed to get retry-interval flag", slog.Any("error", err))
return err
}

retry, err := cmd.Flags().GetBool("retry")
if err != nil {
logger.Error("Failed to get connect-retry-interval flag", slog.Any("error", err))
logger.Error("Failed to get retry flag", slog.Any("error", err))
return err
}

retry, err := cmd.Flags().GetBool("connect-retry")
ignoreRecvMsg, err := cmd.Flags().GetBool("ignore-server-msg")
if err != nil {
logger.Error("Failed to get connect-retry flag", slog.Any("error", err))
logger.Error("Failed to get ignore-server-msg flag", slog.Any("error", err))
return err
}

Expand Down Expand Up @@ -248,7 +295,7 @@ func clientMain(cmd *cobra.Command, args []string) error {
}
go func() {
logger.InfoContext(ctx, "Start to handle connection")
handleConnection(ctx, conn, interval, timeout)
handleConnection(ctx, conn, interval, timeout, nil, false, ignoreRecvMsg)
waitChan <- struct{}{}
}()
case <-waitChan:
Expand Down

0 comments on commit 1470f1e

Please sign in to comment.