Skip to content

Commit

Permalink
moved event bus into its own package.
Browse files Browse the repository at this point in the history
created models for event bus messaging.
added logger.
added source complete and sync events.
  • Loading branch information
AnalogJ committed Sep 10, 2023
1 parent b2bff9c commit 9e1c745
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 20 deletions.
20 changes: 14 additions & 6 deletions backend/pkg/database/sqlite_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg"
"github.com/fastenhealth/fasten-onprem/backend/pkg/config"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/fastenhealth/fasten-onprem/backend/pkg/models"
databaseModel "github.com/fastenhealth/fasten-onprem/backend/pkg/models/database"
"github.com/fastenhealth/fasten-onprem/backend/pkg/utils"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse"
sourceModel "github.com/fastenhealth/fasten-sources/clients/models"
"github.com/gin-gonic/gin"
"github.com/glebarez/sqlite"
Expand Down Expand Up @@ -62,7 +62,7 @@ func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger)
AppConfig: appConfig,
Logger: globalLogger,
GormClient: database,
EventBus: sse.GetEventBusServer(),
EventBus: event_bus.GetEventBusServer(globalLogger),
}

//TODO: automigrate for now, this should be replaced with a migration tool once the DB has stabilized.
Expand Down Expand Up @@ -95,7 +95,7 @@ type SqliteRepository struct {

GormClient *gorm.DB

EventBus *sse.EventBus
EventBus *event_bus.EventBus
}

