forked from rotki/rotki
-
Notifications
You must be signed in to change notification settings - Fork 0
/
serialize.py
103 lines (93 loc) · 3.7 KB
/
serialize.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from typing import Any, Dict, List, Union
from hexbytes import HexBytes
from rotkehlchen.accounting.structures import Balance
from rotkehlchen.assets.asset import Asset
from rotkehlchen.balances.manual import ManuallyTrackedBalanceWithValue
from rotkehlchen.chain.ethereum.makerdao import (
DSRAccountReport,
DSRCurrentBalances,
MakerDAOVault,
MakerDAOVaultDetails,
VaultEvent,
VaultEventType,
)
from rotkehlchen.db.settings import DBSettings
from rotkehlchen.db.utils import AssetBalance, LocationData, SingleAssetBalance
from rotkehlchen.exchanges.data_structures import Trade
from rotkehlchen.exchanges.kraken import KrakenAccountType
from rotkehlchen.fval import FVal
from rotkehlchen.serialization.deserialize import deserialize_location_from_db
from rotkehlchen.typing import EthTokenInfo, Location, TradeType
from rotkehlchen.utils.version_check import VersionCheckResult
def _process_entry(entry: Any) -> Union[str, List[Any], Dict[str, Any], Any]:
if isinstance(entry, FVal):
return str(entry)
elif isinstance(entry, list):
new_list = []
for new_entry in entry:
new_list.append(_process_entry(new_entry))
return new_list
elif isinstance(entry, dict):
new_dict = {}
for k, v in entry.items():
if isinstance(k, Asset):
k = k.identifier
new_dict[k] = _process_entry(v)
return new_dict
elif isinstance(entry, HexBytes):
return entry.hex()
elif isinstance(entry, Location):
return str(entry)
elif isinstance(entry, LocationData):
return {
'time': entry.time,
'location': str(deserialize_location_from_db(entry.location)),
'usd_value': entry.usd_value,
}
elif isinstance(entry, SingleAssetBalance):
return {'time': entry.time, 'amount': entry.amount, 'usd_value': entry.usd_value}
elif isinstance(entry, AssetBalance):
return {
'time': entry.time,
'asset': entry.asset.identifier,
'amount': entry.amount,
'usd_value': entry.usd_value,
}
elif isinstance(entry, (Trade, MakerDAOVault, DSRAccountReport, Balance)):
return process_result(entry.serialize())
elif isinstance(entry, (
DBSettings,
EthTokenInfo,
VersionCheckResult,
DBSettings,
DSRCurrentBalances,
ManuallyTrackedBalanceWithValue,
VaultEvent,
MakerDAOVaultDetails,
)):
return process_result(entry._asdict())
elif isinstance(entry, tuple):
raise ValueError('Query results should not contain plain tuples')
elif isinstance(entry, Asset):
return entry.identifier
elif isinstance(entry, (TradeType, Location, KrakenAccountType, Location, VaultEventType)):
return str(entry)
else:
return entry
def process_result(result: Any) -> Dict[Any, Any]:
"""Before sending out a result dictionary via the server we are serializing it.
Turning:
- all Decimals to strings so that the serialization to float/big number
is handled by the client application and we lose nothing in the transfer
- if a dictionary has an Asset for a key use its identifier as the key value
- all NamedTuples and Dataclasses must be serialized into dicts
- all enums and more
"""
processed_result = _process_entry(result)
assert isinstance(processed_result, Dict)
return processed_result
def process_result_list(result: List[Any]) -> List[Any]:
"""Just like process_result but for lists"""
processed_result = _process_entry(result)
assert isinstance(processed_result, List)
return processed_result