-
Notifications
You must be signed in to change notification settings - Fork 2k
/
datapreview.py
152 lines (120 loc) · 4.95 KB
/
datapreview.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# coding=UTF-8
"""Data previewer functions
Functions and data structures that are needed for the ckan data preview.
"""
import urlparse
import logging
import pylons.config as config
import ckan.plugins as p
DEFAULT_DIRECT_EMBED = ['png', 'jpg', 'jpeg', 'gif']
DEFAULT_LOADABLE_IFRAME = ['html', 'htm', 'rdf+xml', 'owl+xml', 'xml',
'n3', 'n-triples', 'turtle', 'plain',
'atom', 'rss', 'txt']
log = logging.getLogger(__name__)
def direct():
''' Directly embeddable formats.'''
direct_embed = config.get('ckan.preview.direct', '').split()
return direct_embed or DEFAULT_DIRECT_EMBED
def loadable():
''' Iframe loadable formats. '''
loadable_in_iframe = config.get('ckan.preview.loadable', '').split()
return loadable_in_iframe or DEFAULT_LOADABLE_IFRAME
def res_format(resource):
''' The assumed resource format in lower case. '''
if not resource['url']:
return None
return (resource['format'] or resource['url'].split('.')[-1]).lower()
def compare_domains(urls):
''' Return True if the domains of the provided urls are the same.
'''
first_domain = None
for url in urls:
# all urls are interpreted as absolute urls,
# except for urls that start with a /
try:
if not urlparse.urlparse(url).scheme and not url.startswith('/'):
url = '//' + url
parsed = urlparse.urlparse(url.lower(), 'http')
domain = (parsed.scheme, parsed.hostname, parsed.port)
except ValueError:
# URL is so messed up that even urlparse can't stand it
return False
if not first_domain:
first_domain = domain
continue
if first_domain != domain:
return False
return True
def on_same_domain(data_dict):
# compare CKAN domain and resource URL
ckan_url = config.get('ckan.site_url', '//localhost:5000')
resource_url = data_dict['resource']['url']
return compare_domains([ckan_url, resource_url])
def get_preview_plugin(data_dict, return_first=False):
'''Determines whether there is an extension that can preview the resource.
:param data_dict: contains a resource and package dict.
The resource dict has to have a value for ``on_same_domain``
:type data_dict: dictionary
:param return_first: If True return the first plugin that can preview
:type return_first: bool
Returns a dict of plugins that can preview or ones that are fixable'''
data_dict['resource']['on_same_domain'] = on_same_domain(data_dict)
plugins_that_can_preview = []
plugins_fixable = []
for plugin in p.PluginImplementations(p.IResourcePreview):
p_info = {'plugin': plugin, 'quality': 1}
data = plugin.can_preview(data_dict)
# old school plugins return true/False
if isinstance(data, bool):
p_info['can_preview'] = data
else:
# new school provide a dict
p_info.update(data)
# if we can preview
if p_info['can_preview']:
if return_first:
plugin
plugins_that_can_preview.append(p_info)
elif p_info.get('fixable'):
plugins_fixable.append(p_info)
num_plugins = len(plugins_that_can_preview)
if num_plugins == 0:
# we didn't find any. see if any could be made to work
for plug in plugins_fixable:
log.info('%s would allow previews to fix: %s' % (
plug['plugin'], plug['fixable']))
preview_plugin = None
elif num_plugins == 1:
# just one available
preview_plugin = plugins_that_can_preview[0]['plugin']
else:
# multiple plugins so get the best one
plugs = [pl['plugin'] for pl in plugins_that_can_preview]
log.warn('Multiple previews are possible. {0}'.format(plugs))
preview_plugin = max(plugins_that_can_preview,
key=lambda x: x['quality'])['plugin']
return preview_plugin
def get_view_plugin(view_type):
'''
Returns the IResourceView plugin associated with the given view_type.
'''
for plugin in p.PluginImplementations(p.IResourceView):
info = plugin.info()
name = info.get('name')
if name == view_type:
return plugin
def get_allowed_view_plugins(data_dict):
can_view = []
for plugin in p.PluginImplementations(p.IResourceView):
if plugin.can_view(data_dict):
can_view.append(plugin)
return can_view
def get_new_resources(context, data_dict):
''' Get out all new resources in this commit,
needs to be run in extension point after_create and after_update '''
import ckan.lib.dictization.model_dictize as model_dictize
model = context['model']
new_objs = model.Session()._object_cache['new']
new_resources = [obj for obj in new_objs
if isinstance(obj, model.Resource)]
return model_dictize.resource_list_dictize(new_resources, context)