Skip to content

Commit

Permalink
Merge pull request #15 from Financial-Times/feature/improve-query
Browse files Browse the repository at this point in the history
Improve Mongo Query
  • Loading branch information
peteclark-ft committed Nov 23, 2016
2 parents f40a775 + b9b88ca commit c80cdd4
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 31 deletions.
20 changes: 20 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ func (db *MongoDB) Open() (TX, error) {
return &MongoTX{db.session.Copy()}, nil
}

func (tx *MongoTX) EnsureIndices() error {
collection := tx.session.DB("upp-store").C("list-notifications")
lastModifiedIndex := mgo.Index{
Name: "last-modified-index",
Key: []string{"-lastModified"},
}
err := collection.EnsureIndex(lastModifiedIndex)

if err != nil {
return err
}

uuidIndex := mgo.Index{
Name: "uuid-index",
Key: []string{"uuid"},
}
return collection.EnsureIndex(uuidIndex)
}

// Close closes the entire database connection
func (db *MongoDB) Close() {
db.session.Close()
Expand Down Expand Up @@ -90,6 +109,7 @@ type DB interface {
type TX interface {
WriteNotification(notification *model.InternalNotification)
ReadNotifications(offset int, since time.Time) (*[]model.InternalNotification, error)
EnsureIndices() error
Ping() error
Close()
}
Expand Down
61 changes: 32 additions & 29 deletions db/query.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,59 @@
package db

import (
"encoding/json"
"time"

"github.com/Sirupsen/logrus"

"gopkg.in/mgo.v2/bson"
)

func generateQuery(offset int, since time.Time) []bson.M {
match := getMatch(offset, since)

group := bson.M{
"$group": bson.M{
"_id": bson.M{
"uuid": "$uuid",
},
"lastModified": bson.M{
"$max": "$lastModified",
},
"uuid": bson.M{
"$first": "$uuid",
},
"title": bson.M{
"$first": "$title",
},
"eventType": bson.M{
"$first": "$eventType",
},
"publishReference": bson.M{
"$first": "$publishReference",
},
},
}

pipeline := []bson.M{
match,
match, // get all records that exist between the start and end dates
{
"$sort": bson.M{
"lastModified": -1,
"uuid": 1,
},
},
group,
}, // sort most recent notifications first
{
"$group": bson.M{
"_id": "$uuid", // group all notifications together by uuid...
"uuid": bson.M{
"$first": "$uuid",
},
"title": bson.M{
"$first": "$title",
},
"eventType": bson.M{
"$first": "$eventType",
},
"publishReference": bson.M{
"$first": "$publishReference",
},
"lastModified": bson.M{
"$first": "$lastModified",
},
},
}, //... and create one notification based on the most recent fields (the "first" notification's fields)
{
"$sort": bson.M{
"lastModified": 1,
"_id": 1,
"uuid": 1,
},
},
}, // sort by oldest first, and to ensure strict ordering, also sort by uuid when lastModified dates match
{"$skip": offset},
{"$limit": maxLimit + 1},
}

j, err := json.Marshal(pipeline)
if err == nil { // Use /__log/debug endpoint to see the full query.
logrus.WithField("query", string(j)).Debug("Full query.")
}

return pipeline
}

Expand Down
7 changes: 6 additions & 1 deletion db/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ import (
"testing"
"time"

"github.com/Sirupsen/logrus"
"github.com/stretchr/testify/assert"
"gopkg.in/mgo.v2/bson"
)

func init() {
logrus.SetLevel(logrus.DebugLevel)
}

