Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🤗 [Question]: Setup of SSE Fiber with fasthttp.StreamWriter - event source is pending / never connects ... #2837

Open
3 tasks done
michealroberts opened this issue Feb 8, 2024 · 8 comments

Comments

@michealroberts
Copy link

michealroberts commented Feb 8, 2024

Question Description

Versions:

Go 1.21.5
github.com/gofiber/fiber/v2 v2.52.0
github.com/valyala/fasthttp v1.51.0


Issue

I have the following logic inside of an SSE handler:

// Peak at the incoming Accept request header:
accept := c.Request().Header.Peek("Accept")

// Check whether the Accept header is set to text/event-stream:
if c.Accepts("text/event-stream") == "text/event-stream" && strings.Contains(string(accept), "text/event-stream") {
  ctx := c.Context()

  ctx.SetContentType("text/event-stream")

  ctx.Response.Header.Set("Cache-Control", "no-cache")
  ctx.Response.Header.Set("Connection", "keep-alive")
  ctx.Response.Header.Set("Transfer-Encoding", "chunked")
  ctx.Response.Header.Set("Access-Control-Allow-Headers", "Cache-Control")
  ctx.Response.Header.Set("Access-Control-Allow-Credentials", "true")

  ctx.SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
    defer func() {
      if r := recover(); r != nil {
        fmt.Println("Recovered in SSE writer:", r)
      }
    }()

    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    for {
      select {
      case <-ticker.C:
        status, err := GetStatus(telescope)

        if err != nil {
          fmt.Printf("Error while getting status: %v\n", err)
          return
        }

        data, err := json.Marshal(status)

        if err != nil {
          fmt.Printf("Error while marshaling JSON: %v\n", err)
          return
        }

        fmt.Fprintf(w, "data: %s\n\n", string(data))

        fmt.Println(string(data))

        if err := w.Flush(); err != nil {
          fmt.Printf("Error while flushing: %v. Closing connection.\n", err)
          return
        }
      case <-c.Context().Done():
        fmt.Println("Client disconnected. Closing connection.")
        return
      }
    }
  }))

  return nil
}

Which to me, looks good. I can log the message without issue ...

However, when connecting from a browser side client ... the connection is stuck in the "connecting" phase of the event source connection.

I can see, that when the endpoint is called from my client, the server is logging correctly:

CleanShot 2024-02-08 at 11 50 20

But the connection remains as pending:

CleanShot 2024-02-08 at 11 49 24@2x

Also, no errors when requesting application/json (so for me, it isn't a case that the underlying code has an issue):

CleanShot 2024-02-08 at 12 03 50@2x

The front end JS code is standard for the EventSource API.


Headers

CleanShot 2024-02-08 at 11 54 39@2x

Reproduction

I can also provide access to the repository for a minimal reproduction if the issue isn't apparent from what I have supplied if needed, please just request access for your username and I can provide it (as long as you are listed as a core maintainer of this repo).

Code Snippet (optional)

/*****************************************************************************************************************/

//	@author		Michael Roberts <michael@observerly.com>
//	@package	@observerly/nox/telescope
//	@license	Copyright © 2021-2023 observerly

/*****************************************************************************************************************/

package telescope

/*****************************************************************************************************************/

import (
	"bufio"
	"encoding/json"
	"fmt"
	"strings"
	"time"

	"github.com/gofiber/fiber/v2"
	"github.com/observerly/alpacago/pkg/alpacago"
	"github.com/valyala/fasthttp"

	"nox/internal/common"
	"nox/internal/middleware"
)

/*****************************************************************************************************************/

type GetStatusHandlerResponse struct {
	Connected bool `json:"connected"`
}

/*****************************************************************************************************************/

type GetStatusChannels struct {
	Connected chan bool `json:"connected"`
}

/*****************************************************************************************************************/

func GetStatus(telescope *alpacago.Telescope) (GetStatusHandlerResponse, error) {
	// Create channels for the status values:
	channels := GetStatusChannels{}

	// Create a wait group for the status values:
	wg, channels, errors := common.SetupWaitGroupForStruct(channels)

	// Get the connection status:
	go func() {
		defer wg.Done()
		common.RetrieveAndSendToChannel(telescope.IsConnected, channels.Connected, errors)
	}()

	go func() {
		// Wait for all the goroutines to finish:
		wg.Wait()
		// Close the channels:
		common.CloseChannelsForStruct(channels)
	}()

	status := &GetStatusHandlerResponse{}

	// Extract the values from the channels:
	err := common.ExtractValueFromChannelStruct(channels, status)

	// Check if we encountered any errors while extracting the values:
	if len(errors) > 0 {
		return *status, fmt.Errorf("encountered errors while retrieving status values: %v", errors)
	}

	// If we encounter an error, return the error:
	if err != nil {
		return *status, err
	}

	// Return the status values:
	return *status, nil
}

/*****************************************************************************************************************/

func GetStatusHandler(c *fiber.Ctx) error {
	// Get the telescope client from the context middleware:
	telescope, ok := c.Locals("telescope").(*alpacago.Telescope)

	// If the telescope client is not available in the context, return an error:
	if !ok {
		return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
			"error": "Telescope is not available in context",
		})
	}

	// Peak at the incoming Accept request header:
	accept := c.Request().Header.Peek("Accept")

	// Check whether the Accept header is set to text/event-stream:
	if c.Accepts("text/event-stream") == "text/event-stream" && strings.Contains(string(accept), "text/event-stream") {
		ctx := middleware.TextEventStream(c)

		ctx.SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
			defer func() {
				if r := recover(); r != nil {
					fmt.Println("Recovered in SSE writer:", r)
				}
			}()

			ticker := time.NewTicker(1 * time.Second)
			defer ticker.Stop()

			for {
				select {
				case <-ticker.C:
					status, err := GetStatus(telescope)

					if err != nil {
						fmt.Printf("Error while getting status: %v\n", err)
						return
					}

					data, err := json.Marshal(status)

					if err != nil {
						fmt.Printf("Error while marshaling JSON: %v\n", err)
						return
					}

					fmt.Fprintf(w, "data: %s\n\n", string(data))

					fmt.Println(string(data))

					if err := w.Flush(); err != nil {
						fmt.Printf("Error while flushing: %v. Closing connection.\n", err)
						return
					}
				case <-c.Context().Done():
					fmt.Println("Client disconnected. Closing connection.")
					return
				}
			}
		}))

		return nil
	}

	// Get the telescope status
	status, err := GetStatus(telescope)

	if err != nil {
		return c.Status(fiber.StatusInternalServerError).JSON(
			common.ErrorResponse{
				Error: err.Error(),
			},
		)
	}

	// Return the telescope status:
	return c.JSON(status)
}