func (sr *SqliteRepository) Migrate() error {
Expand Down Expand Up @@ -357,10 +357,18 @@ func (sr *SqliteRepository) UpsertResource(ctx context.Context, wrappedResourceM
//wrappedFhirResourceModel.SetResourceRaw(wrappedResourceModel.ResourceRaw)
}

sr.EventBus.Message <- sse.EventBusMessage{
Message: fmt.Sprintf("resource.upsert %s/%s", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID),
UserID: currentUser.ID.String(),
eventSourceSync := models.NewEventSourceSync(
currentUser.ID.String(),
wrappedFhirResourceModel.GetSourceID().String(),
wrappedFhirResourceModel.GetSourceResourceType(),
wrappedFhirResourceModel.GetSourceResourceID(),
)

err = sr.EventBus.PublishMessage(eventSourceSync)
if err != nil {
sr.Logger.Warnf("ignoring: an error occurred while publishing event to EventBus (%s/%s): %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID, err)
}

createResult := sr.GormClient.WithContext(ctx).Where(models.OriginBase{
SourceID: wrappedFhirResourceModel.GetSourceID(),
SourceResourceID: wrappedFhirResourceModel.GetSourceResourceID(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package sse
package event_bus

import (
"encoding/json"
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg/models"
"github.com/sirupsen/logrus"
"log"
"sync"
)
Expand All @@ -17,13 +20,14 @@ type ClientChan chan string
// Get a reference to the EventBus singleton Start procnteessing requests
// this should be a singleton, to ensure that we're always broadcasting to the same clients
// see: https://refactoring.guru/design-patterns/singleton/go/example
func GetEventBusServer() *EventBus {
func GetEventBusServer(logger logrus.FieldLogger) *EventBus {
if singletonEventBusInstance == nil {
eventBusLock.Lock()
defer eventBusLock.Unlock()
if singletonEventBusInstance == nil {
fmt.Println("Creating single instance now.")
singletonEventBusInstance = &EventBus{
Logger: logger,
Message: make(chan EventBusMessage),
NewListener: make(chan EventBusListener),
ClosedListener: make(chan EventBusListener),
Expand All @@ -45,6 +49,8 @@ func GetEventBusServer() *EventBus {
// It keeps a list of clients those are currently attached
// and broadcasting events to those clients.
type EventBus struct {
Logger logrus.FieldLogger

// Events are pushed to this channel by the main events-gathering routine
Message chan EventBusMessage

Expand Down Expand Up @@ -114,3 +120,16 @@ func (bus *EventBus) listen() {
}
}
}

func (bus *EventBus) PublishMessage(eventMsg models.EventInterface) error {
bus.Logger.Infof("Publishing message to room: `%s`", eventMsg.GetUserID())
payload, err := json.Marshal(eventMsg)
if err != nil {
return err
}
bus.Message <- EventBusMessage{
UserID: eventMsg.GetUserID(),
Message: string(payload),
}
return nil
}
21 changes: 21 additions & 0 deletions backend/pkg/models/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package models

type EventSourceSyncStatus string

const (
EventTypeSourceSync EventSourceSyncStatus = "source_sync"
EventTypeSourceComplete EventSourceSyncStatus = "source_complete"
)

type EventInterface interface {
GetUserID() string
}

type Event struct {
UserID string `json:"-"`
EventType EventSourceSyncStatus `json:"event_type"`
}

func (e *Event) GetUserID() string {
return e.UserID
}
16 changes: 16 additions & 0 deletions backend/pkg/models/event_source_complete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package models

type EventSourceComplete struct {
*Event `json:",inline"`
SourceID string `json:"source_id"`
}

func NewEventSourceComplete(userID string, sourceID string) *EventSourceComplete {
return &EventSourceComplete{
Event: &Event{
UserID: userID,
EventType: EventTypeSourceComplete,
},
SourceID: sourceID,
}
}
20 changes: 20 additions & 0 deletions backend/pkg/models/event_source_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package models

type EventSourceSync struct {
*Event `json:",inline"`
SourceID string `json:"source_id"`
ResourceType string `json:"resource_type"`
ResourceID string `json:"resource_id"`
}

func NewEventSourceSync(userID string, sourceID string, resourceType string, resourceID string) *EventSourceSync {
return &EventSourceSync{
Event: &Event{
UserID: userID,
EventType: EventTypeSourceSync,
},
SourceID: sourceID,
ResourceType: resourceType,
ResourceID: resourceID,
}
}
4 changes: 2 additions & 2 deletions backend/pkg/web/handler/server_sent_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package handler

import (
"github.com/fastenhealth/fasten-onprem/backend/pkg"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/gin-gonic/gin"
"io"
"log"
Expand All @@ -23,7 +23,7 @@ func SSEStream(c *gin.Context) {
log.Printf("could not get client channel from context")
return
}
listener, ok := v.(sse.EventBusListener)
listener, ok := v.(event_bus.EventBusListener)
if !ok {
return
}
Expand Down
38 changes: 36 additions & 2 deletions backend/pkg/web/handler/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg"
"github.com/fastenhealth/fasten-onprem/backend/pkg/database"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/fastenhealth/fasten-onprem/backend/pkg/jwk"
"github.com/fastenhealth/fasten-onprem/backend/pkg/models"
"github.com/fastenhealth/fasten-sources/clients/factory"
Expand Down Expand Up @@ -143,7 +144,7 @@ func CreateSource(c *gin.Context) {

// after creating the source, we should do a bulk import (in the background)

summary, err := SyncSourceResources(context.WithValue(c.Request.Context(), pkg.ContextKeyTypeAuthUsername, c.Value(pkg.ContextKeyTypeAuthUsername).(string)), logger, databaseRepo, &sourceCred)
summary, err := SyncSourceResources(GetBackgroundContext(c), logger, databaseRepo, &sourceCred)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"success": false})
return
Expand All @@ -166,11 +167,24 @@ func SourceSync(c *gin.Context) {
}

// after creating the source, we should do a bulk import (in the background)
summary, err := SyncSourceResources(context.WithValue(c.Request.Context(), pkg.ContextKeyTypeAuthUsername, c.Value(pkg.ContextKeyTypeAuthUsername).(string)), logger, databaseRepo, sourceCred)
summary, err := SyncSourceResources(GetBackgroundContext(c), logger, databaseRepo, sourceCred)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"success": false})
return
}

//publish event
currentUser, _ := databaseRepo.GetCurrentUser(c)
err = event_bus.GetEventBusServer(logger).PublishMessage(
models.NewEventSourceComplete(
currentUser.ID.String(),
sourceCred.ID.String(),
),
)
if err != nil {
logger.Warnf("ignoring: an error occurred while publishing sync complete event: %v", err)
}

c.JSON(http.StatusOK, gin.H{"success": true, "source": sourceCred, "data": summary})
}

Expand Down Expand Up @@ -243,7 +257,21 @@ func CreateManualSource(c *gin.Context) {
return
}

//publish event
currentUser, _ := databaseRepo.GetCurrentUser(c)

err = event_bus.GetEventBusServer(logger).PublishMessage(
models.NewEventSourceComplete(
currentUser.ID.String(),
manualSourceCredential.ID.String(),
),
)
if err != nil {
logger.Warnf("ignoring: an error occurred while publishing sync complete event: %v", err)
}

c.JSON(http.StatusOK, gin.H{"success": true, "data": summary, "source": manualSourceCredential})

}

func GetSource(c *gin.Context) {
Expand Down Expand Up @@ -314,3 +342,9 @@ func SyncSourceResources(c context.Context, logger *logrus.Entry, databaseRepo d

return summary, nil
}

//

func GetBackgroundContext(ginContext *gin.Context) context.Context {
return context.WithValue(ginContext.Request.Context(), pkg.ContextKeyTypeAuthUsername, ginContext.Value(pkg.ContextKeyTypeAuthUsername).(string))
}
9 changes: 5 additions & 4 deletions backend/pkg/web/middleware/server_sent_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg"
"github.com/fastenhealth/fasten-onprem/backend/pkg/database"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
)

func SSEHeaderMiddleware() gin.HandlerFunc {
Expand All @@ -18,10 +19,10 @@ func SSEHeaderMiddleware() gin.HandlerFunc {
}
}

func SSEEventBusServerMiddleware() gin.HandlerFunc {
func SSEEventBusServerMiddleware(logger *logrus.Entry) gin.HandlerFunc {

// get reference to streaming server singleton
bus := sse.GetEventBusServer()
bus := event_bus.GetEventBusServer(logger)

return func(c *gin.Context) {
//get a reference to the current user
Expand All @@ -34,7 +35,7 @@ func SSEEventBusServerMiddleware() gin.HandlerFunc {
}

// Initialize client channel
clientListener := sse.EventBusListener{
clientListener := event_bus.EventBusListener{
ResponseChan: make(chan string),
UserID: foundUser.ID.String(),
}
Expand Down
12 changes: 8 additions & 4 deletions backend/pkg/web/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"embed"
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg/config"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/handler"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/middleware"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"net/http"
Expand Down Expand Up @@ -38,8 +38,8 @@ func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) {
// check if the /web folder is populated.
// check if access to database

bus := sse.GetEventBusServer()
bus.Message <- sse.EventBusMessage{
bus := event_bus.GetEventBusServer(ae.Logger)
bus.Message <- event_bus.EventBusMessage{
UserID: "heartbeat",
Message: "sse heartbeat",
}
Expand Down Expand Up @@ -79,7 +79,11 @@ func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) {
secure.POST("/query", handler.QueryResourceFhir)

//server-side-events handler
secure.GET("/events/stream", middleware.SSEHeaderMiddleware(), middleware.SSEEventBusServerMiddleware(), handler.SSEStream)
secure.GET("/events/stream",
middleware.SSEHeaderMiddleware(),
middleware.SSEEventBusServerMiddleware(ae.Logger),
handler.SSEStream,
)
}

if ae.Config.GetBool("web.allow_unsafe_endpoints") {
Expand Down

0 comments on commit 9e1c745

Please sign in to comment.