diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e75402..d17482d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## Change Log +### v0.3.0 + +- Add support for OSS Callback + ### v0.2.0 - Add aliyun/sts diff --git a/README.md b/README.md index b5644e3..7db03c2 100644 --- a/README.md +++ b/README.md @@ -185,6 +185,35 @@ Object的common prefix,包含在`list_objects`的结果中。 Common prefix让用户不需要遍历所有的object(可能数量巨大)而找出前缀, 在模拟目录结构时非常有用。 +## 上传回调 + +在`put_object`和`resumable_upload`时可以指定一个`Callback`,这样在文件 +成功上传到OSS之后,OSS会向用户提供的服务器地址发起一个HTTP POST请求, +以通知用户相应的事件发生了。用户可以在收到这个通知之后进行相应的动作, +例如更新数据库、统计行为等。更多有关上传回调的内容请参考[OSS上传回调][oss-callback]。 + +下面的例子将演示如何使用上传回调: + + callback = Aliyun::OSS::Callback.new( + url: 'http://10.101.168.94:1234/callback', + query: {user: 'put_object'}, + body: 'bucket=${bucket}&object=${object}' + ) + + begin + bucket.put_object('files/hello', callback: callback) + rescue Aliyun::OSS::CallbackError => e + puts "Callback failed: #{e.message}" + end + +**注意** + +1. callback的url**不能**包含query string,而应该在`:query`参数中指定 +2. 可能出现文件上传成功,但是执行回调失败的情况,此时client会抛出 + `CallbackError`,用户如果要忽略此错误,需要显示接住这个异常。 +3. 详细的例子可以参考[callback.rb](examples/aliyun/oss/callback.rb) +4. 接受回调的server可以参考[callback_server.rb](rails/aliyun_oss_callback_server.rb) + ## 断点上传/下载 OSS支持大文件的存储,用户如果上传/下载大文件(Object)的时候中断了(网络 @@ -386,3 +415,4 @@ SDK采用rspec进行测试,如果要对SDK进行修改,请确保没有break [custom-domain]: https://help.aliyun.com/document_detail/oss/user_guide/oss_concept/oss_cname.html [aliyun-sts]: https://help.aliyun.com/document_detail/ram/intro/concepts.html [sdk-api]: http://www.rubydoc.info/gems/aliyun-sdk/ +[oss-callback]: https://help.aliyun.com/document_detail/oss/user_guide/upload_object/upload_callback.html diff --git a/aliyun-sdk.gemspec b/aliyun-sdk.gemspec index 1ce9313..bda389e 100644 --- a/aliyun-sdk.gemspec +++ b/aliyun-sdk.gemspec @@ -30,6 +30,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'rspec', '~> 3.3' spec.add_development_dependency 'webmock', '~> 1.22' spec.add_development_dependency 'simplecov', '~> 0.10' + spec.add_development_dependency 'minitest', '~> 5.8' spec.required_ruby_version = '>= 1.9.3' end diff --git a/examples/aliyun/oss/callback.rb b/examples/aliyun/oss/callback.rb new file mode 100644 index 0000000..529d923 --- /dev/null +++ b/examples/aliyun/oss/callback.rb @@ -0,0 +1,61 @@ +# -*- encoding: utf-8 -*- + +$LOAD_PATH.unshift(File.expand_path("../../../../lib", __FILE__)) +require 'yaml' +require 'json' +require 'aliyun/oss' + +## +# 用户在上传文件时可以指定“上传回调”,这样在文件上传成功后OSS会向用户 +# 提供的服务器地址发起一个HTTP POST请求,相当于一个异步的通知机制。用 +# 户可以在收到回调的时候做相应的动作。 +# 1. 如何接受OSS的回调可以参考代码目录下的 +# rails/aliyun_oss_callback_server.rb +# 2. 只有put_object和resumable_upload支持上传回调 + +# 初始化OSS client +Aliyun::Common::Logging.set_log_level(Logger::DEBUG) +conf_file = '~/.oss.yml' +conf = YAML.load(File.read(File.expand_path(conf_file))) +bucket = Aliyun::OSS::Client.new( + :endpoint => conf['endpoint'], + :cname => conf['cname'], + :access_key_id => conf['access_key_id'], + :access_key_secret => conf['access_key_secret']).get_bucket(conf['bucket']) + +# 辅助打印函数 +def demo(msg) + puts "######### #{msg} ########" + puts + yield + puts "-------------------------" + puts +end + +demo "put object with callback" do + callback = Aliyun::OSS::Callback.new( + url: 'http://10.101.168.94:1234/callback', + query: {user: 'put_object'}, + body: 'bucket=${bucket}&object=${object}' + ) + + begin + bucket.put_object('files/hello', callback: callback) + rescue Aliyun::OSS::CallbackError => e + puts "Callback failed: #{e.message}" + end +end + +demo "resumable upload with callback" do + callback = Aliyun::OSS::Callback.new( + url: 'http://10.101.168.94:1234/callback', + query: {user: 'resumable_upload'}, + body: 'bucket=${bucket}&object=${object}' + ) + + begin + bucket.resumable_upload('files/world', '/tmp/x', callback: callback) + rescue Aliyun::OSS::CallbackError => e + puts "Callback failed: #{e.message}" + end +end diff --git a/lib/aliyun/oss/bucket.rb b/lib/aliyun/oss/bucket.rb index ae80bd7..2b48f94 100644 --- a/lib/aliyun/oss/bucket.rb +++ b/lib/aliyun/oss/bucket.rb @@ -178,6 +178,9 @@ def list_objects(opts = {}) # @option opts [Hash] :metas 设置object的meta,这是一些用户自定 # 义的属性,它们会和object一起存储,在{#get_object}的时候会 # 返回这些meta。属性的key不区分大小写。例如:{ 'year' => '2015' } + # @option opts [Callback] :callback 指定操作成功后OSS的 + # 上传回调,上传成功后OSS会向用户的应用服务器发一个HTTP POST请 + # 求,`:callback`参数指定这个请求的相关参数 # @yield [HTTP::StreamWriter] 如果调用的时候传递了block,则写入 # 到object的数据由block指定 # @example 流式上传数据 @@ -188,7 +191,20 @@ def list_objects(opts = {}) # @example 指定Content-Type和metas # put_object('x', :file => '/tmp/x', :content_type => 'text/html', # :metas => {'year' => '2015', 'people' => 'mary'}) - # @note 如果opts中指定了:file,则block会被忽略 + # @example 指定Callback + # callback = Aliyun::OSS::Callback.new( + # url: 'http://10.101.168.94:1234/callback', + # query: {user: 'put_object'}, + # body: 'bucket=${bucket}&object=${object}' + # ) + # + # bucket.put_object('files/hello', callback: callback) + # @raise [CallbackError] 如果文件上传成功而Callback调用失败,抛 + # 出此错误 + # @note 如果opts中指定了`:file`,则block会被忽略 + # @note 如果指定了`:callback`,则可能文件上传成功,但是callback + # 执行失败,此时会抛出{OSS::CallbackError},用户可以选择接住这 + # 个异常,以忽略Callback调用错误 def put_object(key, opts = {}, &block) args = opts.dup @@ -420,15 +436,31 @@ def get_object_cors(key) # 果设置为true,则在上传的过程中不会写checkpoint文件,这意味着 # 上传失败后不能断点续传,而只能重新上传整个文件。如果这个值为 # true,则:cpt_file会被忽略。 + # @option opts [Callback] :callback 指定文件上传成功后OSS的 + # 上传回调,上传成功后OSS会向用户的应用服务器发一个HTTP POST请 + # 求,`:callback`参数指定这个请求的相关参数 # @yield [Float] 如果调用的时候传递了block,则会将上传进度交由 # block处理,进度值是一个0-1之间的小数 # @raise [CheckpointBrokenError] 如果cpt文件被损坏,则抛出此错误 # @raise [FileInconsistentError] 如果指定的文件与cpt中记录的不一 # 致,则抛出此错误 + # @raise [CallbackError] 如果文件上传成功而Callback调用失败,抛 + # 出此错误 # @example # bucket.resumable_upload('my-object', '/tmp/x') do |p| # puts "Progress: #{(p * 100).round(2)} %" # end + # @example 指定Callback + # callback = Aliyun::OSS::Callback.new( + # url: 'http://10.101.168.94:1234/callback', + # query: {user: 'put_object'}, + # body: 'bucket=${bucket}&object=${object}' + # ) + # + # bucket.resumable_upload('files/hello', '/tmp/x', callback: callback) + # @note 如果指定了`:callback`,则可能文件上传成功,但是callback + # 执行失败,此时会抛出{OSS::CallbackError},用户可以选择接住这 + # 个异常,以忽略Callback调用错误 def resumable_upload(key, file, opts = {}, &block) args = opts.dup diff --git a/lib/aliyun/oss/exception.rb b/lib/aliyun/oss/exception.rb index fafba41..0526f40 100644 --- a/lib/aliyun/oss/exception.rb +++ b/lib/aliyun/oss/exception.rb @@ -54,6 +54,9 @@ def get_request_id(response) end # ServerError + class CallbackError < ServerError + end # CallbackError + ## # ClientError represents client exceptions caused mostly by # invalid parameters. diff --git a/lib/aliyun/oss/http.rb b/lib/aliyun/oss/http.rb index 1e419c0..fd8d5a9 100644 --- a/lib/aliyun/oss/http.rb +++ b/lib/aliyun/oss/http.rb @@ -277,7 +277,7 @@ def do_request(verb, resources = {}, http_options = {}, &block) logger.debug("Received HTTP response, code: #{r.code}, headers: " \ "#{r.headers}, body: #{r.body}") - [r.headers, r.body] + r end def get_user_agent diff --git a/lib/aliyun/oss/protocol.rb b/lib/aliyun/oss/protocol.rb index c1fb6ed..c675860 100644 --- a/lib/aliyun/oss/protocol.rb +++ b/lib/aliyun/oss/protocol.rb @@ -14,6 +14,7 @@ module OSS class Protocol STREAM_CHUNK_SIZE = 16 * 1024 + CALLBACK_HEADER = 'x-oss-callback' include Common::Logging @@ -49,8 +50,8 @@ def list_buckets(opts = {}) 'max-keys' => opts[:limit] }.reject { |_, v| v.nil? } - _, body = @http.get( {}, {:query => params}) - doc = parse_xml(body) + r = @http.get( {}, {:query => params}) + doc = parse_xml(r.body) buckets = doc.css("Buckets Bucket").map do |node| Bucket.new( @@ -135,9 +136,9 @@ def get_bucket_acl(name) logger.info("Begin get bucket acl, name: #{name}") sub_res = {'acl' => nil} - _, body = @http.get({:bucket => name, :sub_res => sub_res}) + r = @http.get({:bucket => name, :sub_res => sub_res}) - doc = parse_xml(body) + doc = parse_xml(r.body) acl = get_node_text(doc.at_css("AccessControlList"), 'Grant') logger.info("Done get bucket acl") @@ -182,9 +183,9 @@ def get_bucket_logging(name) logger.info("Begin get bucket logging, name: #{name}") sub_res = {'logging' => nil} - _, body = @http.get({:bucket => name, :sub_res => sub_res}) + r = @http.get({:bucket => name, :sub_res => sub_res}) - doc = parse_xml(body) + doc = parse_xml(r.body) opts = {:enable => false} logging_node = doc.at_css("LoggingEnabled") @@ -249,10 +250,10 @@ def get_bucket_website(name) logger.info("Begin get bucket website, name: #{name}") sub_res = {'website' => nil} - _, body = @http.get({:bucket => name, :sub_res => sub_res}) + r = @http.get({:bucket => name, :sub_res => sub_res}) opts = {:enable => true} - doc = parse_xml(body) + doc = parse_xml(r.body) opts.update( :index => get_node_text(doc.at_css('IndexDocument'), 'Suffix'), :error => get_node_text(doc.at_css('ErrorDocument'), 'Key') @@ -307,9 +308,9 @@ def get_bucket_referer(name) logger.info("Begin get bucket referer, name: #{name}") sub_res = {'referer' => nil} - _, body = @http.get({:bucket => name, :sub_res => sub_res}) + r = @http.get({:bucket => name, :sub_res => sub_res}) - doc = parse_xml(body) + doc = parse_xml(r.body) opts = { :allow_empty => get_node_text(doc.root, 'AllowEmptyReferer', &:to_bool), @@ -370,9 +371,9 @@ def get_bucket_lifecycle(name) logger.info("Begin get bucket lifecycle, name: #{name}") sub_res = {'lifecycle' => nil} - _, body = @http.get({:bucket => name, :sub_res => sub_res}) + r = @http.get({:bucket => name, :sub_res => sub_res}) - doc = parse_xml(body) + doc = parse_xml(r.body) rules = doc.css("Rule").map do |n| days = n.at_css("Expiration Days") date = n.at_css("Expiration Date") @@ -443,9 +444,9 @@ def get_bucket_cors(name) logger.info("Begin get bucket cors, bucket: #{name}") sub_res = {'cors' => nil} - _, body = @http.get({:bucket => name, :sub_res => sub_res}) + r = @http.get({:bucket => name, :sub_res => sub_res}) - doc = parse_xml(body) + doc = parse_xml(r.body) rules = [] doc.css("CORSRule").map do |n| @@ -505,6 +506,8 @@ def delete_bucket(name) # @option opts [Hash] :metas key-value pairs # that serve as the object meta which will be stored together # with the object + # @option opts [Callback] :callback the HTTP callback performed + # by OSS after `put_object` succeeds # @yield [HTTP::StreamWriter] a stream writer is # yielded to the caller to which it can write chunks of data # streamingly @@ -516,13 +519,23 @@ def put_object(bucket_name, object_name, opts = {}, &block) "#{object_name}, options: #{opts}") headers = {'Content-Type' => opts[:content_type]} + if opts.key?(:callback) + headers[CALLBACK_HEADER] = opts[:callback].serialize + end + (opts[:metas] || {}) .each { |k, v| headers["x-oss-meta-#{k.to_s}"] = v.to_s } - @http.put( + r = @http.put( {:bucket => bucket_name, :object => object_name}, {:headers => headers, :body => HTTP::StreamPayload.new(&block)}) + if r.code == 203 + e = CallbackError.new(r) + logger.error(e.to_s) + raise e + end + logger.debug('Done put object') end @@ -557,13 +570,13 @@ def append_object(bucket_name, object_name, position, opts = {}, &block) (opts[:metas] || {}) .each { |k, v| headers["x-oss-meta-#{k.to_s}"] = v.to_s } - h, _ = @http.post( - {:bucket => bucket_name, :object => object_name, :sub_res => sub_res}, - {:headers => headers, :body => HTTP::StreamPayload.new(&block)}) + r = @http.post( + {:bucket => bucket_name, :object => object_name, :sub_res => sub_res}, + {:headers => headers, :body => HTTP::StreamPayload.new(&block)}) logger.debug('Done append object') - wrap(h[:x_oss_next_append_position], &:to_i) || -1 + wrap(r.headers[:x_oss_next_append_position], &:to_i) || -1 end # List objects in a bucket. @@ -614,12 +627,10 @@ def list_objects(bucket_name, opts = {}) 'encoding-type' => opts[:encoding] }.reject { |_, v| v.nil? } - _, body = @http.get({:bucket => bucket_name}, {:query => params}) - - doc = parse_xml(body) + r = @http.get({:bucket => bucket_name}, {:query => params}) + doc = parse_xml(r.body) encoding = get_node_text(doc.root, 'EncodingType') - objects = doc.css("Contents").map do |node| Object.new( :key => get_node_text(node, "Key") { |x| decode_key(x, encoding) }, @@ -728,12 +739,13 @@ def get_object(bucket_name, object_name, opts = {}, &block) rewrites[:expires].httpdate if rewrites.key?(:expires) end - h, _ = @http.get( - {:bucket => bucket_name, :object => object_name, - :sub_res => sub_res}, - {:headers => headers} - ) { |chunk| yield chunk if block_given? } + r = @http.get( + {:bucket => bucket_name, :object => object_name, + :sub_res => sub_res}, + {:headers => headers} + ) { |chunk| yield chunk if block_given? } + h = r.headers metas = {} meta_prefix = 'x_oss_meta_' h.select { |k, _| k.to_s.start_with?(meta_prefix) } @@ -772,10 +784,11 @@ def get_object_meta(bucket_name, object_name, opts = {}) headers = {} headers.merge!(get_conditions(opts[:condition])) if opts[:condition] - h, _ = @http.head( - {:bucket => bucket_name, :object => object_name}, - {:headers => headers}) + r = @http.head( + {:bucket => bucket_name, :object => object_name}, + {:headers => headers}) + h = r.headers metas = {} meta_prefix = 'x_oss_meta_' h.select { |k, _| k.to_s.start_with?(meta_prefix) } @@ -839,11 +852,11 @@ def copy_object(bucket_name, src_object_name, dst_object_name, opts = {}) headers.merge!(get_copy_conditions(opts[:condition])) if opts[:condition] - _, body = @http.put( + r = @http.put( {:bucket => bucket_name, :object => dst_object_name}, {:headers => headers}) - doc = parse_xml(body) + doc = parse_xml(r.body) copy_result = { :last_modified => get_node_text( doc.root, 'LastModified') { |x| Time.parse(x) }, @@ -897,13 +910,13 @@ def batch_delete_objects(bucket_name, object_names, opts = {}) query = {} query['encoding-type'] = opts[:encoding] if opts[:encoding] - _, body = @http.post( + r = @http.post( {:bucket => bucket_name, :sub_res => sub_res}, {:query => query, :body => body}) deleted = [] unless opts[:quiet] - doc = parse_xml(body) + doc = parse_xml(r.body) encoding = get_node_text(doc.root, 'EncodingType') doc.css("Deleted").map do |n| deleted << get_node_text(n, 'Key') { |x| decode_key(x, encoding) } @@ -942,10 +955,10 @@ def get_object_acl(bucket_name, object_name) "object: #{object_name}") sub_res = {'acl' => nil} - _, body = @http.get( - {bucket: bucket_name, object: object_name, sub_res: sub_res}) + r = @http.get( + {bucket: bucket_name, object: object_name, sub_res: sub_res}) - doc = parse_xml(body) + doc = parse_xml(r.body) acl = get_node_text(doc.at_css("AccessControlList"), 'Grant') logger.debug("Done get object acl") @@ -974,18 +987,18 @@ def get_object_cors(bucket_name, object_name, origin, method, headers = []) 'Access-Control-Request-Headers' => headers.join(',') } - return_headers, _ = @http.options( - {:bucket => bucket_name, :object => object_name}, - {:headers => h}) + r = @http.options( + {:bucket => bucket_name, :object => object_name}, + {:headers => h}) logger.debug("Done get object cors") CORSRule.new( - :allowed_origins => return_headers[:access_control_allow_origin], - :allowed_methods => return_headers[:access_control_allow_methods], - :allowed_headers => return_headers[:access_control_allow_headers], - :expose_headers => return_headers[:access_control_expose_headers], - :max_age_seconds => return_headers[:access_control_max_age] + :allowed_origins => r.headers[:access_control_allow_origin], + :allowed_methods => r.headers[:access_control_allow_methods], + :allowed_headers => r.headers[:access_control_allow_headers], + :expose_headers => r.headers[:access_control_expose_headers], + :max_age_seconds => r.headers[:access_control_max_age] ) end @@ -1014,12 +1027,12 @@ def initiate_multipart_upload(bucket_name, object_name, opts = {}) (opts[:metas] || {}) .each { |k, v| headers["x-oss-meta-#{k.to_s}"] = v.to_s } - _, body = @http.post( - {:bucket => bucket_name, :object => object_name, - :sub_res => sub_res}, - {:headers => headers}) + r = @http.post( + {:bucket => bucket_name, :object => object_name, + :sub_res => sub_res}, + {:headers => headers}) - doc = parse_xml(body) + doc = parse_xml(r.body) txn_id = get_node_text(doc.root, 'UploadId') logger.info("Done initiate multipart upload: #{txn_id}.") @@ -1040,13 +1053,13 @@ def upload_part(bucket_name, object_name, txn_id, part_no, &block) "#{object_name}, txn id: #{txn_id}, part No: #{part_no}") sub_res = {'partNumber' => part_no, 'uploadId' => txn_id} - headers, _ = @http.put( + r = @http.put( {:bucket => bucket_name, :object => object_name, :sub_res => sub_res}, {:body => HTTP::StreamPayload.new(&block)}) logger.debug("Done upload part") - Multipart::Part.new(:number => part_no, :etag => headers[:etag]) + Multipart::Part.new(:number => part_no, :etag => r.headers[:etag]) end # Upload a part in a multipart uploading transaction by copying @@ -1084,26 +1097,31 @@ def upload_part_by_copy( sub_res = {'partNumber' => part_no, 'uploadId' => txn_id} - headers, _ = @http.put( + r = @http.put( {:bucket => bucket_name, :object => object_name, :sub_res => sub_res}, {:headers => headers}) logger.debug("Done upload part by copy: #{source_object}.") - Multipart::Part.new(:number => part_no, :etag => headers[:etag]) + Multipart::Part.new(:number => part_no, :etag => r.headers[:etag]) end # Complete a multipart uploading transaction # @param bucket_name [String] the bucket name # @param object_name [String] the object name # @param txn_id [String] the upload id - # @param parts [Array] all the - # parts in this transaction - def complete_multipart_upload(bucket_name, object_name, txn_id, parts) + # @param parts [Array] all the parts in this + # transaction + # @param callback [Callback] the HTTP callback performed by OSS + # after this operation succeeds + def complete_multipart_upload( + bucket_name, object_name, txn_id, parts, callback = nil) logger.debug("Begin complete multipart upload, "\ "txn id: #{txn_id}, parts: #{parts.map(&:to_s)}") sub_res = {'uploadId' => txn_id} + headers = {} + headers[CALLBACK_HEADER] = callback.serialize if callback body = Nokogiri::XML::Builder.new do |xml| xml.CompleteMultipartUpload { @@ -1116,9 +1134,15 @@ def complete_multipart_upload(bucket_name, object_name, txn_id, parts) } end.to_xml - @http.post( + r = @http.post( {:bucket => bucket_name, :object => object_name, :sub_res => sub_res}, - {:body => body}) + {:headers => headers, :body => body}) + + if r.code == 203 + e = CallbackError.new(r) + logger.error(e.to_s) + raise e + end logger.debug("Done complete multipart upload: #{txn_id}.") end @@ -1188,14 +1212,12 @@ def list_multipart_uploads(bucket_name, opts = {}) 'encoding-type' => opts[:encoding] }.reject { |_, v| v.nil? } - _, body = @http.get( + r = @http.get( {:bucket => bucket_name, :sub_res => sub_res}, {:query => params}) - doc = parse_xml(body) - + doc = parse_xml(r.body) encoding = get_node_text(doc.root, 'EncodingType') - txns = doc.css("Upload").map do |node| Multipart::Transaction.new( :id => get_node_text(node, "UploadId"), @@ -1259,11 +1281,11 @@ def list_parts(bucket_name, object_name, txn_id, opts = {}) 'encoding-type' => opts[:encoding] }.reject { |_, v| v.nil? } - _, body = @http.get( + r = @http.get( {:bucket => bucket_name, :object => object_name, :sub_res => sub_res}, {:query => params}) - doc = parse_xml(body) + doc = parse_xml(r.body) parts = doc.css("Part").map do |node| Multipart::Part.new( :number => get_node_text(node, 'PartNumber', &:to_i), diff --git a/lib/aliyun/oss/struct.rb b/lib/aliyun/oss/struct.rb index 6f1ac5c..f1a65d7 100644 --- a/lib/aliyun/oss/struct.rb +++ b/lib/aliyun/oss/struct.rb @@ -1,5 +1,9 @@ # -*- encoding: utf-8 -*- +require 'base64' +require 'json' +require 'uri' + module Aliyun module OSS @@ -146,5 +150,59 @@ class CORSRule < Common::Struct::Base end # CORSRule + ## + # Callback represents a HTTP call made by OSS to user's + # application server after an event happens, such as an object is + # successfully uploaded to OSS. See: {https://help.aliyun.com/document_detail/oss/api-reference/object/Callback.html} + # Attributes: + # * url [String] the URL *WITHOUT* the query string + # * query [Hash] the query to generate query string + # * body [String] the body of the request + # * content_type [String] the Content-Type of the request + # * host [String] the Host in HTTP header for this request + class Callback < Common::Struct::Base + + attrs :url, :query, :body, :content_type, :host + + include Common::Logging + + def serialize + query_string = (query || {}).map { |k, v| + [CGI.escape(k.to_s), CGI.escape(v.to_s)].join('=') }.join('&') + + cb = { + 'callbackUrl' => "#{normalize_url(url)}?#{query_string}", + 'callbackBody' => body, + 'callbackBodyType' => content_type || default_content_type + } + cb['callbackHost'] = host if host + + logger.debug("Callback json: #{cb}") + + Base64.strict_encode64(cb.to_json) + end + + private + def normalize_url(url) + uri = URI.parse(url) + uri = URI.parse("http://#{url}") unless uri.scheme + + if uri.scheme != 'http' and uri.scheme != 'https' + fail ClientError, "Only HTTP and HTTPS endpoint are accepted." + end + + unless uri.query.nil? + fail ClientError, "Query parameters should not appear in URL." + end + + uri.to_s + end + + def default_content_type + "application/x-www-form-urlencoded" + end + + end # Callback + end # OSS end # Aliyun diff --git a/lib/aliyun/oss/upload.rb b/lib/aliyun/oss/upload.rb index 7cee56c..2a66dc2 100644 --- a/lib/aliyun/oss/upload.rb +++ b/lib/aliyun/oss/upload.rb @@ -116,7 +116,8 @@ def commit parts = sync_get_all_parts.map{ |p| Part.new(:number => p[:number], :etag => p[:etag]) } - @protocol.complete_multipart_upload(bucket, object, id, parts) + @protocol.complete_multipart_upload( + bucket, object, id, parts, @options[:callback]) File.delete(@cpt_file) unless options[:disable_cpt] diff --git a/lib/aliyun/version.rb b/lib/aliyun/version.rb index d546af5..0d7cc20 100644 --- a/lib/aliyun/version.rb +++ b/lib/aliyun/version.rb @@ -2,6 +2,6 @@ module Aliyun - VERSION = "0.2.0" + VERSION = "0.3.0" end # Aliyun diff --git a/rails/aliyun_oss_callback_server.rb b/rails/aliyun_oss_callback_server.rb new file mode 100644 index 0000000..27c8af1 --- /dev/null +++ b/rails/aliyun_oss_callback_server.rb @@ -0,0 +1,59 @@ +# coding: utf-8 + +require 'sinatra' +require 'base64' +require 'open-uri' +require 'cgi' +require 'openssl' +require 'json' + +# 接受OSS上传回调的server示例,利用RSA公钥验证请求来自OSS,而非其 +# 他恶意请求。具体签名/验证过程请参考: +# https://help.aliyun.com/document_detail/oss/api-reference/object/Callback.html + +def get_header(name) + key = "http_#{name.gsub('-', '_')}".upcase + request.env[key] +end + +PUB_KEY_URL_PREFIX = 'http://gosspublic.alicdn.com/' +PUB_KEY_URL_PREFIX_S = 'https://gosspublic.alicdn.com/' + +# 1. Public key is cached so that we don't need fetching it for every +# request +# 2. Ensure pub_key_url is an authentic URL by asserting it starts +# with the offical domain +def get_public_key(pub_key_url, reload = false) + unless pub_key_url.start_with?(PUB_KEY_URL_PREFIX) || + pub_key_url.start_with?(PUB_KEY_URL_PREFIX_S) + fail "Invalid public key URL: #{pub_key_url}" + end + + if reload || @pub_key.nil? + @pub_key = open(pub_key_url) { |f| f.read } + end + + @pub_key +end + +post '/*' do + pub_key_url = Base64.decode64(get_header('x-oss-pub-key-url')) + pub_key = get_public_key(pub_key_url) + rsa = OpenSSL::PKey::RSA.new(pub_key) + + authorization = Base64.decode64(get_header('authorization')) + req_body = request.body.read + + auth_str = CGI.unescape(request.path) + + '?' + request.query_string + "\n" + + req_body + + valid = rsa.public_key.verify( + OpenSSL::Digest::MD5.new, authorization, auth_str) + + if valid + body({'Status' => 'OK'}.to_json) + else + halt 400, "Authorization failed!" + end +end diff --git a/spec/aliyun/oss/client/bucket_spec.rb b/spec/aliyun/oss/client/bucket_spec.rb index 358e430..12710b9 100644 --- a/spec/aliyun/oss/client/bucket_spec.rb +++ b/spec/aliyun/oss/client/bucket_spec.rb @@ -104,6 +104,16 @@ def mock_acl(acl) end.to_xml end + def mock_error(code, message) + Nokogiri::XML::Builder.new do |xml| + xml.Error { + xml.Code code + xml.Message message + xml.RequestId '0000' + } + end.to_xml + end + def err(msg, reqid = '0000') "#{msg} RequestId: #{reqid}" end @@ -229,6 +239,43 @@ def err(msg, reqid = '0000') .with(:body => content, :query => {}) end + it "should put object with callback" do + key = 'ruby' + stub_request(:put, object_url(key)) + + callback = Callback.new( + url: 'http://app.server.com/callback', + query: {'id' => 1, 'name' => '杭州'}, + body: 'hello world', + host: 'server.com' + ) + @bucket.put_object(key, callback: callback) + + expect(WebMock).to have_requested(:put, object_url(key)) + .with { |req| req.headers.key?('X-Oss-Callback') } + end + + it "should raise CallbackError when callback failed" do + key = 'ruby' + code = 'CallbackFailed' + message = 'Error status: 502.' + stub_request(:put, object_url(key)) + .to_return(:status => 203, :body => mock_error(code, message)) + + callback = Callback.new( + url: 'http://app.server.com/callback', + query: {'id' => 1, 'name' => '杭州'}, + body: 'hello world', + host: 'server.com' + ) + expect { + @bucket.put_object(key, callback: callback) + }.to raise_error(CallbackError, err(message)) + + expect(WebMock).to have_requested(:put, object_url(key)) + .with { |req| req.headers.key?('X-Oss-Callback') } + end + it "should get object to file" do key = 'ruby' # 100 KB diff --git a/spec/aliyun/oss/client/client_spec.rb b/spec/aliyun/oss/client/client_spec.rb index ea786bb..6727dad 100644 --- a/spec/aliyun/oss/client/client_spec.rb +++ b/spec/aliyun/oss/client/client_spec.rb @@ -54,7 +54,7 @@ module OSS expect(WebMock) .to have_requested(:get, "#{bucket}.#{endpoint}/#{object}") - .with{ |req| not req.headers.has_key?('x-oss-security-token') } + .with{ |req| req.headers.key?('X-Oss-Security-Token') } end it "should construct different client" do diff --git a/spec/aliyun/oss/client/resumable_upload_spec.rb b/spec/aliyun/oss/client/resumable_upload_spec.rb index d870da9..26f93a2 100644 --- a/spec/aliyun/oss/client/resumable_upload_spec.rb +++ b/spec/aliyun/oss/client/resumable_upload_spec.rb @@ -66,6 +66,10 @@ def mock_error(code, message) end.to_xml end + def err(msg, reqid = '0000') + "#{msg} RequestId: #{reqid}" + end + it "should upload file when all goes well" do stub_request(:post, /#{object_url}\?uploads.*/) .to_return(:body => mock_txn_id('upload_id')) @@ -99,6 +103,98 @@ def mock_error(code, message) expect(prg.size).to eq(10) end + it "should upload file with callback" do + stub_request(:post, /#{object_url}\?uploads.*/) + .to_return(:body => mock_txn_id('upload_id')) + stub_request(:put, /#{object_url}\?partNumber.*/) + stub_request(:post, /#{object_url}\?uploadId.*/) + + callback = Callback.new( + url: 'http://app.server.com/callback', + query: {'id' => 1, 'name' => '杭州'}, + body: 'hello world', + host: 'server.com' + ) + prg = [] + @bucket.resumable_upload( + @object_key, @file, + :part_size => 10, :callback => callback) { |p| prg << p } + + expect(WebMock).to have_requested( + :post, /#{object_url}\?uploads.*/).times(1) + + part_numbers = Set.new([]) + upload_ids = Set.new([]) + + expect(WebMock).to have_requested( + :put, /#{object_url}\?partNumber.*/).with{ |req| + query = parse_query_from_uri(req.uri) + part_numbers << query['partNumber'] + upload_ids << query['uploadId'] + }.times(10) + + expect(part_numbers.to_a).to match_array((1..10).map{ |x| x.to_s }) + expect(upload_ids.to_a).to match_array(['upload_id']) + + expect(WebMock) + .to have_requested( + :post, /#{object_url}\?uploadId.*/) + .with { |req| req.headers.key?('X-Oss-Callback') } + .times(1) + + expect(File.exist?("#{@file}.cpt")).to be false + expect(prg.size).to eq(10) + end + + it "should raise CallbackError when callback failed" do + stub_request(:post, /#{object_url}\?uploads.*/) + .to_return(:body => mock_txn_id('upload_id')) + stub_request(:put, /#{object_url}\?partNumber.*/) + + code = 'CallbackFailed' + message = 'Error status: 502.' + stub_request(:post, /#{object_url}\?uploadId.*/) + .to_return(:status => 203, :body => mock_error(code, message)) + + callback = Callback.new( + url: 'http://app.server.com/callback', + query: {'id' => 1, 'name' => '杭州'}, + body: 'hello world', + host: 'server.com' + ) + prg = [] + expect { + @bucket.resumable_upload( + @object_key, @file, + :part_size => 10, :callback => callback) { |p| prg << p } + }.to raise_error(CallbackError, err(message)) + + expect(WebMock).to have_requested( + :post, /#{object_url}\?uploads.*/).times(1) + + part_numbers = Set.new([]) + upload_ids = Set.new([]) + + expect(WebMock).to have_requested( + :put, /#{object_url}\?partNumber.*/).with{ |req| + query = parse_query_from_uri(req.uri) + part_numbers << query['partNumber'] + upload_ids << query['uploadId'] + }.times(10) + + expect(part_numbers.to_a).to match_array((1..10).map{ |x| x.to_s }) + expect(upload_ids.to_a).to match_array(['upload_id']) + + expect(WebMock) + .to have_requested( + :post, /#{object_url}\?uploadId.*/) + .with { |req| req.headers.key?('X-Oss-Callback') } + .times(1) + + expect(File.exist?("#{@file}.cpt")).to be true + expect(prg.size).to eq(10) + end + it "should restart when begin txn fails" do code = 'Timeout' message = 'Request timeout.' diff --git a/spec/aliyun/oss/object_spec.rb b/spec/aliyun/oss/object_spec.rb index 5fc2417..1297c52 100644 --- a/spec/aliyun/oss/object_spec.rb +++ b/spec/aliyun/oss/object_spec.rb @@ -737,6 +737,26 @@ def err(msg, reqid = '0000') end end # cors + context "callback" do + it "should encode callback" do + callback = Callback.new( + url: 'http://app.server.com/callback', + query: {'id' => 1, 'name' => '杭州'}, + body: 'hello world', + host: 'server.com' + ) + + encoded = "eyJjYWxsYmFja1VybCI6Imh0dHA6Ly9hcHAuc2VydmVyLmNvbS9jYWxsYmFjaz9pZD0xJm5hbWU9JUU2JTlEJUFEJUU1JUI3JTlFIiwiY2FsbGJhY2tCb2R5IjoiaGVsbG8gd29ybGQiLCJjYWxsYmFja0JvZHlUeXBlIjoiYXBwbGljYXRpb24veC13d3ctZm9ybS11cmxlbmNvZGVkIiwiY2FsbGJhY2tIb3N0Ijoic2VydmVyLmNvbSJ9" + expect(callback.serialize).to eq(encoded) + end + + it "should not accept url with query string" do + expect { + Callback.new(url: 'http://app.server.com/callback?id=1').serialize + }.to raise_error(ClientError, "Query parameters should not appear in URL.") + end + + end end # Object end # OSS diff --git a/tests/test_content_type.rb b/tests/test_content_type.rb index b79a39f..be2720b 100644 --- a/tests/test_content_type.rb +++ b/tests/test_content_type.rb @@ -5,7 +5,7 @@ class TestContentType < Minitest::Test def setup - Aliyun::OSS::Logging.set_log_level(Logger::DEBUG) + Aliyun::Common::Logging.set_log_level(Logger::DEBUG) conf_file = '~/.oss.yml' conf = YAML.load(File.read(File.expand_path(conf_file))) client = Aliyun::OSS::Client.new( diff --git a/tests/test_encoding.rb b/tests/test_encoding.rb index 486dc8e..8469963 100644 --- a/tests/test_encoding.rb +++ b/tests/test_encoding.rb @@ -6,7 +6,7 @@ class TestEncoding < Minitest::Test def setup - Aliyun::OSS::Logging.set_log_level(Logger::DEBUG) + Aliyun::Common::Logging.set_log_level(Logger::DEBUG) conf_file = '~/.oss.yml' conf = YAML.load(File.read(File.expand_path(conf_file))) client = Aliyun::OSS::Client.new( diff --git a/tests/test_object_key.rb b/tests/test_object_key.rb index 9e09800..e5980ca 100644 --- a/tests/test_object_key.rb +++ b/tests/test_object_key.rb @@ -6,7 +6,7 @@ class TestObjectKey < Minitest::Test def setup - Aliyun::OSS::Logging.set_log_level(Logger::DEBUG) + Aliyun::Common::Logging.set_log_level(Logger::DEBUG) conf_file = '~/.oss.yml' conf = YAML.load(File.read(File.expand_path(conf_file))) client = Aliyun::OSS::Client.new( diff --git a/tests/test_resumable.rb b/tests/test_resumable.rb index b41e3e8..f682051 100644 --- a/tests/test_resumable.rb +++ b/tests/test_resumable.rb @@ -5,7 +5,7 @@ class TestResumable < Minitest::Test def setup - Aliyun::OSS::Logging.set_log_level(Logger::DEBUG) + Aliyun::Common::Logging.set_log_level(Logger::DEBUG) conf_file = '~/.oss.yml' conf = YAML.load(File.read(File.expand_path(conf_file))) client = Aliyun::OSS::Client.new(