Skip to content

Commit

Permalink
Fix first part of #382 - this makes it possible to send a directory t…
Browse files Browse the repository at this point in the history
…o a client that does NOT support file transfer - it simply sends a zip-file using regular file transfer
  • Loading branch information
olabini committed Oct 23, 2017
1 parent e3af988 commit 950e40d
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 54 deletions.
30 changes: 10 additions & 20 deletions session/filetransfer/bytestreams_send.go
Expand Up @@ -65,8 +65,7 @@ func bytestreamsCalculateValidProxies(ctx *sendContext) func(key string) interfa
})

if e != nil {
ctx.control.ReportError(e)
removeInflightSend(ctx)
ctx.onError(e)
return nil
}

Expand Down Expand Up @@ -96,8 +95,7 @@ func (ctx *sendContext) bytestreamsSendData(c net.Conn) {
buffer := make([]byte, bufSize)
r, err := os.Open(ctx.file)
if err != nil {
ctx.control.ReportError(err)
removeInflightSend(ctx)
ctx.onError(err)
return
}
defer r.Close()
Expand All @@ -110,22 +108,18 @@ func (ctx *sendContext) bytestreamsSendData(c net.Conn) {
}
n, err := r.Read(buffer)
if err == io.EOF && n == 0 {
ctx.control.ReportFinished()
removeInflightSend(ctx)
ctx.onFinish()
return
} else if err != nil {
ctx.control.ReportError(err)
removeInflightSend(ctx)
ctx.onError(err)
return
}
_, err = c.Write(buffer[0:n])
if err != nil {
ctx.control.ReportError(err)
removeInflightSend(ctx)
ctx.onError(err)
return
}
ctx.totalSent += int64(n)
ctx.control.SendUpdate(ctx.totalSent)
ctx.onUpdate(n)
}
}

Expand All @@ -146,8 +140,7 @@ func bytestreamsSendDo(ctx *sendContext) {
}, &bq, func(ciq *data.ClientIQ) {
sh, ok := proxyMap[bq.StreamhostUsed.Jid]
if !ok {
ctx.control.ReportError(errors.New("Invalid streamhost to use - this is likely a developer error from the peers side"))
removeInflightSend(ctx)
ctx.onError(errors.New("Invalid streamhost to use - this is likely a developer error from the peers side"))
return
}
dstAddr := hex.EncodeToString(digests.Sha1([]byte(ctx.sid + ciq.To + ciq.From)))
Expand All @@ -159,16 +152,13 @@ func bytestreamsSendDo(ctx *sendContext) {
go ctx.bytestreamsSendData(c)
})
if e != nil {
ctx.control.ReportError(e)
removeInflightSend(ctx)
ctx.onError(e)
}
}) {
ctx.control.ReportError(fmt.Errorf("Failed at connecting to streamhost: %#v", sh))
removeInflightSend(ctx)
ctx.onError(fmt.Errorf("Failed at connecting to streamhost: %#v", sh))
}
}); err != nil {
ctx.control.ReportError(err)
removeInflightSend(ctx)
ctx.onError(err)
}
}()
}
18 changes: 6 additions & 12 deletions session/filetransfer/ibb_send.go
Expand Up @@ -40,8 +40,7 @@ func (ctx *sendContext) ibbSendDoWithBlockSize(blocksize int) {
ctx.ibbSendDoWithBlockSize(blocksize / 2)
return
}
ctx.control.ReportError(e)
removeInflightSend(ctx)
ctx.onError(e)
})
}

