Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update events logic to allow mapping specific events to specific tags #333

Merged
merged 2 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/abstractions/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type CmdContainerService struct {
rdb *common.RedisClient
tailscale *network.Tailscale
config types.AppConfig
eventRepo repository.EventRepository
}

type ContainerServiceOpts struct {
Expand All @@ -50,6 +51,7 @@ type ContainerServiceOpts struct {
Tailscale *network.Tailscale
Scheduler *scheduler.Scheduler
RedisClient *common.RedisClient
EventRepo repository.EventRepository
}

func NewContainerService(
Expand All @@ -69,6 +71,7 @@ func NewContainerService(
keyEventManager: keyEventManager,
tailscale: opts.Tailscale,
config: opts.Config,
eventRepo: opts.EventRepo,
}

return cs, nil
Expand All @@ -86,6 +89,8 @@ func (cs *CmdContainerService) ExecuteCommand(in *pb.CommandExecutionRequest, st
return err
}

go cs.eventRepo.PushRunStubEvent(authInfo.Workspace.ExternalId, &stub.Stub)

task, err := cs.backendRepo.CreateTask(ctx, &types.TaskParams{
WorkspaceId: authInfo.Workspace.Id,
StubId: stub.Stub.Id,
Expand Down
3 changes: 3 additions & 0 deletions pkg/abstractions/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type HttpEndpointService struct {
scheduler *scheduler.Scheduler
backendRepo repository.BackendRepository
containerRepo repository.ContainerRepository
eventRepo repository.EventRepository
taskRepo repository.TaskRepository
endpointInstances *common.SafeMap[*endpointInstance]
tailscale *network.Tailscale
Expand All @@ -63,6 +64,7 @@ type EndpointServiceOpts struct {
RouteGroup *echo.Group
Tailscale *network.Tailscale
TaskDispatcher *task.Dispatcher
EventRepo repository.EventRepository
}

func NewHTTPEndpointService(
Expand Down Expand Up @@ -92,6 +94,7 @@ func NewHTTPEndpointService(
endpointInstances: common.NewSafeMap[*endpointInstance](),
tailscale: opts.Tailscale,
taskDispatcher: opts.TaskDispatcher,
eventRepo: opts.EventRepo,
}

// Listen for container events with a certain prefix
Expand Down
2 changes: 2 additions & 0 deletions pkg/abstractions/endpoint/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func (es *HttpEndpointService) StartEndpointServe(in *pb.StartEndpointServeReque
return err
}

go es.eventRepo.PushServeStubEvent(instance.Workspace.ExternalId, &instance.Stub.Stub)

// Set lock (used by autoscaler to scale up the single serve container)
instance.Rdb.SetEx(
context.Background(),
Expand Down
13 changes: 13 additions & 0 deletions pkg/abstractions/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type RunCFunctionService struct {
keyEventManager *common.KeyEventManager
rdb *common.RedisClient
routeGroup *echo.Group
eventRepo repository.EventRepository
}

type FunctionServiceOpts struct {
Expand All @@ -62,6 +63,7 @@ type FunctionServiceOpts struct {
Tailscale *network.Tailscale
RouteGroup *echo.Group
TaskDispatcher *task.Dispatcher
EventRepo repository.EventRepository
}

func NewRuncFunctionService(ctx context.Context,
Expand All @@ -83,6 +85,7 @@ func NewRuncFunctionService(ctx context.Context,
keyEventManager: keyEventManager,
taskDispatcher: opts.TaskDispatcher,
routeGroup: opts.RouteGroup,
eventRepo: opts.EventRepo,
}

// Register task dispatcher
Expand All @@ -106,6 +109,16 @@ func (fs *RunCFunctionService) FunctionInvoke(in *pb.FunctionInvokeRequest, stre
return err
}

go func() {
stub, err := fs.backendRepo.GetStubByExternalId(ctx, in.StubId)
if err != nil {
log.Printf("error getting stub: %v", err)
return
}

go fs.eventRepo.PushRunStubEvent(authInfo.Workspace.ExternalId, &stub.Stub)
}()

return fs.stream(ctx, stream, authInfo, task)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/abstractions/taskqueue/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ func (tq *RedisTaskQueue) StartTaskQueueServe(in *pb.StartTaskQueueServeRequest,
return err
}

go tq.eventRepo.PushServeStubEvent(instance.Workspace.ExternalId, &instance.Stub.Stub)

// Set lock (used by autoscaler to scale up the single serve container)
instance.Rdb.SetEx(
context.Background(),
Expand Down
3 changes: 3 additions & 0 deletions pkg/abstractions/taskqueue/taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type TaskQueueServiceOpts struct {
Tailscale *network.Tailscale
RouteGroup *echo.Group
TaskDispatcher *task.Dispatcher
EventRepo repository.EventRepository
}

const (
Expand All @@ -67,6 +68,7 @@ type RedisTaskQueue struct {
keyEventManager *common.KeyEventManager
queueClient *taskQueueClient
tailscale *network.Tailscale
eventRepo repository.EventRepository
}

func NewRedisTaskQueueService(
Expand Down Expand Up @@ -98,6 +100,7 @@ func NewRedisTaskQueueService(
queueClient: newRedisTaskQueueClient(opts.RedisClient, opts.TaskRepo),
queueInstances: common.NewSafeMap[*taskQueueInstance](),
tailscale: opts.Tailscale,
eventRepo: opts.EventRepo,
}

// Listen for container events with a certain prefix
Expand Down
5 changes: 4 additions & 1 deletion pkg/common/config.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,15 @@ monitoring:
port: 9090
fluentbit:
events:
endpoint: http://fluent-bit.beta9:9880
endpoint: http://fluent-bit.monitoring:9880
maxConns: 0
maxIdleConns: 30
idleConnTimeout: 10s
dialTimeout: 2s
keepAlive: 30s
# mapping:
# - name: container.lifecycle
# tag: internal_api
openmeter:
serverUrl: ""
apiKey: ""
8 changes: 8 additions & 0 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Gateway struct {
ContainerRepo repository.ContainerRepository
BackendRepo repository.BackendRepository
ProviderRepo repository.ProviderRepository
EventRepo repository.EventRepository
Tailscale *network.Tailscale
metricsRepo repository.MetricsRepository
Storage storage.Storage
Expand Down Expand Up @@ -115,6 +116,7 @@ func NewGateway() (*Gateway, error) {
return nil, err
}

eventRepo := repository.NewTCPEventClientRepo(config.Monitoring.FluentBit.Events)
containerRepo := repository.NewContainerRedisRepository(redisClient)
providerRepo := repository.NewProviderRedisRepository(redisClient)
taskRepo := repository.NewTaskRedisRepository(redisClient)
Expand All @@ -133,6 +135,7 @@ func NewGateway() (*Gateway, error) {
gateway.Tailscale = tailscale
gateway.TaskDispatcher = taskDispatcher
gateway.metricsRepo = metricsRepo
gateway.EventRepo = eventRepo

return gateway, nil
}
Expand Down Expand Up @@ -233,6 +236,7 @@ func (g *Gateway) registerServices() error {
Tailscale: g.Tailscale,
RouteGroup: g.rootRouteGroup,
TaskDispatcher: g.TaskDispatcher,
EventRepo: g.EventRepo,
})
if err != nil {
return err
Expand All @@ -250,6 +254,7 @@ func (g *Gateway) registerServices() error {
Tailscale: g.Tailscale,
RouteGroup: g.rootRouteGroup,
TaskDispatcher: g.TaskDispatcher,
EventRepo: g.EventRepo,
})
if err != nil {
return err
Expand All @@ -266,6 +271,7 @@ func (g *Gateway) registerServices() error {
RouteGroup: g.rootRouteGroup,
Tailscale: g.Tailscale,
TaskDispatcher: g.TaskDispatcher,
EventRepo: g.EventRepo,
})
if err != nil {
return err
Expand All @@ -289,6 +295,7 @@ func (g *Gateway) registerServices() error {
Tailscale: g.Tailscale,
Scheduler: g.Scheduler,
RedisClient: g.RedisClient,
EventRepo: g.EventRepo,
},
)
if err != nil {
Expand Down Expand Up @@ -324,6 +331,7 @@ func (g *Gateway) registerServices() error {
Scheduler: g.Scheduler,
TaskDispatcher: g.TaskDispatcher,
RedisClient: g.RedisClient,
EventRepo: g.EventRepo,
})
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions pkg/gateway/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type GatewayService struct {
scheduler *scheduler.Scheduler
taskDispatcher *task.Dispatcher
redisClient *common.RedisClient
eventRepo repository.EventRepository
pb.UnimplementedGatewayServiceServer
}

Expand All @@ -28,6 +29,7 @@ type GatewayServiceOpts struct {
Scheduler *scheduler.Scheduler
TaskDispatcher *task.Dispatcher
RedisClient *common.RedisClient
EventRepo repository.EventRepository
}

func NewGatewayService(opts *GatewayServiceOpts) (*GatewayService, error) {
Expand All @@ -39,5 +41,6 @@ func NewGatewayService(opts *GatewayServiceOpts) (*GatewayService, error) {
scheduler: opts.Scheduler,
taskDispatcher: opts.TaskDispatcher,
redisClient: opts.RedisClient,
eventRepo: opts.EventRepo,
}, nil
}
2 changes: 2 additions & 0 deletions pkg/gateway/services/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func (gws *GatewayService) DeployStub(ctx context.Context, in *pb.DeployStubRequ
}, nil
}

go gws.eventRepo.PushDeployStubEvent(authInfo.Workspace.ExternalId, &stub.Stub)

return &pb.DeployStubResponse{
Ok: true,
DeploymentId: deployment.ExternalId,
Expand Down
3 changes: 3 additions & 0 deletions pkg/repository/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ type EventRepository interface {
PushContainerStoppedEvent(containerID string, workerID string)
PushWorkerStartedEvent(workerID string)
PushWorkerStoppedEvent(workerID string)
PushDeployStubEvent(workspaceId string, stub *types.Stub)
PushServeStubEvent(workspaceId string, stub *types.Stub)
PushRunStubEvent(workspaceId string, stub *types.Stub)
}

type MetricsRepository interface {
Expand Down
59 changes: 56 additions & 3 deletions pkg/repository/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
type TCPEventClientRepo struct {
config types.FluentBitEventConfig
endpointAvailable bool
eventTagMap map[string]string
}

func NewTCPEventClientRepo(config types.FluentBitEventConfig) EventRepository {
Expand All @@ -25,9 +26,16 @@ func NewTCPEventClientRepo(config types.FluentBitEventConfig) EventRepository {
log.Println("[WARNING] fluentbit host does not appear to be up, events will be dropped")
}

// Parse event mapping
eventTagMap := make(map[string]string)
luke-lombardi marked this conversation as resolved.
Show resolved Hide resolved
for _, mapping := range config.Mapping {
eventTagMap[mapping.Name] = mapping.Tag
}

return &TCPEventClientRepo{
config: config,
endpointAvailable: endpointAvailable,
eventTagMap: eventTagMap,
}
}

Expand All @@ -50,7 +58,7 @@ func (t *TCPEventClientRepo) createEventObject(eventName string, schemaVersion s

event := cloudevents.NewEvent()
event.SetID(objectId)
event.SetSource("beam-cloud")
event.SetSource("beta9-cluster")
event.SetType(eventName)
event.SetSpecVersion(schemaVersion)
event.SetTime(time.Now())
Expand All @@ -76,7 +84,13 @@ func (t *TCPEventClientRepo) pushEvent(eventName string, schemaVersion string, d
return
}

resp, err := http.Post(t.config.Endpoint, "application/json", bytes.NewBuffer(eventBytes))
var tag string
tag, ok := t.eventTagMap[eventName]
if !ok {
tag = ""
}

resp, err := http.Post(t.config.Endpoint+"/"+tag, "application/json", bytes.NewBuffer(eventBytes))
if err != nil {
log.Println("failed to send payload to event server:", err)
return
Expand Down Expand Up @@ -152,10 +166,49 @@ func (t *TCPEventClientRepo) PushWorkerStartedEvent(workerID string) {
func (t *TCPEventClientRepo) PushWorkerStoppedEvent(workerID string) {
t.pushEvent(
types.EventWorkerLifecycle,
types.EventWorkerLifecycle,
types.EventWorkerLifecycleSchemaVersion,
types.EventWorkerLifecycleSchema{
WorkerID: workerID,
Status: types.EventWorkerLifecycleStopped,
},
)
}

func (t *TCPEventClientRepo) PushDeployStubEvent(workspaceId string, stub *types.Stub) {
t.pushEvent(
types.EventStubDeploy,
types.EventStubSchemaVersion,
types.EventStubSchema{
ID: stub.ExternalId,
StubType: stub.Type,
StubConfig: stub.Config,
WorkspaceID: workspaceId,
},
)
}

func (t *TCPEventClientRepo) PushServeStubEvent(workspaceId string, stub *types.Stub) {
t.pushEvent(
types.EventStubServe,
types.EventStubSchemaVersion,
types.EventStubSchema{
ID: stub.ExternalId,
StubType: stub.Type,
StubConfig: stub.Config,
WorkspaceID: workspaceId,
},
)
}

func (t *TCPEventClientRepo) PushRunStubEvent(workspaceId string, stub *types.Stub) {
t.pushEvent(
types.EventStubRun,
types.EventStubSchemaVersion,
types.EventStubSchema{
ID: stub.ExternalId,
StubType: stub.Type,
StubConfig: stub.Config,
WorkspaceID: workspaceId,
},
)
}
1 change: 1 addition & 0 deletions pkg/types/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ const (
StubTypeFunction string = "function"
StubTypeFunctionDeployment string = "function/deployment"
StubTypeFunctionServe string = "function/serve"
StubTypeContainer string = "container"
StubTypeTaskQueue string = "taskqueue"
StubTypeTaskQueueDeployment string = "taskqueue/deployment"
StubTypeTaskQueueServe string = "taskqueue/serve"
Expand Down
18 changes: 12 additions & 6 deletions pkg/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,17 @@ type FluentBitConfig struct {
Events FluentBitEventConfig `key:"events" json:"events"`
}

type FluentBitEventMapping struct {
Name string `key:"name" json:"name"`
Tag string `key:"tag" json:"tag"`
}

type FluentBitEventConfig struct {
Endpoint string `key:"endpoint" json:"endpoint"`
MaxConns int `key:"maxConns" json:"max_conns"`
MaxIdleConns int `key:"maxIdleConns" json:"max_idle_conns"`
IdleConnTimeout time.Duration `key:"idleConnTimeout" json:"idle_conn_timeout"`
DialTimeout time.Duration `key:"dialTimeout" json:"dial_timeout"`
KeepAlive time.Duration `key:"keepAlive" json:"keep_alive"`
Endpoint string `key:"endpoint" json:"endpoint"`
MaxConns int `key:"maxConns" json:"max_conns"`
MaxIdleConns int `key:"maxIdleConns" json:"max_idle_conns"`
IdleConnTimeout time.Duration `key:"idleConnTimeout" json:"idle_conn_timeout"`
DialTimeout time.Duration `key:"dialTimeout" json:"dial_timeout"`
KeepAlive time.Duration `key:"keepAlive" json:"keep_alive"`
Mapping []FluentBitEventMapping `key:"mapping" json:"mapping"`
}
Loading
Loading