Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datastore: Parallel Transaction on same kind with unique keys giving concurrent transaction error #10068

Open
mehulparmariitr opened this issue Apr 30, 2024 · 4 comments
Assignees
Labels
api: datastore Issues related to the Datastore API. triage me I really want to be triaged.

Comments

@mehulparmariitr
Copy link

mehulparmariitr commented Apr 30, 2024

Client

datastore

Environment

Go executable on Linux

Go Environment

$ go version - go version go1.21.0 darwin/amd64
$ go env -

GO111MODULE='on'
GOARCH='amd64'
GOBIN=''
GOCACHE='/Users/mehparmar/Library/Caches/go-build'
GOENV='/Users/mehparmar/Library/Application Support/go/env'
GOEXE=''
GOEXPERIMENT=''
GOFLAGS=''
GOHOSTARCH='amd64'
GOHOSTOS='darwin'
GOINSECURE=''
GOMODCACHE='/Users/mehparmar/go/pkg/mod'
GONOPROXY=''
GONOSUMDB=''
GOOS='darwin'
GOPATH='/Users/mehparmar/go'
GOPRIVATE=''
GOPROXY='https://proxy.golang.org,direct'
GOROOT='/usr/local/go'
GOSUMDB='sum.golang.org'
GOTMPDIR=''
GOTOOLCHAIN='auto'
GOTOOLDIR='/usr/local/go/pkg/tool/darwin_amd64'
GOVCS=''
GOVERSION='go1.21.0'
GCCGO='gccgo'
GOAMD64='v1'
AR='ar'
CC='clang'
CXX='clang++'
CGO_ENABLED='1'
GOMOD='/Users/mehparmar/Repos/mehparmar/abcserv/go.mod'
GOWORK=''
CGO_CFLAGS='-O2 -g'
CGO_CPPFLAGS=''
CGO_CXXFLAGS='-O2 -g'
CGO_FFLAGS='-O2 -g'
CGO_LDFLAGS='-O2 -g'
PKG_CONFIG='pkg-config'
GOGCCFLAGS='-fPIC -arch x86_64 -m64 -pthread -fno-caret-diagnostics -Qunused-arguments -fmessage-length=0 -ffile-prefix-map=/var/folders/6n/kcp2mfmx0s9_lqw9y30yqq500000gq/T/go-build1900003096=/tmp/go-build -gno-record-gcc-switches -fno-common'

Code

e.g.

func processBatch(ctx context.Context, logger *l.Logger, desQSp *gds.QuerySpecs, tableName string, batch []dataanalytics.AggregationEntity, wg *sync.WaitGroup) {
	defer wg.Done() // Signal that this goroutine is done

        //Calling GetMulti on keys passed here, then checking if its insert or update call.
	err, _, _, _ := dataanalytics.Upsert(ctx, logger, (*gds2.QuerySpecs)(desQSp), tableName, batch)
	if err != nil {
		// Handle the error appropriately (logging, retries, etc.)
		log.Printf("Error during Upsert: %v", err)
	}
}

// code in main function
// go routines to write data in parallel for unique keys
var wg sync.WaitGroup // To wait for goroutines to finish
		//At max 1000 keys can be queried from Google Datastore, Hence creating batches of 1000
		for i := 0; i < len(msgs); i += 1000 {
			end := i + 1000
			if end > len(msgs) {
				end = len(msgs)
			}
			batch := msgs[i:end]
			// Process the batch here
			//wg.Add(1)
			go processBatch(ctx, l.New("abc-services"), (*gds.QuerySpecs)(desQSp), preAggregationRequest.DestinationKind, batch, &wg)
		}

		wg.Wait() // Wait for all goroutines to complete

Expected behavior

All entities should be written. There are around 275,000

Actual behavior

2024/04/30 05:06:42 Error during Upsert: datastore: concurrent transaction
UUID-NOT-COLLECTED API-NOT-COLLECTED class.Upsert tx.Commit: datastore: concurrent transaction
2024/04/30 05:06:43 Error during Upsert: datastore: concurrent transaction
UUID-NOT-COLLECTED API-NOT-COLLECTED class.Upsert tx.Commit: rpc error: code = InvalidArgument desc = Invalid transaction.
2024/04/30 05:06:43 Error during Upsert: rpc error: code = InvalidArgument desc = Invalid transaction.
UUID-NOT-COLLECTED API-NOT-COLLECTED class.Upsert tx.Commit: rpc error: code = InvalidArgument desc = Invalid transaction.
2024/04/30 05:06:43 Error during Upsert: rpc error: code = InvalidArgument desc = Invalid transaction.

