Skip to content

Commit

Permalink
Add poll/vote_count as calculated field(#363)
Browse files Browse the repository at this point in the history
  • Loading branch information
ostcar authored Nov 18, 2021
1 parent adb131a commit b9c287c
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 13 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ The Service uses the following environment variables:
* `MESSAGE_BUS_PORT`: Port of the redis server. The default is `6379`.
* `REDIS_TEST_CONN`: Test the redis connection on startup. Disable on the cloud
if redis needs more time to start then this service. The default is `true`.
* `VOTE_HOST`: Host of the vote-service. The default is `localhost`.
* `VOTE_PORT`: Port of the vote-service. The default is `9013`.
* `VOTE_PROTOCAL`: Protocol of the vote-service. The default is `http`.
* `AUTH`: Sets the type of the auth service. `fake` (default) or `ticket`.
* `AUTH_HOST`: Host of the auth service. The default is `localhost`.
* `AUTH_PORT`: Port of the auth service. The default is `9004`.
Expand Down
8 changes: 7 additions & 1 deletion cmd/autoupdate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func defaultEnv() map[string]string {
"MESSAGE_BUS_PORT": "6379",
"REDIS_TEST_CONN": "true",

"VOTE_HOST": "localhost",
"VOTE_PORT": "9013",
"VOTE_PROTOCAL": "http",

"AUTH": "fake",
"AUTH_PROTOCOL": "http",
"AUTH_HOST": "localhost",
Expand Down Expand Up @@ -135,8 +139,10 @@ func run() error {
return fmt.Errorf("creating auth adapter: %w", err)
}

voteAddr := env["VOTE_PROTOCAL"] + "://" + env["VOTE_HOST"] + env["VOTE_PORT"]

// Autoupdate Service.
service := autoupdate.New(datastoreService, restrict.Middleware, ctx.Done())
service := autoupdate.New(datastoreService, restrict.Middleware, voteAddr, ctx.Done())
go service.PruneOldData(ctx)
go service.ResetCache(ctx)

Expand Down
7 changes: 6 additions & 1 deletion internal/autoupdate/autoupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Autoupdate struct {
datastore Datastore
topic *topic.Topic
restricter RestrictMiddleware
voteAddr string
}

// RestrictMiddleware is a function that can restrict data.
Expand All @@ -53,11 +54,12 @@ type RestrictMiddleware func(getter datastore.Getter, uid int) datastore.Getter
//
// The attribute closed is a channel that should be closed when the server shuts
// down. In this case, all connections get closed.
func New(datastore Datastore, restricter RestrictMiddleware, closed <-chan struct{}) *Autoupdate {
func New(datastore Datastore, restricter RestrictMiddleware, voteAddr string, closed <-chan struct{}) *Autoupdate {
a := &Autoupdate{
datastore: datastore,
topic: topic.New(topic.WithClosed(closed)),
restricter: restricter,
voteAddr: voteAddr,
}

// Update the topic when an data update is received.
Expand All @@ -71,6 +73,9 @@ func New(datastore Datastore, restricter RestrictMiddleware, closed <-chan struc
return nil
})

// Register the calculated field for vote_count.
a.datastore.RegisterCalculatedField("poll/vote_count", a.datastorePollVoteCount)

return a
}

Expand Down
10 changes: 5 additions & 5 deletions internal/autoupdate/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestConnectionEmptyData(t *testing.T) {
})
go datastore.ListenOnUpdates(shutdownCtx, datastore, func(err error) { log.Println(err) })

s := autoupdate.New(datastore, test.RestrictAllowed, shutdownCtx.Done())
s := autoupdate.New(datastore, test.RestrictAllowed, "", shutdownCtx.Done())
kb := test.KeysBuilder{K: test.Str(doesExistKey, doesNotExistKey)}

t.Run("First responce", func(t *testing.T) {
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestConnectionFilterData(t *testing.T) {
"user/1/name": []byte(`"Hello World"`),
})

s := autoupdate.New(datastore, test.RestrictAllowed, shutdownCtx.Done())
s := autoupdate.New(datastore, test.RestrictAllowed, "", shutdownCtx.Done())
kb := test.KeysBuilder{K: test.Str("user/1/name")}
next := s.Connect(1, kb)
if _, err := next(context.Background()); err != nil {
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestConntectionFilterOnlyOneKey(t *testing.T) {
})
go datastore.ListenOnUpdates(shutdownCtx, datastore, func(err error) { log.Println(err) })

s := autoupdate.New(datastore, test.RestrictAllowed, shutdownCtx.Done())
s := autoupdate.New(datastore, test.RestrictAllowed, "", shutdownCtx.Done())
kb := test.KeysBuilder{K: test.Str("user/1/name")}
next := s.Connect(1, kb)
if _, err := next(context.Background()); err != nil {
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestNextNoReturnWhenDataIsRestricted(t *testing.T) {
"user/1/name": []byte(`"Hello World"`),
})

s := autoupdate.New(datastore, test.RestrictNotAllowed, shutdownCtx.Done())
s := autoupdate.New(datastore, test.RestrictNotAllowed, "", shutdownCtx.Done())
kb := test.KeysBuilder{K: test.Str("user/1/name")}

next := s.Connect(1, kb)
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestKeyNotRequestedAnymore(t *testing.T) {
`))
go datastore.ListenOnUpdates(shutdownCtx, datastore, nil)

s := autoupdate.New(datastore, test.RestrictAllowed, shutdownCtx.Done())
s := autoupdate.New(datastore, test.RestrictAllowed, "", shutdownCtx.Done())
kb, err := keysbuilder.FromJSON(strings.NewReader(`{
"collection":"organization",
"ids":[
Expand Down
2 changes: 1 addition & 1 deletion internal/autoupdate/feature_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestFeatures(t *testing.T) {
defer cancel()

datastore := dsmock.NewMockDatastore(shutdownCtx.Done(), dsmock.YAMLData(dataSet))
service := autoupdate.New(datastore, test.RestrictAllowed, shutdownCtx.Done())
service := autoupdate.New(datastore, test.RestrictAllowed, "", shutdownCtx.Done())

for _, tt := range []struct {
name string
Expand Down
2 changes: 2 additions & 0 deletions internal/autoupdate/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type Datastore interface {
Get(ctx context.Context, keys ...string) (map[string][]byte, error)
RegisterChangeListener(f func(map[string][]byte) error)
ResetCache()
RegisterCalculatedField(field string, f func(ctx context.Context, key string, changed map[string][]byte) ([]byte, error))
RequestKeys(url string, keys []string) (map[string][]byte, error)
}

// KeysBuilder holds the keys that are requested by a user.
Expand Down
2 changes: 1 addition & 1 deletion internal/autoupdate/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func getConnection(closed <-chan struct{}) (autoupdate.DataProvider, *dsmock.Moc
datastore := dsmock.NewMockDatastore(closed, map[string][]byte{
"user/1/name": []byte(`"Hello World"`),
})
s := autoupdate.New(datastore, test.RestrictAllowed, closed)
s := autoupdate.New(datastore, test.RestrictAllowed, "", closed)
kb := test.KeysBuilder{K: test.Str("user/1/name")}
next := s.Connect(1, kb)

Expand Down
20 changes: 20 additions & 0 deletions internal/autoupdate/poll_vote_count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package autoupdate

import (
"context"
"fmt"
)

const path = "/internal/vote/vote_count"

func (a *Autoupdate) datastorePollVoteCount(ctx context.Context, fqfield string, changed map[string][]byte) ([]byte, error) {
if changed != nil {
return changed[fqfield], nil
}

values, err := a.datastore.RequestKeys(a.voteAddr+path, []string{fqfield})
if err != nil {
return nil, fmt.Errorf("loading key %q from vote-service: %w", fqfield, err)
}
return values[fqfield], nil
}
8 changes: 4 additions & 4 deletions pkg/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (d *Datastore) ListenOnUpdates(ctx context.Context, keychanger Updater, err
func (d *Datastore) loadKeys(keys []string, set func(string, []byte)) error {
calculatedKeys, normalKeys := d.splitCalculatedKeys(keys)
if len(normalKeys) > 0 {
data, err := d.requestKeys(normalKeys)
data, err := d.RequestKeys(d.url, normalKeys)
if err != nil {
return fmt.Errorf("requesting keys from datastore: %w", err)
}
Expand Down Expand Up @@ -218,19 +218,19 @@ func (d *Datastore) calculateField(field string, key string, updated map[string]

}

// requestKeys request a list of keys by the datastore.
// RequestKeys request a list of keys from the datastore.
//
// If an error happens, no key is returned.
//
// The returned map contains exacply the given keys. If a key does not exist in
// the datastore, then the value of this key is <nil>.
func (d *Datastore) requestKeys(keys []string) (map[string][]byte, error) {
func (d *Datastore) RequestKeys(url string, keys []string) (map[string][]byte, error) {
requestData, err := keysToGetManyRequest(keys)
if err != nil {
return nil, fmt.Errorf("creating GetManyRequest: %w", err)
}

req, err := http.NewRequest("POST", d.url, bytes.NewReader(requestData))
req, err := http.NewRequest("POST", url, bytes.NewReader(requestData))
if err != nil {
return nil, fmt.Errorf("creating request: %w", err)
}
Expand Down

0 comments on commit b9c287c

Please sign in to comment.