Skip to content

Commit

Permalink
Update plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
terryyylim committed Mar 2, 2023
1 parent 367247b commit 4bd1afa
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 60 deletions.
17 changes: 15 additions & 2 deletions api/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,26 @@ components:
topic_name:
type: string
description: Topic name of the PubSub subscription
MessageQueueKind:
description: Kind of message queue
type: string
enum:
- noop
- pubsub
MessageQueueConfig:
type: object
properties:
kind:
$ref: '#/components/schemas/MessageQueueKind'
pub_sub:
$ref: '#/components/schemas/PubSub'
SegmenterConfig:
type: object
TreatmentServiceConfig:
type: object
properties:
pub_sub:
$ref: '#/components/schemas/PubSub'
message_queue_config:
$ref: '#/components/schemas/MessageQueueConfig'
segmenter_config:
$ref: '#/components/schemas/SegmenterConfig'
SelectedTreatmentData:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 54 additions & 35 deletions common/api/schema/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 19 additions & 4 deletions management-service/services/configuration_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,30 @@ type configurationService struct {
func NewConfigurationService(cfg *config.Config) ConfigurationService {
var segmenterConfig schema.SegmenterConfig = cfg.SegmenterConfig

return &configurationService{
var messageQueueKind schema.MessageQueueKind
switch cfg.MessageQueueConfig.Kind {
case "pubsub":
messageQueueKind = schema.MessageQueueKindPubsub
case "":
messageQueueKind = schema.MessageQueueKindNoop
}

configurationSvc := &configurationService{
treatmentServiceConfig: schema.TreatmentServiceConfig{
PubSub: &schema.PubSub{
Project: &cfg.MessageQueueConfig.PubSubConfig.Project,
TopicName: &cfg.MessageQueueConfig.PubSubConfig.TopicName,
MessageQueueConfig: &schema.MessageQueueConfig{
Kind: &messageQueueKind,
},
SegmenterConfig: &segmenterConfig,
},
}
if cfg.MessageQueueConfig.Kind == "pubsub" {
configurationSvc.treatmentServiceConfig.MessageQueueConfig.PubSub = &schema.PubSub{
Project: &cfg.MessageQueueConfig.PubSubConfig.Project,
TopicName: &cfg.MessageQueueConfig.PubSubConfig.TopicName,
}
}

return configurationSvc
}

func (svc configurationService) GetTreatmentServiceConfig() schema.TreatmentServiceConfig {
Expand Down
11 changes: 8 additions & 3 deletions management-service/services/configuration_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func (s *ConfigurationServiceTestSuite) SetupSuite() {

cfg := config.Config{
MessageQueueConfig: &config.MessageQueueConfig{
Kind: "pubsub",
PubSubConfig: &config.PubSubConfig{
Project: "dev",
TopicName: "xp-update",
Expand All @@ -41,13 +42,17 @@ func TestConfigurationService(t *testing.T) {
}

func (s *ConfigurationServiceTestSuite) TestGetTreatmentServicePluginConfig() {
messageQueueKind := schema.MessageQueueKindPubsub
pubSubConfigProject := "dev"
pubSubConfigTopicName := "xp-update"

expectedConfiguration := schema.TreatmentServiceConfig{
PubSub: &schema.PubSub{
Project: &pubSubConfigProject,
TopicName: &pubSubConfigTopicName,
MessageQueueConfig: &schema.MessageQueueConfig{
Kind: &messageQueueKind,
PubSub: &schema.PubSub{
Project: &pubSubConfigProject,
TopicName: &pubSubConfigTopicName,
},
},
SegmenterConfig: &schema.SegmenterConfig{
"s2_ids": map[string]interface{}{
Expand Down
2 changes: 1 addition & 1 deletion management-service/services/message_queue_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewMessageQueueService(mqConfig *config.MessageQueueConfig) (MessageQueueSe
case config.PubSubMQ:
mq, err = NewPubSubPublisherService(mqConfig.PubSubConfig)
default:
return nil, fmt.Errorf("invalid message queue config (%s) was provided", mqConfig.Kind)
return nil, fmt.Errorf("invalid message queue kind (%s) was provided", mqConfig.Kind)
}
if err != nil {
return nil, err
Expand Down
31 changes: 23 additions & 8 deletions plugins/turing/manager/experiment_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (em *experimentManager) MakeTreatmentServicePluginConfig(
treatmentServiceConfig *schema.TreatmentServiceConfig,
projectID int,
) (*config.Config, error) {
return &config.Config{
pluginConfig := &config.Config{
Port: em.TreatmentServicePluginConfig.Port,
ProjectIds: []string{strconv.Itoa(projectID)},
AssignedTreatmentLogger: em.TreatmentServicePluginConfig.AssignedTreatmentLogger,
Expand All @@ -153,13 +153,28 @@ func (em *experimentManager) MakeTreatmentServicePluginConfig(
SwaggerConfig: em.TreatmentServicePluginConfig.SwaggerConfig,
NewRelicConfig: em.TreatmentServicePluginConfig.NewRelicConfig,
SentryConfig: em.TreatmentServicePluginConfig.SentryConfig,
PubSub: config.PubSub{
Project: *treatmentServiceConfig.PubSub.Project,
TopicName: *treatmentServiceConfig.PubSub.TopicName,
PubSubTimeoutSeconds: em.TreatmentServicePluginConfig.PubSubTimeoutSeconds,
},
SegmenterConfig: *treatmentServiceConfig.SegmenterConfig,
}, nil
SegmenterConfig: *treatmentServiceConfig.SegmenterConfig,
}
messageQueueKind := *treatmentServiceConfig.MessageQueueConfig.Kind
switch messageQueueKind {
case schema.MessageQueueKindPubsub:
pluginConfig.MessageQueueConfig = config.MessageQueueConfig{
Kind: "pubsub",
PubSubConfig: config.PubSub{
Project: *treatmentServiceConfig.MessageQueueConfig.PubSub.Project,
TopicName: *treatmentServiceConfig.MessageQueueConfig.PubSub.TopicName,
PubSubTimeoutSeconds: em.TreatmentServicePluginConfig.PubSubTimeoutSeconds,
},
}
case schema.MessageQueueKindNoop:
pluginConfig.MessageQueueConfig = config.MessageQueueConfig{
Kind: "",
}
default:
return nil, fmt.Errorf("invalid message queue kind (%s) was provided", messageQueueKind)
}

return pluginConfig, nil
}

func NewExperimentManager(configData json.RawMessage) (manager.CustomExperimentManager, error) {
Expand Down
4 changes: 2 additions & 2 deletions plugins/turing/runner/experiment_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,9 @@ func (er *experimentRunner) startBackgroundServices(
errChannel chan error,
) {
backgroundSvcCtx := context.Background()
if er.appContext.ExperimentSubscriber != nil {
if er.appContext.MessageQueueService != nil {
go func() {
err := er.appContext.ExperimentSubscriber.SubscribeToManagementService(backgroundSvcCtx)
err := er.appContext.MessageQueueService.SubscribeToManagementService(backgroundSvcCtx)
if err != nil {
errChannel <- err
}
Expand Down
6 changes: 3 additions & 3 deletions treatment-service/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ type MessageQueueConfig struct {
}

type PubSub struct {
Project string `json:"project" default:"dev" validate:"required"`
TopicName string `json:"topic_name" default:"xp-update" validate:"required"`
PubSubTimeoutSeconds int `json:"pub_sub_timeout_seconds" default:"30" validate:"required"`
Project string `json:"project" default:"dev"`
TopicName string `json:"topic_name" default:"xp-update"`
PubSubTimeoutSeconds int `json:"pub_sub_timeout_seconds" default:"30"`
}

type ManagementServiceConfig struct {
Expand Down

0 comments on commit 4bd1afa

Please sign in to comment.