Additional context

if keys are unique then why transaction is failing?

@mehulparmariitr mehulparmariitr added the triage me I really want to be triaged. label Apr 30, 2024
@product-auto-label product-auto-label bot added the api: datastore Issues related to the Datastore API. label Apr 30, 2024
@mehulparmariitr mehulparmariitr changed the title datastore: Parallel Transaction on same kind with different keys giving concurrent transaction error datastore: Parallel Transaction on same kind with unique keys giving concurrent transaction error Apr 30, 2024
@bhshkh
Copy link
Contributor

bhshkh commented Apr 30, 2024

On which version of Datastore client library are you facing this issue?

@mehulparmariitr
Copy link
Author

mehulparmariitr commented May 1, 2024

@bhshkh I see its 1.0.0.

image

But we have replaced also this lib in go.mod

replace cloud.google.com/go/datastore => github.com/googleapis/google-cloud-go/datastore v1.0.0

@mehulparmariitr
Copy link
Author

mehulparmariitr commented May 1, 2024

Even with the latest version the error is there. Seems like a logical error. But if keys are unique then it shouldnt throw concurrent transaction error right?

go 1.12

require (
	cloud.google.com/go/bigquery v1.60.0
	cloud.google.com/go/datastore v1.16.0
	cloud.google.com/go/logging v1.9.0
	cloud.google.com/go/profiler v0.1.0
	cloud.google.com/go/pubsub v1.37.0
	cloud.google.com/go/storage v1.39.1
	github.com/dimfeld/httptreemux v5.0.1+incompatible
	github.com/dimfeld/httptreemux/v5 v5.0.2
	github.com/evanphx/json-patch v4.5.0+incompatible
	github.com/getlantern/deepcopy v0.0.0-20160317154340-7f45deb8130a
	github.com/go-redis/redis v6.15.7+incompatible
	github.com/gomodule/redigo v2.0.0+incompatible
	github.com/kelseyhightower/envconfig v1.4.0
	github.com/mitchellh/mapstructure v1.1.2
	github.com/oleiade/reflections v1.0.1
	github.com/openzipkin/zipkin-go v0.2.5
	github.com/pborman/uuid v1.2.1
	github.com/pkg/errors v0.9.1
	github.com/schwarmco/go-cartesian-product v0.0.0-20180515110546-d5ee747a6dc9
	github.com/signalfx/golib/v3 v3.3.19
	github.com/spf13/cobra v1.5.0
	github.com/stretchr/testify v1.9.0
	github.com/thedevsaddam/gojsonq v2.3.0+incompatible
	go.opencensus.io v0.24.0
	google.golang.org/api v0.176.1
	google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda
	gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
	gopkg.in/go-playground/validator.v8 v8.18.2
	gopkg.in/yaml.v2 v2.4.0
)

replace cloud.google.com/go/datastore => github.com/googleapis/google-cloud-go/datastore v1.16.0

The error -

 UUID-NOT-COLLECTED API-NOT-COLLECTED abc.Upsert tx.Commit: datastore: concurrent transaction
2024/05/01 02:46:59 Error during Upsert: datastore: concurrent transaction
 UUID-NOT-COLLECTED API-NOT-COLLECTED abc.Upsert tx.Commit: datastore: concurrent transaction
2024/05/01 02:46:59 Error during Upsert: datastore: concurrent transaction
 UUID-NOT-COLLECTED API-NOT-COLLECTED abc.Upsert tx.Commit: rpc error: code = InvalidArgument desc = The referenced transaction has expired or is no longer valid.
2024/05/01 02:47:00 Error during Upsert: rpc error: code = InvalidArgument desc = The referenced transaction has expired or is no longer valid.

@mehulparmariitr
Copy link
Author

This is the method I am using to upsert into DB -

