From 8986950670788b310a007ea535d93f8a59c06768 Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Mon, 16 Mar 2020 18:30:02 +0200 Subject: [PATCH] UPPSF-1100 Added spark and cct origins and authorities --- cmd/content-rw-elasticsearch/main.go | 12 +-- configs/app.yml | 37 ++++--- go.mod | 1 + go.sum | 3 + pkg/config/config.go | 45 +++++---- pkg/es/client.go | 5 +- pkg/es/service.go | 4 + pkg/mapper/mapper.go | 4 +- pkg/message/message_handler.go | 138 ++++++++++++++------------- pkg/message/message_handler_test.go | 14 ++- 10 files changed, 153 insertions(+), 110 deletions(-) diff --git a/cmd/content-rw-elasticsearch/main.go b/cmd/content-rw-elasticsearch/main.go index 294b33f..14b4c0c 100644 --- a/cmd/content-rw-elasticsearch/main.go +++ b/cmd/content-rw-elasticsearch/main.go @@ -3,7 +3,7 @@ package main import ( - nethttp "net/http" + "net/http" "os" "time" @@ -11,7 +11,7 @@ import ( "github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/config" "github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/es" "github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/health" - "github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/http" + pkghttp "github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/http" "github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/mapper" "github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/message" "github.com/Financial-Times/go-logger/v2" @@ -129,7 +129,7 @@ func main() { Endpoint: *esEndpoint, } - httpClient := http.NewHTTPClient() + httpClient := pkghttp.NewHTTPClient() appConfig, err := config.ParseConfig("app.yml") if err != nil { @@ -137,7 +137,7 @@ func main() { } esService := es.NewService(*indexName) - + concordanceAPIService := concept.NewConcordanceAPIService(*publicConcordancesEndpoint, httpClient) mapperHandler := mapper.NewMapperHandler( @@ -160,9 +160,9 @@ func main() { healthService := health.NewHealthService(&queueConfig, esService, httpClient, concordanceAPIService, *appSystemCode, log) // - serveMux := nethttp.NewServeMux() + serveMux := http.NewServeMux() serveMux = healthService.AttachHTTPEndpoints(serveMux, *appName, config.AppDescription) - http.StartServer(log, serveMux, *port) + pkghttp.StartServer(log, serveMux, *port) handler.Stop() } diff --git a/configs/app.yml b/configs/app.yml index e5a54b3..e6c574a 100644 --- a/configs/app.yml +++ b/configs/app.yml @@ -19,19 +19,32 @@ predicates: hasAuthor: "http://www.ft.com/ontology/annotation/hasAuthor" hasContributor: "http://www.ft.com/ontology/hasContributor" -content: - origin: - methode: "methode-web-pub" - wordpress: "wordpress" - video: "next-video-editor" - pac: "http://cmdb.ft.com/systems/pac" - -authorities: - article: "http://api.ft.com/system/FTCOM-METHODE" - blog: "http://api.ft.com/system/FT-LABS-WP" - video: "http://api.ft.com/system/NEXT-VIDEO-EDITOR" +contentMetadata: + methode: + origin: "methode-web-pub" + authority: "http://api.ft.com/system/FTCOM-METHODE" + contentType: "article" + wordpress: + origin: "wordpress" + authority: "http://api.ft.com/system/FT-LABS-WP" + contentType: "blog" + video: + origin: "next-video-editor" + authority: "http://api.ft.com/system/NEXT-VIDEO-EDITOR" + contentType: "video" + cct: + origin: "http://cmdb.ft.com/systems/cct" + authority: "http://api.ft.com/system/cct" + contentType: "article" + spark: + origin: "http://cmdb.ft.com/systems/spark" + authority: "http://api.ft.com/system/spark" + contentType: "article" + pac: + origin: "http://cmdb.ft.com/systems/pac" + contentType: "article" -esContentTypeMap: +esContentTypeMetadata: article: collection: "FTCom" format: "Articles" diff --git a/go.mod b/go.mod index 1ee7950..ffae900 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/Financial-Times/go-logger/v2 v2.0.1 github.com/Financial-Times/message-queue-gonsumer v0.0.0-20180518165041-cd41937c7566 github.com/Financial-Times/service-status-go v0.0.0-20160323111542-3f5199736a3d + github.com/Financial-Times/transactionid-utils-go v0.2.0 github.com/Financial-Times/uuid-utils-go v0.0.0-20170516110427-e22658edd0f1 github.com/dchest/uniuri v0.0.0-20160212164326-8902c56451e9 github.com/fortytw2/leaktest v1.3.0 // indirect diff --git a/go.sum b/go.sum index ea6b2a8..eb3ff42 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/Financial-Times/message-queue-gonsumer v0.0.0-20180518165041-cd41937c github.com/Financial-Times/message-queue-gonsumer v0.0.0-20180518165041-cd41937c7566/go.mod h1:A88i3psx3Zm80Ai2OYTrwzKkZGKj+x5KL02z+YrRd10= github.com/Financial-Times/service-status-go v0.0.0-20160323111542-3f5199736a3d h1:USNBTIof6vWGM49SYrxvC5Y8NqyDL3YuuYmID81ORZQ= github.com/Financial-Times/service-status-go v0.0.0-20160323111542-3f5199736a3d/go.mod h1:7zULC9rrq6KxFkpB3Y5zNVaEwrf1g2m3dvXJBPDXyvM= +github.com/Financial-Times/transactionid-utils-go v0.2.0 h1:YcET5Hd1fUGWWpQSVszYUlAc15ca8tmjRetUuQKRqEQ= +github.com/Financial-Times/transactionid-utils-go v0.2.0/go.mod h1:tPAcAFs/dR6Q7hBDGNyUyixHRvg/n9NW/JTq8C58oZ0= github.com/Financial-Times/uuid-utils-go v0.0.0-20170516110427-e22658edd0f1 h1:FXM7cqqPyGh2QZ8BRJA16Gr65/+/91KEFSPKyRM+Nd8= github.com/Financial-Times/uuid-utils-go v0.0.0-20170516110427-e22658edd0f1/go.mod h1:i62wLwNq+NmRCQpZS5BLTKsOVYsTOxs9bSx7FgtxXwM= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -168,6 +170,7 @@ golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190522155817-f3200d17e092 h1:4QSRKanuywn15aTZvI/mIDEgPQpswuFndXpOj3rKEco= golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= diff --git a/pkg/config/config.go b/pkg/config/config.go index 2bd7152..9cd9a90 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -24,23 +24,33 @@ const ( AudioType = "audio" ) -type ContentTypeMap map[string]schema.ContentType +type ESContentTypeMetadataMap map[string]schema.ContentType type Map map[string]string +type ContentMetadataMap map[string]ContentMetadata + +type ContentMetadata struct { + Origin string + Authority string + ContentType string +} func (c Map) Get(key string) string { return c[strings.ToLower(key)] } -func (c ContentTypeMap) Get(key string) schema.ContentType { +func (c ESContentTypeMetadataMap) Get(key string) schema.ContentType { + return c[strings.ToLower(key)] +} + +func (c ContentMetadataMap) Get(key string) ContentMetadata { return c[strings.ToLower(key)] } type AppConfig struct { - Predicates Map - ConceptTypes Map - Origins Map - Authorities Map - ContentTypeMap ContentTypeMap + Predicates Map + ConceptTypes Map + ContentMetadataMap ContentMetadataMap + ESContentTypeMetadataMap ESContentTypeMetadataMap } func ParseConfig(configFileName string) (AppConfig, error) { @@ -55,22 +65,25 @@ func ParseConfig(configFileName string) (AppConfig, error) { return AppConfig{}, err } - origins := v.Sub("content").GetStringMapString("origin") + var contentMetadataMap ContentMetadataMap + err = v.UnmarshalKey("contentMetadata", &contentMetadataMap) + if err != nil { + return AppConfig{}, fmt.Errorf("unable to unmarshal %w", err) + } + predicates := v.GetStringMapString("predicates") concepts := v.GetStringMapString("conceptTypes") - authorities := v.GetStringMapString("authorities") - var contentTypeMap ContentTypeMap - err = v.UnmarshalKey("esContentTypeMap", &contentTypeMap) + var contentTypeMetadataMap ESContentTypeMetadataMap + err = v.UnmarshalKey("esContentTypeMetadata", &contentTypeMetadataMap) if err != nil { return AppConfig{}, fmt.Errorf("unable to unmarshal %w", err) } return AppConfig{ - Predicates: predicates, - ConceptTypes: concepts, - Origins: origins, - Authorities: authorities, - ContentTypeMap: contentTypeMap, + Predicates: predicates, + ConceptTypes: concepts, + ContentMetadataMap: contentMetadataMap, + ESContentTypeMetadataMap: contentTypeMetadataMap, }, nil } diff --git a/pkg/es/client.go b/pkg/es/client.go index 6e2a19d..11e7834 100644 --- a/pkg/es/client.go +++ b/pkg/es/client.go @@ -32,7 +32,7 @@ func (a AWSSigningTransport) RoundTrip(req *http.Request) (*http.Response, error return a.HTTPClient.Do(awsauth.Sign4(req, a.Credentials)) } -func NewClient(config AccessConfig, c *http.Client) (Client, error) { +func NewClient(config AccessConfig, c *http.Client, log *logger.UPPLogger) (Client, error) { signingTransport := AWSSigningTransport{ Credentials: awsauth.Credentials{ AccessKeyID: config.AccessKey, @@ -47,6 +47,7 @@ func NewClient(config AccessConfig, c *http.Client) (Client, error) { elastic.SetScheme("https"), elastic.SetHttpClient(signingClient), elastic.SetSniff(false), // needs to be disabled due to EAS behavior. Healthcheck still operates as normal. - elastic.SetErrorLog(logger.NewUnstructuredLogger()), + // elastic.SetErrorLog(logger.NewUnstructuredLogger()), + elastic.SetErrorLog(log), ) } diff --git a/pkg/es/service.go b/pkg/es/service.go index e02bcaf..d147d3b 100644 --- a/pkg/es/service.go +++ b/pkg/es/service.go @@ -107,6 +107,8 @@ func (s *ElasticsearchService) SetClient(client Client) { } func (s *ElasticsearchService) WriteData(conceptType string, uuid string, payload interface{}) (*elastic.IndexResult, error) { + s.mu.Lock() + defer s.mu.Unlock() return s.ElasticClient.Index(). Index(s.IndexName). Type(conceptType). @@ -116,6 +118,8 @@ func (s *ElasticsearchService) WriteData(conceptType string, uuid string, payloa } func (s *ElasticsearchService) DeleteData(conceptType string, uuid string) (*elastic.DeleteResult, error) { + s.mu.Lock() + defer s.mu.Unlock() return s.ElasticClient.Delete(). Index(s.IndexName). Type(conceptType). diff --git a/pkg/mapper/mapper.go b/pkg/mapper/mapper.go index 1914b6d..ffd61cd 100644 --- a/pkg/mapper/mapper.go +++ b/pkg/mapper/mapper.go @@ -173,9 +173,9 @@ func (h *Handler) populateContentRelatedFields(model *schema.IndexModel, enriche model.InternalContentType = new(string) *model.InternalContentType = contentType model.Category = new(string) - *model.Category = h.Config.ContentTypeMap.Get(contentType).Category + *model.Category = h.Config.ESContentTypeMetadataMap.Get(contentType).Category model.Format = new(string) - *model.Format = h.Config.ContentTypeMap.Get(contentType).Format + *model.Format = h.Config.ESContentTypeMetadataMap.Get(contentType).Format model.UID = &(enrichedContent.Content.UUID) model.LeadHeadline = new(string) *model.LeadHeadline = html.TransformText(enrichedContent.Content.Title, diff --git a/pkg/message/message_handler.go b/pkg/message/message_handler.go index c75b2ec..4e8e46e 100644 --- a/pkg/message/message_handler.go +++ b/pkg/message/message_handler.go @@ -7,15 +7,12 @@ import ( "time" "github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/config" + "github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/es" "github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/mapper" "github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/schema" - - "github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/es" "github.com/Financial-Times/go-logger/v2" "github.com/Financial-Times/message-queue-gonsumer/consumer" - "github.com/dchest/uniuri" - - "github.com/stretchr/stew/slice" + transactionid "github.com/Financial-Times/transactionid-utils-go" ) const ( @@ -27,10 +24,7 @@ const ( articleContentTypeHeader = "ft-upp-article" ) -// Empty type added for older content. Placeholders - which are subject of exclusion - have type Content. -var allowedTypes = []string{"Article", "Video", "MediaResource", "Audio", ""} - -type ESClient func(config es.AccessConfig, c *http.Client) (es.Client, error) +type ESClient func(config es.AccessConfig, c *http.Client, log *logger.UPPLogger) (es.Client, error) type Handler struct { esService es.Service @@ -49,26 +43,20 @@ func NewMessageHandler(service es.Service, mapper *mapper.Handler, httpClient *h func (h *Handler) Start(baseAPIURL string, accessConfig es.AccessConfig) { h.Mapper.BaseAPIURL = baseAPIURL - channel := make(chan es.Client) go func() { - defer close(channel) for { - ec, err := h.esClient(accessConfig, h.httpClient) - if err == nil { - h.log.Info("Connected to Elasticsearch") - channel <- ec - return + ec, err := h.esClient(accessConfig, h.httpClient, h.log) + if err != nil { + h.log.Error("Could not connect to Elasticsearch") + time.Sleep(time.Minute) + // channel <- ec + continue } - h.log.Error("Could not connect to Elasticsearch") - time.Sleep(time.Minute) - } - }() - - go func() { - for ec := range channel { h.esService.SetClient(ec) + h.log.Info("Connected to Elasticsearch") // this is a blocking method h.messageConsumer.Start() + return } }() } @@ -81,20 +69,23 @@ func (h *Handler) Stop() { func (h *Handler) handleMessage(msg consumer.Message) { tid := msg.Headers[transactionIDHeader] + log := h.log.WithTransactionID(tid) + if tid == "" { - tid = "tid_" + uniuri.NewLen(10) + "_content-rw-elasticsearch" - h.log.WithTransactionID(tid).Info("Generated tid") + tid = transactionid.NewTransactionID() + log = h.log.WithTransactionID(tid) + log.Info("Generated tid") } if strings.Contains(tid, syntheticRequestPrefix) { - h.log.WithTransactionID(tid).Info("Ignoring synthetic message") + log.Info("Ignoring synthetic message") return } var combinedPostPublicationEvent schema.EnrichedContent err := json.Unmarshal([]byte(msg.Body), &combinedPostPublicationEvent) if err != nil { - h.log.WithTransactionID(tid).WithError(err).Error("Cannot unmarshal message body") + log.WithError(err).Error("Cannot unmarshal message body") return } @@ -103,71 +94,82 @@ func (h *Handler) handleMessage(msg consumer.Message) { combinedPostPublicationEvent.Content.BodyXML = "" } - if !slice.ContainsString(allowedTypes, combinedPostPublicationEvent.Content.Type) { - h.log.WithTransactionID(tid).Infof("Ignoring message of type %s", combinedPostPublicationEvent.Content.Type) + if !isAllowedType(combinedPostPublicationEvent.Content.Type) { + log.Infof("Ignoring message of type %s", combinedPostPublicationEvent.Content.Type) return } uuid := combinedPostPublicationEvent.UUID - h.log.WithTransactionID(tid).WithUUID(uuid).Info("Processing combined post publication event") + log = log.WithUUID(uuid) + log.Info("Processing combined post publication event") - var contentType string - typeHeader := msg.Headers[contentTypeHeader] - if strings.Contains(typeHeader, audioContentTypeHeader) { - contentType = config.AudioType - } else if strings.Contains(typeHeader, articleContentTypeHeader) { - contentType = config.ArticleType - } else { - for _, identifier := range combinedPostPublicationEvent.Content.Identifiers { - if strings.HasPrefix(identifier.Authority, h.Mapper.Config.Authorities.Get(config.BlogType)) { - contentType = config.BlogType - } else if strings.HasPrefix(identifier.Authority, h.Mapper.Config.Authorities.Get(config.ArticleType)) { - contentType = config.ArticleType - } else if strings.HasPrefix(identifier.Authority, h.Mapper.Config.Authorities.Get(config.VideoType)) { - contentType = config.VideoType - } - } + contentType := h.readContentType(msg, combinedPostPublicationEvent) + pacOrigin := h.Mapper.Config.ContentMetadataMap.Get("pac").Origin + if contentType == "" && msg.Headers[originHeader] != pacOrigin { + log.Error("Failed to index content. Could not infer type of content") + return } - if contentType == "" { - originHeader := msg.Headers[originHeader] - origins := h.Mapper.Config.Origins - - if strings.Contains(originHeader, origins.Get("methode")) { - contentType = config.ArticleType - } else if strings.Contains(originHeader, origins.Get("wordpress")) { - contentType = config.BlogType - } else if strings.Contains(originHeader, origins.Get("video")) { - contentType = config.VideoType - } else if originHeader != origins.Get("pac") { - h.log.WithTransactionID(tid).WithUUID(uuid).WithError(err).Error("Failed to index content. Could not infer type of content") - return - } - } + log = log.WithMonitoringEvent("ContentDeleteElasticsearch", tid, contentType) - conceptType := h.Mapper.Config.ContentTypeMap.Get(contentType).Collection + conceptType := h.Mapper.Config.ESContentTypeMetadataMap.Get(contentType).Collection if combinedPostPublicationEvent.MarkedDeleted == "true" { _, err = h.esService.DeleteData(conceptType, uuid) if err != nil { - h.log.WithTransactionID(tid).WithUUID(uuid).WithError(err).Error("Failed to delete indexed content") + log.WithError(err).Error("Failed to delete indexed content") return } - h.log.WithMonitoringEvent("ContentDeleteElasticsearch", tid, contentType).WithUUID(uuid).Info("Successfully deleted") + log.Info("Successfully deleted") return } if combinedPostPublicationEvent.Content.UUID == "" || contentType == "" { - h.log.WithTransactionID(tid).WithUUID(combinedPostPublicationEvent.UUID).Info("Ignoring message with no content") + log.Info("Ignoring message with no content") return } payload := h.Mapper.ToIndexModel(combinedPostPublicationEvent, contentType, tid) - h.log.Info(conceptType) _, err = h.esService.WriteData(conceptType, uuid, payload) if err != nil { - h.log.WithTransactionID(tid).WithUUID(uuid).WithError(err).Error("Failed to index content") + log.WithError(err).Error("Failed to index content") return } - h.log.WithMonitoringEvent("ContentWriteElasticsearch", tid, contentType).WithUUID(uuid).Info("Successfully saved") + log.Info("Successfully saved") +} + +func (h *Handler) readContentType(msg consumer.Message, event schema.EnrichedContent) string { + typeHeader := msg.Headers[contentTypeHeader] + if strings.Contains(typeHeader, audioContentTypeHeader) { + return config.AudioType + } + if strings.Contains(typeHeader, articleContentTypeHeader) { + return config.ArticleType + } + contentMetadata := h.Mapper.Config.ContentMetadataMap + for _, identifier := range event.Content.Identifiers { + for _, t := range contentMetadata { + if t.Authority != "" && strings.Contains(identifier.Authority, t.Authority) { + return t.ContentType + } + } + } + originHeader := msg.Headers[originHeader] + for _, t := range contentMetadata { + if t.Origin != "" && strings.Contains(originHeader, t.Origin) { + return t.ContentType + } + } + return "" +} + +func isAllowedType(s string) bool { + // Empty type added for older content. Placeholders - which are subject of exclusion - have type Content. + var allowedTypes = [...]string{"Article", "Video", "MediaResource", "Audio", ""} + for _, value := range allowedTypes { + if value == s { + return true + } + } + return false } diff --git a/pkg/message/message_handler_test.go b/pkg/message/message_handler_test.go index 7f97118..88ab94b 100644 --- a/pkg/message/message_handler_test.go +++ b/pkg/message/message_handler_test.go @@ -87,11 +87,11 @@ type concordanceAPIMock struct { mock.Mock } -var defaultESClient = func(config es.AccessConfig, c *http.Client) (es.Client, error) { +var defaultESClient = func(config es.AccessConfig, c *http.Client, log *logger.UPPLogger) (es.Client, error) { return &elasticClientMock{}, nil } -var errorESClient = func(config es.AccessConfig, c *http.Client) (es.Client, error) { +var errorESClient = func(config es.AccessConfig, c *http.Client, log *logger.UPPLogger) (es.Client, error) { return nil, elastic.ErrNoClient } @@ -327,8 +327,14 @@ func TestHandleWriteMessageNoUUIDForMetadataPublish(t *testing.T) { serviceMock := &esServiceMock{} - _, handler := mockMessageHandler(defaultESClient, serviceMock) - handler.handleMessage(consumer.Message{Body: string(inputJSON), Headers: map[string]string{originHeader: handler.Mapper.Config.Origins.Get("methode")}}) + _, h := mockMessageHandler(defaultESClient, serviceMock) + methodeOrigin := h.Mapper.Config.ContentMetadataMap.Get("methode").Origin + h.handleMessage(consumer.Message{ + Body: string(inputJSON), + Headers: map[string]string{ + originHeader: methodeOrigin, + }, + }) serviceMock.AssertNotCalled(t, "WriteData", mock.Anything, "b17756fe-0f62-4cf1-9deb-ca7a2ff80172", mock.Anything) serviceMock.AssertNotCalled(t, "DeleteData", mock.Anything, "b17756fe-0f62-4cf1-9deb-ca7a2ff80172")