Skip to content

Commit

Permalink
Merge pull request #1273 from okfn/1273-resource-uploads
Browse files Browse the repository at this point in the history
Resource uploads through multipart form data.
  • Loading branch information
nigelbabu committed Dec 5, 2013
2 parents 82cdbb8 + 9416adc commit 6dde104
Show file tree
Hide file tree
Showing 20 changed files with 359 additions and 316 deletions.
25 changes: 3 additions & 22 deletions ckan/config/deployment.ini_tmpl
Expand Up @@ -121,28 +121,9 @@ ckan.feeds.author_link =

## Storage Settings

# Local file storage:
#ofs.impl = pairtree
#ofs.storage_dir = /var/lib/ckan/default

# Google cloud storage:
#ofs.impl = google
#ofs.gs_access_key_id =
#ofs.gs_secret_access_key =

# S3 cloud storage:
#ofs.impl = s3
#ofs.aws_access_key_id = ....
#ofs.aws_secret_access_key = ....

# 'Bucket' to use for file storage
#ckan.storage.bucket = default

# Prefix for uploaded files (only used for pairtree)
#ckan.storage.key_prefix = file/

# The maximum content size, in bytes, for uploads
#ckan.storage.max_content_length = 50000000
#ckan.storage_path = /var/lib/ckan
#ckan.max_resource_size = 10
#ckan.max_image_size = 2

## Datapusher settings

Expand Down
2 changes: 2 additions & 0 deletions ckan/config/routing.py
Expand Up @@ -253,6 +253,8 @@ def make_map():
action='resource_edit', ckan_icon='edit')
m.connect('/dataset/{id}/resource/{resource_id}/download',
action='resource_download')
m.connect('/dataset/{id}/resource/{resource_id}/download/{filename}',
action='resource_download')
m.connect('/dataset/{id}/resource/{resource_id}/embed',
action='resource_embedded_dataviewer')
m.connect('/dataset/{id}/resource/{resource_id}/viewer',
Expand Down
6 changes: 4 additions & 2 deletions ckan/controllers/api.py
Expand Up @@ -839,7 +839,9 @@ def make_unicode(entity):
cls.log.debug('Retrieving request POST: %r' % request.POST)
cls.log.debug('Retrieving request GET: %r' % request.GET)
request_data = None
if request.POST:
if request.POST and request.content_type == 'multipart/form-data':
request_data = dict(request.POST)
elif request.POST:
try:
keys = request.POST.keys()
# Parsing breaks if there is a = in the value, so for now
Expand Down Expand Up @@ -873,7 +875,7 @@ def make_unicode(entity):
raise ValueError(msg)
else:
request_data = {}
if request_data:
if request_data and request.content_type != 'multipart/form-data':
try:
request_data = h.json.loads(request_data, encoding='utf8')
except ValueError, e:
Expand Down
33 changes: 26 additions & 7 deletions ckan/controllers/package.py
@@ -1,11 +1,15 @@
import logging
from urllib import urlencode
import datetime
import os
import mimetypes
import cgi

from pylons import config
from genshi.template import MarkupTemplate
from genshi.template.text import NewTextTemplate
from paste.deploy.converters import asbool
import paste.fileapp

import ckan.logic as logic
import ckan.lib.base as base
Expand All @@ -18,6 +22,7 @@
import ckan.model as model
import ckan.lib.datapreview as datapreview
import ckan.lib.plugins
import ckan.lib.uploader as uploader
import ckan.plugins as p
import ckan.lib.render

Expand Down Expand Up @@ -550,7 +555,7 @@ def resource_edit(self, id, resource_id, data=None, errors=None,
del data['save']

context = {'model': model, 'session': model.Session,
'api_version': 3,
'api_version': 3, 'for_edit': True,
'user': c.user or c.author, 'auth_user_obj': c.userobj}

data['package_id'] = id
Expand All @@ -571,7 +576,7 @@ def resource_edit(self, id, resource_id, data=None, errors=None,
id=id, resource_id=resource_id))

context = {'model': model, 'session': model.Session,
'api_version': 3,
'api_version': 3, 'for_edit': True,
'user': c.user or c.author, 'auth_user_obj': c.userobj}
pkg_dict = get_action('package_show')(context, {'id': id})
if pkg_dict['state'].startswith('draft'):
Expand Down Expand Up @@ -621,7 +626,8 @@ def new_resource(self, id, data=None, errors=None, error_summary=None):
# see if we have any data that we are trying to save
data_provided = False
for key, value in data.iteritems():
if value and key != 'resource_type':
if ((value or isinstance(value, cgi.FieldStorage))
and key != 'resource_type'):
data_provided = True
break

Expand Down Expand Up @@ -1204,10 +1210,10 @@ def _resource_preview(self, data_dict):
or datapreview.get_preview_plugin(
data_dict, return_first=True))

def resource_download(self, id, resource_id):
def resource_download(self, id, resource_id, filename=None):
"""
Provides a direct download by redirecting the user to the url stored
against this resource.
Provides a direct download by either redirecting the user to the url stored
or downloading an uploaded file directly.
"""
context = {'model': model, 'session': model.Session,
'user': c.user or c.author, 'auth_user_obj': c.userobj}
Expand All @@ -1220,7 +1226,20 @@ def resource_download(self, id, resource_id):
except NotAuthorized:
abort(401, _('Unauthorized to read resource %s') % id)

if not 'url' in rsc:
if rsc.get('url_type') == 'upload':
upload = uploader.ResourceUpload(rsc)
filepath = upload.get_path(rsc['id'])
fileapp = paste.fileapp.FileApp(filepath)
try:
status, headers, app_iter = request.call_application(fileapp)
except OSError:
abort(404, _('Resource data not found'))
response.headers.update(dict(headers))
content_type, content_enc = mimetypes.guess_type(rsc.get('url',''))
response.headers['Content-Type'] = content_type
response.status = status
return app_iter
elif not 'url' in rsc:
abort(404, _('No download is available'))
redirect(rsc['url'])

Expand Down
42 changes: 42 additions & 0 deletions ckan/lib/cli.py
Expand Up @@ -130,6 +130,7 @@ class ManageDb(CkanCommand):
db load-only FILE_PATH - load a pg_dump from a file but don\'t do
the schema upgrade or search indexing
db create-from-model - create database from the model (indexes not made)
db migrate-filestore - migrate all uploaded data from the 2.1 filesore.
'''
summary = __doc__.split('\n')[0]
usage = __doc__
Expand Down Expand Up @@ -187,6 +188,8 @@ def command(self):
print 'Creating DB: SUCCESS'
elif cmd == 'send-rdf':
self.send_rdf()
elif cmd == 'migrate-filestore':
self.migrate_filestore()
else:
print 'Command %s not recognized' % cmd
sys.exit(1)
Expand Down Expand Up @@ -319,6 +322,45 @@ def send_rdf(self):
talis = ckan.lib.talis.Talis()
return talis.send_rdf(talis_store, username, password)

def migrate_filestore(self):
from ckan.model import Session
import requests
from ckan.lib.uploader import ResourceUpload
results = Session.execute("select id, revision_id, url from resource "
"where resource_type = 'file.upload' "
"and (url_type <> 'upload' or url_type is null)"
"and url like '%storage%'")
for id, revision_id, url in results:
response = requests.get(url, stream=True)
if response.status_code != 200:
print "failed to fetch %s (code %s)" % (url,
response.status_code)
continue
resource_upload = ResourceUpload({'id': id})
assert resource_upload.storage_path, "no storage configured aborting"

directory = resource_upload.get_directory(id)
filepath = resource_upload.get_path(id)
try:
os.makedirs(directory)
except OSError, e:
## errno 17 is file already exists
if e.errno != 17:
raise

with open(filepath, 'wb+') as out:
for chunk in response.iter_content(1024):
if chunk:
out.write(chunk)

Session.execute("update resource set url_type = 'upload'"
"where id = '%s'" % id)
Session.execute("update resource_revision set url_type = 'upload'"
"where id = '%s' and "
"revision_id = '%s'" % (id, revision_id))
Session.commit()
print "Saved url %s" % url

def version(self):
from ckan.model import Session
print Session.execute('select version from migrate_version;').fetchall()
Expand Down
17 changes: 16 additions & 1 deletion ckan/lib/dictization/model_dictize.py
Expand Up @@ -139,14 +139,29 @@ def _unified_resource_format(format_):
return format_new

def resource_dictize(res, context):
model = context['model']
resource = d.table_dictize(res, context)
resource_group_id = resource['resource_group_id']
extras = resource.pop("extras", None)
if extras:
resource.update(extras)
resource['format'] = _unified_resource_format(res.format)
# some urls do not have the protocol this adds http:// to these
url = resource['url']
if not urlparse.urlsplit(url).scheme:
## for_edit is only called at the times when the dataset is to be edited
## in the frontend. Without for_edit the whole qualified url is returned.
if resource.get('url_type') == 'upload' and not context.get('for_edit'):
resource_group = model.Session.query(
model.ResourceGroup).get(resource_group_id)
last_part = url.split('/')[-1]
cleaned_name = munge.munge_filename(last_part)
resource['url'] = h.url_for(controller='package',
action='resource_download',
id=resource_group.package_id,
resource_id=res.id,
filename=cleaned_name,
qualified=True)
elif not urlparse.urlsplit(url).scheme and not context.get('for_edit'):
resource['url'] = u'http://' + url.lstrip('/')
return resource

Expand Down
4 changes: 2 additions & 2 deletions ckan/lib/dictization/model_save.py
Expand Up @@ -41,8 +41,8 @@ def resource_dict_save(res_dict, context):
# this is an internal field so ignore
# FIXME This helps get the tests to pass but is a hack and should
# be fixed properly. basically don't update the format if not needed
if (key == 'format' and value == obj.format
or value == d.model_dictize._unified_resource_format(obj.format)):
if (key == 'format' and (value == obj.format
or value == d.model_dictize._unified_resource_format(obj.format))):
continue
setattr(obj, key, value)
else:
Expand Down
100 changes: 97 additions & 3 deletions ckan/lib/uploader.py
Expand Up @@ -6,9 +6,13 @@
import logging
import ckan.logic as logic


config = pylons.config
log = logging.getLogger(__name__)

_storage_path = None
_max_resource_size = None
_max_image_size = None


def get_storage_path():
Expand All @@ -17,9 +21,9 @@ def get_storage_path():

#None means it has not been set. False means not in config.
if _storage_path is None:
storage_path = pylons.config.get('ckan.storage_path')
ofs_impl = pylons.config.get('ofs.impl')
ofs_storage_dir = pylons.config.get('ofs.storage_dir')
storage_path = config.get('ckan.storage_path')
ofs_impl = config.get('ofs.impl')
ofs_storage_dir = config.get('ofs.storage_dir')
if storage_path:
_storage_path = storage_path
elif ofs_impl == 'pairtree' and ofs_storage_dir:
Expand All @@ -40,6 +44,20 @@ def get_storage_path():
return _storage_path


def get_max_image_size():
global _max_image_size
if _max_image_size is None:
_max_image_size = int(config.get('ckan.max_image_size', 2))
return _max_image_size


def get_max_resource_size():
global _max_resource_size
if _max_resource_size is None:
_max_resource_size = int(config.get('ckan.max_resource_size', 10))
return _max_resource_size


class Upload(object):
def __init__(self, object_type, old_filename=None):
''' Setup upload by creating a subdirectory of the storage directory
Expand Down Expand Up @@ -129,3 +147,79 @@ def upload(self, max_size=2):
os.remove(self.old_filepath)
except OSError, e:
pass