func TestShiftSince(t *testing.T) {
cacheDelay = 42
now := time.Now()
Expand Down Expand Up @@ -51,7 +56,7 @@ func TestQuery(t *testing.T) {

query := generateQuery(50, since)

regex := regexp.MustCompile(`\[\{"\$match":\{"lastModified":\{"\$gte":\{"\$date":".*"},"\$lte":\{"\$date":".*"}}}},\{"\$sort":\{"lastModified":-1,"uuid":1}},\{"\$group":\{"_id":\{"uuid":"\$uuid"},"eventType":\{"\$first":"\$eventType"},"lastModified":\{"\$max":"\$lastModified"},"publishReference":\{"\$first":"\$publishReference"},"title":\{"\$first":"\$title"},"uuid":\{"\$first":"\$uuid"}}},\{"\$sort":\{"_id":1,"lastModified":1}},\{"\$skip":50},\{"\$limit":103}]`)
regex := regexp.MustCompile(`\[\{"\$match":\{"lastModified":\{"\$gte":\{"\$date":".*"},"\$lte":\{"\$date":".*"}}}},\{"\$sort":\{"lastModified":-1}},\{"\$group":\{"_id":"\$uuid","eventType":\{"\$first":"\$eventType"},"lastModified":\{"\$first":"\$lastModified"},"publishReference":\{"\$first":"\$publishReference"},"title":\{"\$first":"\$title"},"uuid":\{"\$first":"\$uuid"}}},\{"\$sort":\{"lastModified":1,"uuid":1}},\{"\$skip":50},\{"\$limit":103}]`)
data, _ := bson.MarshalJSON(query)
assert.True(t, regex.MatchString(string(data)), "Query json should match!")
}
22 changes: 21 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func main() {
app.Before = altsrc.InitInputSourceWithContext(flags, altsrc.NewYamlSourceFromFlagFunc("config"))
app.Flags = flags

app.Action = func(ctx *cli.Context) {
app.Action = func(ctx *cli.Context) error {
logrus.Info("Initialising MongoDB.")
mongo := &db.MongoDB{
Urls: ctx.String("db"),
Timeout: ctx.Int("db-connect-timeout"),
Expand All @@ -89,6 +90,21 @@ func main() {

defer mongo.Close()

logrus.Info("Opening initial connection to Mongo.")
tx, err := mongo.Open()
if err != nil {
return err
}

logrus.Info("Ensuring Mongo indices are setup...")
err = tx.EnsureIndices()
if err != nil {
return err
}

logrus.Info("Finished ensuring indices.")
tx.Close()

mapper := mapping.DefaultMapper{ApiHost: ctx.String("api-host")}

nextLink := mapping.OffsetNextLink{
Expand All @@ -98,6 +114,7 @@ func main() {
}

server(ctx.Int("port"), ctx.Int("max-since-interval"), ctx.Bool("dump-requests"), mapper, nextLink, mongo)
return nil
}

app.Run(os.Args)
Expand All @@ -112,6 +129,9 @@ func server(port int, maxSinceInterval int, dumpRequests bool, mapper mapping.No
r.HandleFunc("/lists/{uuid}", write).Methods("PUT")

r.HandleFunc("/__health", resources.Health(db))

r.HandleFunc("/__log", resources.UpdateLogLevel()).Methods("POST")

r.HandleFunc(status.GTGPath, resources.GTG(db))

r.HandleFunc(status.PingPath, status.PingHandler)
Expand Down
41 changes: 41 additions & 0 deletions resources/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package resources

import (
"encoding/json"
"net/http"

"github.com/Sirupsen/logrus"
)

// UpdateLogLevel changes the logrus log level dynamically.
func UpdateLogLevel() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
level := struct {
Level string `json:"level"`
}{}

dec := json.NewDecoder(r.Body)
err := dec.Decode(&level)

if err != nil {
writeError("Please specify one of [debug, info]", 400, w)
return
}

switch level.Level {
case "debug":
logrus.SetLevel(logrus.DebugLevel)
logrus.Debug("Log level updated to debug.")
break
case "info":
logrus.SetLevel(logrus.InfoLevel)
logrus.Info("Log level updated to info.")
break
default:
writeError("Please specify one of [debug, info]", 400, w)
return
}

writeMessage(200, "Log level changed to "+level.Level, w)
}
}
53 changes: 53 additions & 0 deletions resources/logging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package resources

import (
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/Sirupsen/logrus"
"github.com/stretchr/testify/assert"
)

func TestDebugLogLevel(t *testing.T) {
req, _ := http.NewRequest("POST", "http://our.host.name/__log", strings.NewReader(`{"level": "debug"}`))

w := httptest.NewRecorder()
UpdateLogLevel()(w, req)

assert.Equal(t, 200, w.Code)
assert.Equal(t, logrus.DebugLevel, logrus.GetLevel())
}

func TestInfoLogLevel(t *testing.T) {
req, _ := http.NewRequest("POST", "http://our.host.name/__log", strings.NewReader(`{"level": "info"}`))

w := httptest.NewRecorder()
UpdateLogLevel()(w, req)

assert.Equal(t, 200, w.Code)
assert.Equal(t, logrus.InfoLevel, logrus.GetLevel())
}

func TestUnsupportedLogLevel(t *testing.T) {
expected := logrus.GetLevel()
req, _ := http.NewRequest("POST", "http://our.host.name/__log", strings.NewReader(`{"level": "warn"}`))

w := httptest.NewRecorder()
UpdateLogLevel()(w, req)

assert.Equal(t, 400, w.Code)
assert.Equal(t, expected, logrus.GetLevel())
}

func TestLevelNotJson(t *testing.T) {
expected := logrus.GetLevel()
req, _ := http.NewRequest("POST", "http://our.host.name/__log", strings.NewReader(`{"level": "where's my closing quote?}`))

w := httptest.NewRecorder()
UpdateLogLevel()(w, req)

assert.Equal(t, 400, w.Code)
assert.Equal(t, expected, logrus.GetLevel())
}
5 changes: 5 additions & 0 deletions resources/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ func (m *MockTX) WriteNotification(notification *model.InternalNotification) {
m.Called(notification)
}

func (m *MockTX) EnsureIndices() error {
m.Called()
return nil
}

func (m *MockTX) Close() {
m.Called()
}

0 comments on commit c80cdd4

Please sign in to comment.