/*****************************************************************************************************************/

Checklist:

  • I agree to follow Fiber's Code of Conduct.
  • I have checked for existing issues that describe my questions prior to opening this one.
  • I understand that improperly formatted questions may be closed without explanation.
@michealroberts michealroberts changed the title 🤗 [Question]: Setup of Fiber 🤗 [Question]: Setup of SSE Fiber with fasthttp.StreamWriter - event source is pending / never connects ... Feb 8, 2024
@efectn
Copy link
Member

efectn commented Feb 9, 2024

@michealroberts is your endpoint working if you remove

case <-c.Context().Done():
    fmt.Println("Client disconnected. Closing connection.")
    return

@michealroberts
Copy link
Author

michealroberts commented Feb 9, 2024

@efectn Yeh, so it still doesn't work when removing that unfortunately 😞

I have a branch here: https://github.com/observerly/nox/pull/48 for a full reproduction that you should be able to access. In that code, I've added the basic example given in the gofiber/examples repo for SSE setup, line for line, and unfortunately it still doesn't work ...

Are you able to work with SSE on the latest versions of Fiber and fasthttp?

@efectn
Copy link
Member

efectn commented Feb 9, 2024

@efectn Yeh, so it still doesn't work when removing that unfortunately 😞

I have a branch here: observerly/nox#48 for a full reproduction that you should be able to access. In that code, I've added the basic example given in the gofiber/examples repo for SSE setup, line for line, and unfortunately it still doesn't work ...

Are you able to work with SSE on the latest versions of Fiber and fasthttp?

This one works for me https://paste.laravel.io/8c6a1464-4f52-46c1-b362-ab49f5ad60cf

2024-02-09_18-40

@michealroberts
Copy link
Author

@efectn Aye aye aye, ok. I feel like I have narrowed it down to be able to replicate it.

I have the ETag middleware installed from github.com/gofiber/fiber/v2/middleware/etag ... I guess this somehow causes issues in terms of headers 🤔

What are your thoughts on this? I should be able to get a minimal reproduction.

@efectn
Copy link
Member

efectn commented Feb 9, 2024

@efectn Aye aye aye, ok. I feel like I have narrowed it down to be able to replicate it.

I have the ETag middleware installed from github.com/gofiber/fiber/v2/middleware/etag ... I guess this somehow causes issues in terms of headers 🤔

What are your thoughts on this? I should be able to get a minimal reproduction.

Yes it seems. You can disable etag for specific path like:

app.Use(etag.New(etag.Config{
	Next: func(c *fiber.Ctx) bool {
		return c.Path() == "/sse"
	},
}))

@michealroberts
Copy link
Author

@efectn I think I will disable ETags globally for now, I have quite a number of SSE routes.

I wonder if I should open up a separate minimal, reproducible, example of the SSE + ETag issue, and maybe start to work on a possible fix ...

@sdaduanbilei
Copy link

I have the same problem.

@michealroberts
Copy link
Author

I will start working on this issue this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants