Skip to content

Commit

Permalink
refactored Read and Write on channels to use common code
Browse files Browse the repository at this point in the history
  • Loading branch information
dustin committed Jul 2, 2014
1 parent 9c4740b commit 28ad1c5
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 99 deletions.
54 changes: 4 additions & 50 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,59 +241,13 @@ func (f *clientChannel) Read(b []byte) (n int, err error) {
if f.isClosed() {
return 0, errClosedReadCh
}
read := 0
for len(b) > 0 {
if f.current == nil || len(f.current) == 0 {
var ok bool
if read == 0 {
select {
case f.current, ok = <-f.incoming:
case <-f.closeMarker:
case <-f.fc.closeMarker:
}
} else {
select {
case f.current, ok = <-f.incoming:
case <-f.closeMarker:
case <-f.fc.closeMarker:
default:
return read, nil
}
}
if !ok {
return read, io.EOF
}
}
copied := copy(b, f.current)
read += copied
f.current = f.current[copied:]
b = b[copied:]
}
return read, nil
n, f.current, err = channelRead(b, f.current, f.incoming,
f.closeMarker, f.fc.closeMarker)
return n, err
}

func (f *clientChannel) Write(b []byte) (n int, err error) {
if len(b) > maxWriteLen {
b = b[0:maxWriteLen]
}

bc := make([]byte, len(b))
copy(bc, b)
pkt := &FramePacket{
Cmd: FrameData,
Channel: f.channel,
Data: bc,
rch: make(chan error, 1),
}

select {
case f.fc.egress <- pkt:
case <-f.closeMarker:
return 0, errClosedWriteCh
case <-f.fc.closeMarker:
return 0, errClosedConn
}
return len(b), <-pkt.rch
return channelWrite(b, f.channel, f.fc.egress, f.closeMarker, f.fc.closeMarker)
}

func (f *clientChannel) Close() error {
Expand Down
63 changes: 63 additions & 0 deletions common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package frames

import "io"

func channelRead(b []byte, current []byte, incoming chan []byte,
close1, close2 chan bool) (int, []byte, error) {

read := 0
for len(b) > 0 {
if current == nil || len(current) == 0 {
var ok bool
if read == 0 {
select {
case current, ok = <-incoming:
case <-close1:
case <-close2:
}
} else {
select {
case current, ok = <-incoming:
case <-close1:
case <-close2:
default:
return read, current, nil
}
}
if !ok {
return read, current, io.EOF
}
}
copied := copy(b, current)
read += copied
current = current[copied:]
b = b[copied:]
}
return read, current, nil
}

func channelWrite(b []byte, channel uint16, egress chan *FramePacket,
close1, close2 chan bool) (int, error) {

if len(b) > maxWriteLen {
b = b[0:maxWriteLen]
}

bc := make([]byte, len(b))
copy(bc, b)
pkt := &FramePacket{
Cmd: FrameData,
Channel: channel,
Data: bc,
rch: make(chan error, 1),
}

select {
case egress <- pkt:
case <-close1:
return 0, errClosedWriteCh
case <-close2:
return 0, errClosedConn
}
return len(b), <-pkt.rch
}
53 changes: 4 additions & 49 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,58 +204,13 @@ func (f *frameChannel) Read(b []byte) (n int, err error) {
if f.isClosed() {
return 0, errClosedReadCh
}
read := 0
for len(b) > 0 {
if f.current == nil || len(f.current) == 0 {
var ok bool
if read == 0 {
select {
case f.current, ok = <-f.incoming:
case <-f.closeMarker:
case <-f.conn.closeMarker:
}
} else {
select {
case f.current, ok = <-f.incoming:
case <-f.closeMarker:
case <-f.conn.closeMarker:
default:
return read, nil
}
}
if !ok {
return read, io.EOF
}

}
copied := copy(b, f.current)
read += copied
f.current = f.current[copied:]
b = b[copied:]
}
return read, nil
n, f.current, err = channelRead(b, f.current, f.incoming,
f.closeMarker, f.conn.closeMarker)
return n, err
}

func (f *frameChannel) Write(b []byte) (n int, err error) {
if len(b) > maxWriteLen {
b = b[0:maxWriteLen]
}

bc := make([]byte, len(b))
copy(bc, b)
pkt := &FramePacket{
Cmd: FrameData,
Channel: f.channel,
Data: bc,
rch: make(chan error, 1),
}

select {
case f.conn.egress <- pkt:
case <-f.conn.closeMarker:
return 0, errClosedWriteCh
}
return len(b), <-pkt.rch
return channelWrite(b, f.channel, f.conn.egress, f.conn.closeMarker, nil)
}

func (f *frameChannel) isClosed() bool {
Expand Down

0 comments on commit 28ad1c5

Please sign in to comment.