-
Notifications
You must be signed in to change notification settings - Fork 246
/
schema.go
140 lines (117 loc) · 4.33 KB
/
schema.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package v1
import (
"context"
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
grpcvalidate "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/internal/middleware"
datastoremw "github.com/authzed/spicedb/internal/middleware/datastore"
"github.com/authzed/spicedb/internal/middleware/usagemetrics"
"github.com/authzed/spicedb/internal/services/shared"
"github.com/authzed/spicedb/pkg/datastore"
dispatchv1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
"github.com/authzed/spicedb/pkg/schemadsl/compiler"
"github.com/authzed/spicedb/pkg/schemadsl/generator"
"github.com/authzed/spicedb/pkg/schemadsl/input"
"github.com/authzed/spicedb/pkg/zedtoken"
)
// NewSchemaServer creates a SchemaServiceServer instance.
func NewSchemaServer(additiveOnly bool) v1.SchemaServiceServer {
return &schemaServer{
WithServiceSpecificInterceptors: shared.WithServiceSpecificInterceptors{
Unary: middleware.ChainUnaryServer(
grpcvalidate.UnaryServerInterceptor(),
usagemetrics.UnaryServerInterceptor(),
),
Stream: middleware.ChainStreamServer(
grpcvalidate.StreamServerInterceptor(),
usagemetrics.StreamServerInterceptor(),
),
},
additiveOnly: additiveOnly,
}
}
type schemaServer struct {
v1.UnimplementedSchemaServiceServer
shared.WithServiceSpecificInterceptors
additiveOnly bool
}
func (ss *schemaServer) rewriteError(ctx context.Context, err error) error {
return shared.RewriteError(ctx, err, nil)
}
func (ss *schemaServer) ReadSchema(ctx context.Context, _ *v1.ReadSchemaRequest) (*v1.ReadSchemaResponse, error) {
// Schema is always read from the head revision.
ds := datastoremw.MustFromContext(ctx)
headRevision, err := ds.HeadRevision(ctx)
if err != nil {
return nil, ss.rewriteError(ctx, err)
}
reader := ds.SnapshotReader(headRevision)
nsDefs, err := reader.ListAllNamespaces(ctx)
if err != nil {
return nil, ss.rewriteError(ctx, err)
}
caveatDefs, err := reader.ListAllCaveats(ctx)
if err != nil {
return nil, ss.rewriteError(ctx, err)
}
if len(nsDefs) == 0 {
return nil, status.Errorf(codes.NotFound, "No schema has been defined; please call WriteSchema to start")
}
schemaDefinitions := make([]compiler.SchemaDefinition, 0, len(nsDefs)+len(caveatDefs))
for _, caveatDef := range caveatDefs {
schemaDefinitions = append(schemaDefinitions, caveatDef.Definition)
}
for _, nsDef := range nsDefs {
schemaDefinitions = append(schemaDefinitions, nsDef.Definition)
}
schemaText, _, err := generator.GenerateSchema(schemaDefinitions)
if err != nil {
return nil, ss.rewriteError(ctx, err)
}
usagemetrics.SetInContext(ctx, &dispatchv1.ResponseMeta{
DispatchCount: uint32(len(nsDefs) + len(caveatDefs)),
})
return &v1.ReadSchemaResponse{
SchemaText: schemaText,
ReadAt: zedtoken.MustNewFromRevision(headRevision),
}, nil
}
func (ss *schemaServer) WriteSchema(ctx context.Context, in *v1.WriteSchemaRequest) (*v1.WriteSchemaResponse, error) {
log.Ctx(ctx).Trace().Str("schema", in.GetSchema()).Msg("requested Schema to be written")
ds := datastoremw.MustFromContext(ctx)
// Compile the schema into the namespace definitions.
emptyDefaultPrefix := ""
compiled, err := compiler.Compile(compiler.InputSchema{
Source: input.Source("schema"),
SchemaString: in.GetSchema(),
}, &emptyDefaultPrefix)
if err != nil {
return nil, ss.rewriteError(ctx, err)
}
log.Ctx(ctx).Trace().Int("objectDefinitions", len(compiled.ObjectDefinitions)).Int("caveatDefinitions", len(compiled.CaveatDefinitions)).Msg("compiled namespace definitions")
// Do as much validation as we can before talking to the datastore.
validated, err := shared.ValidateSchemaChanges(ctx, compiled, ss.additiveOnly)
if err != nil {
return nil, ss.rewriteError(ctx, err)
}
// Update the schema.
revision, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error {
applied, err := shared.ApplySchemaChanges(ctx, rwt, validated)
if err != nil {
return err
}
usagemetrics.SetInContext(ctx, &dispatchv1.ResponseMeta{
DispatchCount: applied.TotalOperationCount,
})
return nil
})
if err != nil {
return nil, ss.rewriteError(ctx, err)
}
return &v1.WriteSchemaResponse{
WrittenAt: zedtoken.MustNewFromRevision(revision),
}, nil
}