Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle racing conditions for common meta-data #31

Open
vkuznet opened this issue Apr 20, 2022 · 1 comment
Open

Handle racing conditions for common meta-data #31

vkuznet opened this issue Apr 20, 2022 · 1 comment

Comments

@vkuznet
Copy link
Contributor

vkuznet commented Apr 20, 2022

The bulkblocks API internally has racing conditions, see full discussion dmwm/WMCore#11106

To resolve this we can use Queue'ing system to insert common meta-data:

  • Create independent Queues for individual DBS database tables, e.g. ProcessedDatasetQueue
  • adjust bulkblocks API to use appropriate queue for given metadata, e.g. adjust getProcessedDatasetID to use ProcessedDatasetQueue instead of GetRecID call, see line-354
  • each individual queue will process requests sequentially, e.g. FIFO. This will eliminate racing condition among independent and competing HTTP requests since the specific common meta-data will be either injected or obtained its ID from its queue.

We need:

  • new queueProcess injector which will consume requests for injection and process them in FIFO order
  • blocking channel which will be used before we inject the data

For instance,

type QueueDataRecord struct {
  Record dbs.DBRecord
  Out chan bool
}

rec // represent data record we received from bulkblocks API
// process engine to process records in Queue
go processQueue()

// channel will block us until all records are injected
ch := make(chan bool)

// send data record to the queue
queue <- QueueDataRecord{Record: rec, Out: ch}

// consume data when it is ready
for {
   select {
   case r := <- ch
        // call GetIDs of common meta-data
   }
}

and processQueue function will do something like

func processQueue() {
   for {
      select {
         case rec := <- queue {
             // process given record
             // send out its status that it is ready
             rec.Out <- true
         default:
             time.Sleep(time.Duration(1) * time.Second) // wait for new task
      }
   }
}
@vkuznet
Copy link
Contributor Author

vkuznet commented Apr 20, 2022

Further thoughts on this:

  • this approach will not work if we'll have multiple independent DBSWriters (k8s pods) since they do not share the queue
  • therefore, in order to make it work we either need to have persistent queue storage (among all k8s pods) or have individual ONE additional server which will process these records sequentially

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant