Skip to content

Commit

Permalink
History (#440)
Browse files Browse the repository at this point in the history
  • Loading branch information
ostcar committed Mar 19, 2022
1 parent 743303f commit 77644ab
Show file tree
Hide file tree
Showing 51 changed files with 1,239 additions and 172 deletions.
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@ without a newline:
{"user/1/name":"value","user/2/name":"value"}
```

With the query parameter `single` the server writes the first response and
closes the request immediately. So there are not autoupdates:

`curl -N localhost:9012/system/autoupdate?k=user/1/username&single=1`

With the query parameter `position=XX` it is possible to request the data at a
specific position from the datastore. This implieds `single`:

`curl -N localhost:9012/system/autoupdate?k=user/1/username&position=42`


### Updates via redis

Expand Down Expand Up @@ -142,6 +152,26 @@ curl -N localhost:9012/system/autoupdate -d '
]'
```

### History Information

To get all history information for an fqid call

`curl localhost:9012/system/autoupdate/history_information?fqid=motion/42`

It returns a list of all changes to the requested fqid. Each element in the list
is an object like this:

```
{
"position": 23,
"user_id": 5,
"information": "motion was created",
"timestamp: 1234567
}
```



## Configuration

### Environment variables
Expand Down
17 changes: 11 additions & 6 deletions cmd/autoupdate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ func run() error {
}
go datastoreService.ListenOnUpdates(ctx, errHandler)

// Create http mux to add urls.
mux := http.NewServeMux()

// Auth Service.
authService, err := buildAuth(ctx, env, messageBus, errHandler)
if err != nil {
Expand All @@ -146,8 +143,12 @@ func run() error {
go service.PruneOldData(ctx)
go service.ResetCache(ctx)

// Create http mux to add urls.
mux := http.NewServeMux()

autoupdateHttp.Health(mux)
autoupdateHttp.Autoupdate(mux, authService, service, messageBus)
autoupdateHttp.HistoryInformation(mux, authService, service)
autoupdateHttp.MetricRequest(mux, messageBus)

// Projector Service.
Expand Down Expand Up @@ -201,9 +202,13 @@ func buildDatastore(env map[string]string, mb messageBus) (*datastore.Datastore,

voteCountSource := datastore.NewVoteCountSource(env["VOTE_PROTOCAL"] + "://" + env["VOTE_HOST"] + ":" + env["VOTE_PORT"])

return datastore.New(datastoreSource, map[string]datastore.Source{
"poll/vote_count": voteCountSource,
}), nil
return datastore.New(
datastoreSource,
map[string]datastore.Source{
"poll/vote_count": voteCountSource,
},
datastoreSource,
), nil
}

// buildMessagebus builds the receiver needed by the datastore service. It uses
Expand Down
87 changes: 87 additions & 0 deletions internal/autoupdate/autoupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@ package autoupdate
import (
"context"
"fmt"
"io"
"strconv"
"strings"
"time"

"github.com/OpenSlides/openslides-autoupdate-service/internal/restrict"
"github.com/OpenSlides/openslides-autoupdate-service/internal/restrict/collection"
"github.com/OpenSlides/openslides-autoupdate-service/internal/restrict/perm"
"github.com/OpenSlides/openslides-autoupdate-service/pkg/datastore"
"github.com/ostcar/topic"
)
Expand Down Expand Up @@ -93,6 +99,29 @@ func (a *Autoupdate) Connect(userID int, kb KeysBuilder) DataProvider {
return c.Next
}

// SingleData returns the data for the kb. It is the same as calling Connect and
// then Next for the first time.
func (a *Autoupdate) SingleData(ctx context.Context, userID int, kb KeysBuilder, position int) (map[string][]byte, error) {
var getter datastore.Getter = a.datastore
var restricter datastore.Getter = a.restricter(getter, userID)

if position != 0 {
getter = datastore.NewGetPosition(a.datastore, position)
restricter = restrict.NewHistory(userID, a.datastore, getter)
}

if err := kb.Update(ctx, restricter); err != nil {
return nil, fmt.Errorf("create keys for keysbuilder: %w", err)
}

data, err := restricter.Get(ctx, kb.Keys()...)
if err != nil {
return nil, fmt.Errorf("get restricted data: %w", err)
}

return data, nil
}

// LastID returns the id of the last data update.
func (a *Autoupdate) LastID() uint64 {
return a.topic.LastID()
Expand Down Expand Up @@ -131,3 +160,61 @@ func (a *Autoupdate) ResetCache(ctx context.Context) {
}
}
}

// HistoryInformation writes the history information for an fqid.
func (a *Autoupdate) HistoryInformation(ctx context.Context, uid int, fqid string, w io.Writer) error {
coll, rawID, found := strings.Cut(fqid, "/")
if !found {
return fmt.Errorf("invalid fqid")
}

id, err := strconv.Atoi(rawID)
if err != nil {
return fmt.Errorf("invalid fqid. ID part is not an int")
}

ds := datastore.NewRequest(a.datastore)

meetingID, hasMeeting, err := collection.Collection(coll).MeetingID(ctx, ds, id)
if err != nil {
return fmt.Errorf("getting meeting id for collection %s id %d: %w", coll, id, err)
}

if !hasMeeting {
hasOML, err := perm.HasOrganizationManagementLevel(ctx, ds, uid, perm.OMLCanManageOrganization)
if err != nil {
return fmt.Errorf("getting organization management level: %w", err)
}

if !hasOML {
return permissionDeniedError{fmt.Errorf("you are not allowed to use history information on %s", fqid)}
}
} else {
p, err := perm.New(ctx, ds, uid, meetingID)
if err != nil {
return fmt.Errorf("getting meeting permissions: %w", err)
}

if !p.Has(perm.MeetingCanSeeHistory) {
return permissionDeniedError{fmt.Errorf("you are not allowed to use history information on %s", fqid)}
}
}

if err := a.datastore.HistoryInformation(ctx, fqid, w); err != nil {
return fmt.Errorf("getting history information: %w", err)
}

return nil
}

type permissionDeniedError struct {
err error
}

func (e permissionDeniedError) Error() string {
return fmt.Sprintf("permissoin denied: %v", e.err)
}

func (e permissionDeniedError) Type() string {
return "permission_denied"
}
3 changes: 3 additions & 0 deletions internal/autoupdate/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@ package autoupdate

import (
"context"
"io"

"github.com/OpenSlides/openslides-autoupdate-service/pkg/datastore"
)

// Datastore gets values for keys and informs, if they change.
type Datastore interface {
Get(ctx context.Context, keys ...string) (map[string][]byte, error)
GetPosition(ctx context.Context, position int, keys ...string) (map[string][]byte, error)
RegisterChangeListener(f func(map[string][]byte) error)
ResetCache()
RegisterCalculatedField(
field string,
f func(ctx context.Context, key string, changed map[string][]byte) ([]byte, error),
)
HistoryInformation(ctx context.Context, fqid string, w io.Writer) error
}

// KeysBuilder holds the keys that are requested by a user.
Expand Down
84 changes: 58 additions & 26 deletions internal/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"log"
"net/http"
"strconv"
"strings"

"github.com/OpenSlides/openslides-autoupdate-service/internal/autoupdate"
Expand All @@ -24,6 +25,7 @@ const (
// Connecter returns an connect object.
type Connecter interface {
Connect(userID int, kb autoupdate.KeysBuilder) autoupdate.DataProvider
SingleData(ctx context.Context, userID int, kb autoupdate.KeysBuilder, position int) (map[string][]byte, error)
}

// RequestMetricer saves metrics about requests.
Expand Down Expand Up @@ -68,12 +70,37 @@ func Autoupdate(mux *http.ServeMux, auth Authenticater, connecter Connecter, met

builder := keysbuilder.FromBuilders(queryBuilder, bodyBuilder)

sender := sendMessages
if r.URL.Query().Has("single") {
sender = sendSingleMessage
rawPosition := r.URL.Query().Get("position")
position := 0
if rawPosition != "" {
p, err := strconv.Atoi(rawPosition)
if err != nil {
handleError(w, invalidRequestError{fmt.Errorf("position has to be a number, not %s", rawPosition)}, true)
return
}
position = p
}

if r.URL.Query().Has("single") || position != 0 {
data, err := connecter.SingleData(r.Context(), uid, builder, position)
if err != nil {
handleError(w, fmt.Errorf("getting single data: %w", err), true)
return
}

converted := make(map[string]json.RawMessage, len(data))
for k, v := range data {
converted[k] = v
}

if err := json.NewEncoder(w).Encode(converted); err != nil {
handleError(w, fmt.Errorf("encoding end sending next message: %w", err), true)
return
}
return
}

if err := sender(r.Context(), w, uid, builder, connecter); err != nil {
if err := sendMessages(r.Context(), w, uid, builder, connecter); err != nil {
handleError(w, err, false)
return
}
Expand All @@ -82,6 +109,33 @@ func Autoupdate(mux *http.ServeMux, auth Authenticater, connecter Connecter, met
mux.Handle(prefixPublic, validRequest(authMiddleware(handler, auth)))
}

// HistoryInformationer is an object, that can write the history information for
// an object.
type HistoryInformationer interface {
HistoryInformation(ctx context.Context, uid int, fqid string, w io.Writer) error
}

// HistoryInformation registers the route to return the history information info
// for an fqid.
func HistoryInformation(mux *http.ServeMux, auth Authenticater, hi HistoryInformationer) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
uid := auth.FromContext(r.Context())

fqid := r.URL.Query().Get("fqid")
if fqid == "" {
handleError(w, invalidRequestError{fmt.Errorf("History Information needs an fqid")}, true)
return
}

if err := hi.HistoryInformation(r.Context(), uid, fqid, w); err != nil {
handleError(w, fmt.Errorf("getting history information: %w", err), true)
return
}
})

mux.Handle(prefixPublic+"/history_information", authMiddleware(handler, auth))
}

// MetricRequest returns the request metrics.
func MetricRequest(mux *http.ServeMux, metric RequestMetricer) {
mux.Handle(prefixInternal+"/metric/request", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -118,28 +172,6 @@ func sendMessages(ctx context.Context, w io.Writer, uid int, kb autoupdate.KeysB
return ctx.Err()
}

func sendSingleMessage(ctx context.Context, w io.Writer, uid int, kb autoupdate.KeysBuilder, connecter Connecter) error {
next := connecter.Connect(uid, kb)
encoder := json.NewEncoder(w)

// conn.Next() blocks, until there is new data. It also unblocks,
// when the client context or the server is closed.
data, err := next(ctx)
if err != nil {
return fmt.Errorf("getting next message: %w", err)
}

converted := make(map[string]json.RawMessage, len(data))
for k, v := range data {
converted[k] = v
}

if err := encoder.Encode(converted); err != nil {
return fmt.Errorf("encoding end sending next message: %w", err)
}
return nil
}

// Health tells, if the service is running.
func Health(mux *http.ServeMux) {
url := prefixPublic + "/health"
Expand Down
Loading

0 comments on commit 77644ab

Please sign in to comment.