Skip to content

Commit

Permalink
Merge pull request #119 from Financial-Times/feature/UPPSF-2731-lists…
Browse files Browse the repository at this point in the history
…-push-notifications

Adding a new deployment for Lists push notifications
  • Loading branch information
martin-stanchev committed Oct 20, 2021
2 parents c1b7c86 + 21ecde3 commit 9baccf2
Show file tree
Hide file tree
Showing 17 changed files with 171 additions and 42 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ kubectl port-forward kafka-0 9092

* Start the service using environment variables:

For content push notifications:
```
export NOTIFICATIONS_RESOURCE=content \
&& export KAFKA_ADDRS=localhost:2181 \
Expand All @@ -43,6 +44,19 @@ export NOTIFICATIONS_RESOURCE=content \
&& export CONTENT_URI_WHITELIST="^http://(methode|wordpress|content)-(article|collection|content-placeholder)-(transformer|mapper|unfolder)(-pr|-iw)?(-uk-.*)?\\.svc\\.ft\\.com(:\\d{2,5})?/(content)/[\\w-]+.*$" \
&& export ALLOWED_ALL_CONTENT_TYPE="Article,ContentPackage,Audio" \
&& export SUPPORTED_SUBSCRIPTION_TYPE="Annotations,Article,ContentPackage,Audio,All,LiveBlogPackage,LiveBlogPost,Content,Page"
&& export DEFAULT_SUBSCRIPTION_TYPE="Article"
&& ./notifications-push
```
or for list push notifications:
```
export NOTIFICATIONS_RESOURCE=lists \
&& export KAFKA_ADDRS=localhost:2181 \
&& export GROUP_ID=notifications-push-yourtest \
&& export TOPIC=PostPublicationEvents \
&& export NOTIFICATIONS_DELAY=10 \
&& export API_BASE_URL="http://api.ft.com" \
&& export CONTENT_TYPE_WHITELIST="application/vnd.ft-upp-list+json" \
&& export DEFAULT_SUBSCRIPTION_TYPE="List"
&& ./notifications-push
```

Expand All @@ -61,12 +75,14 @@ export NOTIFICATIONS_RESOURCE=content \
--content_uri_whitelist="^http://(methode|wordpress|content)-(article|collection|content-placeholder)-(transformer|mapper|unfolder)(-pr|-iw)?(-uk-.*)?\\.svc\\.ft\\.com(:\\d{2,5})?/(content)/[\\w-]+.*$" \
--allowed_all_contentType="Article,ContentPackage,Audio" \
--supported_subscription_type="Annotations,Article,ContentPackage,Audio,All,LiveBlogPackage,LiveBlogPost,Content,Page"
--default_subscription_type="Article"
```

NB: for the complete list of options run `./notifications-push -h`

HTTP endpoints
----------
### For content:
```curl -i --header "x-api-key: «api_key»" https://api.ft.com/content/notifications-push```

The following subscription types could be also specified for which the client would like to receive notifications by setting a "type" parameter on the request:
Expand All @@ -92,6 +108,13 @@ E.g.
You can be subscribed for multiple types:
```curl -i --header "x-api-key: «api_key»" https://api.ft.com/content/notifications-push?type=All&type=LiveBlogPost&type=LiveBlogPackage```

### For lists:

```curl -i --header "x-api-key: «api_key»" https://api.ft.com/lists/notifications-push```

The lists endpoint only supports push notifications of type `List`, so no `?type=` parameter is supported.
Lists also do not have metadata, and the lists deployment doesn't support `Annotations` or other metadata subscription types.

### Filter DELETE messages by type
When a content has been deleted (`http://www.ft.com/thing/ThingChangeType/DELETE`), the kafka payload is empty and we cannot extract the content type from the message. In this case, there are 2 possible behaviours:

