Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FFM-2096 Add Interface for hooking into SSE Events #69

Merged
merged 3 commits into from
Jan 24, 2022
Merged

Conversation

jcox250
Copy link
Contributor

@jcox250 jcox250 commented Jan 18, 2022

What

  • Creates an Interface that clients can implement in order or receive the SSE Events that the SDK gets from Feature Flags
  • Adds an option func so clients can pass their own EventStreamListener in, default behaviour is to not have one.

Why

In order to implement streaming in the Proxy we need to be able to get the SSE events after the SDK has updated the cache. This change only forwards the Event to the EventListener after the cache has been updated.

Testing

Did the following locally

  • Created a basic EventListener in the Proxy and passed it in to the embedded SDK using WithEventStreamListener.
  • Changed the Proxy stream handler to just return the Grip Control headers for pushpin
  • Ran the Proxy and pointed and SDK at it that was running in streaming mode
  • Toggled flags on/off and the SDK got the event and then fetched the new value from the Proxy's cache

Basic Event Listener

type EventListener struct {
	gpc *gripcontrol.GripPubControl
}

func NewEventListener() EventListener {
	gpc := gripcontrol.NewGripPubControl([]map[string]interface{}{
		{
			"control_uri": "http://localhost:5561",
		},
	})
	return EventListener{gpc}
}

func (e EventListener) Pub(ctx context.Context, event stream.Event) error {
	fmt.Println("Got SSE Event From SDK")

	h := hash.NewSha256()
	channel := h.Hash(event.APIKey)

	content := fmt.Sprintf("event: *\ndata: %s\n\n", event.Event.Data)
	if err := e.gpc.PublishHttpStream(channel, content, "", ""); err != nil {
		fmt.Printf("Error sending data: %s, to pushpin with err %s", event.Event.Data, err)
	}

	fmt.Printf("Pushing event to pushpin %+v", event)
	return nil
}

Basic Stream Handler

h.router.GET("/stream", func(c echo.Context) error {
	w := c.Response().Writer
	apiKey := c.Request().Header.Get("API-Key")
	h := hash.NewSha256()

	w.Header().Add("Content-Type", "text/event-stream")
	w.Header().Add("Grip-Hold", "stream")
	w.Header().Add("Grip-Channel", h.Hash(apiKey))
	w.Header().Add("Grip-Keep-Alive", "format=cstring,timeout=30")

	return nil
})

Update
After the refactor I got my Proxy to use the latest commit on this branch, reran the same manual tests and streaming still works as expected

Copy link
Contributor

@davejohnston davejohnston left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jcox250 jcox250 merged commit df6fa51 into main Jan 24, 2022
@jcox250 jcox250 deleted the FFM-2096 branch January 24, 2022 12:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants