forked from igrigorik/em-synchrony
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
226 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
# CSP Experiments with Ruby | ||
|
||
Partly an exercise to help myself wrap my head around Go's concurrency, partly an experiment to see how much of the syntax & behavior of Go's CSP model can be modelled in Ruby... As it turns out, it's not hard to almost replicate the look and feel. | ||
|
||
Note: none of the Ruby examples actually give you the parallelism of Go. | ||
|
||
## Notes | ||
* Instead of explicitly using locks to mediate access to shared data, Go encourages the use of channels to pass references to data between goroutines. | ||
|
||
* Channels combine communication — the exchange of a value—with synchronization — guaranteeing that two calculations (goroutines) are in a known state. | ||
|
||
go.rb implements an (un)bounded Channel interface, and with some help from Fibers & Ruby 1.9, we can also implement the goroutine look and feel pretty easily. In fact, with CSP semantics, its not hard to imagine a MVM (multi-VM) Ruby where each VM still has a GIL, but where data sharing is done via communication of references between VM's. | ||
|
||
## Simple channel example in Go | ||
|
||
package main | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
) | ||
|
||
func main() { | ||
c := make(chan string) | ||
|
||
go func() { | ||
time.Sleep(1) | ||
c <- "go go go sleep 1!" | ||
}() | ||
|
||
fmt.Printf("%v\n", <-c) // Wait for goroutine to finish | ||
} | ||
|
||
## Equivalent in Ruby | ||
|
||
require 'go' | ||
|
||
EM.synchrony do | ||
c = Channel.new | ||
|
||
go { | ||
sleep(1) | ||
c << 'go go go sleep 1!' | ||
} | ||
|
||
puts c.pop | ||
|
||
EM.stop | ||
end | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
) | ||
|
||
func main() { | ||
c := make(chan string) | ||
|
||
go func() { | ||
time.Sleep(1) | ||
c <- "go go go sleep 1!" | ||
}() | ||
|
||
fmt.Printf("%v\n", <-c) // Wait for goroutine to finish | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
require 'go' | ||
|
||
EM.synchrony do | ||
c = Channel.new | ||
|
||
go { | ||
sleep(1) | ||
c << 'go go go sleep 1!' | ||
} | ||
|
||
puts c.pop | ||
|
||
EM.stop | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
"runtime" | ||
) | ||
|
||
func producer(c chan int, N int, s chan bool) { | ||
for i := 0; i < N; i++ { | ||
fmt.Printf("producer: %d\n", i) | ||
c <- i | ||
} | ||
s <- true | ||
} | ||
|
||
func consumer(c chan int, N int, s chan bool) { | ||
for i := 0; i < N; i++ { | ||
fmt.Printf("consumer got: %d\n", <- c) | ||
} | ||
s <- true | ||
} | ||
|
||
func main() { | ||
runtime.GOMAXPROCS(2) | ||
|
||
c := make(chan int) | ||
s := make(chan bool) | ||
|
||
go producer(c, 10, s) | ||
go consumer(c, 10, s) | ||
|
||
<- s | ||
<- s | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
require 'go' | ||
|
||
EM.synchrony do | ||
def producer(c, n, s) | ||
n.times do |i| | ||
puts "producer: #{i}" | ||
c << i | ||
end | ||
|
||
s << "producer finished" | ||
end | ||
|
||
consumer = ->(c, n, s) do | ||
n.times do |i| | ||
puts "consumer 1 got: #{c.pop}" | ||
end | ||
|
||
s << "consumer finished" | ||
end | ||
|
||
c = Channel.new(size: 2) | ||
s = Channel.new | ||
n = 10 | ||
|
||
# mix the syntax, just for fun... | ||
go c,n,s, &method(:producer) | ||
go c,n-1,s, &consumer | ||
|
||
go c,s do |c, s| | ||
puts "consumer 2 got: #{c.pop}" | ||
s << "consumer 2 finished" | ||
end | ||
|
||
3.times { puts s.pop } | ||
|
||
EM.stop | ||
end | ||
|
||
# (M=6c0863) igrigorik /git/em-synchrony/examples/go> ruby consumer-publisher.rb | ||
# producer: 0 | ||
# producer: 1 | ||
# consumer 1 got: [0] | ||
# producer: 2 | ||
# consumer 2 got: [1] | ||
# producer: 3 | ||
# consumer 1 got: [2] | ||
# producer: 4 | ||
# consumer 2 finished | ||
# consumer 1 got: [3] | ||
# producer: 5 | ||
# consumer 1 got: [4] | ||
# producer: 6 | ||
# consumer 1 got: [5] | ||
# producer: 7 | ||
# consumer 1 got: [6] | ||
# producer: 8 | ||
# consumer 1 got: [7] | ||
# producer: 9 | ||
# consumer 1 got: [8] | ||
# consumer 1 got: [9] | ||
# producer finished | ||
# consumer finished |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
require 'em-synchrony' | ||
|
||
module Kernel | ||
def go(*args, &blk) | ||
EM.next_tick do | ||
Fiber.new { blk.call(*args) }.resume | ||
end | ||
end | ||
end | ||
|
||
class Channel < EM::Queue | ||
def initialize(opts = {}) | ||
@limit = opts[:size] | ||
@prodq = [] | ||
@size = 0 | ||
|
||
super() | ||
end | ||
|
||
def size; @size; end | ||
def empty?; size == 0; end | ||
|
||
def pop | ||
f = Fiber.current | ||
clb = Proc.new do |*args| | ||
@size -= 1 | ||
f.resume(args) | ||
@prodq.shift.call if !@prodq.empty? | ||
end | ||
|
||
super(&clb) | ||
Fiber.yield | ||
end | ||
|
||
def push(*items) | ||
f = Fiber.current | ||
@size += 1 | ||
|
||
EM.next_tick { super(*items) } | ||
|
||
# if the queue is bounded, then suspend the producer | ||
# until someone consumes a pending message | ||
if @limit && size >= @limit | ||
@prodq.push -> { f.resume } | ||
Fiber.yield | ||
end | ||
end | ||
alias :<< :push | ||
end |