Expand Down Expand Up @@ -248,6 +271,7 @@ How to Build & Run with Docker
--env CONTENT_URI_WHITELIST="^http://(methode|wordpress|content)-(article|collection)-(transformer|mapper|unfolder)(-pr|-iw)?(-uk-.*)?\\.svc\\.ft\\.com(:\\d{2,5})?/(content)/[\\w-]+.*$" \
--env ALLOWED_ALL_CONTENT_TYPE="Article,ContentPackage,Audio" \
--env SUPPORTED_SUBSCRIPTION_TYPE="Annotations,Article,ContentPackage,Audio,All,LiveBlogPackage,LiveBlogPost,Content,Page" \
--env DEFAULT_SUBSCRIPTION_TYPE="Article" \
coco/notifications-push
```

Expand Down
36 changes: 26 additions & 10 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import (

const (
heartbeatPeriod = 30 * time.Second
serviceName = "notifications-push"
appDescription = "Proactively notifies subscribers about new publishes/modifications."
)

func main() {
serviceName := os.Getenv("APP_NAME")
if serviceName == "" {
serviceName = "notifications-push"
}
app := cli.App(serviceName, appDescription)
resource := app.String(cli.StringOpt{
Name: "notifications_resource",
Expand Down Expand Up @@ -101,6 +104,7 @@ func main() {
})
contentURIWhitelist := app.String(cli.StringOpt{
Name: "content_uri_whitelist",
Value: "",
Desc: `The contentURI whitelist for incoming notifications - i.e. ^http://.*-transformer-(pr|iw)-uk-.*\.svc\.ft\.com(:\d{2,5})?/content/[\w-]+.*$`,
EnvVar: "CONTENT_URI_WHITELIST",
})
Expand All @@ -112,7 +116,7 @@ func main() {
})
whitelistedMetadataOriginSystemHeaders := app.Strings(cli.StringsOpt{
Name: "whitelistedMetadataOriginSystemHeaders",
Value: []string{"http://cmdb.ft.com/systems/pac", "http://cmdb.ft.com/systems/methode-web-pub", "http://cmdb.ft.com/systems/next-video-editor"},
Value: []string{},
Desc: "Origin-System-Ids that are supported to be processed from the PostPublicationEvents queue.",
EnvVar: "WHITELISTED_METADATA_ORIGIN_SYSTEM_HEADERS",
})
Expand All @@ -126,18 +130,25 @@ func main() {

allowedAllContentType := app.Strings(cli.StringsOpt{
Name: "allowed_all_contentType",
Value: []string{"Article", "ContentPackage", "Audio", "Content"},
Value: []string{},
Desc: `Comma-separated list of ContentTypes that compose ALL (contentType) - i.e. Article,`,
EnvVar: "ALLOWED_ALL_CONTENT_TYPE",
})

supportedSubscriptionType := app.Strings(cli.StringsOpt{
Name: "supported_subscription_type",
Value: []string{"Annotations", "Article", "ContentPackage", "Audio", "All", "LiveBlogPackage", "LiveBlogPost", "Content", "Page"},
Value: []string{},
Desc: `Comma-separated list of supported subscription types`,
EnvVar: "SUPPORTED_SUBSCRIPTION_TYPE",
})

defaultSubscriptionType := app.String(cli.StringOpt{
Name: "default_subscription_type",
Value: "Article",
Desc: `The default subscription type to serve when no arguments are passed.`,
EnvVar: "DEFAULT_SUBSCRIPTION_TYPE",
})

e2eTestUUIDs := app.Strings(cli.StringsOpt{
Name: "e2e_test_ids",
Value: []string{},
Expand All @@ -156,13 +167,18 @@ func main() {
"E2E_TEST_IDS": *e2eTestUUIDs,
}).Infof("[Startup] notifications-push is starting ")

kafkaTopics := []string{*contentTopic}
if *metadataTopic != "" {
kafkaTopics = append(kafkaTopics, *metadataTopic)
}

kafkaConsumer, err := createSupervisedConsumer(log,
*consumerAddrs,
*consumerGroupID,
[]string{
*contentTopic,
*metadataTopic,
})
kafkaTopics,
serviceName,
)

if err != nil {
log.WithError(err).Fatal("could not start kafka consumer")
}
Expand Down Expand Up @@ -197,7 +213,7 @@ func main() {
}

healthCheckEndpoint = baseURL.ResolveReference(healthCheckEndpoint)
hc := resources.NewHealthCheck(kafkaConsumer, healthCheckEndpoint.String(), requestStatusCode)
hc := resources.NewHealthCheck(kafkaConsumer, healthCheckEndpoint.String(), requestStatusCode, serviceName)

dispatcher, history := createDispatcher(*delay, *historySize, log)

Expand Down Expand Up @@ -229,7 +245,7 @@ func main() {

keyProcessor := resources.NewKeyProcessor(keyValidateURL.String(), keyPoliciesURL.String(), httpClient, log)
subHandler := resources.NewSubHandler(dispatcher, keyProcessor, srv, heartbeatPeriod,
log, *allowedAllContentType, *supportedSubscriptionType)
log, *allowedAllContentType, *supportedSubscriptionType, *defaultSubscriptionType)
if err != nil {
log.WithError(err).Fatal("Could not create request handler")
}
Expand Down
4 changes: 2 additions & 2 deletions app_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ func TestPushNotifications(t *testing.T) {
defer server.Close()

// handler
hc := resources.NewHealthCheck(queue, apiGatewayGTGURL, nil)
hc := resources.NewHealthCheck(queue, apiGatewayGTGURL, nil, "notifications-push")

keyProcessor := resources.NewKeyProcessor(server.URL+apiGatewayValidateURL, server.URL+apiGatewayPoliciesURL, http.DefaultClient, l)
s := resources.NewSubHandler(d, keyProcessor, reg, heartbeat, l, []string{"Article", "ContentPackage", "Audio"},
[]string{"Annotations", "Article", "ContentPackage", "Audio", "All", "LiveBlogPackage", "LiveBlogPost", "Content", "Page"})
[]string{"Annotations", "Article", "ContentPackage", "Audio", "All", "LiveBlogPackage", "LiveBlogPost", "Content", "Page"}, "Article")

initRouter(router, s, resource, d, h, hc, l)

Expand Down
2 changes: 1 addition & 1 deletion consumer/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewMessageQueueHandler(contentHandler, metadataHandler MessageQueueHandler)
}

