Skip to content

Commit

Permalink
This finishes #381 by implementing full support for IBB sending
Browse files Browse the repository at this point in the history
  • Loading branch information
olabini committed Oct 7, 2017
1 parent 16bc205 commit 75d14e3
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 35 deletions.
6 changes: 6 additions & 0 deletions session/filetransfer/ibb_recv.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ func IbbClose(s access.Session, stanza *data.ClientIQ) (ret interface{}, iqtype
return iqErrorNotAcceptable, "error", false
}

inflightSend, ok := getInflightSend(tag.Sid)
if ok {
inflightSend.ibbReceivedClose(s)
return data.EmptyReply{}, "", false
}

inflight, ok := getInflight(tag.Sid)

if !ok || inflight.status.opaque == nil {
Expand Down
99 changes: 68 additions & 31 deletions session/filetransfer/ibb_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package filetransfer
import (
"encoding/base64"
"errors"
"fmt"
"io"
"os"
"time"
Expand All @@ -11,109 +12,140 @@ import (
"github.com/coyim/coyim/xmpp/data"
)

// TODO - 2 - make cancel work based on different failures
// TODO - 4 - multiplex the close tag

const ibbDefaultBlockSize = 4096

func ibbSendDoWithBlockSize(s access.Session, ctx *sendContext, blocksize int) {
func ibbSendDo(s access.Session, ctx *sendContext) {
ctx.ibbSendDoWithBlockSize(s, ibbDefaultBlockSize)
}

func (ctx *sendContext) ibbSendDoWithBlockSize(s access.Session, blocksize int) {
res, _, e := s.Conn().SendIQ(ctx.peer, "set", data.IBBOpen{
BlockSize: ibbDefaultBlockSize,
Sid: ctx.sid,
Stanza: "iq",
})
if e != nil {
ctx.control.ReportError(e)
removeInflightSend(ctx)
return
}
go ibbSendWaitForConfirmationOfOpen(s, ctx, res, blocksize)
go ctx.ibbSendWaitForConfirmationOfOpen(s, res, blocksize)
}

func ibbSendDo(s access.Session, ctx *sendContext) {
ibbSendDoWithBlockSize(s, ctx, ibbDefaultBlockSize)
}

// TODO: Print everything received and sent
// TODO: We need to multiplex the Close IBB tag...

func ibbSendChunk(s access.Session, ctx *sendContext, r io.ReadCloser, buffer []byte, seq uint16) bool {
// TODO: check for cancel here
func (ctx *sendContext) ibbSendChunk(s access.Session, r io.ReadCloser, buffer []byte, seq uint16) bool {
if ctx.weWantToCancel {
s.Conn().SendIQ(ctx.peer, "set", data.IBBClose{Sid: ctx.sid})
removeInflightSend(ctx)
return false
} else if ctx.theyWantToCancel {
close(ctx.control.TransferFinished)
close(ctx.control.Update)
close(ctx.control.ErrorOccurred)
removeInflightSend(ctx)
return false
}

n, err := r.Read(buffer)
if err == io.EOF && n == 0 {
r.Close()
// TODO[LATER]: we ignore the result of this close - maybe we should react to it in some way, if it reports failure from the other side
s.Conn().SendIQ(ctx.peer, "set", data.IBBClose{Sid: ctx.sid})
ctx.control.ReportFinished()
removeInflightSend(ctx)
return false
} else if err != nil {
ctx.control.ReportError(err)
removeInflightSend(ctx)
return false
}
encdata := base64.StdEncoding.EncodeToString(buffer[:n])

// TODO: we should keep track of each response here
_, _, e := s.Conn().SendIQ(ctx.peer, "set", data.IBBData{
rpl, _, e := s.Conn().SendIQ(ctx.peer, "set", data.IBBData{
Sid: ctx.sid,
Sequence: seq,
Base64: encdata,
})
if e != nil {
ctx.control.ReportError(e)
removeInflightSend(ctx)
return false
}
ctx.totalSent += int64(n)
ctx.control.Update <- ctx.totalSent

go ctx.trackResultOfSend(s, rpl)

return true
}

func ibbScheduleNextSend(s access.Session, ctx *sendContext, r io.ReadCloser, buffer []byte, seq uint16) bool {
func (ctx *sendContext) trackResultOfSend(s access.Session, reply <-chan data.Stanza) {
select {
case r := <-reply:
switch ciq := r.Value.(type) {
case *data.ClientIQ:
if ciq.Type == "result" {
return
}
}
s.Info(fmt.Sprintf("Received unhappy response to IBB data sent: %#v", r))
ctx.theyWantToCancel = true
case <-time.After(time.Minute * 5):
// Ignore timeout
}
}

func (ctx *sendContext) ibbScheduleNextSend(s access.Session, r io.ReadCloser, buffer []byte, seq uint16) bool {
time.AfterFunc(time.Duration(200)*time.Millisecond, func() {
ibbSendChunks(s, ctx, r, buffer, seq)
ctx.ibbSendChunks(s, r, buffer, seq)
})

return true
}

func ibbSendChunks(s access.Session, ctx *sendContext, r io.ReadCloser, buffer []byte, seq uint16) {
func (ctx *sendContext) ibbSendChunks(s access.Session, r io.ReadCloser, buffer []byte, seq uint16) {
// The seq variable can wrap around here - THAT IS ON PURPOSE
// See XEP-0047 for details
ignore := ibbSendChunk(s, ctx, r, buffer, seq) &&
ibbSendChunk(s, ctx, r, buffer, seq+1) &&
ibbSendChunk(s, ctx, r, buffer, seq+2) &&
ibbSendChunk(s, ctx, r, buffer, seq+3) &&
ibbSendChunk(s, ctx, r, buffer, seq+4) &&
ibbScheduleNextSend(s, ctx, r, buffer, seq+5)
ignore := ctx.ibbSendChunk(s, r, buffer, seq) &&
ctx.ibbSendChunk(s, r, buffer, seq+1) &&
ctx.ibbSendChunk(s, r, buffer, seq+2) &&
ctx.ibbSendChunk(s, r, buffer, seq+3) &&
ctx.ibbSendChunk(s, r, buffer, seq+4) &&
ctx.ibbScheduleNextSend(s, r, buffer, seq+5)
ignore = ignore
}

func ibbSendStartTransfer(s access.Session, ctx *sendContext, blockSize int) {
// cancel := make(chan bool)
func (ctx *sendContext) ibbSendStartTransfer(s access.Session, blockSize int) {
seq := uint16(0)
buffer := make([]byte, blockSize)
f, err := os.Open(ctx.file)
if err != nil {
ctx.control.ReportError(err)
removeInflightSend(ctx)
return
}
ibbSendChunks(s, ctx, f, buffer, seq)
ctx.ibbSendChunks(s, f, buffer, seq)
}

func ibbSendWaitForConfirmationOfOpen(s access.Session, ctx *sendContext, reply <-chan data.Stanza, blockSize int) {
func (ctx *sendContext) ibbSendWaitForConfirmationOfOpen(s access.Session, reply <-chan data.Stanza, blockSize int) {
r, ok := <-reply
if !ok {
ctx.control.ReportError(errors.New("We didn't receive a response when trying to open IBB file transfer with peer"))
removeInflightSend(ctx)
return
}

switch ciq := r.Value.(type) {
case *data.ClientIQ:
if ciq.Type == "result" {
go ibbSendStartTransfer(s, ctx, blockSize)
go ctx.ibbSendStartTransfer(s, blockSize)
return
} else if ciq.Type == "error" {
if ciq.Error.Type == "cancel" {
ctx.control.ReportErrorNonblocking(errors.New("The peer canceled the file transfer"))
} else if ciq.Error.Type == "modify" &&
ciq.Error.Any.XMLName.Local == "resource-constraint" &&
ciq.Error.Any.XMLName.Space == "urn:ietf:params:xml:ns:xmpp-stanzas" {
ibbSendDoWithBlockSize(s, ctx, blockSize/2)
ctx.ibbSendDoWithBlockSize(s, blockSize/2)
} else {
ctx.control.ReportErrorNonblocking(errors.New("Invalid error type - this shouldn't happen"))
}
Expand All @@ -123,4 +155,9 @@ func ibbSendWaitForConfirmationOfOpen(s access.Session, ctx *sendContext, reply
default:
ctx.control.ReportErrorNonblocking(errors.New("Invalid stanza type - this shouldn't happen"))
}
removeInflightSend(ctx)
}

func (ctx *sendContext) ibbReceivedClose(s access.Session) {
ctx.theyWantToCancel = true
}
51 changes: 47 additions & 4 deletions session/filetransfer/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"

"github.com/coyim/coyim/session/access"
sdata "github.com/coyim/coyim/session/data"
Expand Down Expand Up @@ -101,10 +102,13 @@ var supportedSendingMechanisms = map[string]func(access.Session, *sendContext){
}

type sendContext struct {
peer string
file string
sid string
control sdata.FileTransferControl
peer string
file string
sid string
weWantToCancel bool
theyWantToCancel bool
totalSent int64
control sdata.FileTransferControl
}

func (ctx *sendContext) notifyUserThatSendStarted(s access.Session) {
Expand All @@ -118,6 +122,15 @@ func isValidSubmitForm(siq data.SI) bool {
len(siq.Feature.Form.Fields[0].Values) == 1
}

func (ctx *sendContext) listenForCancellation() {
if cancel, ok := <-ctx.control.CancelTransfer; ok && cancel {
ctx.weWantToCancel = true
close(ctx.control.TransferFinished)
close(ctx.control.Update)
close(ctx.control.ErrorOccurred)
}
}

func (ctx *sendContext) waitForResultToStartFileSend(s access.Session, reply <-chan data.Stanza) {
r, ok := <-reply
if ok {
Expand All @@ -140,6 +153,7 @@ func (ctx *sendContext) waitForResultToStartFileSend(s access.Session, reply <-c
prof := siq.Feature.Form.Fields[0].Values[0]
if f, ok := supportedSendingMechanisms[prof]; ok {
ctx.notifyUserThatSendStarted(s)
addInflightSend(ctx)
f(s, ctx)
return
}
Expand Down Expand Up @@ -176,6 +190,35 @@ func InitSend(s access.Session, peer string, file string) sdata.FileTransferCont
return ctx.control
}

go ctx.listenForCancellation()
ctx.offerSend(s)
return ctx.control
}

var inflightSends struct {
sync.RWMutex
transfers map[string]*sendContext
}

func init() {
inflightSends.transfers = make(map[string]*sendContext)
}

func addInflightSend(ctx *sendContext) {
inflightSends.Lock()
defer inflightSends.Unlock()
inflightSends.transfers[ctx.sid] = ctx
}

func getInflightSend(id string) (result *sendContext, ok bool) {
inflightSends.RLock()
defer inflightSends.RUnlock()
result, ok = inflightSends.transfers[id]
return
}

func removeInflightSend(ctx *sendContext) {
inflightSends.Lock()
defer inflightSends.Unlock()
delete(inflightSends.transfers, ctx.sid)
}

0 comments on commit 75d14e3

Please sign in to comment.