Expand All @@ -61,12 +60,10 @@ func (ctx *sendContext) ibbSendChunk(r io.ReadCloser, buffer []byte, seq uint16)
r.Close()
// TODO[LATER]: we ignore the result of this close - maybe we should react to it in some way, if it reports failure from the other side
ctx.s.Conn().SendIQ(ctx.peer, "set", data.IBBClose{Sid: ctx.sid})
ctx.control.ReportFinished()
removeInflightSend(ctx)
ctx.onFinish()
return false
} else if err != nil {
ctx.control.ReportError(err)
removeInflightSend(ctx)
ctx.onError(err)
return false
}
encdata := base64.StdEncoding.EncodeToString(buffer[:n])
Expand All @@ -77,12 +74,10 @@ func (ctx *sendContext) ibbSendChunk(r io.ReadCloser, buffer []byte, seq uint16)
Base64: encdata,
})
if e != nil {
ctx.control.ReportError(e)
removeInflightSend(ctx)
ctx.onError(e)
return false
}
ctx.totalSent += int64(n)
ctx.control.SendUpdate(ctx.totalSent)
ctx.onUpdate(n)

go ctx.trackResultOfSend(rpl)

Expand Down Expand Up @@ -130,8 +125,7 @@ func (ctx *sendContext) ibbSendStartTransfer(blockSize int) {
buffer := make([]byte, blockSize)
f, err := os.Open(ctx.file)
if err != nil {
ctx.control.ReportError(err)
removeInflightSend(ctx)
ctx.onError(err)
return
}
ctx.ibbSendChunks(f, buffer, seq)
Expand Down
46 changes: 36 additions & 10 deletions session/filetransfer/send.go
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/coyim/coyim/xmpp/interfaces"
)

const fileTransferProfile = "http://jabber.org/protocol/si/profile/file-transfer"