func (h *MessageQueueRouter) HandleMessage(queueMsg kafka.FTMessage) error {
if isAnnotationMessage(queueMsg.Headers) {
if h.metadataHandler != nil && isAnnotationMessage(queueMsg.Headers) {
return h.metadataHandler.HandleMessage(queueMsg)
}
return h.contentHandler.HandleMessage(queueMsg)
Expand Down
9 changes: 8 additions & 1 deletion consumer/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,19 @@ func (n NotificationMapper) MapNotification(event ContentMessage, transactionID
resource = "pages"
}

var standout *dispatch.Standout
if contentType != dispatch.ListType {
standout = &dispatch.Standout{Scoop: scoop}
}

return dispatch.NotificationModel{
Type: eventType,
ID: "http://www.ft.com/thing/" + UUID,
APIURL: n.APIBaseURL + "/" + resource + "/" + UUID,
PublishReference: transactionID,
LastModified: event.LastModified,
Title: title,
Standout: &dispatch.Standout{Scoop: scoop},
Standout: standout,
SubscriptionType: contentType,
}, nil
}
Expand Down Expand Up @@ -110,6 +115,8 @@ func resolveTypeFromMessageHeader(contentTypeHeader string) string {
return dispatch.LiveBlogPackageType
case "application/vnd.ft-upp-page+json":
return dispatch.PageType
case "application/vnd.ft-upp-list+json":
return dispatch.ListType
default:
return ""
}
Expand Down
30 changes: 30 additions & 0 deletions consumer/mapper_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consumer

import (
"fmt"
"testing"

"github.com/Financial-Times/notifications-push/v5/dispatch"
Expand Down Expand Up @@ -261,3 +262,32 @@ func TestNotificationMappingMetadata(t *testing.T) {
})
}
}