// Upsert - insert in gds aggregation and update using transaction if entries previously exist with same MdInstanceHashKey it gets updated
func Upsert(ctx context.Context, logger *slog.Logger, qSp *gds.QuerySpecs, tableName string, msgs []AggregationEntity) (error, []*datastore.Key, []*datastore.Key, []*AggregationEntity) {
	ctx, span := trace.StartSpan(ctx, "internal.dataanalytics.Update")
	defer span.End()
	logger.Infof(ctx, "Started putting entries in GDS(aggregation) in %v table", tableName)

	client, err := gds.GetDatastoreClient(ctx, logger, qSp)
	if err != nil {
		logger.Errorf(ctx, "Failed to create GDS client: %v", err)
		return err, nil, nil, nil
	}

	// make keys array use MdInstanceHashKey column
	var keys []*datastore.Key
	for id := 0; id < len(msgs); id++ {
		newKey := datastore.NameKey(tableName, msgs[id].MdInstanceHashKey, nil)
		newKey.Namespace = qSp.Namespace
		keys = append(keys, newKey)
	}
	logger.Infof(ctx, "Length of keys to insert in data store are : %v", len(keys))
	tx, err := client.NewTransaction(ctx)
	if err != nil {
		logger.Errorf(ctx, "client.NewTransaction: %v", err)
		return err, nil, nil, nil
	}
	gdsEntry := make([]AggregationEntity, len(msgs))
	if err := tx.GetMulti(keys, gdsEntry); err == datastore.ErrNoSuchEntity {
		logger.Errorf(ctx, "error in update in GDS %v", err)
		return err, nil, nil, nil
	}

	var createKeys []*datastore.Key
	var updateKeys []*datastore.Key
	var createValues []*AggregationEntity
	// checking if entry with same key exists or not if it exists then update it
	for i := 0; i < len(msgs); i++ {

		if gdsEntry[i].GitTestRepoURL == "" {
			//create
			gdsEntry[i] = msgs[i]
			createKeys = append(createKeys, keys[i])
			createValues = append(createValues, &msgs[i])
		} else {
			//update
			if gdsEntry[i].Type == Pass {
				gdsEntry[i] = AggregateUpdateSum(logger, gdsEntry[i], msgs[i], Passed)
				updateKeys = append(updateKeys, keys[i])
			}
			if gdsEntry[i].Type == Fail {
				gdsEntry[i] = AggregateUpdateSum(logger, gdsEntry[i], msgs[i], Failed)
				updateKeys = append(updateKeys, keys[i])
			}
			if gdsEntry[i].Type == Skip {
				gdsEntry[i] = AggregateUpdateSum(logger, gdsEntry[i], msgs[i], Skipped)
				updateKeys = append(updateKeys, keys[i])
			}
			if gdsEntry[i].Type == MostCommonFailure {
				gdsEntry[i] = AggregateUpdateSum(logger, gdsEntry[i], msgs[i], Failed)
				updateKeys = append(updateKeys, keys[i])
			}
			if gdsEntry[i].Type == MostFailed {
				gdsEntry[i] = AggregateUpdateSum(logger, gdsEntry[i], msgs[i], Failed)
				updateKeys = append(updateKeys, keys[i])
			}
			if gdsEntry[i].Type == MostSkipped {
				gdsEntry[i] = AggregateUpdateSum(logger, gdsEntry[i], msgs[i], Skipped)
			}
			if gdsEntry[i].Type == MostDuration {
				gdsEntry[i] = AggregateUpdateMax(logger, gdsEntry[i], msgs[i], Duration)
				updateKeys = append(updateKeys, keys[i])
			}
		}
	}

	// batch operation to put in GDS using transaction
	if _, e := tx.PutMulti(keys, gdsEntry); e != nil {
		tx.Rollback()
		logger.Errorf(ctx, "tx.PutMulti: %v", e)
		//logger.Errorf(ctx, "gdsEntry1 are: %v", gdsEntry)
		//logger.Errorf(ctx, "keys1 are: %v", gdsEntry)
		return e, nil, nil, nil
	}
	if _, er := tx.Commit(); er != nil {
		tx.Rollback()
		logger.Errorf(ctx, "tx.Commit: %v", er) // error: concurrent transaction
		//logger.Errorf(ctx, "gdsEntry2 are: %v", gdsEntry)
		//logger.Errorf(ctx, "keys2 are: %v", gdsEntry)
		return er, nil, nil, nil
	}

	logger.Infof(ctx, "Entries stored successfully")
	return nil, createKeys, updateKeys, createValues
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: datastore Issues related to the Datastore API. triage me I really want to be triaged.
Projects
None yet
Development

No branches or pull requests

2 participants