Skip to content

Commit

Permalink
Add support for publication field and example configuration for Susta…
Browse files Browse the repository at this point in the history
…inable Views
  • Loading branch information
asparuhft authored and ManoelMilchev committed Nov 7, 2023
1 parent 4cff6ab commit 5ef0118
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 34 deletions.
25 changes: 20 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"io"
"os"
"regexp"
"slices"
)

type OriginSystemConfig struct {
ContentType string `json:"content_type,binding:required"`
Collection string `json:"collection,binding:required"`
ContentType string `json:"content_type,binding:required"`
Publication []string `json:"publication"`
Collection string `json:"collection,binding:required"`
contentTypeRegexp *regexp.Regexp
}

Expand All @@ -34,17 +36,17 @@ func (c *Configuration) validateConfig() error {
return nil
}

func (c *Configuration) GetCollection(originID string, contentType string) (string, error) {
func (c *Configuration) GetCollection(originID string, contentType string, publication []interface{}) (string, error) {
collection := c.Config[originID]
if len(collection) == 0 {
return "", errors.New("origin system not found")
}
for _, val := range collection {
if val.contentTypeRegexp.MatchString(contentType) {
if val.contentTypeRegexp.MatchString(contentType) && publicationMatch(publication, val) {
return val.Collection, nil
}
}
return "", errors.New("origin system and content type not configured")
return "", errors.New("origin system, content type and publication not configured")
}

// ReadConfigFromReader reads config as a json stream from the given reader
Expand All @@ -67,3 +69,16 @@ func ReadConfig(confPath string) (c *Configuration, e error) {
defer file.Close()
return ReadConfigFromReader(file)
}

func publicationMatch(publication []interface{}, config OriginSystemConfig) bool {
if len(config.Publication) == 0 {
return true
}

for _, pub := range publication {
if slices.Contains(config.Publication, pub.(string)) {
return true
}
}
return false
}
57 changes: 45 additions & 12 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func TestConfiguration_GetCollection(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

got, err := c.GetCollection(tt.args.originID, tt.args.contentType)
got, err := c.GetCollection(tt.args.originID, tt.args.contentType, nil)
if (err != nil) != tt.wantErr {
t.Errorf("Configuration.GetCollection() error = %v, wantErr %v", err, tt.wantErr)
return
Expand All @@ -418,6 +418,7 @@ func TestConfigurationMetadata_GetCollection(t *testing.T) {
type args struct {
originID string
contentType string
publication []interface{}
}
c := &Configuration{
Config: map[string][]OriginSystemConfig{
Expand All @@ -431,6 +432,12 @@ func TestConfigurationMetadata_GetCollection(t *testing.T) {
Collection: "video-metadata",
},
},
"http://cmdb.ft.com/systems/cct": {
{ContentType: ".*",
Publication: []string{"8e6c705e-1132-42a2-8db0-c295e29e8658"},
Collection: "external-metadata",
},
},
},
}
err := c.validateConfig()
Expand All @@ -447,78 +454,104 @@ func TestConfigurationMetadata_GetCollection(t *testing.T) {
{
"pac json",
args{"http://cmdb.ft.com/systems/pac",
"application/json"},
"application/json",
nil},
"pac-metadata",
false,
},
{
"pac null CT",
args{"http://cmdb.ft.com/systems/pac",
""},
"",
nil},
"pac-metadata",
false,
},
{
"pac",
args{"http://cmdb.ft.com/systems/pac",
"anytype"},
"anytype",
nil},
"pac-metadata",
false,
},
{
"video wrong CT",
args{"http://cmdb.ft.com/systems/next-video-editor",
"anytype"},
"anytype",
nil},
"",
true,
},
{
"video OK",
args{"http://cmdb.ft.com/systems/next-video-editor",
"application/json"},
"application/json",
nil},
"video-metadata",
false,
},
{
"video OK long CT",
args{"http://cmdb.ft.com/systems/next-video-editor",
"application/json; utf8"},
"application/json; utf8",
nil},
"video-metadata",
false,
},
{
"audio OK",
args{"http://cmdb.ft.com/systems/next-video-editor",
"application/vnd.ft-upp-audio+json"},
"application/vnd.ft-upp-audio+json",
nil},
"",
true,
},
{
"audio long CT",
args{"http://cmdb.ft.com/systems/next-video-editor",
"application/vnd.ft-upp-audio+json;UTF8"},
"application/vnd.ft-upp-audio+json;UTF8",
nil},
"",
true,
},
{
"audio wrong CT",
args{"http://cmdb.ft.com/systems/next-video-editor",
"application/vnd.ft-upp-audio-json"},
"application/vnd.ft-upp-audio-json",
nil},
"",
true,
},
{
"wrong origin",
args{"http://cmdb.ft.com/systems/next",
"application/vnd.ft-upp-audio+json"},
"application/vnd.ft-upp-audio+json",
nil},
"",
true,
},
{
"sustainable views ok",
args{"http://cmdb.ft.com/systems/cct",
"",
[]interface{}{"8e6c705e-1132-42a2-8db0-c295e29e8658"}},
"external-metadata",
false,
},
{
"sustainable views wrong publication",
args{"http://cmdb.ft.com/systems/cct",
"",
[]interface{}{"8e6c705e-1132-42a2-8db0-c295e29e8659"}},
"",
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

got, err := c.GetCollection(tt.args.originID, tt.args.contentType)
got, err := c.GetCollection(tt.args.originID, tt.args.contentType, tt.args.publication)
if (err != nil) != tt.wantErr {
t.Errorf("Configuration.GetCollection() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
7 changes: 7 additions & 0 deletions config_metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,12 @@
"content_type": "^(application/json).*$",
"collection": "video-metadata"
}
],
"http://cmdb.ft.com/systems/cct": [
{
"content_type": ".*",
"publication": ["8e6c705e-1132-42a2-8db0-c295e29e8658"],
"collection": "external-metadata"
}
]
}
4 changes: 2 additions & 2 deletions mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ type WriterMock struct {
mock.Mock
}