class ResourceUpload(object):
def __init__(self, resource):
path = get_storage_path()
if not path:
self.storage_path = None
return
self.storage_path = os.path.join(path, 'resources')
try:
os.makedirs(self.storage_path)
except OSError, e:
## errno 17 is file already exists
if e.errno != 17:
raise
self.filename = None

url = resource.get('url')
upload_field_storage = resource.pop('upload', None)
self.clear = resource.pop('clear_upload', None)

if isinstance(upload_field_storage, cgi.FieldStorage):
self.filename = upload_field_storage.filename
self.filename = munge.munge_filename(self.filename)
resource['url'] = self.filename
resource['url_type'] = 'upload'
self.upload_file = upload_field_storage.file
elif self.clear:
resource['url_type'] = ''

def get_directory(self, id):
directory = os.path.join(self.storage_path,
id[0:3], id[3:6])
return directory

def get_path(self, id):
directory = self.get_directory(id)
filepath = os.path.join(directory, id[6:])
return filepath

def upload(self, id, max_size=10):
if not self.storage_path:
return
directory = self.get_directory(id)
filepath = self.get_path(id)
if self.filename:
try:
os.makedirs(directory)
except OSError, e:
## errno 17 is file already exists
if e.errno != 17:
raise
tmp_filepath = filepath + '~'
output_file = open(tmp_filepath, 'wb+')
self.upload_file.seek(0)
current_size = 0
while True:
current_size = current_size + 1
#MB chunks
data = self.upload_file.read(2 ** 20)
if not data:
break
output_file.write(data)
if current_size > max_size:
os.remove(tmp_filepath)
raise logic.ValidationError(
{'upload': ['File upload too large']}
)
output_file.close()
os.rename(tmp_filepath, filepath)

if self.clear:
try:
os.remove(filepath)
except OSError, e:
pass

0 comments on commit 6dde104

Please sign in to comment.