Skip to content

Commit

Permalink
Finishing test_create tests, moved mimetype logic back into uploader.py,
Browse files Browse the repository at this point in the history
test_model_dictize.py had one test that now expects size to be 0, fixing
formatting for pip8
  • Loading branch information
jrods committed Oct 25, 2016
1 parent f75cffa commit 96053f2
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 21 deletions.
28 changes: 20 additions & 8 deletions ckan/lib/uploader.py
Expand Up @@ -5,6 +5,7 @@
import datetime
import logging
import magic
import mimetypes

import ckan.lib.munge as munge
import ckan.logic as logic
Expand Down Expand Up @@ -97,7 +98,7 @@ def __init__(self, object_type, old_filename=None):
self.storage_path = None
self.filename = None
self.filepath = None
self.filesize = 0 # bytes
self.filesize = 0 # bytes
self.mimetype = None
path = get_storage_path()
if not path:
Expand Down Expand Up @@ -203,13 +204,16 @@ def __init__(self, resource):
if e.errno != 17:
raise
self.filename = None
self.filesize = 0 # bytes
self.filesize = 0 # bytes
self.mimetype = None

url = resource.get('url')

upload_field_storage = resource.pop('upload', None)
self.clear = resource.pop('clear_upload', None)

self.mimetype = mimetypes.guess_type(url)[0]

if isinstance(upload_field_storage, cgi.FieldStorage):
self.filename = upload_field_storage.filename
self.filename = munge.munge_filename(self.filename)
Expand All @@ -222,12 +226,20 @@ def __init__(self, resource):
self.filesize = self.upload_file.tell()
# go back to the beginning of the file buffer
self.upload_file.seek(0, os.SEEK_SET)
try:
self.mimetype = magic.from_buffer(self.upload_file.read(), mime=True)
self.upload_file.seek(0, os.SEEK_SET)
except IOError, e:
# Not that important if call above fails
self.mimetype = None

# check if the mimetype failed from guessing with the url
if not self.mimetype:
self.mimetype = mimetypes.guess_type(self.filename)[0]

# check again if guessing with the filename failed
if not self.mimetype:
try:
self.mimetype = magic.from_buffer(self.upload_file.read(),
mime=True)
self.upload_file.seek(0, os.SEEK_SET)
except IOError, e:
# Not that important if call above fails
self.mimetype = None

elif self.clear:
resource['url_type'] = ''
Expand Down
4 changes: 1 addition & 3 deletions ckan/logic/action/create.py
Expand Up @@ -298,9 +298,7 @@ def resource_create(context, data_dict):
upload = uploader.get_resource_uploader(data_dict)

if not 'mimetype' in data_dict:
if 'url' in data_dict:
data_dict['mimetype'] = mimetypes.guess_type(data_dict['url'])[0]
elif hasattr(upload, 'mimetype'):
if hasattr(upload, 'mimetype'):
data_dict['mimetype'] = upload.mimetype

if not 'size' in data_dict:
Expand Down
4 changes: 1 addition & 3 deletions ckan/logic/action/update.py
Expand Up @@ -97,9 +97,7 @@ def resource_update(context, data_dict):
upload.upload(id, uploader.get_max_resource_size())

if not 'mimetype' in data_dict:
if 'url' in data_dict:
data_dict['mimetype'] = mimetypes.guess_type(data_dict['url'])[0]
elif hasattr(upload, 'mimetype'):
if hasattr(upload, 'mimetype'):
data_dict['mimetype'] = upload.mimetype

if not 'size' in data_dict:
Expand Down
2 changes: 1 addition & 1 deletion ckan/tests/lib/dictization/test_model_dictize.py
Expand Up @@ -415,7 +415,7 @@ def test_package_dictize_resource(self):
u'name': u'test_pkg_dictize',
u'position': 0,
u'resource_type': None,
u'size': None,
u'size': 0,
u'state': u'active',
u'url': u'http://link.to.some.data',
u'url_type': None
Expand Down
99 changes: 93 additions & 6 deletions ckan/tests/logic/action/test_create.py
Expand Up @@ -395,13 +395,13 @@ def test_add_default_views_to_resource_no_dataset_passed(self):

class TestResourceCreate(object):
import cgi

class FakeFileStorage(cgi.FieldStorage):
def __init__(self, fp, filename):
self.file = fp
self.filename = filename
self.name = 'upload'


@classmethod
def setup_class(cls):
helpers.reset_db()
Expand Down Expand Up @@ -451,6 +451,12 @@ def test_doesnt_require_url(self):
assert not stored_resource['url']