func registerSendFileTransferMethod(name string, dispatch func(*sendContext), isCurrentlyValid func(string, *sendContext) bool) {
supportedSendingMechanisms[name] = dispatch
isSendingMechanismCurrentlyValid[name] = isCurrentlyValid
Expand All @@ -23,14 +25,15 @@ func registerSendFileTransferMethod(name string, dispatch func(*sendContext), is
var supportedSendingMechanisms = map[string]func(*sendContext){}
var isSendingMechanismCurrentlyValid = map[string]func(string, *sendContext) bool{}

func discoverSupport(s access.Session, p string) (profiles []string, err error) {
func discoverSupport(s access.Session, p string) (profiles map[string]bool, err error) {
profiles = make(map[string]bool)
if res, ok := s.Conn().DiscoveryFeatures(p); ok {
foundSI := false
for _, feature := range res {
if feature == "http://jabber.org/protocol/si" {
foundSI = true
} else if strings.HasPrefix(feature, "http://jabber.org/protocol/si/profile/") {
profiles = append(profiles, feature)
profiles[feature] = true
}
}

Expand All @@ -42,9 +45,9 @@ func discoverSupport(s access.Session, p string) (profiles []string, err error)
return nil, errors.New("Peer doesn't support any stream initiation profiles")
}

return profiles, nil
return
}
return nil, errors.New("Problem discovering the features of the peer")
return profiles, errors.New("Problem discovering the features of the peer")
}

func genSid(c interfaces.Conn) string {
Expand All @@ -55,8 +58,6 @@ func genSid(c interfaces.Conn) string {
return fmt.Sprintf("sid%d", binary.LittleEndian.Uint64(buf[:]))
}

const fileTransferProfile = "http://jabber.org/protocol/si/profile/file-transfer"

func (ctx *sendContext) calculateAvailableSendOptions() []data.FormFieldOptionX {
res := []data.FormFieldOptionX{}
for k, _ := range supportedSendingMechanisms {
Expand Down Expand Up @@ -99,7 +100,7 @@ func (ctx *sendContext) offerSend() error {
var siq data.SI
nonblockIQ(ctx.s, ctx.peer, "set", toSend, &siq, func(*data.ClientIQ) {
if !isValidSubmitForm(siq) {
ctx.control.ReportErrorNonblocking(errors.New("Invalid data sent from peer for file sending"))
ctx.onError(errors.New("Invalid data sent from peer for file sending"))
return
}
prof := siq.Feature.Form.Fields[0].Values[0]
Expand All @@ -109,9 +110,9 @@ func (ctx *sendContext) offerSend() error {
f(ctx)
return
}
ctx.control.ReportErrorNonblocking(errors.New("Invalid sending mechanism sent from peer for file sending"))
ctx.onError(errors.New("Invalid sending mechanism sent from peer for file sending"))
}, func(_ *data.ClientIQ, e error) {
ctx.control.ReportErrorNonblocking(e)
ctx.onError(e)
})

return nil
Expand All @@ -126,6 +127,31 @@ type sendContext struct {
theyWantToCancel bool
totalSent int64
control *sdata.FileTransferControl
onFinishHook func(*sendContext)
onErrorHook func(*sendContext, error)
onUpdateHook func(*sendContext, int64)
}

func (ctx *sendContext) onFinish() {
ctx.control.ReportFinished()
removeInflightSend(ctx)
if ctx.onFinishHook != nil {
ctx.onFinishHook(ctx)
}
}
func (ctx *sendContext) onError(e error) {
ctx.control.ReportErrorNonblocking(e)
removeInflightSend(ctx)
if ctx.onErrorHook != nil {
ctx.onErrorHook(ctx, e)
}
}
func (ctx *sendContext) onUpdate(v int) {
ctx.totalSent += int64(v)
ctx.control.SendUpdate(ctx.totalSent)
if ctx.onUpdateHook != nil {
ctx.onUpdateHook(ctx, ctx.totalSent)
}
}

func (ctx *sendContext) notifyUserThatSendStarted(method string) {
Expand All @@ -148,7 +174,7 @@ func (ctx *sendContext) listenForCancellation() {
func (ctx *sendContext) initSend() {
_, err := discoverSupport(ctx.s, ctx.peer)
if err != nil {
ctx.control.ReportErrorNonblocking(err)
ctx.onError(err)
return
}

Expand Down
71 changes: 59 additions & 12 deletions session/filetransfer/send_dir.go
@@ -1,12 +1,17 @@
package filetransfer

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"

"github.com/coyim/coyim/session/access"
sdata "github.com/coyim/coyim/session/data"
)

const dirTransferProfile = "http://jabber.org/protocol/si/profile/directory-transfer"

type dirSendContext struct {
s access.Session
peer string
Expand All @@ -16,57 +21,99 @@ type dirSendContext struct {
theyWantToCancel bool
totalSent int64
control *sdata.FileTransferControl
fallback *sendContext
onErrorHook func(*dirSendContext, error)
}

func (ctx *dirSendContext) onError(e error) {
ctx.control.ReportErrorNonblocking(e)
if ctx.onErrorHook != nil {
ctx.onErrorHook(ctx, e)
}
}

func (ctx *dirSendContext) startPackingDirectory() (<-chan string, <-chan error) {
result := make(chan string)
errorResult := make(chan error)

go func() {
tmpFile, e := ioutil.TempFile("", "coyim-packing")
tmpFile, e := ioutil.TempFile("", fmt.Sprintf("%s-directory-", filepath.Base(ctx.dir)))
if e != nil {
errorResult <- e
return
}
defer tmpFile.Close()
e = pack(ctx.dir, tmpFile)
if e != nil {
errorResult <- e
tmpFile.Close()
return
}
result <- tmpFile.Name()
newName := fmt.Sprintf("%v.zip", tmpFile.Name())
tmpFile.Close()
os.Rename(tmpFile.Name(), newName)
result <- newName
}()

return result, errorResult
}

const dirTransferProfile = "http://jabber.org/protocol/si/profile/directory-transfer"

func (ctx *dirSendContext) initSend() {
result, errorResult := ctx.startPackingDirectory()

_, err := discoverSupport(ctx.s, ctx.peer)
supported, err := discoverSupport(ctx.s, ctx.peer)
if err != nil {
ctx.control.ReportErrorNonblocking(err)
ctx.onError(err)
return
}

go ctx.listenForCancellation()

select {
case tmpFile := <-result:
ctx.offerSend(tmpFile)
ctx.offerSend(tmpFile, supported)
case e := <-errorResult:
ctx.control.ReportErrorNonblocking(e)
ctx.onError(e)
}
}

func (ctx *dirSendContext) listenForCancellation() {
// TODO: fix
ctx.control.WaitForCancel(func() {
ctx.weWantToCancel = true
if ctx.fallback != nil {
ctx.fallback.weWantToCancel = true
}
})
}

func (ctx *dirSendContext) offerSendDirectory(file string) error {
// TODO: for now, only supported on CoyIM
ctx.sid = genSid(ctx.s.Conn())
return nil
}

func (ctx *dirSendContext) offerSend(file string) {
// TODO: fix
func (ctx *dirSendContext) offerSendDirectoryFallback(file string) error {
// This one is a fallback for sending to clients that don't support directory sending, but do support file sending. We will simply send the packaged .zip file to them.
fctx := &sendContext{
s: ctx.s,
peer: ctx.peer,
file: file,
control: ctx.control,
onFinishHook: func(_ *sendContext) {
os.Remove(file)
},
onErrorHook: func(_ *sendContext, _ error) {
os.Remove(file)
},
}
ctx.fallback = fctx
return fctx.offerSend()
}

func (ctx *dirSendContext) offerSend(file string, availableProfiles map[string]bool) error {
if availableProfiles[dirTransferProfile] {
return ctx.offerSendDirectory(file)
}
return ctx.offerSendDirectoryFallback(file)
}

// InitSendDir starts the process of sending a directory to a peer
Expand Down
1 change: 1 addition & 0 deletions session/session_test.go
Expand Up @@ -339,6 +339,7 @@ func (s *SessionSuite) Test_WatchStanzas_getsDiscoInfoIQ(c *C) {
"<feature xmlns=\"http://jabber.org/protocol/disco#info\" var=\"jabber:x:data\"></feature>"+
"<feature xmlns=\"http://jabber.org/protocol/disco#info\" var=\"http://jabber.org/protocol/si\"></feature>"+
"<feature xmlns=\"http://jabber.org/protocol/disco#info\" var=\"http://jabber.org/protocol/si/profile/file-transfer\"></feature>"+
"<feature xmlns=\"http://jabber.org/protocol/disco#info\" var=\"http://jabber.org/protocol/si/profile/directory-transfer\"></feature>"+
"<feature xmlns=\"http://jabber.org/protocol/disco#info\" var=\"http://jabber.org/protocol/bytestreams\"></feature>"+
"</query>"+
"</iq>")
Expand Down
1 change: 1 addition & 0 deletions xmpp/discovery.go
Expand Up @@ -130,6 +130,7 @@ func DiscoveryReply(name string) data.DiscoveryReply {
{Var: "jabber:x:data"},
{Var: "http://jabber.org/protocol/si"},
{Var: "http://jabber.org/protocol/si/profile/file-transfer"},
{Var: "http://jabber.org/protocol/si/profile/directory-transfer"},
{Var: "http://jabber.org/protocol/bytestreams"},
},
}
Expand Down
1 change: 1 addition & 0 deletions xmpp/discovery_test.go
Expand Up @@ -278,6 +278,7 @@ func (s *DiscoveryXMPPSuite) Test_DiscoveryReply_returnsSupportedValues(c *C) {
{Var: "jabber:x:data"},
{Var: "http://jabber.org/protocol/si"},
{Var: "http://jabber.org/protocol/si/profile/file-transfer"},
{Var: "http://jabber.org/protocol/si/profile/directory-transfer"},
{Var: "http://jabber.org/protocol/bytestreams"},
},
Forms: []data.Form(nil)})
Expand Down

0 comments on commit 950e40d

Please sign in to comment.