Skip to content

Commit

Permalink
perf: batch insertion of subject and subject digests
Browse files Browse the repository at this point in the history
In some test attestations collections I ran into some that had roughly
50k subjects. MySql was taking sometimes a minute and a half to insert
these collections.

Using the batch functionality speeds this up greatly, reducing those
insertions to around 1-2 seconds.

Signed-off-by: Mikhail Swift <mikhail@testifysec.com>
  • Loading branch information
mikhailswift committed Aug 4, 2022
1 parent 1cb368b commit 5bcf80b
Showing 1 changed file with 62 additions and 13 deletions.
75 changes: 62 additions & 13 deletions internal/metadatastorage/mysqlstore/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ import (
_ "github.com/go-sql-driver/mysql"
)

// mysql has a limit of 65536 parameters in a single query. each subject has ~2 parameters [statment id and name],
// so we can theoretically jam 65535/2 subjects in a single batch. but we probably want some breathing room just in case.
const subjectBatchSize = 30000

// mysql has a limit of 65536 parameters in a single query. each subject has ~3 parameters [subject id, algo, value],
// so we can theoretically jam 65535/3 subjects in a single batch. but we probably want some breathing room just in case.
const subjectDigestBatchSize = 20000

type Store struct {
client *ent.Client
}
Expand Down Expand Up @@ -269,25 +277,40 @@ func (s *Store) Store(ctx context.Context, gitoid string, obj []byte) error {
return err
}

bulkSubject := make([]*ent.SubjectCreate, 0)
for _, subject := range payload.Subject {
storedSubject, err := tx.Subject.Create().
SetName(subject.Name).
SetStatement(stmt).
Save(ctx)
if err != nil {
return err
}
bulkSubject = append(bulkSubject,
tx.Subject.Create().
SetName(subject.Name).
SetStatement(stmt),
)
}

subjects, err := batch(ctx, subjectBatchSize, bulkSubject, func(digests ...*ent.SubjectCreate) saver[*ent.Subject] {
return tx.Subject.CreateBulk(digests...)
})
if err != nil {
return err
}

bulkSubjectDigests := make([]*ent.SubjectDigestCreate, 0)
for i, subject := range payload.Subject {
for algorithm, value := range subject.Digest {
if err := tx.SubjectDigest.Create().
SetAlgorithm(algorithm).
SetValue(value).SetSubject(storedSubject).
Exec(ctx); err != nil {
return err
}
bulkSubjectDigests = append(bulkSubjectDigests,
tx.SubjectDigest.Create().
SetAlgorithm(algorithm).
SetValue(value).
SetSubject(subjects[i]),
)
}
}

if _, err := batch(ctx, subjectDigestBatchSize, bulkSubjectDigests, func(digests ...*ent.SubjectDigestCreate) saver[*ent.SubjectDigest] {
return tx.SubjectDigest.CreateBulk(digests...)
}); err != nil {
return err
}

collection, err := tx.AttestationCollection.Create().
SetStatementID(stmt.ID).
SetName(parsedCollection.Name).
Expand Down Expand Up @@ -319,3 +342,29 @@ func (s *Store) Store(ctx context.Context, gitoid string, obj []byte) error {
func (s *Store) GetClient() *ent.Client {
return s.client
}

type saver[T any] interface {
Save(context.Context) ([]T, error)
}

func batch[TCreate any, TResult any](ctx context.Context, batchSize int, create []TCreate, saveFn func(...TCreate) saver[TResult]) ([]TResult, error) {
results := make([]TResult, 0, len(create))
for i := 0; i < len(create); i += batchSize {
var batch []TCreate
if i+batchSize > len(create) {
batch = create[i:]
} else {
batch = create[i : i+batchSize]
}

batchSaver := saveFn(batch...)
batchResults, err := batchSaver.Save(ctx)
if err != nil {
return nil, err
}

results = append(results, batchResults...)
}

return results, nil
}

0 comments on commit 5bcf80b

Please sign in to comment.