-
Notifications
You must be signed in to change notification settings - Fork 498
/
tree_entity.py
226 lines (180 loc) · 8.79 KB
/
tree_entity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
"""
Copyright 2019 Goldman Sachs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
"""
import pandas as pd
import datetime as dt
from typing import Optional
from pydash import get
from gs_quant.api.gs.assets import GsAsset, GsAssetApi
from gs_quant.data import Dataset
from gs_quant.errors import MqValueError
class AssetTreeNode:
def __init__(self, id, depth: Optional[int] = 0, date: Optional[dt.date] = None, asset: Optional[GsAsset] = None):
self.id = id
self.date = date # date for which the Tree is constructed. If None, it is set to the latest available date.
self.depth = depth # depth of the node with respect to the root node
self.asset = asset
self.name = get(self.asset, 'name')
self.bbid = get(get(self.asset, 'xref'), 'bbid')
self.asset_type = get(self.asset, 'type')
self.data = {}
self.constituents_df = pd.DataFrame()
self.direct_underlier_assets_as_nodes = [] # holds the child nodes as AssetTreeNode objects.
def __str__(self):
result = self.bbid if self.bbid is not None else self.id
return f'Tree Node - {result}'
def to_frame(self) -> pd.DataFrame:
if len(self.constituents_df) > 0:
return self.constituents_df
else:
self.constituents_df = self.__build_constituents_df(pd.DataFrame()).drop_duplicates().\
sort_values(by='depth').reset_index(drop=True)
return self.constituents_df
def populate_values(self, dataset, value_column, underlier_column):
ds = Dataset(dataset)
query = ds.get_data(start=self.date, end=self.date, assetId=[self.id])
if len(query) > 0:
for node in self.direct_underlier_assets_as_nodes:
value = query.loc[query[underlier_column] == node.id][value_column].iloc[0]
node.data[value_column] = value
node.populate_values(dataset, value_column, underlier_column)
def build_tree(self, dataset, underlier_column):
"""
Build the full tree and return the root node
"""
query = self.__get_direct_underliers(self.id, dataset)
if len(query) > 0:
all_ids = query[underlier_column].tolist()
all_assets = GsAssetApi.get_many_assets(id=all_ids)
asset_lookup = {mq_id: asset_obj for mq_id, asset_obj in zip(all_ids, all_assets)}
for i_, row in query.iterrows():
underlier = row[underlier_column]
if underlier not in asset_lookup:
raise Exception("Unable to find {}".format(underlier))
child_node = AssetTreeNode(underlier, self.depth + 1, self.date, asset_lookup[underlier])
child_node.build_tree(dataset, underlier_column)
self.direct_underlier_assets_as_nodes.append(child_node)
def __get_direct_underliers(self, asset_id, dataset) -> pd.DataFrame:
"""
Queries the dataset for the date passed during initialisation. If date isn't passed, returns the data of the
latest available date.
"""
ds = Dataset(dataset)
if self.date:
query = ds.get_data(start=self.date, end=self.date, assetId=[asset_id]).drop_duplicates()
else:
query = ds.get_data(assetId=[asset_id]).drop_duplicates()
if len(query) > 0:
self.date = query.index.max().date()
query = query[query.index == query.index.max()].reset_index()
return query
def __build_constituents_df(self, constituents_df) -> pd.DataFrame:
for node in self.direct_underlier_assets_as_nodes:
data = {'date': self.date, 'assetName': self.name, 'assetId': self.id, 'assetBbid': self.bbid,
'underlyingAssetName': node.name, 'underlyingAssetId': node.id, 'underlyingAssetBbid': node.bbid,
'depth': node.depth}
for key, value in node.data.items():
data[key] = value
constituents_df = constituents_df.append(pd.DataFrame(data, index=[0]))
d = node.__build_constituents_df(pd.DataFrame())
if len(d) > 0:
constituents_df = constituents_df.append(d)
return constituents_df
class TreeHelper:
def __init__(self,
id,
date: Optional[dt.date] = None,
tree_underlier_dataset: Optional[str] = None,
underlier_column: Optional[str] = 'underlyingAssetId'):
self.id = id
self.root = AssetTreeNode(self.id, 0, date, GsAssetApi.get_asset(asset_id=self.id))
self.date = self.root.date
self.update_time = dt.datetime.now()
self.constituents_df = pd.DataFrame()
self.tree_built = False
self.__tree_underlier_dataset = tree_underlier_dataset
self.__underlier_column = underlier_column
def populate_weights(self,
dataset,
weight_column: Optional[str] = 'weight'):
if not self.tree_built:
self.build_tree()
self.root.data['weight'] = 1
self.root.populate_values(dataset, weight_column, self.__underlier_column)
def populate_attribution(self,
dataset,
attribution_column: Optional[str] = 'absoluteAttribution'):
if not self.tree_built:
self.build_tree()
self.root.data['absoluteAttribution'] = 1
self.root.populate_values(dataset, attribution_column, self.__underlier_column)
def to_frame(self) -> pd.DataFrame:
"""
Retrieve constituents of the full tree. If it has already been fetched once, it is stored and returned when
called later in the future.
:return: dataframe with constituents of the full tree, with parent AssetID, underlying AssetID, depth and weight
**Usage**
Retrieve constituents of the full tree.
"""
if not self.tree_built:
self.build_tree()
self.constituents_df = self.root.to_frame()
if len(self.constituents_df) > 0:
return self.constituents_df
else:
raise MqValueError('No constituents found for the asset')
def build_tree(self):
if not self.tree_built:
self.root.build_tree(self.__tree_underlier_dataset, self.__underlier_column)
self.tree_built = True
self.update_time = dt.datetime.now()
def get_tree(self) -> AssetTreeNode:
"""
Build the full tree and return the root node of the full-fledged tree.
If the tree has been built already, return it on future calls.
:return: AssetTreeNode object of the root node, with a list attribute direct_underlier_assets_as_nodes that
holds the child AssetTreeNode object.
**Usage**
Root AssetTreeNode object of the tree entity
"""
if not self.tree_built:
self.build_tree()
return self.root
def get_visualisation(self, visualise_by: str = 'name'):
try:
from treelib import Tree
except ModuleNotFoundError:
raise RuntimeError('You must install treelib to be able use this function.')
if not self.tree_built:
self.build_tree()
if visualise_by in ['name', 'bbid', 'id']:
# Each entry in the BFS queue is an array having the node and the prefix value for that node.
# The prefix is the path from root to the parent of that node, and is empty for the root node.
# This definition of prefix allows using the same node in multiple branches.
bfs_queue = [[self.root, '']]
tree_vis = Tree()
while len(bfs_queue) != 0:
node, prefix = bfs_queue.pop(0)
node_id = prefix + '-' + node.id
node_name = getattr(node, visualise_by)
if str(node_name) == 'None':
node_name = f'NA ({node.id})'
if (prefix == ''):
tree_vis.create_node(node_name, node_id)
else:
tree_vis.create_node(node_name, node_id, parent=prefix)
for c in node.direct_underlier_assets_as_nodes:
bfs_queue.append([c, node_id])
else:
raise MqValueError('visualise_by argument has to be either name, id or bbid')
return tree_vis.show()