Skip to content

Commit

Permalink
generic file transfer mechanism using either SFTP or SCP
Browse files Browse the repository at this point in the history
  • Loading branch information
jamis committed Apr 25, 2008
1 parent d099e45 commit 06bfd02
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 158 deletions.
1 change: 1 addition & 0 deletions capistrano.gemspec
Expand Up @@ -16,6 +16,7 @@ Gem::Specification.new do |s|

s.add_dependency 'net-ssh', ">= 1.99.2"
s.add_dependency 'net-sftp', ">= 1.99.1"
s.add_dependency 'net-scp', ">= 0.99.0"
s.add_dependency 'net-ssh-gateway', ">= 0.99.0"
s.add_dependency 'highline'

Expand Down
22 changes: 1 addition & 21 deletions lib/capistrano/command.rb
@@ -1,31 +1,11 @@
require 'capistrano/errors'
require 'capistrano/processable'

module Capistrano

# This class encapsulates a single command to be executed on a set of remote
# machines, in parallel.
class Command
module Processable
def process_iteration(wait=nil, &block)
sessions.each { |session| session.preprocess }
return false if block && !block.call(self)

readers = sessions.map { |session| session.listeners.keys }.flatten.reject { |io| io.closed? }
writers = readers.select { |io| io.respond_to?(:pending_write?) && io.pending_write? }

readers, writers, = IO.select(readers, writers, nil, wait)

if readers
sessions.each do |session|
ios = session.listeners.keys
session.postprocess(ios & readers, ios & writers)
end
end

true
end
end

include Processable

attr_reader :command, :sessions, :options
Expand Down
31 changes: 19 additions & 12 deletions lib/capistrano/configuration/actions/file_transfer.rb
@@ -1,4 +1,4 @@
require 'capistrano/upload'
require 'capistrano/transfer'

module Capistrano
class Configuration
Expand All @@ -9,22 +9,29 @@ module FileTransfer
# by the current task. If <tt>:mode</tt> is specified it is used to
# set the mode on the file.
def put(data, path, options={})
execute_on_servers(options) do |servers|
targets = servers.map { |s| sessions[s] }
Upload.process(targets, path, :data => data, :mode => options[:mode], :logger => logger)
end
upload(StringIO.new(data), path, options)
end

# Get file remote_path from FIRST server targetted by
# Get file remote_path from FIRST server targeted by
# the current task and transfer it to local machine as path.
#
# get "#{deploy_to}/current/log/production.log", "log/production.log.web"
def get(remote_path, path, options = {})
execute_on_servers(options.merge(:once => true)) do |servers|
logger.info "downloading `#{servers.first.host}:#{remote_path}' to `#{path}'"
sftp = sessions[servers.first].sftp
sftp.download! remote_path, path
logger.debug "download finished"
def get(remote_path, path, options={}, &block)
download(remote_path, path, options.merge(:once => true), &block)
end

def upload(from, to, options={}, &block)
transfer(:up, from, to, options, &block)
end

def download(from, to, options={}, &block)
transfer(:down, from, to, options, &block)
end

def transfer(direction, from, to, options={}, &block)
execute_on_servers(options) do |servers|
targets = servers.map { |s| sessions[s] }
Transfer.process(direction, from, to, targets, options.merge(:logger => logger), &block)
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/capistrano/errors.rb
Expand Up @@ -10,6 +10,6 @@ class RemoteError < Error
end

class ConnectionError < RemoteError; end
class UploadError < RemoteError; end
class TransferError < RemoteError; end
class CommandError < RemoteError; end
end
53 changes: 53 additions & 0 deletions lib/capistrano/processable.rb
@@ -0,0 +1,53 @@
module Capistrano
module Processable
module SessionAssociation
def self.on(exception, session)
unless exception.respond_to?(:session)
exception.extend(self)
exception.session = session
end

return exception
end

attr_accessor :session
end

def process_iteration(wait=nil, &block)
ensure_each_session { |session| session.preprocess }

return false if block && !block.call(self)

readers = sessions.map { |session| session.listeners.keys }.flatten.reject { |io| io.closed? }
writers = readers.select { |io| io.respond_to?(:pending_write?) && io.pending_write? }

if readers.any? || writers.any?
readers, writers, = IO.select(readers, writers, nil, wait)
end

