-
Notifications
You must be signed in to change notification settings - Fork 120
/
exceptions.py
166 lines (121 loc) · 4.7 KB
/
exceptions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
##################################################################
# Copyright 2018 Open Source Geospatial Foundation and others #
# licensed under MIT, Please consult LICENSE.txt for details #
##################################################################
"""
OGC OWS and WPS Exceptions
Based on OGC OWS, WPS and
http://lists.opengeospatial.org/pipermail/wps-dev/2013-October/000335.html
"""
from werkzeug.wrappers import Response
from werkzeug.exceptions import HTTPException
from markupsafe import escape
import logging
from pywps import __version__
__author__ = "Alex Morega & Calin Ciociu"
LOGGER = logging.getLogger('PYWPS')
class NoApplicableCode(HTTPException):
"""No applicable code exception implementation
also
Base exception class
"""
code = 500
locator = ""
def __init__(self, description, locator="", code=400):
self.code = code
self.description = description
self.locator = locator
msg = 'Exception: code: {}, description: {}, locator: {}'.format(self.code, self.description, self.locator)
LOGGER.exception(msg)
HTTPException.__init__(self)
@property
def name(self):
"""The status name."""
return self.__class__.__name__
def get_description(self, environ=None):
"""Get the description."""
if self.description:
return '''<ows:ExceptionText>{}</ows:ExceptionText>'''.format(escape(self.description))
else:
return ''
def get_response(self, environ=None):
args = {
'version': __version__,
'code': self.code,
'locator': escape(self.locator),
'name': escape(self.name),
'description': self.get_description(environ)
}
doc = str((
'<?xml version="1.0" encoding="UTF-8"?>\n'
'<!-- PyWPS {version} -->\n'
'<ows:ExceptionReport xmlns:ows="http://www.opengis.net/ows/1.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.opengis.net/ows/1.1 http://schemas.opengis.net/ows/1.1.0/owsExceptionReport.xsd" version="1.0.0">\n' # noqa
' <ows:Exception exceptionCode="{name}" locator="{locator}" >\n'
' {description}\n'
' </ows:Exception>\n'
'</ows:ExceptionReport>'
).format(**args))
return Response(doc, self.code, mimetype='text/xml')
class InvalidParameterValue(NoApplicableCode):
"""Invalid parameter value exception implementation
"""
code = 400
class MissingParameterValue(NoApplicableCode):
"""Missing parameter value exception implementation
"""
code = 400
class FileSizeExceeded(NoApplicableCode):
"""File size exceeded exception implementation
"""
code = 400
class VersionNegotiationFailed(NoApplicableCode):
"""Version negotiation exception implementation
"""
code = 400
class OperationNotSupported(NoApplicableCode):
"""Operation not supported exception implementation
"""
code = 501
class StorageNotSupported(NoApplicableCode):
"""Storage not supported exception implementation
"""
code = 400
class NotEnoughStorage(NoApplicableCode):
"""Not enough storage exception implementation
"""
code = 400
class FileStorageError(NoApplicableCode):
"""File storage exception implementation
"""
code = 400
class ServerBusy(NoApplicableCode):
"""Max number of operations exceeded
"""
code = 400
description = 'Maximum number of processes exceeded'
def get_body(self, environ=None):
"""Get the XML body."""
args = {
'name': escape(self.name),
'description': self.get_description(environ)
}
return str((
'<?xml version="1.0" encoding="UTF-8"?>\n'
'<ows:ExceptionReport xmlns:ows="http://www.opengis.net/ows/1.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.opengis.net/ows/1.1 ../../../ows/1.1.0/owsExceptionReport.xsd" version="1.0.0">' # noqa
'<ows:Exception exceptionCode="{name}">'
'{description}'
'</ows:Exception>'
'</ows:ExceptionReport>'
).format(**args))
class FileURLNotSupported(NoApplicableCode):
"""File URL not supported exception implementation
"""
code = 400
description = 'File URL not supported as input.'
def __init__(self, description="", locator="", code=400):
description = description or self.description
NoApplicableCode.__init__(self, description=description, locator=locator, code=code)
class SchedulerNotAvailable(NoApplicableCode):
"""Job scheduler not available exception implementation
"""
code = 400