func TestNotificationMappingEmptyStandoutForLists(t *testing.T) {
t.Parallel()

var standout *dispatch.Standout
payload := map[string]interface{}{"title": "This is a title", "standout": standout, "type": "List", "publishCount": "2"}
id, _ := uuid.NewV4()
event := ContentMessage{
ContentURI: "http://list-transformer-pr-uk-up.svc.ft.com:8081/list/blah/" + id.String(),
LastModified: "2016-11-02T10:54:22.234Z",
Payload: payload,
}

mapper := NotificationMapper{
APIBaseURL: "test.api.ft.com",
Resource: "lists",
}

n, err := mapper.MapNotification(event, "tid_test1")

mappedAPIURL := fmt.Sprintf("test.api.ft.com/lists/%s", id.String())

assert.Nil(t, err, "The mapping should not return an error")
assert.Equal(t, "http://www.ft.com/thing/ThingChangeType/UPDATE", n.Type, "It is an UPDATE notification")
assert.Equal(t, "This is a title", n.Title, "Title should be mapped correctly")
assert.Nil(t, n.Standout, "Scoop field should be mapped correctly")
assert.Equal(t, "List", n.SubscriptionType, "SubscriptionType field should be mapped correctly")
assert.Equal(t, mappedAPIURL, n.APIURL, "API URL field should be mapped correctly")
}
1 change: 1 addition & 0 deletions dispatch/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
ContentPlaceholderType = "Content"
PageType = "Page"
AllContentType = "All"
ListType = "List"
)

// notification types
Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ services:
image: local/notifications-push:latest
environment:
KAFKA_ADDRS: "zookeeper:2181"
API_BASE_URL: "http://test.api.ft.com"
API_BASE_URL: "http://api-t.ft.com"
GROUP_ID: "notifications-push-test1"
TOPIC: "PostPublicationEvents"
METADATA_TOPIC: "PostConceptAnnotations"
Expand All @@ -38,6 +38,7 @@ services:
ALLOWED_ALL_CONTENT_TYPE: "Article,ContentPackage,Audio"
SUPPORTED_SUBSCRIPTION_TYPE: "Annotations,Article,ContentPackage,Audio,All,LiveBlogPackage,LiveBlogPost,Content,Page"
WHITELISTED_METADATA_ORIGIN_SYSTEM_HEADERS: "http://cmdb.ft.com/systems/pac, http://cmdb.ft.com/systems/methode-web-pub, http://cmdb.ft.com/systems/next-video-editor"
DEFAULT_SUBSCRIPTION_TYPE: "Article"
APP_PORT: 8080
ports:
- 8000:8080
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Values used for the deployed application.
replicaCount: 2
service:
name: list-notifications-push
env:
TOPIC: "PublicationNotificationEvents"
CONSUMER_BACKOFF: "2"
NOTIFICATIONS_RESOURCE: "lists"
CONTENT_TYPE_WHITELIST: "application/vnd.ft-upp-list+json"

# We don't want to match any content URIs for this deployment.
CONTENT_URI_WHITELIST: "$."
PUSH_PORT: "8599"
DEFAULT_SUBSCRIPTION_TYPE: "List"
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ env:
SUPPORTED_SUBSCRIPTION_TYPE: "Annotations,Article,ContentPackage,Audio,All,LiveBlogPackage,LiveBlogPost,Content,Page"
PUSH_PORT: "8599"
E2E_TEST_IDS: "427f2a19-2ae7-47c1-b580-c225fa0a0199"
DEFAULT_SUBSCRIPTION_TYPE: "Article"
6 changes: 5 additions & 1 deletion helm/notifications-push/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Chart.Version }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
env:
- name: APP_NAME
value: {{ .Values.service.name }}
- name: KAFKA_ADDRS
valueFrom:
configMapKeyRef:
Expand Down Expand Up @@ -68,7 +70,9 @@ spec:
- name: ALLOWED_ALL_CONTENT_TYPE
value: {{ .Values.env.ALLOWED_ALL_CONTENT_TYPE }}
- name: SUPPORTED_SUBSCRIPTION_TYPE
value: {{ .Values.env.SUPPORTED_SUBSCRIPTION_TYPE }}
value: {{ .Values.env.SUPPORTED_SUBSCRIPTION_TYPE }}
- name: DEFAULT_SUBSCRIPTION_TYPE
value: {{ .Values.env.DEFAULT_SUBSCRIPTION_TYPE }}
- name: WHITELISTED_METADATA_ORIGIN_SYSTEM_HEADERS
value: {{ .Values.env.WHITELISTED_METADATA_ORIGIN_SYSTEM_HEADERS }}
- name: E2E_TEST_IDS
Expand Down
11 changes: 7 additions & 4 deletions push_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *supervisedConsumer) ConnectivityCheck() error {
return s.c.ConnectivityCheck()
}

func createSupervisedConsumer(log *logger.UPPLogger, address string, groupID string, topics []string) (*supervisedConsumer, error) {
func createSupervisedConsumer(log *logger.UPPLogger, address string, groupID string, topics []string, serviceName string) (*supervisedConsumer, error) {
errCh := make(chan error, 2)
var fatalErrs = []error{kazoo.ErrPartitionNotClaimed, zk.ErrNoServer}
fatalErrHandler := func(err error, serviceName string) {
Expand Down Expand Up @@ -139,9 +139,12 @@ func createMessageHandler(config msgHandlerCfg, dispatcher *dispatch.Dispatcher,
}

contentHandler := queueConsumer.NewContentQueueHandler(whitelistR, ctWhitelist, config.E2ETestUUIDs, mapper, dispatcher, log)
metadataHandler := queueConsumer.NewMetadataQueueHandler(config.MetadataHeaders, mapper, dispatcher, log)
handler := queueConsumer.NewMessageQueueHandler(contentHandler, metadataHandler)
return handler, nil

var metadataHandler *queueConsumer.MetadataQueueHandler
if len(config.MetadataHeaders) > 0 {
metadataHandler = queueConsumer.NewMetadataQueueHandler(config.MetadataHeaders, mapper, dispatcher, log)
}
return queueConsumer.NewMessageQueueHandler(contentHandler, metadataHandler), nil
}

func requestStatusCode(ctx context.Context, url string) (int, error) {
Expand Down
6 changes: 4 additions & 2 deletions resources/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ type HealthCheck struct {
consumer KafkaConsumer
StatusFunc RequestStatusFn
apiGatewayGTGAddress string
serviceName string
}

func NewHealthCheck(kafkaConsumer KafkaConsumer, apiGatewayGTGAddress string, statusFunc RequestStatusFn) *HealthCheck {
func NewHealthCheck(kafkaConsumer KafkaConsumer, apiGatewayGTGAddress string, statusFunc RequestStatusFn, serviceName string) *HealthCheck {
return &HealthCheck{
consumer: kafkaConsumer,
apiGatewayGTGAddress: apiGatewayGTGAddress,
StatusFunc: statusFunc,
serviceName: serviceName,
}
}

Expand All @@ -40,7 +42,7 @@ func (h *HealthCheck) Health() func(w http.ResponseWriter, r *http.Request) {
hc := fthealth.TimedHealthCheck{
HealthCheck: fthealth.HealthCheck{
SystemCode: "upp-notifications-push",
Name: "Notifications Push",
Name: h.serviceName,
Description: "Checks if all the dependent services are reachable and healthy.",
Checks: checks,
},
Expand Down
2 changes: 1 addition & 1 deletion resources/healthchecks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestHealthcheck(t *testing.T) {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
hc := NewHealthCheck(test.kafkaConsumerMock, "randomAddress", test.statusFn)
hc := NewHealthCheck(test.kafkaConsumerMock, "randomAddress", test.statusFn, "notifications-push")

req, err := http.NewRequest("GET", "/__health", nil)
if err != nil {
Expand Down
Loading

0 comments on commit 9baccf2

Please sign in to comment.