if readers
ensure_each_session do |session|
ios = session.listeners.keys
session.postprocess(ios & readers, ios & writers)
end
end

true
end

def ensure_each_session
errors = []

sessions.each do |session|
begin
yield session
rescue Exception => error
errors << SessionAssociation.on(error, session)
end
end

raise errors.first if errors.any?
sessions
end
end
end
2 changes: 1 addition & 1 deletion lib/capistrano/shell.rb
Expand Up @@ -253,7 +253,7 @@ def process_command(scope_type, scope_value, command)
end
end

# All open sessions
# All open sessions, needed to satisfy the Command::Processable include
def sessions
configuration.sessions.values
end
Expand Down
159 changes: 159 additions & 0 deletions lib/capistrano/transfer.rb
@@ -0,0 +1,159 @@
require 'net/scp'
require 'net/sftp'

require 'capistrano/processable'

module Capistrano
class Transfer
include Processable

def self.process(direction, from, to, sessions, options={}, &block)
new(direction, from, to, sessions, options, &block).process!
end

attr_reader :sessions
attr_reader :options
attr_reader :callback

attr_reader :transport
attr_reader :direction
attr_reader :from
attr_reader :to

attr_reader :logger
attr_reader :transfers

def initialize(direction, from, to, sessions, options={}, &block)
@direction = direction
@from = from
@to = to
@sessions = sessions
@options = options
@callback = callback

@transport = options.fetch(:transport, :sftp)
@logger = options.delete(:logger)

prepare_transfers
end

def process!
loop do
begin
break unless process_iteration { active? }
rescue Exception => error
if error.respond_to?(:session)
handle_error(error)
else
raise
end
end
end

failed = transfers.select { |txfr| txfr[:failed] }
if failed.any?
hosts = failed.map { |txfr| txfr[:server] }
errors = failed.map { |txfr| "#{txfr[:error]} (#{txfr[:error].message})" }.uniq.join(", ")
error = TransferError.new("#{operation} via #{transport} failed on #{hosts.join(',')}: #{errors}")
error.hosts = hosts
raise error
end

self
end

def active?
transfers.any? { |transfer| transfer.active? }
end

def operation
"#{direction}load"
end

private

def prepare_transfers
@session_map = {}

@transfers = sessions.map do |session|
session_from = normalize(from, session)
session_to = normalize(to, session)

@session_map[session] = case transport
when :sftp
prepare_sftp_transfer(session_from, session_to, session)
when :scp
prepare_scp_transfer(session_from, session_to, session)
else
raise ArgumentError, "unsupported transport type: #{transport.inspect}"
end
end
end

def prepare_scp_transfer(from, to, session)
scp = Net::SCP.new(session)

channel = case direction
when :up
scp.upload(from, to, options, &callback)
when :down
scp.download(from, to, options, &callback)
else
raise ArgumentError, "unsupported transfer direction: #{direction.inspect}"
end

channel[:server] = session.xserver
channel[:host] = session.xserver.host
channel[:channel] = channel

return channel
end

def prepare_sftp_transfer(from, to, session)
# FIXME: connect! is a synchronous operation, do this async and then synchronize all at once
sftp = Net::SFTP::Session.new(session).connect!

real_callback = Proc.new do |event, op, *args|
callback.call(event, op, *args) if callback
op[:channel].close if event == :finish
end

operation = case direction
when :up
sftp.upload(from, to, options, &real_callback)
when :down
sftp.download(from, to, options, &real_callback)
else
raise ArgumentError, "unsupported transfer direction: #{direction.inspect}"
end

operation[:server] = session.xserver
operation[:host] = session.xserver.host
operation[:channel] = sftp.channel

return operation
end

def normalize(argument, session)
if argument.is_a?(String)
argument.gsub(/\$CAPISTRANO:HOST\$/, session.xserver.host)
elsif argument.respond_to?(:read)
pos = argument.pos
clone = StringIO.new(argument.read)
clone.pos = argument.pos = pos
clone
else
argument
end
end

def handle_error(error)
transfer = @session_map[error.session]
transfer[:channel].close
transfer[:error] = error
transfer[:failed] = true

transfer.abort! if transport == :sftp
end
end
end

0 comments on commit 06bfd02

Please sign in to comment.