-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial implementation #1
Changes from 42 commits
ebc6fca
a83a2dc
9a716cf
65cbb4c
b52bb26
97adc47
77be817
99b12b9
1e72dd7
f677d7d
82f7418
b9d4978
1798c2e
5a7ccbc
b0046f3
22c55e9
515ac54
3ed42a7
060addc
8838165
4f1428b
6398fca
243a1ae
b2b4919
77744c9
d549372
bb42527
1168178
78596bc
8e9e808
c382da5
882ee27
3239963
2be816f
0c8215b
b6056be
f79f79a
95f1757
fff12ba
9d7d2c9
8f304a8
3141cb4
6b93fad
ac64af2
d7bfa85
5e92f90
8f19921
317b9ed
97b3733
8970b9d
5bb154e
d176846
5b56ffb
2bd3db8
0e81bed
8273c2d
6494bfd
92c1980
2392b17
3644277
9407ce5
b2774cd
dd2fa08
c113ee5
a096768
44bf95b
4f832c1
c16a9a3
eb8bbee
d521b06
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
*.iml | ||
*.exe | ||
.idea | ||
content-rw-elasticsearch | ||
vendor/*/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
FROM golang:1.8-alpine | ||
|
||
ENV PROJECT=content-rw-elasticsearch | ||
COPY . /${PROJECT}-sources/ | ||
|
||
RUN apk --no-cache --virtual .build-dependencies add git \ | ||
&& ORG_PATH="github.com/Financial-Times" \ | ||
&& REPO_PATH="${ORG_PATH}/${PROJECT}" \ | ||
&& mkdir -p $GOPATH/src/${ORG_PATH} \ | ||
# Linking the project sources in the GOPATH folder | ||
&& ln -s /${PROJECT}-sources $GOPATH/src/${REPO_PATH} \ | ||
&& cd $GOPATH/src/${REPO_PATH} \ | ||
&& BUILDINFO_PACKAGE="${ORG_PATH}/${PROJECT}/vendor/${ORG_PATH}/service-status-go/buildinfo." \ | ||
&& VERSION="version=$(git describe --tag --always 2> /dev/null)" \ | ||
&& DATETIME="dateTime=$(date -u +%Y%m%d%H%M%S)" \ | ||
&& REPOSITORY="repository=$(git config --get remote.origin.url)" \ | ||
&& REVISION="revision=$(git rev-parse HEAD)" \ | ||
&& BUILDER="builder=$(go version)" \ | ||
&& LDFLAGS="-X '"${BUILDINFO_PACKAGE}$VERSION"' -X '"${BUILDINFO_PACKAGE}$DATETIME"' -X '"${BUILDINFO_PACKAGE}$REPOSITORY"' -X '"${BUILDINFO_PACKAGE}$REVISION"' -X '"${BUILDINFO_PACKAGE}$BUILDER"'" \ | ||
&& echo "Build flags: $LDFLAGS" \ | ||
&& echo "Fetching dependencies..." \ | ||
&& go get -u github.com/kardianos/govendor \ | ||
&& $GOPATH/bin/govendor sync \ | ||
&& go get -v \ | ||
&& go build -ldflags="${LDFLAGS}" \ | ||
&& mv ${PROJECT} /${PROJECT} \ | ||
&& mv runtime / \ | ||
&& apk del .build-dependencies \ | ||
&& rm -rf $GOPATH /var/cache/apk/* | ||
|
||
WORKDIR / | ||
# Using the expanded command, so that the shell will expand the $PROJECT env var. See https://docs.docker.com/engine/reference/builder/#cmd | ||
CMD ["sh", "-c", "/${PROJECT}"] | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,85 @@ | ||
# content-rw-elasticsearch | ||
Reader-writer for ES backing replatformed SAPI v1 | ||
|
||
[![Circle CI](https://circleci.com/gh/Financial-Times/content-rw-elasticsearch/tree/master.png?style=shield)](https://circleci.com/gh/Financial-Times/content-rw-elasticsearch/tree/master)[![Go Report Card](https://goreportcard.com/badge/github.com/Financial-Times/content-rw-elasticsearch)](https://goreportcard.com/report/github.com/Financial-Times/content-rw-elasticsearch) [![Coverage Status](https://coveralls.io/repos/github/Financial-Times/content-rw-elasticsearch/badge.svg)](https://coveralls.io/github/Financial-Times/content-rw-elasticsearch) | ||
|
||
|
||
## Introduction | ||
Indexes V2 content in Elasticsearch for use by SAPI v1 | ||
|
||
## Installation | ||
Download the source code, dependencies and test dependencies: | ||
|
||
go get -u github.com/kardianos/govendor | ||
go get -u github.com/Financial-Times/content-rw-elasticsearch | ||
cd $GOPATH/src/github.com/Financial-Times/content-rw-elasticsearch | ||
govendor sync | ||
go get -t | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we still need this? Test dependencies should also come with govendor sync. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||
go build . | ||
|
||
## Running locally | ||
|
||
1. Run the tests and install the binary: | ||
|
||
govendor sync | ||
govendor test -v -race | ||
go install | ||
|
||
2. Run the binary (using the `help` flag to see the available optional arguments): | ||
|
||
$GOPATH/bin/content-rw-elasticsearch [--help] | ||
|
||
Options: | ||
|
||
--app-system-code="content_rw_elasticsearch" System Code of the application ($APP_SYSTEM_CODE) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know this is a Readme, but anyway, the SystemCode of this app in Dewey is with dash instead of underscore: content-rw-elasticsearch (https://dewey.ft.com/content-rw-elasticsearch.html) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch, this was the actual default as returned by --help! |
||
--port="8080" Port to listen on ($APP_PORT) | ||
--aws-access-key="" AWS ACCES KEY ($AWS_ACCESS_KEY_ID) | ||
--aws-secret-access-key="" AWS SECRET ACCES KEY ($AWS_SECRET_ACCESS_KEY) | ||
--elasticsearch-sapi-endpoint="http://localhost:9200" AES endpoint ($ELASTICSEARCH_SAPI_ENDPOINT) | ||
--index-name="ft" The name of the elaticsearch index ($ELASTICSEARCH_SAPI_INDEX) | ||
--kafka-proxy-address="http://localhost:8080" Addresses used by the queue consumer to connect to the queue ($KAFKA_PROXY_ADDR) | ||
--kafka-consumer-group="default-consumer-group" Group used to read the messages from the queue ($KAFKA_CONSUMER_GROUP) | ||
--kafka-topic="CombinedPostPublicationEvents" The topic to read the meassages from ($KAFKA_TOPIC) | ||
--kafka-header="kafka" The header identifying the queue to read the messages from ($KAFKA_HEADER) | ||
--kafka-concurrent-processing=false Whether the consumer uses concurrent processing for the messages ($KAFKA_CONCURRENT_PROCESSING) | ||
|
||
3. Test: | ||
|
||
There are no service endpoints to test. | ||
|
||
## Build and deployment | ||
|
||
* Built by Docker Hub on merge to master: [coco/content-rw-elasticsearch](https://hub.docker.com/r/coco/content-rw-elasticsearch/) | ||
* CI provided by CircleCI: [content-rw-elasticsearch](https://circleci.com/gh/Financial-Times/content-rw-elasticsearch) | ||
|
||
## Healthchecks | ||
Admin endpoints are: | ||
|
||
`/__gtg` | ||
|
||
Returns 503 if any if the checks executed at the /__health endpoint returns false | ||
|
||
`/__health` | ||
|
||
There are several checks performed: | ||
* Elasticsearch cluster connectivity | ||
* Elasticsearch cluster health | ||
* Elastic schema validation | ||
* Kafka queue topic check | ||
|
||
|
||
`/__health-details` | ||
|
||
Shows ES cluster health details | ||
|
||
`/__build-info` | ||
|
||
|
||
## Other information | ||
An example of event structure is here [testdata/exampleEnrichedContentModel.json](testdata/exampleEnrichedContentModel.json) | ||
|
||
The reference mappings for Elasticsearch are found here [runtime/referenceSchema.json](runtime/referenceSchema.json) | ||
|
||
### Logging | ||
|
||
* The application uses [logrus](https://github.com/Sirupsen/logrus); the log file is initialised in [main.go](main.go). | ||
* NOTE: `/__build-info` and `/__gtg` endpoints are not logged as they are called every second from varnish/vulcand and this information is not needed in logs/splunk. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
machine: | ||
environment: | ||
GOPATH: "${HOME}/.go_workspace:/usr/local/go_workspace:${HOME}/.go_project" | ||
GO_PROJECT_PATH: "${HOME}/.go_project/src/github.com/${CIRCLE_PROJECT_USERNAME}/${CIRCLE_PROJECT_REPONAME}" | ||
|
||
checkout: | ||
post: | ||
- mkdir -p $(dirname ${GO_PROJECT_PATH}) | ||
- ln -fs $HOME/$CIRCLE_PROJECT_REPONAME $GO_PROJECT_PATH | ||
|
||
dependencies: | ||
pre: | ||
- go get -u github.com/kardianos/govendor | ||
override: | ||
- cd $GO_PROJECT_PATH && govendor sync | ||
- cd $GO_PROJECT_PATH && go build -v | ||
|
||
test: | ||
pre: | ||
- go get -u github.com/jstemmer/go-junit-report | ||
- go get -u github.com/mattn/goveralls | ||
override: | ||
- mkdir -p $CIRCLE_TEST_REPORTS/golang | ||
- cd $GO_PROJECT_PATH && govendor test -race -v +local | go-junit-report > $CIRCLE_TEST_REPORTS/golang/junit.xml | ||
- cd $GO_PROJECT_PATH && govendor test -v -cover -race -coverprofile=$CIRCLE_ARTIFACTS/coverage.out +local | ||
- cd $CIRCLE_ARTIFACTS && sed -i '1d' *.out | ||
- | | ||
echo "mode: atomic" > $CIRCLE_ARTIFACTS/overall-coverage.result | ||
- cd $CIRCLE_ARTIFACTS && cat *.out >> overall-coverage.result | ||
post: | ||
- goveralls -coverprofile=$CIRCLE_ARTIFACTS/overall-coverage.result -service=circle-ci -repotoken=$COVERALLS_TOKEN |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
package main | ||
|
||
import ( | ||
"encoding/json" | ||
health "github.com/Financial-Times/go-fthealth/v1_1" | ||
"github.com/Financial-Times/http-handlers-go/httphandlers" | ||
"github.com/Financial-Times/message-queue-gonsumer/consumer" | ||
status "github.com/Financial-Times/service-status-go/httphandlers" | ||
log "github.com/Sirupsen/logrus" | ||
"github.com/gorilla/mux" | ||
"github.com/kr/pretty" | ||
"github.com/rcrowley/go-metrics" | ||
"net" | ||
"net/http" | ||
"os" | ||
"os/signal" | ||
"strings" | ||
"sync" | ||
"syscall" | ||
"time" | ||
) | ||
|
||
const syntheticRequestPrefix = "SYNTHETIC-REQ-MON" | ||
|
||
type contentIndexer struct { | ||
esServiceInstance esServiceI | ||
} | ||
|
||
func waitForSignal() { | ||
ch := make(chan os.Signal) | ||
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) | ||
<-ch | ||
} | ||
|
||
func (indexer *contentIndexer) start(appSystemCode string, indexName string, port string, accessConfig esAccessConfig, queueConfig consumer.QueueConfig) { | ||
channel := make(chan esClientI) | ||
go func() { | ||
defer close(channel) | ||
for { | ||
ec, err := newAmazonClient(accessConfig) | ||
if err == nil { | ||
log.Infof("connected to ElasticSearch") | ||
channel <- ec | ||
return | ||
} | ||
log.Errorf("could not connect to ElasticSearch: %s", err.Error()) | ||
time.Sleep(time.Minute) | ||
} | ||
}() | ||
|
||
//create writer service | ||
indexer.esServiceInstance = newEsService(indexName) | ||
|
||
go func() { | ||
for ec := range channel { | ||
indexer.esServiceInstance.setClient(ec) | ||
indexer.startMessageConsumer(queueConfig) | ||
} | ||
}() | ||
|
||
go func() { | ||
indexer.serveAdminEndpoints(appSystemCode, port, queueConfig) | ||
}() | ||
} | ||
|
||
func (indexer *contentIndexer) serveAdminEndpoints(appSystemCode string, port string, queueConfig consumer.QueueConfig) { | ||
healthService := newHealthService(indexer.esServiceInstance, queueConfig.Topic, queueConfig.Addrs[0]) | ||
var monitoringRouter http.Handler = mux.NewRouter() | ||
monitoringRouter = httphandlers.TransactionAwareRequestLoggingHandler(log.StandardLogger(), monitoringRouter) | ||
monitoringRouter = httphandlers.HTTPMetricsHandler(metrics.DefaultRegistry, monitoringRouter) | ||
|
||
serveMux := http.NewServeMux() | ||
|
||
hc := health.HealthCheck{SystemCode: appSystemCode, Name: appSystemCode, Description: "Content Read Writer for Elasticsearch", Checks: healthService.checks} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Name shouldn't be identical with SystemCode, I know this looks kind of duplicate because most of the time they are almost the same (there are some that are pretty different), but that's the standard agreed for FT healthchecks, i.e. to have both system code and name in the response: https://docs.google.com/document/d/18hefJjImF5IFp9WvPAm9Iq5_GmWzI9ahlKSzShpQl1s/edit So I think the name should be the name it has in Dewey (without the UPP maybe? or just as it is in Dewey, not really sure about that): Content RW Elasticsearch I personally created another env var for name, like for the system code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
serveMux.HandleFunc("/__health", health.Handler(hc)) | ||
serveMux.HandleFunc("/__health-details", healthService.HealthDetails) | ||
serveMux.HandleFunc(status.GTGPath, status.NewGoodToGoHandler(healthService.gtgCheck)) | ||
serveMux.HandleFunc(status.BuildInfoPath, status.BuildInfoHandler) | ||
|
||
serveMux.Handle("/", monitoringRouter) | ||
|
||
if err := http.ListenAndServe(":"+port, serveMux); err != nil { | ||
log.Fatalf("Unable to start: %v", err) | ||
} | ||
} | ||
|
||
func (indexer *contentIndexer) startMessageConsumer(config consumer.QueueConfig) { | ||
client := &http.Client{ | ||
Transport: &http.Transport{ | ||
Proxy: http.ProxyFromEnvironment, | ||
DialContext: (&net.Dialer{ | ||
Timeout: 30 * time.Second, | ||
KeepAlive: 30 * time.Second, | ||
}).DialContext, | ||
MaxIdleConnsPerHost: 20, | ||
TLSHandshakeTimeout: 3 * time.Second, | ||
ExpectContinueTimeout: 1 * time.Second, | ||
}, | ||
} | ||
messageConsumer := consumer.NewConsumer(config, indexer.handleMessage, client) | ||
log.Printf("[Startup] Consumer: %# v", pretty.Formatter(messageConsumer)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you use Infof instead of Printf? Just for consistency, basically it is the same. (There are some other occurrences as well.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
var consumerWaitGroup sync.WaitGroup | ||
consumerWaitGroup.Add(1) | ||
|
||
go func() { | ||
messageConsumer.Start() | ||
consumerWaitGroup.Done() | ||
}() | ||
|
||
waitForSignal() | ||
messageConsumer.Stop() | ||
consumerWaitGroup.Wait() | ||
} | ||
|
||
func (indexer *contentIndexer) handleMessage(msg consumer.Message) { | ||
|
||
tid := msg.Headers["X-Request-Id"] | ||
|
||
if strings.Contains(tid, syntheticRequestPrefix) { | ||
log.Infof("[%s] Ignoring synthetic message", tid) | ||
return | ||
} | ||
|
||
var combinedPostPublicationEvent enrichedContentModel | ||
err := json.Unmarshal([]byte(msg.Body), &combinedPostPublicationEvent) | ||
if err != nil { | ||
log.Errorf("[%s] Cannot unmarshal message body:[%v]", tid, err.Error()) | ||
return | ||
} | ||
|
||
uuid := combinedPostPublicationEvent.Content.UUID | ||
log.Printf("[%s] Processing combined post publication event for uuid [%s]", tid, uuid) | ||
|
||
var contentType string | ||
|
||
for _, identifier := range combinedPostPublicationEvent.Content.Identifiers { | ||
if strings.HasPrefix(identifier.Authority, "http://api.ft.com/system/FT-LABS-WP") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The authority urls could be extracted as constants. Maybe event the content types and origins. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
contentType = "blogPost" | ||
} else if strings.HasPrefix(identifier.Authority, "http://api.ft.com/system/FTCOM-METHODE") { | ||
contentType = "article" | ||
} else if strings.HasPrefix(identifier.Authority, "http://api.ft.com/system/BRIGHTCOVE") { | ||
contentType = "video" | ||
} | ||
} | ||
|
||
if contentType == "" { | ||
origin := msg.Headers["Origin-System-Id"] | ||
if strings.Contains(origin, "methode-web-pub") { | ||
contentType = "article" | ||
} else if strings.Contains(origin, "wordpress") { | ||
contentType = "blogPost" | ||
} else if strings.Contains(origin, "brightcove") { | ||
contentType = "video" | ||
} else { | ||
log.Errorf("[%s] Failed to index content with UUID %s. Could not infer type of content.", tid, uuid) | ||
return | ||
} | ||
} | ||
|
||
if combinedPostPublicationEvent.Content.MarkedDeleted { | ||
_, err = indexer.esServiceInstance.deleteData(contentTypeMap[contentType].collection, uuid) | ||
if err != nil { | ||
log.Errorf(err.Error()) | ||
return | ||
} | ||
} else { | ||
payload := convertToESContentModel(combinedPostPublicationEvent, contentType) | ||
|
||
_, err = indexer.esServiceInstance.writeData(contentTypeMap[contentType].collection, uuid, payload) | ||
if err != nil { | ||
log.Errorf(err.Error()) | ||
return | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we want the app PID inside the container to be 1, so that we avoid the duplicate container/orphan container issue on update.
Using this, it's not 1:
See mail
Phantom/orphaned container issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed