Skip to content

Commit 12b604b

Browse files
committed
Fix BufferedChannelQueue.PutWithTimeout() functionalities.
# Conflicts: # queue.go
1 parent 9f1f2ca commit 12b604b

File tree

1 file changed

+30
-12
lines changed

1 file changed

+30
-12
lines changed

queue.go

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ func (q *LinkedListQueue) KeepNodePoolCount(n int) {
347347
last.Next = nil
348348
}
349349

350-
// CLear Clear all data
350+
// Clear Clear all data
351351
func (q *LinkedListQueue) Clear() {
352352
q.nodePoolFirst = q.first
353353
q.nodeCount = q.count
@@ -694,17 +694,35 @@ func (q *BufferedChannelQueue) Put(val interface{}) error {
694694
return q.Offer(val)
695695
}
696696

697-
// // PutWithTimeout Put the val(blocking), with timeout
698-
// func (q BufferedChannelQueue) PutWithTimeout(val interface{}, timeout time.Duration) error {
699-
// // q.lock.Lock()
700-
// // defer q.lock.Unlock()
701-
//
702-
// if q.isClosed.Get() {
703-
// return ErrQueueIsClosed
704-
// }
705-
//
706-
// return q.blockingQueue.PutWithTimeout(val, timeout)
707-
// }
697+
// PutWithTimeout Put the T val(blocking), with timeout
698+
func (q *BufferedChannelQueue) PutWithTimeout(val interface{}, timeout time.Duration) error {
699+
// q.lock.Lock()
700+
// defer q.lock.Unlock()
701+
702+
if q.isClosed.Get() {
703+
return ErrQueueIsClosed
704+
}
705+
706+
deadline := time.Now().Add(timeout)
707+
for time.Now().Before(deadline) {
708+
//fmt.Println("iteration")
709+
q.loadWorkerCh.Offer(1)
710+
711+
err := q.Offer(val)
712+
if err == nil {
713+
return nil
714+
}
715+
if errors.Is(err, ErrQueueIsFull) {
716+
return err
717+
}
718+
q.loadWorkerCh.Offer(1)
719+
time.Sleep(q.loadFromPoolDuration)
720+
}
721+
722+
//return q.blockingQueue.PutWithTimeout(val, timeout)
723+
724+
return ErrQueuePutTimeout
725+
}
708726

709727
// Take Take the val(blocking)
710728
func (q *BufferedChannelQueue) Take() (interface{}, error) {

0 commit comments

Comments
 (0)