-
Notifications
You must be signed in to change notification settings - Fork 257
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Datastore v2 #581
Datastore v2 #581
Conversation
5d5ebd8
to
ff3ac89
Compare
9e5f9ac
to
6cb191f
Compare
fa897aa
to
4ecb13a
Compare
I will look into resolving any E2E test problems, but I don't think it should hold up the review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🥹
DeleteRelationships(ctx context.Context, preconditions []*v1.Precondition, filter *v1.RelationshipFilter) (Revision, error) | ||
// ReadWriteTx tarts a read/write transaction, which will be committed if no error is | ||
// returned and rolled back if an error is returned. | ||
ReadWriteTx(context.Context, TxUserFunc) (Revision, error) | ||
|
||
// OptimizedRevision gets a revision that will likely already be replicated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment right? I can't expand it for some reason
74291f7
to
e9b7d21
Compare
// occurred when building the transaction. | ||
type TxFactory func(context.Context) (pgx.Tx, TxCleanupFunc, error) | ||
|
||
func NewPGXExecutor(txSource TxFactory) ExecuteQueryFunc { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doc comment?
|
||
cleanup := func(ctx context.Context) { | ||
if err := tx.Rollback(ctx); err != nil { | ||
log.Ctx(ctx).Err(err).Msg("error rolling back transaction") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add additional context to the error message, since it is the same message as below, and once line numbers move, it'll be harder to pin down the exact location if we see the error occur
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mean they will have different underlying errors.
internal/datastore/crdb/readwrite.go
Outdated
bulkTouch := queryTouchTuple | ||
var bulkTouchCount int64 | ||
|
||
// Process the actual updates var tupleCountChange int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var tupleCountChange int64
?
func (rwt *crdbReadWriteTXN) DeleteRelationships(filter *v1.RelationshipFilter) error { | ||
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(rwt.ctx), "DeleteRelationships") | ||
defer span.End() | ||
// Add clauses for the ResourceFilter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newline before this line
if err != nil { | ||
return fmt.Errorf(errUnableToWriteConfig, err) | ||
} | ||
query = query.Values(newConfig.Name, serialized) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might have a risk of hitting query limits here too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, there is a hard upper bound on how big our schema can get now, but it's unlikely to hit this limit without first hitting the overall schema byte limit.
return tx, err | ||
} | ||
|
||
newRevision := revisionFromTimestamp(time.Now().UTC()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
internal/datastore/memdb/memdb.go
Outdated
} | ||
if tx != nil { | ||
for _, change := range tx.Changes() { | ||
if change.Table == "relationship" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Place the table name into a const
} | ||
|
||
// caller must already hold the concurrent access lock | ||
func (rwt *memdbReadWriteTx) delete(tx *memdb.Txn, filter *v1.RelationshipFilter) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe name to indicate the caller must hold the lock?
return nil, fmt.Errorf("failed to open connection to db: %w", err) | ||
} | ||
|
||
// The go mysql driver implements the ExecerContext interface, assert that here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecerContext
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean you can see it on the next line. What do you want here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just seems a weird name, but its fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't name it, it's part of the go sql interface.
|
||
rwt := &mysqlReadWriteTXN{ | ||
&mysqlReader{ | ||
mds.QueryBuilder, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True... it just makes it harder to read, but I'd rather have the safety.
)) | ||
} | ||
|
||
func (r relationship) Relationship() *v1.Relationship { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the core package provides conversions that could remove the need for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is going from the flat struct that we use to represent a row in the memdb to the relationship.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do convert to a core rel below, so we could in theory chain that way, but not sure it is worth it
ctx, span := tracer.Start(ctx, "WriteNamespaces") | ||
defer span.End() | ||
|
||
// span.SetAttributes(common.ObjNamespaceNameKey.String(newNamespace.Name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a new key and set it to the list of namespace names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Fixes #118