Skip to content

Commit

Permalink
Add IO object upload
Browse files Browse the repository at this point in the history
  • Loading branch information
gabifija committed Sep 29, 2020
1 parent e746a4a commit 277613c
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 92 deletions.
26 changes: 23 additions & 3 deletions README.md
Expand Up @@ -55,13 +55,33 @@ filelink = client.upload(filepath: '/path/to/localfile')
# OR

filelink = client.upload(external_url: 'http://domain.com/image.png')

# OR

file = StringIO.new
filelink = client.upload(io: file)
```

To upload a local and an external file with query parameters:
To upload a local, an IO object and an external file with following optional options:

```ruby
filelink = client.upload(filepath: '/path/to/localfile', options: { mimetype: 'image/png' })
options = {
filename: 'string',
location: 'string',
path: 'string',
container: 'string',
mimetype: 'string',
region: 'string',
workflows: ['workflow-id-1', 'workflow-id-2'],
upload_tags: {
key: 'value',
key2: 'value'
}
}

filelink = client.upload(filepath: '/path/to/localfile', options: { mimetype: 'image/png', filename: 'custom_filename.png' })

filelink = client.upload(external_url: 'http://domain.com/image.png', options: { mimetype: 'image/jpeg' })
filelink = client.upload(external_url: 'http://domain.com/image.png', options: { mimetype: 'image/jpeg', filename: 'custom_filename.png' })
```

To store file on `dropbox`, `azure`, `gcs` or `rackspace`, you must have the chosen provider configured in the developer portal to enable this feature. By default the file is stored on `s3`. You can add more details of the storage in `options`.
Expand Down
2 changes: 2 additions & 0 deletions lib/filestack/config.rb
Expand Up @@ -22,6 +22,8 @@ class FilestackConfig
'Accept-Encoding' => "application/json"
}.freeze

DEFAULT_UPLOAD_MIMETYPE = 'application/octet-stream'

INTELLIGENT_ERROR_MESSAGES = ['BACKEND_SERVER', 'BACKEND_NETWORK', 'S3_SERVER', 'S3_NETWORK']

def self.multipart_start_url
Expand Down
20 changes: 11 additions & 9 deletions lib/filestack/models/filestack_client.rb
Expand Up @@ -23,23 +23,25 @@ def initialize(apikey, security: nil)
@security = security
end

# Upload a local file or external url
# Upload a local file, external url or IO object
# @param [String] filepath The path of a local file
# @param [String] external_url An external URL
# @param [StringIO] io The IO object
# @param [Hash] options User-supplied upload options
# @param [Boolean] intelligent Upload file using Filestack Intelligent Ingestion
# @param [String] storage Default storage to be used for uploads
#
# return [Filestack::FilestackFilelink]
def upload(filepath: nil, external_url: nil, options: {}, intelligent: false, timeout: 60, storage: 'S3')
return 'You cannot upload a URL and file at the same time' if filepath && external_url
def upload(filepath: nil, external_url: nil, io: nil, options: {}, intelligent: false, timeout: 60, storage: 'S3')
return 'You cannot upload a URL and file at the same time' if (filepath || io) && external_url

response = if filepath
multipart_upload(@apikey, filepath, @security, options, timeout, storage, intelligent: intelligent)
response = if external_url
send_upload(@apikey, external_url, @security, options)
else
send_upload(@apikey,
external_url: external_url,
options: options,
security: @security)
return 'You cannot upload IO object and file at the same time' if io && filepath
multipart_upload(@apikey, filepath, io, @security, options, timeout, storage, intelligent)
end

FilestackFilelink.new(response['handle'], security: @security, apikey: @apikey)
end
# Transform an external URL
Expand Down
64 changes: 42 additions & 22 deletions lib/filestack/utils/multipart_upload_utils.rb
Expand Up @@ -13,13 +13,22 @@
include IntelligentUtils
# Includes all the utility functions for Filestack multipart uploads
module MultipartUploadUtils
def get_file_info(file)
filename = File.basename(file)

def get_file_attributes(file, options = {})
filename = options[:filename] || File.basename(file)
mimetype = options[:mimetype] || MimeMagic.by_magic(File.open(file)) || FilestackConfig::DEFAULT_UPLOAD_MIMETYPE
filesize = File.size(file)
mimetype = MimeMagic.by_magic(File.open(file))
if mimetype.nil?
mimetype = 'application/octet-stream'
end

[filename, filesize, mimetype.to_s]
end

def get_io_attributes(io, options = {})
filename = options[:filename] || 'unnamed_file'
mimetype = options[:mimetype] || FilestackConfig::DEFAULT_UPLOAD_MIMETYPE

io.seek(0, IO::SEEK_END)
filesize = io.tell

[filename, filesize, mimetype.to_s]
end

Expand All @@ -31,8 +40,10 @@ def get_file_info(file)
# @param [String] mimetype Mimetype of incoming file
# @param [FilestackSecurity] security Security object with
# policy/signature
# @param [String] storage Default storage to be used for uploads
# @param [Hash] options User-defined options for
# multipart uploads
# @param [Bool] intelligent Upload file using Filestack Intelligent Ingestion
#
# @return [Typhoeus::Response]
def multipart_start(apikey, filename, filesize, mimetype, security, storage, options = {}, intelligent)
Expand Down Expand Up @@ -67,22 +78,21 @@ def multipart_start(apikey, filename, filesize, mimetype, security, storage, opt
#
# @param [String] apikey Filestack API key
# @param [String] filename Name of incoming file
# @param [String] filepath Local path to file
# @param [Int] filesize Size of incoming file
# @param [Typhoeus::Response] start_response Response body from
# multipart_start
# @param [String] storage Default storage to be used for uploads
# @param [Hash] options User-defined options for
# multipart uploads
#
# @return [Array]
def create_upload_jobs(apikey, filename, filepath, filesize, start_response, storage, options)
def create_upload_jobs(apikey, filename, filesize, start_response, storage, options)
jobs = []
part = 1
seek_point = 0
while seek_point < filesize
part_info = {
seek_point: seek_point,
filepath: filepath,
filename: filename,
apikey: apikey,
part: part,
Expand All @@ -92,7 +102,7 @@ def create_upload_jobs(apikey, filename, filepath, filesize, start_response, sto
upload_id: start_response['upload_id'],
location_url: start_response['location_url'],
start_response: start_response,
store: { location: storage }
store: { location: storage },
}

part_info[:store].merge!(options) if options
Expand All @@ -116,15 +126,16 @@ def create_upload_jobs(apikey, filename, filepath, filesize, start_response, sto
# @param [Hash] job Hash of options needed
# to upload a chunk
# @param [String] apikey Filestack API key
# @param [String] location_url Location url given back
# @param [String] filepath Location url given back
# from endpoint
# @param [String] filepath Local path to file
# @param [StringIO] io The IO object
# @param [Hash] options User-defined options for
# multipart uploads
# @param [String] storage Default storage to be used for uploads
#
# @return [Typhoeus::Response]
def upload_chunk(job, apikey, filepath, options, storage)
file = File.open(filepath)
def upload_chunk(job, apikey, filepath, io, options, storage)
file = filepath ? File.open(filepath) : io
file.seek(job[:seek_point])
chunk = file.read(FilestackConfig::DEFAULT_CHUNK_SIZE)

Expand All @@ -139,7 +150,6 @@ def upload_chunk(job, apikey, filepath, options, storage)
region: job[:region],
upload_id: job[:upload_id],
store: { location: storage },
file: Tempfile.new(job[:filename])
}
data = data.merge!(options) if options

Expand All @@ -158,13 +168,14 @@ def upload_chunk(job, apikey, filepath, options, storage)
# @param [String] filepath Local path to file
# @param [Hash] options User-defined options for
# multipart uploads
# @param [String] storage Default storage to be used for uploads
#
# @return [Array] Array of parts/etags strings
def run_uploads(jobs, apikey, filepath, options, storage)
def run_uploads(jobs, apikey, filepath, io, options, storage)
bar = ProgressBar.new(jobs.length)
results = Parallel.map(jobs, in_threads: 4) do |job|
response = upload_chunk(
job, apikey, filepath, options, storage
job, apikey, filepath, io, options, storage
)
if response.code == 200
bar.increment!
Expand All @@ -190,6 +201,8 @@ def run_uploads(jobs, apikey, filepath, options, storage)
# part numbers
# @param [Hash] options User-defined options for
# multipart uploads
# @param [String] storage Default storage to be used for uploads
# @param [Boolean] intelligent Upload file using Filestack Intelligent Ingestion
#
# @return [Typhoeus::Response]
def multipart_complete(apikey, filename, filesize, mimetype, start_response, parts_and_etags, options, storage, intelligent = false)
Expand All @@ -215,32 +228,39 @@ def multipart_complete(apikey, filename, filesize, mimetype, start_response, par
#
# @param [String] apikey Filestack API key
# @param [String] filename Name of incoming file
# @param [StringIO] io The IO object
# @param [FilestackSecurity] security Security object with
# policy/signature
# @param [Hash] options User-defined options for
# multipart uploads
# @param [String] storage Default storage to be used for uploads
# @param [Boolean] intelligent Upload file using Filestack Intelligent Ingestion
#
# @return [Hash]
def multipart_upload(apikey, filepath, security, options, timeout, storage, intelligent: false)
filename, filesize, mimetype = get_file_info(filepath)
def multipart_upload(apikey, filepath, io, security, options, timeout, storage, intelligent = false)
filename, filesize, mimetype = if filepath
get_file_attributes(filepath, options)
else
get_io_attributes(io, options)
end

start_response = multipart_start(
apikey, filename, filesize, mimetype, security, storage, options, intelligent
)

jobs = create_upload_jobs(
apikey, filename, filepath, filesize, start_response, storage, options
apikey, filename, filesize, start_response, storage, options
)

if intelligent
state = IntelligentState.new
run_intelligent_upload_flow(jobs, state, storage)
run_intelligent_upload_flow(jobs, filepath, io, state, storage)
response_complete = multipart_complete(
apikey, filename, filesize, mimetype,
start_response, nil, options, storage, intelligent
)
else
parts_and_etags = run_uploads(jobs, apikey, filepath, options, storage)
parts_and_etags = run_uploads(jobs, apikey, filepath, io, options, storage)
response_complete = multipart_complete(
apikey, filename, filesize, mimetype,
start_response, parts_and_etags, options, storage
Expand Down
22 changes: 10 additions & 12 deletions lib/filestack/utils/utils.rb
Expand Up @@ -88,7 +88,7 @@ def build_store_task(options = {})
# @param [Hash] options User-defined options for
# multipart uploads
# @return [Hash]
def send_upload(apikey, external_url: nil, security: nil, options: nil)
def send_upload(apikey, external_url = nil, security = nil, options = nil)
base = "#{FilestackConfig::CDN_URL}/#{apikey}/#{build_store_task(options)}"

if security
Expand Down Expand Up @@ -199,15 +199,15 @@ def change_offset(working_offset, state)
# @param [IntelligentState] state An IntelligentState object
#
# @return [Array]
def run_intelligent_upload_flow(jobs, state, storage)
def run_intelligent_upload_flow(jobs, filepath, io, state, storage)
bar = ProgressBar.new(jobs.length)
generator = create_intelligent_generator(jobs)
working_offset = FilestackConfig::DEFAULT_OFFSET_SIZE
while generator.alive?
batch = get_generator_batch(generator)
# run parts
Parallel.map(batch, in_threads: 4) do |part|
state = run_intelligent_uploads(part, state, storage)
state = run_intelligent_uploads(part, filepath, io, state, storage)
# condition: a chunk has failed but we have not reached the maximum retries
while bad_state(state)
# condition: timeout to S3, requiring offset size to be changed
Expand All @@ -219,7 +219,7 @@ def run_intelligent_upload_flow(jobs, state, storage)
sleep(state.backoff)
end
state.add_retry
state = run_intelligent_uploads(part, state, storage)
state = run_intelligent_uploads(part, filepath, io, state, storage)
end
raise "Upload has failed. Please try again later." unless state.ok
bar.increment!
Expand Down Expand Up @@ -275,15 +275,14 @@ def create_upload_job_chunks(jobs, state, apikey, filename, filepath, filesize,
# multipart_start
#
# @return [Dict]
def chunk_job(job, state, apikey, filename, filepath, filesize, start_response, storage)
def chunk_job(job, state, apikey, filename, filesize, start_response, storage)
offset = 0
seek_point = job[:seek_point]
chunk_list = []

while (offset < FilestackConfig::DEFAULT_CHUNK_SIZE) && (seek_point + offset) < filesize
chunk_list.push(
seek_point: seek_point,
filepath: filepath,
filename: filename,
apikey: apikey,
part: job[:part],
Expand All @@ -307,15 +306,14 @@ def chunk_job(job, state, apikey, filename, filepath, filesize, start_response,
# @param [IntelligentState] state An IntelligentState object
#
# @return [IntelligentState]
def run_intelligent_uploads(part, state, storage)
def run_intelligent_uploads(part, filepath, io, state, storage)
failed = false
chunks = chunk_job(
part, state, part[:apikey], part[:filename], part[:filepath],
part[:filesize], part[:start_response], storage
part, state, part[:apikey], part[:filename], part[:filesize], part[:start_response], storage
)
Parallel.map(chunks, in_threads: 3) do |chunk|
begin
upload_chunk_intelligently(chunk, state, part[:apikey], part[:filepath], part[:options], storage)
upload_chunk_intelligently(chunk, state, part[:apikey], filepath, io, part[:options], storage)
rescue => e
state.error_type = e.message
failed = true
Expand Down Expand Up @@ -364,8 +362,8 @@ def run_intelligent_uploads(part, state, storage)
# multipart uploads
#
# @return [Typhoeus::Response]
def upload_chunk_intelligently(job, state, apikey, filepath, options, storage)
file = File.open(filepath)
def upload_chunk_intelligently(job, state, apikey, filepath, io, options, storage)
file = filepath ? File.open(filepath) : io
file.seek(job[:seek_point] + job[:offset])

chunk = file.read(state.offset)
Expand Down

0 comments on commit 277613c

Please sign in to comment.