def test_mimetype_by_url(self):
"""
The mimetype is guessed from the url
Real world usage would be externally linking the resource and the mimetype would
be guessed, based on the url
"""
context = {}
params = {
'package_id': factories.Dataset()['id'],
Expand All @@ -460,10 +466,17 @@ def test_mimetype_by_url(self):
result = helpers.call_action('resource_create', context, **params)

mimetype = result.pop('mimetype')

assert mimetype
assert_equals(mimetype, 'text/csv')
#maybe see if the mimetype is in the available list of mimetypes?

def test_mimetype_by_user(self):
"""
The mimetype is supplied by the user
Real world usage would be using the FileStore API or web UI form to create a resource
and the user wanted to specify the mimetype themselves
"""
context = {}
params = {
'package_id': factories.Dataset()['id'],
Expand All @@ -476,8 +489,32 @@ def test_mimetype_by_user(self):
mimetype = result.pop('mimetype')
assert_equals(mimetype, 'application/csv')

def test_mimetype_by_upload(self):
test_file = file('/home/vagrant/test.txt', 'rb')
def test_mimetype_by_upload_by_filename(self):
"""
The mimetype is guessed from an uploaded file with a filename
Real world usage would be using the FileStore API or web UI form to upload a file, with a filename plus extension
If there's no url or the mimetype can't be guessed by the url, mimetype will be guessed by the extension in the filename
"""
import StringIO
test_file = StringIO.StringIO()
test_file.write('''
"info": {
"title": "BC Data Catalogue API",
"description": "This API provides information about datasets in the BC Data Catalogue.",
"termsOfService": "http://www.data.gov.bc.ca/local/dbc/docs/license/API_Terms_of_Use.pdf",
"contact": {
"name": "Data BC",
"url": "http://data.gov.bc.ca/",
"email": ""
},
"license": {
"name": "Open Government License - British Columbia",
"url": "http://www.data.gov.bc.ca/local/dbc/docs/license/OGL-vbc2.0.pdf"
},
"version": "3.0.0"
}
''')
test_resource = TestResourceCreate.FakeFileStorage(test_file, 'test.json')

context = {}
Expand All @@ -490,11 +527,54 @@ def test_mimetype_by_upload(self):
result = helpers.call_action('resource_create', context, **params)

mimetype = result.pop('mimetype')

assert mimetype
assert_equals(mimetype, 'application/json')

def test_mimetype_by_upload_by_file(self):
"""
The mimetype is guessed from an uploaded file by the contents inside
Real world usage would be using the FileStore API or web UI form to upload a file, that has no extension
If the mimetype can't be guessed by the url or filename, mimetype will be guessed by the contents inside the file
"""
import StringIO
test_file = StringIO.StringIO()
test_file.write('''
Snow Course Name, Number, Elev. metres, Date of Survey, Snow Depth cm, Water Equiv. mm, Survey Code, % of Normal, Density %, Survey Period, Normal mm
SKINS LAKE,1B05,890,2015/12/30,34,53,,98,16,JAN-01,54
MCGILLIVRAY PASS,1C05,1725,2015/12/31,88,239,,87,27,JAN-01,274
NAZKO,1C08,1070,2016/01/05,20,31,,76,16,JAN-01,41
''')
test_resource = TestResourceCreate.FakeFileStorage(test_file, '')

context = {}
params = {
'package_id': factories.Dataset()['id'],
'url': 'http://data',
'name': 'A nice resource',
'upload': test_resource
}
result = helpers.call_action('resource_create', context, **params)

mimetype = result.pop('mimetype')

assert mimetype
assert_equals(mimetype, 'text/plain')

def test_size_of_resource_by_upload(self):
test_file = file('/home/vagrant/test.txt', 'rb')
test_resource = TestResourceCreate.FakeFileStorage(test_file, 'test.txt')
"""
The size of the resource determined by the uploaded file
"""
import StringIO
test_file = StringIO.StringIO()
test_file.write('''
Snow Course Name, Number, Elev. metres, Date of Survey, Snow Depth cm, Water Equiv. mm, Survey Code, % of Normal, Density %, Survey Period, Normal mm
SKINS LAKE,1B05,890,2015/12/30,34,53,,98,16,JAN-01,54
MCGILLIVRAY PASS,1C05,1725,2015/12/31,88,239,,87,27,JAN-01,274
NAZKO,1C08,1070,2016/01/05,20,31,,76,16,JAN-01,41
''')
test_resource = TestResourceCreate.FakeFileStorage(test_file, 'test.csv')

context = {}
params = {
Expand All @@ -506,9 +586,16 @@ def test_size_of_resource_by_upload(self):
result = helpers.call_action('resource_create', context, **params)

size = result.pop('size')

assert size
assert size > 0

def test_size_of_resource_by_user(self):
"""
The size of the resource is provided by the users
Real world usage would be using the FileStore API and the user provides a size for the resource
"""
context = {}
params = {
'package_id': factories.Dataset()['id'],
Expand Down
36 changes: 36 additions & 0 deletions ckan/tests/logic/action/test_update.py
Expand Up @@ -796,6 +796,42 @@ def test_datastore_active_not_present_if_not_provided_and_not_datastore_plugin_e

assert 'datastore_active' not in res_returned

def test_mimetype_by_url(self):
"""
"""
pass

def test_mimetype_by_user(self):
"""
"""
pass

def test_mimetype_by_upload_by_file(self):
"""
"""
pass

def test_mimetype_by_upload_by_filename(self):
"""
"""
pass

def test_size_of_resource_by_user(self):
"""
"""
pass

def test_size_of_resource_by_upload(self):
"""
"""
pass


class TestConfigOptionUpdate(object):

Expand Down

0 comments on commit 96053f2

Please sign in to comment.