From 804b848c4b068cab365dd6263875d3fd92632298 Mon Sep 17 00:00:00 2001 From: theteacat Date: Tue, 13 May 2025 13:55:55 +0100 Subject: [PATCH] refactor readers --- src/bidirectional_stream.go | 63 ++++++++++++++----------------------- 1 file changed, 24 insertions(+), 39 deletions(-) diff --git a/src/bidirectional_stream.go b/src/bidirectional_stream.go index 84555c9..76b7f14 100644 --- a/src/bidirectional_stream.go +++ b/src/bidirectional_stream.go @@ -68,61 +68,46 @@ func (s *bidirectionalStream) run() { go func() { defer wg.Done() - defer s.clientToServer.Close() defer func() { if r := recover(); r != nil { slog.Error("Recovered from panic in clientToServer reader:", "Err", r) } }() - reader := bufio.NewReader(&s.clientToServer) - for { - request, err := http.ReadRequest(reader) - if err == io.EOF { - return - } else if err != nil { - continue - } - // RemoteAddr is not filled in by ReadRequest so we have to populate it ourselves - request.RemoteAddr = fmt.Sprintf("%s:%s", s.net.Src().String(), s.transport.Src().String()) - if request.ContentLength > 0 && request.ContentLength < s.maxBodySize { - responseBody := make([]byte, request.ContentLength) - if request.ContentLength > 0 { - io.ReadFull(request.Body, responseBody) - } - request.Body.Close() - request.Body = io.NopCloser(bytes.NewReader(responseBody)) - } - requestChannel <- request - + requestBytes := make([]byte, s.maxBodySize) + bytesRead, err := io.ReadFull(&s.clientToServer, requestBytes) + if err != nil && err != io.ErrUnexpectedEOF { + slog.Debug("Failed to read request bytes from stream:", "Err", err.Error(), "BytesRead", bytesRead) + return + } + request, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(requestBytes))) + if err != nil { + slog.Debug("Failed to read request bytes:", "Err", err.Error()) + return } + // RemoteAddr is not filled in by ReadRequest so we have to populate it ourselves + request.RemoteAddr = fmt.Sprintf("%s:%s", s.net.Src().String(), s.transport.Src().String()) + requestChannel <- request }() go func() { defer wg.Done() - defer s.serverToClient.Close() defer func() { if r := recover(); r != nil { slog.Error("Recovered from panic in serverToClient reader:", "Err", r) } }() - reader := bufio.NewReader(&s.serverToClient) - for { - response, err := http.ReadResponse(reader, nil) - if err == io.ErrUnexpectedEOF { - return - } else if err != nil { - continue - } - if response.ContentLength > 0 && response.ContentLength < s.maxBodySize { - responseBody := make([]byte, response.ContentLength) - if response.ContentLength > 0 { - io.ReadFull(response.Body, responseBody) - } - response.Body.Close() - response.Body = io.NopCloser(bytes.NewReader(responseBody)) - } - responseChannel <- response + responseBytes := make([]byte, s.maxBodySize) + bytesRead, err := io.ReadFull(&s.serverToClient, responseBytes) + if err != nil && err != io.ErrUnexpectedEOF { + slog.Debug("Failed to read response bytes from stream:", "Err", err.Error(), "BytesRead", bytesRead) + return + } + response, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(responseBytes)), nil) + if err != nil { + slog.Debug("Failed to read response bytes:", "Err", err.Error()) + return } + responseChannel <- response }() wg.Wait()