Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate token parsing from storage #652

Merged
merged 2 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ clean:
docker container prune -f
rm -rf ./integration/token/fungible/dlog/cmd/
rm -rf ./integration/token/fungible/dloghsm/cmd/
rm -rf ./integration/token/fungible/dlogstress/cmd/
rm -rf ./integration/token/fungible/fabtoken/cmd/
rm -rf ./integration/token/fungible/odlog/cmd/
rm -rf ./integration/token/fungible/ofabtoken/cmd/
Expand Down
6 changes: 2 additions & 4 deletions token/services/db/driver/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,8 @@ type CertificationDB interface {
}

type TokenDBTransaction interface {
// TransactionExists returns true if a token with that transaction id exists in the db
TransactionExists(id string) (bool, error)
// GetToken returns the owned tokens and their identifier keys for the passed ids.
GetToken(txID string, index uint64, includeDeleted bool) (*token.Token, []string, error)
// OwnersOf returns the list of owner of a given token
OwnersOf(txID string, index uint64) ([]string, error)
// Delete marks the passed token as deleted by a given identifier (idempotent)
Delete(txID string, index uint64, deletedBy string) error
// StoreToken stores the passed token record in relation to the passed owner identifiers, if any
Expand Down Expand Up @@ -155,6 +151,8 @@ type TokenDB interface {
GetTokens(inputs ...*token.ID) ([]string, []*token.Token, error)
// WhoDeletedTokens for each id, the function return if it was deleted and by who as per the Delete function
WhoDeletedTokens(inputs ...*token.ID) ([]string, []bool, error)
// TransactionExists returns true if a token with that transaction id exists in the db
TransactionExists(id string) (bool, error)
// StorePublicParams stores the public parameters
StorePublicParams(raw []byte) error
// PublicParams returns the stored public parameters
Expand Down
39 changes: 36 additions & 3 deletions token/services/db/sql/identity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"path"
"reflect"
"sync"
"testing"

"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/cache/secondcache"
Expand Down Expand Up @@ -72,9 +73,10 @@ var IdentityCases = []struct {
Name string
Fn func(*testing.T, *IdentityDB)
}{
{"TIdentityInfo", TIdentityInfo},
{"TSignerInfo", TSignerInfo},
{"TConfigurations", TConfigurations},
{"IdentityInfo", TIdentityInfo},
{"SignerInfo", TSignerInfo},
{"Configurations", TConfigurations},
{"SignerInfoConcurrent", TSignerInfoConcurrent},
}

func TConfigurations(t *testing.T, db *IdentityDB) {
Expand Down Expand Up @@ -146,3 +148,34 @@ func TSignerInfo(t *testing.T, db *IdentityDB) {
assert.NoError(t, err, "failed to check signer info existence for [%s]", bob)
assert.False(t, exists)
}

func TSignerInfoConcurrent(t *testing.T, db *IdentityDB) {
wg := sync.WaitGroup{}
n := 100
wg.Add(n)

for i := 0; i < n; i++ {
go func(i int) {
alice := []byte(fmt.Sprintf("alice_%d", i))
bob := []byte(fmt.Sprintf("bob_%d", i))
assert.NoError(t, db.StoreSignerInfo(alice, nil))
exists, err := db.SignerInfoExists(alice)
assert.NoError(t, err, "failed to check signer info existence for [%s]", alice)
assert.True(t, exists)

t.Log(i)
exists, err = db.SignerInfoExists(bob)
assert.NoError(t, err, "failed to check signer info existence for [%s]", bob)
assert.False(t, exists)
wg.Done()
}(i)
}
wg.Wait()

for i := 0; i < n; i++ {
alice := []byte(fmt.Sprintf("alice_%d", i))
exists, err := db.SignerInfoExists(alice)
assert.NoError(t, err, "failed to check signer info existence for [%s]", alice)
assert.True(t, exists)
}
}
61 changes: 16 additions & 45 deletions token/services/db/sql/tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,22 @@ func (db *TokenDB) WhoDeletedTokens(inputs ...*token.ID) ([]string, []bool, erro
return spentBy, isSpent, nil
}

func (db *TokenDB) TransactionExists(id string) (bool, error) {
query := fmt.Sprintf("SELECT tx_id FROM %s WHERE tx_id=$1 LIMIT 1;", db.table.Tokens)
logger.Debug(query, id)

row := db.db.QueryRow(query, id)
var found string
if err := row.Scan(&found); err != nil {
if err == sql.ErrNoRows {
return false, nil
}
logger.Warnf("tried to check transaction existence for id %s, err %s", id, err)
return false, err
}
return true, nil
}

func (db *TokenDB) StorePublicParams(raw []byte) error {
now := time.Now().UTC()
query := fmt.Sprintf("INSERT INTO %s (raw, stored_at) VALUES ($1, $2)", db.table.PublicParams)
Expand Down Expand Up @@ -783,22 +799,6 @@ type TokenTransaction struct {
tx *sql.Tx
}

func (t *TokenTransaction) TransactionExists(id string) (bool, error) {
query := fmt.Sprintf("SELECT tx_id FROM %s WHERE tx_id=$1 LIMIT 1;", t.db.table.Tokens)
logger.Debug(query, id)

row := t.tx.QueryRow(query, id)
var found string
if err := row.Scan(&found); err != nil {
if err == sql.ErrNoRows {
return false, nil
}
logger.Warnf("tried to check transaction existence for id %s, err %s", id, err)
return false, err
}
return true, nil
}

func (t *TokenTransaction) GetToken(txID string, index uint64, includeDeleted bool) (*token.Token, []string, error) {
where, join, args := tokenQuerySql(driver.QueryTokenDetailsParams{
IDs: []*token.ID{{TxId: txID, Index: index}},
Expand Down Expand Up @@ -841,35 +841,6 @@ func (t *TokenTransaction) GetToken(txID string, index uint64, includeDeleted bo
}, owners, nil
}

func (t *TokenTransaction) OwnersOf(txID string, index uint64) ([]string, error) {
args := make([]interface{}, 0)
tokenIDs := []*token.ID{{TxId: txID, Index: index}}
where := whereTokenIDs(&args, tokenIDs)
query := fmt.Sprintf("SELECT enrollment_id FROM %s WHERE %s", t.db.table.Ownership, where)
logger.Debug(query, args)
rows, err := t.tx.Query(query, args...)
if err != nil {
return nil, err
}
defer rows.Close()

var owners []string
for rows.Next() {
var owner string
if err := rows.Scan(&owner); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, err
}
owners = append(owners, owner)
}
if rows.Err() != nil {
return nil, rows.Err()
}
return owners, nil
}

func (t *TokenTransaction) Delete(txID string, index uint64, deletedBy string) error {
logger.Debugf("delete token [%s:%d:%s]", txID, index, deletedBy)
// We don't delete audit tokens, and we keep the 'ownership' relation.
Expand Down
66 changes: 34 additions & 32 deletions token/services/tokens/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (d *DBStorage) NewTransaction() (*transaction, error) {
}, nil
}

func (d *DBStorage) TransactionExists(id string) (bool, error) {
return d.tokenDB.TransactionExists(id)
}

func (d *DBStorage) StorePublicParams(raw []byte) error {
return d.tokenDB.StorePublicParams(raw)
}
Expand Down Expand Up @@ -105,56 +109,58 @@ func (t *transaction) DeleteTokens(deletedBy string, ids []*token2.ID) error {
return nil
}

func (t *transaction) AppendToken(
txID string,
index uint64,
tok *token2.Token,
tokenOnLedger []byte,
tokenOnLedgerMetadata []byte,
ids []string,
issuer token.Identity,
precision uint64,
flags Flags,
) error {
q, err := token2.ToQuantity(tok.Quantity, precision)
type TokenToAppend struct {
txID string
index uint64
tok *token2.Token
tokenOnLedger []byte
tokenOnLedgerMetadata []byte
owners []string
issuer token.Identity
precision uint64
flags Flags
}

func (t *transaction) AppendToken(tta TokenToAppend) error {
q, err := token2.ToQuantity(tta.tok.Quantity, tta.precision)
if err != nil {
return errors.Wrapf(err, "cannot covert [%s] with precision [%d]", tok.Quantity, precision)
return errors.Wrapf(err, "cannot covert [%s] with precision [%d]", tta.tok.Quantity, tta.precision)
}

typ, id, err := t.ote.OwnerType(tok.Owner.Raw)
typ, id, err := t.ote.OwnerType(tta.tok.Owner.Raw)
if err != nil {
logger.Errorf("could not unmarshal identity when storing token: %s", err.Error())
return errors.Wrap(err, "could not unmarshal identity when storing token")
}

err = t.tx.StoreToken(
tokendb.TokenRecord{
TxID: txID,
Index: index,
IssuerRaw: issuer,
OwnerRaw: tok.Owner.Raw,
TxID: tta.txID,
Index: tta.index,
IssuerRaw: tta.issuer,
OwnerRaw: tta.tok.Owner.Raw,
OwnerType: typ,
OwnerIdentity: id,
Ledger: tokenOnLedger,
LedgerMetadata: tokenOnLedgerMetadata,
Quantity: tok.Quantity,
Type: tok.Type,
Ledger: tta.tokenOnLedger,
LedgerMetadata: tta.tokenOnLedgerMetadata,
Quantity: tta.tok.Quantity,
Type: tta.tok.Type,
Amount: q.ToBigInt().Uint64(),
Owner: flags.Mine,
Auditor: flags.Auditor,
Issuer: flags.Issuer,
Owner: tta.flags.Mine,
Auditor: tta.flags.Auditor,
Issuer: tta.flags.Issuer,
},
ids,
tta.owners,
)
if err != nil {
return errors.Wrapf(err, "cannot store token in db")
}

for _, id := range ids {
for _, id := range tta.owners {
if len(id) == 0 {
continue
}
t.Notify(AddToken, t.tmsID, id, tok.Type, txID, index)
t.Notify(AddToken, t.tmsID, id, tta.tok.Type, tta.txID, tta.index)
}

return nil
Expand Down Expand Up @@ -186,10 +192,6 @@ func (t *transaction) Commit() error {
return t.tx.Commit()
}

func (t *transaction) TransactionExists(id string) (bool, error) {
return t.tx.TransactionExists(id)
}

type TokenProcessorEvent struct {
topic string
message TokenMessage
Expand Down
Loading
Loading