func (w *WriterMock) GetCollection(originID string, contentType string) (string, error) {
args := w.Called(originID, contentType)
func (w *WriterMock) GetCollection(originID string, contentType string, publication []interface{}) (string, error) {
args := w.Called(originID, contentType, publication)
return args.String(0), args.Error(1)
}

Expand Down
21 changes: 18 additions & 3 deletions native/native_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ const (
messageTypePartialContentPublished = "cms-partial-content-published"
schemaVersionHeader = "X-Schema-Version"
contentRevisionHeader = "X-Content-Revision"
publicationBodyField = "publication"
)

// Writer provides the functionalities to write in the native store
type Writer interface {
GetCollection(originID string, contentType string) (string, error)
GetCollection(originID string, contentType string, publication []interface{}) (string, error)
WriteToCollection(msg NativeMessage, collection string) (string, string, error)
ConnectivityCheck() (string, error)
}
Expand All @@ -46,8 +47,8 @@ func NewWriter(address string, collectionsOriginIdsMap config.Configuration, par
return &nativeWriter{address, collections, http.Client{}, parser, logger}
}

func (nw *nativeWriter) GetCollection(originID string, contentType string) (string, error) {
return nw.collections.GetCollection(originID, contentType)
func (nw *nativeWriter) GetCollection(originID string, contentType string, publication []interface{}) (string, error) {
return nw.collections.GetCollection(originID, contentType, publication)
}

func (nw *nativeWriter) WriteToCollection(msg NativeMessage, collection string) (string, string, error) {
Expand Down Expand Up @@ -218,3 +219,17 @@ func (msg *NativeMessage) ContentRevision() string {
func (msg *NativeMessage) IsPartialContent() bool {
return msg.headers[messageTypeHeader] == messageTypePartialContentPublished
}

func (msg *NativeMessage) Publication() []interface{} {
publication, exists := msg.body[publicationBodyField]
if !exists {
return nil
}

publicationArray, ok := publication.([]interface{})
if !ok {
return nil
}

return publicationArray
}
10 changes: 5 additions & 5 deletions native/native_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ func TestGetCollectionShort(t *testing.T) {

w := NewWriter("", *testCollectionsOriginIdsMap, p, log)

actualCollection, err := w.GetCollection(cctOriginSystemID, aContentType)
actualCollection, err := w.GetCollection(cctOriginSystemID, aContentType, nil)
assert.NoError(t, err, "It should not return an error")
assert.Equal(t, universalContentCollectionName, actualCollection, "It should return the universal-content collection")

_, err = w.GetCollection("Origin-Id-that-do-not-exist", aContentType)
_, err = w.GetCollection("Origin-Id-that-do-not-exist", aContentType, nil)
assert.EqualError(t, err, "origin system not found", "It should return a collection not found error")
p.AssertExpectations(t)
}
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestGetCollection(t *testing.T) {
}
for _, tt := range tests {
t.Run("Test", func(t *testing.T) {
actualCollection, err := w.GetCollection(cctOriginSystemID, tt.contentType)
actualCollection, err := w.GetCollection(cctOriginSystemID, tt.contentType, nil)
if (err != nil) != tt.wantErr {
t.Errorf("TestGetVideoCollection() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestGetAllCollection(t *testing.T) {
}
for _, tt := range tests {
t.Run("Test", func(t *testing.T) {
actualCollection, err := w.GetCollection(cctOriginSystemID, tt.contentType)
actualCollection, err := w.GetCollection(cctOriginSystemID, tt.contentType, nil)
if (err != nil) != tt.wantErr {
t.Errorf("TestGetVideoCollection() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestGetVideoCollection(t *testing.T) {
}
for _, tt := range tests {
t.Run("Test", func(t *testing.T) {
actualCollection, err := w.GetCollection(o, tt.contentType)
actualCollection, err := w.GetCollection(o, tt.contentType, nil)
if (err != nil) != tt.wantErr {
t.Errorf("TestGetVideoCollection() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
2 changes: 1 addition & 1 deletion queue/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (mh *MessageHandler) HandleMessage(msg kafka.FTMessage) {
return
}

collection, err := mh.writer.GetCollection(pubEvent.originSystemID(), writerMsg.ContentType())
collection, err := mh.writer.GetCollection(pubEvent.originSystemID(), writerMsg.ContentType(), writerMsg.Publication())
if err != nil {
logMonitoringEvent.
WithValidFlag(false).
Expand Down
12 changes: 6 additions & 6 deletions queue/message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var badBodyMsg = kafka.FTMessage{
func TestWriteToNativeSuccessfullyWithoutForward(t *testing.T) {
log := logger.NewUnstructuredLogger()
w := new(mocks.WriterMock)
w.On("GetCollection", cctOriginSystemID, contentType).Return(universalContentCollection, nil)
w.On("GetCollection", cctOriginSystemID, contentType, []interface{}(nil)).Return(universalContentCollection, nil)
w.On("WriteToCollection", mock.AnythingOfType("native.NativeMessage"), universalContentCollection).Return("", "", nil)

p := new(mocks.ProducerMock)
Expand All @@ -56,7 +56,7 @@ func TestWriteToNativeSuccessfullyWithoutForward(t *testing.T) {
func TestWriteToNativeSuccessfullyWithForward(t *testing.T) {
log := logger.NewUnstructuredLogger()
w := new(mocks.WriterMock)
w.On("GetCollection", cctOriginSystemID, contentType).Return(universalContentCollection, nil)
w.On("GetCollection", cctOriginSystemID, contentType, []interface{}(nil)).Return(universalContentCollection, nil)
w.On("WriteToCollection", mock.AnythingOfType("native.NativeMessage"), universalContentCollection).Return("", "", nil)

p := new(mocks.ProducerMock)
Expand All @@ -82,7 +82,7 @@ func TestWritePartialContentToNativeSuccessfullyWithForward(t *testing.T) {
Headers: goodMsgPartialUpdated.Headers,
}

w.On("GetCollection", cctOriginSystemID, contentType).Return(universalContentCollection, nil)
w.On("GetCollection", cctOriginSystemID, contentType, []interface{}(nil)).Return(universalContentCollection, nil)
w.On("WriteToCollection", mock.AnythingOfType("native.NativeMessage"), universalContentCollection).Return("", updatedBody, nil)

p := new(mocks.ProducerMock)
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestWriteToNativeFailWithBadBodyMessage(t *testing.T) {
func TestWriteToNativeFailWithNotCollectionForOriginId(t *testing.T) {
log := logger.NewUnstructuredLogger()
w := new(mocks.WriterMock)
w.On("GetCollection", cctOriginSystemID, contentType).Return("", errors.New("Collection Not Found"))
w.On("GetCollection", cctOriginSystemID, contentType, []interface{}(nil)).Return("", errors.New("Collection Not Found"))

p := new(mocks.ProducerMock)

Expand All @@ -129,7 +129,7 @@ func TestWriteToNativeFailWithNotCollectionForOriginId(t *testing.T) {
func TestWriteToNativeFailBecauseOfWriter(t *testing.T) {
log := logger.NewUnstructuredLogger()
w := new(mocks.WriterMock)
w.On("GetCollection", cctOriginSystemID, contentType).Return(universalContentCollection, nil)
w.On("GetCollection", cctOriginSystemID, contentType, []interface{}(nil)).Return(universalContentCollection, nil)
w.On("WriteToCollection", mock.AnythingOfType("native.NativeMessage"), universalContentCollection).Return("", "", errors.New("today I do not want to write"))

p := new(mocks.ProducerMock)
Expand All @@ -149,7 +149,7 @@ func TestForwardFailBecauseOfProducer(t *testing.T) {
hook.SetOutput(&buf)

w := new(mocks.WriterMock)
w.On("GetCollection", cctOriginSystemID, contentType).Return(universalContentCollection, nil)
w.On("GetCollection", cctOriginSystemID, contentType, []interface{}(nil)).Return(universalContentCollection, nil)
w.On("WriteToCollection", mock.AnythingOfType("native.NativeMessage"), universalContentCollection).Return("", "", nil)

p := new(mocks.ProducerMock)
Expand Down

0 comments on commit 5ef0118

Please sign in to comment.