/
json.py
146 lines (114 loc) · 3.99 KB
/
json.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""JSON Serialization Utilities."""
from __future__ import annotations
import base64
import json
import uuid
from datetime import date, datetime, time
from decimal import Decimal
from typing import Any, Callable, TypeVar
textual_types = ()
try:
from django.utils.functional import Promise
textual_types += (Promise,)
except ImportError:
pass
class JSONEncoder(json.JSONEncoder):
"""Kombu custom json encoder."""
def default(self, o):
reducer = getattr(o, "__json__", None)
if reducer is not None:
return reducer()
if isinstance(o, textual_types):
return str(o)
for t, (marker, encoder) in _encoders.items():
if isinstance(o, t):
return (
encoder(o) if marker is None else _as(marker, encoder(o))
)
# Bytes is slightly trickier, so we cannot put them directly
# into _encoders, because we use two formats: bytes, and base64.
if isinstance(o, bytes):
try:
return _as("bytes", o.decode("utf-8"))
except UnicodeDecodeError:
return _as("base64", base64.b64encode(o).decode("utf-8"))
return super().default(o)
def _as(t: str, v: Any):
return {"__type__": t, "__value__": v}
def dumps(
s,
_dumps=json.dumps,
cls=JSONEncoder,
default_kwargs=None,
**kwargs
):
"""Serialize object to json string."""
default_kwargs = default_kwargs or {}
return _dumps(s, cls=cls, **dict(default_kwargs, **kwargs))
def object_hook(o: dict):
"""Hook function to perform custom deserialization."""
if o.keys() == {"__type__", "__value__"}:
decoder = _decoders.get(o["__type__"])
if decoder:
return decoder(o["__value__"])
else:
raise ValueError("Unsupported type", type, o)
else:
return o
def loads(s, _loads=json.loads, decode_bytes=True, object_hook=object_hook):
"""Deserialize json from string."""
# None of the json implementations supports decoding from
# a buffer/memoryview, or even reading from a stream
# (load is just loads(fp.read()))
# but this is Python, we love copying strings, preferably many times
# over. Note that pickle does support buffer/memoryview
# </rant>
if isinstance(s, memoryview):
s = s.tobytes().decode("utf-8")
elif isinstance(s, bytearray):
s = s.decode("utf-8")
elif decode_bytes and isinstance(s, bytes):
s = s.decode("utf-8")
return _loads(s, object_hook=object_hook)
DecoderT = EncoderT = Callable[[Any], Any]
T = TypeVar("T")
EncodedT = TypeVar("EncodedT")
def register_type(
t: type[T],
marker: str | None,
encoder: Callable[[T], EncodedT],
decoder: Callable[[EncodedT], T] = lambda d: d,
):
"""Add support for serializing/deserializing native python type.
If marker is `None`, the encoding is a pure transformation and the result
is not placed in an envelope, so `decoder` is unnecessary. Decoding must
instead be handled outside this library.
"""
_encoders[t] = (marker, encoder)
if marker is not None:
_decoders[marker] = decoder
_encoders: dict[type, tuple[str | None, EncoderT]] = {}
_decoders: dict[str, DecoderT] = {
"bytes": lambda o: o.encode("utf-8"),
"base64": lambda o: base64.b64decode(o.encode("utf-8")),
}
def _register_default_types():
# NOTE: datetime should be registered before date,
# because datetime is also instance of date.
register_type(datetime, "datetime", datetime.isoformat,
datetime.fromisoformat)
register_type(
date,
"date",
lambda o: o.isoformat(),
lambda o: datetime.fromisoformat(o).date(),
)
register_type(time, "time", lambda o: o.isoformat(), time.fromisoformat)
register_type(Decimal, "decimal", str, Decimal)
register_type(
uuid.UUID,
"uuid",
lambda o: {"hex": o.hex},
lambda o: uuid.UUID(**o),
)
_register_default_types()