-
Notifications
You must be signed in to change notification settings - Fork 22
/
util.py
2279 lines (1920 loc) · 82.1 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Hive Appier Framework
# Copyright (c) 2008-2020 Hive Solutions Lda.
#
# This file is part of Hive Appier Framework.
#
# Hive Appier Framework is free software: you can redistribute it and/or modify
# it under the terms of the Apache License as published by the Apache
# Foundation, either version 2.0 of the License, or (at your option) any
# later version.
#
# Hive Appier Framework is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Apache License for more details.
#
# You should have received a copy of the Apache License along with
# Hive Appier Framework. If not, see <http://www.apache.org/licenses/>.
__author__ = "João Magalhães <joamag@hive.pt>"
""" The author(s) of the module """
__version__ = "1.0.0"
""" The version of the module """
__revision__ = "$LastChangedRevision$"
""" The revision number of the module """
__date__ = "$LastChangedDate$"
""" The last change date of the module """
__copyright__ = "Copyright (c) 2008-2020 Hive Solutions Lda."
""" The copyright for the module """
__license__ = "Apache License, Version 2.0"
""" The license for the module """
import os
import re
import sys
import json
import copy
import uuid
import types
import locale
import hashlib
import calendar
import datetime
import warnings
import functools
import threading
import mimetypes
import contextlib
import subprocess
import multiprocessing
from . import smtp
from . import config
from . import legacy
from . import common
from . import defines
from . import exceptions
CREATION_COUNTER = 0
""" The global static creation counter value that
will be used to create an order in the declaration
of attributes for a set of classes """
FIRST_CAP_REGEX = re.compile("(.)([A-Z][a-z]+)")
""" Regular expression that ensures that the first
token of each camel string is properly capitalized """
ALL_CAP_REGEX = re.compile("([a-z0-9])([A-Z])")
""" The generalized transition from lower case to
upper case letter regex that will provide a way of
putting the underscore in the middle of the transition """
SORT_MAP = {
"1" : 1,
"-1" : -1,
"ascending" : 1,
"descending" : -1,
}
""" The map associating the normalized (text) way of
representing sorting with the current infra-structure
number way of representing the same information """
SEQUENCE_TYPES = (list, tuple)
""" The sequence defining the various types that are
considered to be sequence based for python """
defines = defines
def to_limit(limit_s):
limit = int(limit_s)
if limit < 0: return 0
return limit
def to_find(find_s):
if not find_s: return []
find_t = type(find_s)
if find_t == list: return find_s
return [find_s]
def to_sort(sort_s):
values = sort_s.split(":", 1)
if len(values) == 1: values.append("descending")
name, direction = values
if name == "default": return None
values[1] = SORT_MAP.get(direction, 1)
return [tuple(values)]
ALIAS = {
"context" : "find_d",
"filters" : "find_d",
"filters[]" : "find_d",
"filter_def" : "find_d",
"filter_string" : "find_s",
"filter_name" : "find_n",
"filter_operator" : "find_o",
"insensitive" : "find_i",
"order" : "sort",
"offset" : "skip",
"start_record" : "skip",
"number_records" : "limit"
}
""" The map containing the various attribute alias
between the normalized manned and the appier manner """
FIND_TYPES = dict(
skip = int,
limit = to_limit,
find_s = legacy.UNICODE,
find_d = to_find,
find_i = bool,
find_t = legacy.UNICODE,
find_n = legacy.UNICODE,
find_o = legacy.UNICODE,
sort = to_sort,
meta = bool,
fields = list
)
""" The map associating the various find fields with
their respective types, note that in case a special
conversion operation is required the associated value
may represent a conversion function instead """
FIND_DEFAULTS = dict(
limit = 10
)
""" The map that defines the various default values
for a series of find related attributes """
def is_iterable(object):
"""
Verifies if the provided object (value) is iterable
meaning that the type of it is listed in a list of
sequence based data types.
:type object: Object
:param object: The value that is going to be tested
for iterable type.
:rtype: bool
:return: If the provided object represents an iterable
object meaning that it belongs to sequence type.
"""
return isinstance(object, defines.ITERABLES)
def is_mobile(user_agent):
"""
Verifies if the provided user agent string represents a
mobile agent, for that a series of regular expressions
are matched against the user agent string.
:type user_agent: String
:param user_agent: The string containing the user agent
value that is going to be verified against a series of
regular expressions for mobile verification.
:rtype: bool
:return: If the provided user agent string represents a
mobile browser or a regular (desktop) one.
"""
if not user_agent: return False
prefix = user_agent[:4]
mobile = defines.MOBILE_REGEX.search(user_agent)
mobile_prefix = defines.MOBILE_PREFIX_REGEX.search(prefix)
is_mobile = True if mobile or mobile_prefix else False
return is_mobile
def is_tablet(user_agent):
"""
Verifies if the provided user agent string represents a
tablet agent, for that a series of regular expressions
are matched against the user agent string.
:type user_agent: String
:param user_agent: The string containing the user agent
value that is going to be verified against a series of
regular expressions for tablet verification.
:rtype: bool
:return: If the provided user agent string represents a
tablet browser or a regular (desktop) one.
"""
if not user_agent: return False
prefix = user_agent[:4]
tablet = defines.TABLET_REGEX.search(user_agent)
mobile_prefix = defines.MOBILE_PREFIX_REGEX.search(prefix)
is_tablet = True if tablet or mobile_prefix else False
return is_tablet
def is_browser(user_agent):
"""
Verifies if the provided user agent string represents a
browser (interactive) agent, for that a series of verifications
are going to be performed against the user agent string.
:type user_agent: String
:param user_agent: The string containing the user agent
value that is going to be verified for browser presence.
:rtype: bool
:return: If the provided user agent string represents an
interactive browser or not.
"""
info = browser_info(user_agent)
if not info: return False
interactive = info.get("interactive", False)
if not interactive: return False
return True
def is_bot(user_agent):
"""
Verifies if the provided user agent string represents a
bot (automated) agent, for that a series of verifications
are going to be performed against the user agent string.
:type user_agent: String
:param user_agent: The string containing the user agent
value that is going to be verified for bot presence.
:rtype: bool
:return: If the provided user agent string represents an
automated bot or not.
"""
info = browser_info(user_agent = user_agent)
if not info: return False
bot = info.get("bot", False)
if not bot: return False
return True
def browser_info(user_agent):
"""
Retrieves a dictionary containing information about the browser
and the operative system associated with the provided user agent.
The retrieval of the information depends on the kind of user
agent string provided, as coverage is limited.
:type user_agent: String
:param user_agent: The HTTP based user agent string to be processed.
:rtype: Dictionary
:return: The dictionary/map containing the information processed from
the provided user agent.
"""
if not user_agent: return None
info = dict()
for browser_i in defines.BROWSER_INFO:
identity = browser_i["identity"]
sub_string = browser_i.get("sub_string", identity)
version_search = browser_i.get("version_search", sub_string + "/")
interactive = browser_i.get("interactive", True)
bot = browser_i.get("bot", False)
if not sub_string in user_agent: continue
if not version_search in user_agent: continue
version_i = user_agent.index(version_search) + len(version_search)
version = user_agent[version_i:].split(" ", 1)[0].strip(" ;")
try: version_f = float(".".join(version.split(".")[:2]))
except ValueError: version_f = 0.0
try: version_i = int(version_f)
except ValueError: version_f = 0
info.update(
name = identity,
version = version,
version_f = version_f,
version_i = version_i,
interactive = interactive,
bot = bot
)
break
for os_i in defines.OS_INFO:
identity = os_i["identity"]
sub_string = os_i.get("sub_string", identity)
if not sub_string in user_agent: continue
info.update(os = identity)
break
return info if info else None
def email_parts(base, strip = True):
"""
Unpacks the complete set of parts (name and email) from the
provided generalized email string. The provided string may
be a single email or the more complex form (eg: Name <email>).
Note that the provided base argument may be a single string
or a sequence of strings and the returning type will reflect
that same provided parameter.
:type base: String/List
:param base: The base value that is going to be parsed as an
email string or a sequence of such values.
:type strip: bool
:param strip: If the provided base value should be stripped
of any extra space characters before processing.
:rtype: Tuple/List
:return: The resulting parsed tuple/tuples for the provided
email strings, these tuples contain name and emails for each
of the parsed values.
"""
base_t = type(base)
if base_t in SEQUENCE_TYPES:
return [email_parts(base, strip = strip) for base in base]
if not base: return (None, None)
if strip: base = base.strip()
match = defines.EMAIL_REGEX.match(base)
if not match: return (None, None)
email = match.group("email_a") or match.group("email_b")
name = match.group("name") or email
return (name, email)
def email_mime(base, encoding = "utf-8"):
if legacy.PYTHON_3: encoding = None
base_t = type(base)
if base_t in SEQUENCE_TYPES:
return [value for value in (email_mime(item, encoding = encoding) for item in base) if value]
name, email = email_parts(base)
if not name or not email: return None
name = smtp.header(name, encoding = encoding)
return "%s <%s>" % (name, email)
def email_name(base):
base_t = type(base)
if base_t in SEQUENCE_TYPES:
return [value for value in (email_name(base) for base in base if email_name(base)) if value]
name, _email = email_parts(base)
return name
def email_base(base):
base_t = type(base)
if base_t in SEQUENCE_TYPES:
return [value for value in (email_base(base) for base in base if email_base(base)) if value]
_name, email = email_parts(base)
return email
def date_to_timestamp(value, format = "%d/%m/%Y"):
if not value: return None
try: value = datetime.datetime.strptime(value, format)
except Exception: return None
value = value.utctimetuple()
return calendar.timegm(value)
def obfuscate(value, display_l = 3, token = "*"):
value_l = len(value)
display_l = min([value_l, display_l])
obfuscated = value[:display_l] + ((value_l - display_l) * token)
return obfuscated
def import_pip(name, package = None, default = None):
package = package or name
try:
module = __import__(name)
except ImportError:
try: module = install_pip_s(package)
except Exception: return default
try: module = __import__(name)
except ImportError: return default
return module
def ensure_pip(name, package = None, delayed = False):
package = package or name
try:
__import__(name)
except ImportError:
install_pip_s(package, delayed = delayed)
def install_pip(package, delayed = False, isolated = True, user = None):
try:
import pip
pip_internal = pip
finally:
pass
try:
import pip._internal
pip_internal = pip._internal
except ImportError:
pass
try:
import pip._internal.main
pip_internal = pip._internal.main
except ImportError:
pass
user = config.conf("PIP_USER", False, cast = bool)
args = ["install", package]
if hasattr(pip_internal, "main"): pip_main = pip_internal.main
elif hasattr(pip, "main"): pip_main = pip.main #@UndefinedVariable
else: raise exceptions.OperationalError(message = "pip not found")
if user: args.insert(1, "--user")
if delayed:
process = multiprocessing.Process(
target = pip_main,
args = (args,)
)
process.start()
result = 0
elif isolated:
process = multiprocessing.Process(
target = pip_main,
args = (args,)
)
process.start()
process.join()
result = process.exitcode
else:
result = pip_main(args)
if result == 0: return
raise exceptions.OperationalError(message = "pip error")
def install_pip_s(package, delayed = False):
try:
install_pip(package, delayed = delayed, user = False)
except exceptions.OperationalError:
install_pip(package, delayed = delayed, user = True)
def request_json(request = None, encoding = "utf-8"):
# retrieves the proper request object, either the provided
# request or the default base request object and then in
# case the the JSON data is already in the request properties
# it is used (cached value) otherwise continues with the parse
request = request or common.base().get_request()
if "_data_j" in request.properties: return request.properties["_data_j"]
# retrieves the current request data and tries to
# "load" it as JSON data, in case it fails gracefully
# handles the failure setting the value as an empty map
data = request.data
try:
is_bytes = legacy.is_bytes(data)
if is_bytes: data = data.decode(encoding)
data_j = json.loads(data)
except Exception:
data_j = {}
request.properties["_data_j"] = data_j
# returns the JSON data object to the caller method so that it
# may be used as the parsed value (post information)
return data_j
def get_context(self):
"""
Retrieves the "best" possible context object for the current
execution life-cycle, typically this should be an "attached"
request object.
Multiple strategies should be used while trying to retrieved
the "current" context.
"""
# tries to retrieve the request attached to the current instance
# (typically a property) and verifies the object compliance,
# returning the object to the caller in case it's valid
if hasattr(self, "request"):
request = self.request
is_valid = hasattr(request, "is_mock") and not request.is_mock()
if is_valid: return request
# uses the global strategy to try to retrieve a request for the
# current execution environment (not thread safe)
request = common.base().get_request()
is_valid = hasattr(request, "is_mock") and not request.is_mock()
if is_valid: return request
# fallback return value meaning that it was not possible to retrieve
# any valid execution context for the current environment
return None
def get_object(
object = None,
alias = False,
page = False,
find = False,
norm = True,
**kwargs
):
# retrieves the base request object that is going to be used in
# the construction of the object
request = common.base().get_request()
# verifies if the provided object is valid in such case creates
# a copy of it and uses it as the base object for validation
# otherwise used an empty map (form validation)
object = object and copy.copy(object) or {}
# retrieves the current request data and tries to
# "load" it as JSON data, in case it fails gracefully
# handles the failure setting the value as an empty map
data_j = request_json()
# uses all the values referencing data in the request to try
# to populate the object this way it may be constructed using
# any of theses strategies (easier for the developer)
for name, value in data_j.items(): object[name] = value
for name, value in request.files_s.items(): object[name] = value
for name, value in request.post_s.items(): object[name] = value
for name, value in request.params_s.items(): object[name] = value
# in case the alias flag is set tries to resolve the attribute
# alias and in case the find types are set converts the find
# based attributes using the currently defined mapping map
alias and resolve_alias(object)
page and page_types(object)
find and find_types(object)
find and find_defaults(object, kwargs)
# in case the normalization flag is set runs the normalization
# of the provided object so that sequences are properly handled
# as defined in the specification (this allows multiple references)
norm and norm_object(object)
# returns the constructed object to the caller method this object
# should be a structured representation of the data in the request
return object
def resolve_alias(object):
for name, value in legacy.eager(object.items()):
if not name in ALIAS: continue
_alias = ALIAS[name]
object[_alias] = value
del object[name]
def page_types(object, size = 50):
page = object.get("page", 1)
size = object.get("size", size)
sorter = object.get("sorter", None)
direction = object.get("direction", "descending")
page = int(page)
size = int(size)
offset = page - 1
object["skip"] = offset * size
object["limit"] = size
if sorter: object["sort"] = "%s:%s" % (sorter, direction)
def find_types(object):
for name, value in legacy.eager(object.items()):
if not name in FIND_TYPES:
del object[name]
continue
find_type = FIND_TYPES[name]
object[name] = find_type(value)
def find_defaults(object, kwargs):
for name, value in legacy.iteritems(kwargs):
if name in object: continue
if not name in FIND_TYPES: continue
object[name] = value
for name, value in legacy.iteritems(FIND_DEFAULTS):
if name in object: continue
object[name] = value
def norm_object(object):
# iterates over all the key value association in the
# object, trying to find the ones that refer sequences
# so that they may be normalized
for name, value in object.items():
# verifies if the current name references a sequence
# and if that's not the case continues the loop trying
# to find any other sequence based value
if not name.endswith("[]"): continue
# removes the current reference to the name as the value
# is not in the valid structure and then normalizes the
# name by removing the extra sequence indication value
del object[name]
name = name[:-2]
# in case the current value is not valid (empty) the object
# is set with an empty list for the current iteration as this
# is considered to be the default value
if not value: object[name] = []; continue
# retrieves the normalized and linearized list of leafs
# for the current value and ten verifies the size of each
# of its values and uses it to measure the number of
# dictionary elements that are going to be contained in
# the sequence to be "generated", then uses this (size)
# value to pre-generate the complete set of dictionaries
leafs_l = leafs(value)
first = leafs_l[0] if leafs_l else (None, [])
_fqn, values = first
size = len(values)
list = [dict() for _index in range(size)]
# sets the list of generates dictionaries in the object for
# the newly normalized name of structure
object[name] = list
# iterates over the complete set of key value pairs in the
# leafs list to gather the value into the various objects that
# are contained in the sequence (normalization process)
for _name, _value in leafs_l:
for index in range(size):
_object = list[index]
_name_l = _name.split(".")
set_object(_object, _name_l, _value[index])
def set_object(object, name_l, value):
"""
Sets a composite value in an object, allowing for
dynamic setting of random size key values.
This method is useful for situations where one wants
to set a value at a randomly defined depth inside
an object without having to much work with the creation
of the inner dictionaries.
:type object: Dictionary
:param object: The target object that is going to be
changed and set with the target value.
:type name_l: List
:param name_l: The list of names that defined the fully
qualified name to be used in the setting of the value
for example path.to.end will be a three size list containing
each of the partial names.
:type value: Object
:param value: The value that is going to be set in the
defined target of the object.
"""
# retrieves the first name in the names list this is the
# value that is going to be used for the current iteration
name = name_l[0]
# in case the length of the current names list has reached
# one this is the final iteration and so the value is set
# at the current naming point
if len(name_l) == 1: object[name] = value
# otherwise this is a "normal" step and so a new map must
# be created/retrieved and the iteration step should be
# performed on this new map as it's set on the current naming
# place (recursion step)
else:
map = object.get(name, {})
object[name] = map
set_object(map, name_l[1:], value)
def leafs(object):
"""
Retrieves a list containing a series of tuples that
each represent a leaf of the current object structure.
A leaf is the last element of an object that is not a
map, the other intermediary maps are considered to be
trunks and should be percolated recursively.
This is a recursive function that takes some memory for
the construction of the list, and so should be used with
the proper care to avoid bottlenecks.
:type object: Dictionary
:param object: The object for which the leafs list
structure is meant to be retrieved.
:rtype: List
:return: The list of leaf node tuples for the provided
object, as requested for each of the sequences.
"""
# creates the list that will hold the various leaf nodes
# "gathered" by the current recursion function
leafs_l = []
# iterates over all the key and value relations in the
# object trying to find the leaf nodes (no map nodes)
# creating a tuple of fqn (fully qualified name) and value
for name, value in object.items():
# retrieves the data type for the current value and
# validation if it is a dictionary or any other type
# in case it's a dictionary a new iteration step must
# be performed retrieving the leafs of the value and
# then incrementing the name with the current prefix
value_t = type(value)
if value_t == dict:
_leafs = leafs(value)
_leafs = [(name + "." + _name, value) for _name, value in _leafs]
leafs_l.extend(_leafs)
# otherwise this is a leaf node and so the leaf tuple
# node must be constructed with the current value
# (properly validated for sequence presence)
else:
value_t = type(value)
if not value_t == list: value = [value]
leafs_l.append((name, value))
# returns the list of leaf nodes that was "just" created
# to the caller method so that it may be used there
return leafs_l
def gather_errors(lazy_dict, resolve = True):
"""
Function responsible for the iterative gathering of
lazy evaluation errors, allowing for a complete gathering
of error instead of a single evaluation.
:type lazy_dict: LazyDict
:param lazy_dict: The lazy dictionary that is going to be
percolated and evaluated sequentially.
:type resolve: bool
:param resolve: If the lazy dictionary values should be evaluated
even if they have already been eager loaded, by unsetting this value
there's a risk of not gathering all of the errors.
:rtype: Dictionary
:return: The final dictionary containing the complete set of
errors that have been found.
"""
# creates the dictionary that is going to hold sequences of
# string based error indexed by parameter name
errors = dict()
# iterates over the complete set of keys in the lazy dictionary
# to evaluate the values and check if there are errors associated
for key in lazy_dict:
try: _value = lazy_dict.__getitem__(key, resolve = resolve)
except (exceptions.AppierException, exceptions.BaseInternalError) as exception:
_errors = errors.get(key, [])
_errors.append(exception.message)
errors[key] = _errors
# returns the final dictionary of error (indexed by name) to
# the caller method so that it may be used for error handling
return errors
def gen_token(limit = None, hash = hashlib.sha256):
"""
Generates a random cryptographic ready token according
to the framework specification, this is generated using
a truly random UUID based seed and hashed using the
provided hash digest strategy (SHA256 by default).
The resulting value is returned as an hexadecimal based
string according to the standard.
:type limit: int
:param limit: The maximum number of characters allowed
for the token to be generated.
:type hash: Function
:param hash: The hashing method that is going to be used
for the hash of the generated token, this should be compliant
with the base python hashing infra-structure.
:rtype: String
:return: The hexadecimal based string value
"""
token_s = str(uuid.uuid4())
token_s = token_s.encode("utf-8")
token = hash(token_s).hexdigest()
if limit: token = token[:limit]
return token
def html_to_text(data):
"""
Converts the provided HTML textual data into a plain text
representation of it. This method uses a series of heuristics
for this conversion, and such conversion should not be considered
to be completely reliable.
The current implementation is not memory or processor efficient
and should be used carefully to avoid performance problems.
:type data: String
:param data: The HTML string of text that is going to be used for
the conversion into the plain text representation.
:rtype: String
:return: The approximate plain text representation to the provided
HTML contents.
"""
data = data.strip()
data = data.replace("\n", "\r")
data = data.replace("©", "Copyright")
data = data.replace("·", "-")
result = re.findall(defines.BODY_REGEX, data)
data = result[0] if result else ""
data = defines.TAG_REGEX.sub("", data)
valid = []
lines = data.splitlines(False)
for line in lines:
line = line.strip()
if not line: continue
valid.append(line)
data = "\n".join(valid)
data = data.replace("\n.", ".")
return data
def camel_to_underscore(camel, separator = "_", lower = True):
"""
Converts the provided camel cased based value into
a normalized underscore based string.
An optional lower parameter may be used to avoid the case
of the letters from being lower cased.
This is useful as most of the python string standards
are compliant with the underscore strategy.
:type camel: String
:param camel: The camel cased string that is going to be
converted into an underscore based string.
:type separator: String
:param separator: The separator token that is going to
be used in the camel to underscore conversion.
:type lower: bool
:param lower: If the letter casing should be changed while
convert the value from camel to underscore.
:rtype: String
:return: The underscore based string resulting from the
conversion of the provided camel cased one.
"""
if not camel: return camel
value = FIRST_CAP_REGEX.sub(r"\1" + separator + r"\2", camel)
value = ALL_CAP_REGEX.sub(r"\1" + separator + r"\2", value)
if lower: value = value.lower()
return value
def camel_to_readable(camel, lower = False, capitalize = False):
"""
Converts the given camel cased oriented string value
into a readable one meaning that the returned value
is a set of strings separated by spaces.
This method may be used to convert class names into
something that is readable by an end user.
:type camel: String
:param camel: The camel case string value that is going
to be used in the conversion into a readable string.
:type lower: bool
:param lower: If the camel based value should be lower
cased before the conversion to readable.
:type capitalize: bool
:param capitalize: If all of the words should be capitalized
or if instead only the first one should.
:rtype: String
:return: The final human readable string that may be
used to display a value to an end user.
"""
if not camel: return camel
underscore = camel_to_underscore(camel, lower = lower)
return underscore_to_readable(underscore, capitalize = capitalize)
def underscore_to_camel(underscore, lower = False):
"""
Converts the provided underscore cased based value into
a normalized camel cased string.
An optional lower parameter may be provided to obtain a
lower came case version of the string.
This is useful as most of the python string standards
are compliant with the underscore strategy.
:type underscore: String
:param underscore: The underscore cased string that is going to be
converted into an camel case based string.
:type lower: bool
:param lower: If the the first letter of the resulting camel
case string should be lower case (lower camel case).
:rtype: String
:return: The camel case based string resulting from the
conversion of the provided underscore cased one.
"""
if not underscore: return underscore
camel = underscore_to_readable(underscore, capitalize = True, separator = "")
if not lower: return camel
return camel[0].lower() + camel[1:]
def underscore_to_readable(underscore, capitalize = False, separator = " "):
"""
Converts the given underscore oriented string value
into a readable one meaning that the returned value
is a set of strings separated by spaces.
This method may be used to class attributes into
something that is readable by an end user.
:type camel: String
:param camel: The underscore string value that is going
to be used in the conversion into a readable string.
:type capitalize: bool
:param capitalize: If all of the words should be capitalized
or if instead only the first one should.
:type separator: String
:param separator: The separator to be used to join the multiple
parts of the resulting readable tokens.
:rtype: String
:return: The final human readable string that may be
used to display a value to an end user.
"""
if not underscore: return underscore
parts = underscore.split("_")
parts = [part for part in parts if part]
if capitalize: parts = [part[0].upper() + part[1:] for part in parts]
else: parts[0] = parts[0][0].upper() + parts[0][1:]
return separator.join(parts)
def quote(value, *args, **kwargs):
"""
Quotes the passed value according to the defined
standard for URL escaping, the value is first encoded
into the expected UTF-8 encoding as defined by standard.
This method should be used instead of a direct call to
the equivalent call in the URL library.
:type value: String
:param value: The string value that is going to be quoted
according to the URL escaping scheme.
:rtype: String
:return: The quoted value according to the URL scheme this
value may be safely used in urls.
"""
is_unicode = isinstance(value, legacy.UNICODE)
if is_unicode: value = value.encode("utf-8")
return legacy.quote(value, *args, **kwargs)
def unquote(value, *args, **kwargs):
"""
Unquotes the provided value according to the URL scheme
the resulting value should be an unicode string representing
the same value, the intermediary string value from the decoding
should be an UTF-8 based value.
This method should be used instead of a direct call to
the equivalent call in the URL library.
:type value: String
:param value: The string value that is going to be unquoted
according to the URL escaping scheme.
:rtype: String
:return: The unquoted value extracted as an unicode
string that the represents the same value.
"""
value = legacy.unquote(value, *args, **kwargs)
is_bytes = isinstance(value, legacy.BYTES)
if is_bytes: value = value.decode("utf-8")
return value
def escape(value, char, escape = "\\"):
"""
Escapes the provided string value according to the requested
target character and escape value. Meaning that all the characters
are going to be replaced by the escape plus character sequence.
:type value: String
:param value: The string that is going to have the target characters