This concurrent program uses Go to implement a PriorityQueue. The program uses message passing and channels to send priority messages to a PriorityQueue.

In [26]:
import ("math/rand"; "strconv"; "sort"; "fmt")

type PriorityMessage struct {
    Priority int // non-negative
    Message string
}

func priorityQueue(capacity int, west chan PriorityMessage, east chan string) {
    n := 0
    var priorities [] int
    var messages [] string
    for {
        if n == 0 {
            message := <- west
            priorities = append(priorities[:], message.Priority)
            messages = append(messages[:], message.Message)
            n = n + 1
        } else if n == capacity {
            east <- messages[0]
            messages = messages[1:]
            priorities = priorities[1:]
            n = n - 1
        } else {
            select {
                case message := <- west: {
                    var index int
                    insert := message.Priority
                    for i, v := range priorities {
                        if insert < v {
                            index = i
                            break
                        } else {
                            index = -1
                        }
                    }
                    if index == -1 {
                        priorities = append(priorities, insert)
                        messages = append(messages, message.Message)
                    } else {
                        priorities = append(priorities[:index+1], priorities[index:]...)
                        priorities[index] = insert
                        messages = append(messages[:index+1], messages[index:]...)
                        messages[index] = message.Message   
                    }
                    n = n + 1
                }
                case east <- messages[0]: {
                    messages = messages[1:]
                    priorities = priorities[1:]
                    n = n - 1
                }
            }
        }
    }
}
func sendMessages(n int, ch0 chan PriorityMessage, ch1 chan string) { // 0 <= n <= 90, number of messages
    for s := 10; s < n + 10; s++ { // 2-digit serial number
        prio := rand.Intn(10) // 1-digit priority
        m := strconv.Itoa(prio) + "." + strconv.Itoa(s)
        ch0 <- PriorityMessage{prio, m}; ch1 <- m
    }
}
func main() {
    const C, R = 20, 10 // capacity, rounds of testing
    west := make(chan PriorityMessage)
    south, east := make(chan string),  make(chan string)
    
    go priorityQueue(C, west, east)
    
    // testing priority queue exactly at capacity: received messages must be sent messages in ascending order
    for t := 0; t < 10; t++ {
        var in, out [C]string
        go sendMessages(C, west, south) // priority queue is filled up
        for i := 0; i < C; i++ { // messages sent to priority queue are copied to array in
            in[i] = <- south
        }
        for i := 0; i < C; i++ { // messages received from priority queue are stored in array out 
            out[i] = <- east; fmt.Print(out[i], " ") // printed in ascending order
        }
        sort.Strings(in[:]) // sort the sent messages
        if in != out {panic("received messages must be sent messages in ascending order")}
        fmt.Println()
    }
    
    // testing with more messages than capacity: received messages may not always be in ascending order
    for t := 0; t < 10; t++ {
        var in, out [2 * C]string
        go sendMessages(2 * C, west, south) // priority queue is filled up
        go func () {
            for i := 0; i < 2 * C; i++ { // messages sent to priority queue are copied to array in
                in[i] = <- south
            }
        }()
        for i := 0; i < 2 * C; i++ { // messages received from priority queue are stored in array out 
            out[i] = <- east; fmt.Print(out[i], " ") // printed in not necessarily ascending order
        }
        sort.Strings(in[:]); sort.Strings(out[:]) // sort the sent and received messages
        // if in != out {panic("all sent messages must be received (in some order)")}
        fmt.Println()
    }
}

In [27]:
main()

0.14 0.25 1.12 2.27 4.17 4.23 4.26 5.15 5.18 5.22 5.24 6.20 6.28 7.13 8.16 8.29 9.10 9.11 9.19 9.21 
0.18 0.22 0.25 0.27 1.10 1.13 1.19 1.26 2.20 3.12 3.24 3.29 4.15 5.11 6.16 7.23 8.17 8.28 9.14 9.21 
0.20 0.22 0.27 1.10 1.24 1.26 3.17 3.25 4.15 4.19 4.23 4.28 5.12 5.13 6.16 7.14 8.18 8.29 9.11 9.21 
0.10 0.12 1.16 1.17 2.14 2.22 4.28 5.18 5.21 5.27 6.11 6.25 7.13 7.24 7.26 8.19 8.20 8.23 9.15 9.29 
0.14 0.15 0.24 0.26 2.11 2.17 2.23 2.27 3.12 4.19 4.20 5.29 6.13 6.25 8.10 8.16 8.22 8.28 9.18 9.21 
0.11 0.20 0.21 1.12 1.15 1.22 1.26 3.10 3.17 4.14 4.25 4.27 4.28 6.18 6.19 6.29 8.24 9.13 9.16 9.23 
0.20 2.10 2.12 2.18 2.25 3.11 3.14 3.15 3.23 4.16 4.21 4.27 5.13 7.17 7.19 8.28 8.29 9.22 9.24 9.26 
0.17 0.19 0.22 1.20 2.14 3.10 3.11 3.18 4.25 4.26 5.13 5.16 5.24 6.12 7.23 7.27 7.28 8.21 8.29 9.15 
0.25 1.26 1.28 2.10 2.20 3.13 3.19 3.22 4.11 4.14 4.15 4.17 5.12 6.21 8.23 8.24 8.29 9.16 9.18 9.27 
1.16 1.27 2.21 2.22 3.10 3.12 4.11 4.18 4.20 6.25 6.26 6.28 7.23 7.24 8.13 8.14 8.17 8.29 9

1 goroutine canceled
