/
gnosis_ref_arch.py
294 lines (242 loc) · 12.9 KB
/
gnosis_ref_arch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
"""GnosisReferenceArchitecture i.e. Reference Architecture Generator."""
from analytics_platform.kronos.src.config import SPARK_HOME_PATH, PY4J_VERSION
import os
import sys
sys.path.insert(0, SPARK_HOME_PATH + "/python")
sys.path.insert(0, os.path.join(SPARK_HOME_PATH, "python/lib/" + PY4J_VERSION))
from pyspark import SparkContext
from pyspark.mllib.fpm import FPGrowth
from analytics_platform.kronos.gnosis.src.abstract_gnosis import AbstractGnosis
from analytics_platform.kronos.gnosis.src.gnosis_package_topic_model import GnosisPackageTopicModel
import analytics_platform.kronos.gnosis.src.gnosis_constants as gnosis_constants
import util.gnosis_util as utils
class GnosisReferenceArchitecture(AbstractGnosis):
"""GnosisReferenceArchitecture i.e. Reference Architecture Generator."""
def __init__(self, dictionary):
"""Instantiate Gnosis Reference Architecture dictionary.
:param gnosis_model: the gnosis model.
"""
self._dictionary = dictionary
@classmethod
def train(cls, data_store, additional_path="", min_support_count=None,
min_intent_topic_count=None, fp_num_partition=None, fp_tag_intent_limit=None):
"""Generate the Gnosis Reference Architecture.
:param data_store: input data store containing the processed package
topic map and list of manifest files.
:param min_support_count: minimum support count to be used by FP Growth Algo.
:param min_intent_topic_count: minimum number of allowed topics per intent.
:return: the Gnosis Reference Architecture dictionary.
"""
gnosis_ptm_obj = GnosisPackageTopicModel.load(
data_store=data_store,
filename=os.path.join(additional_path, gnosis_constants.GNOSIS_PTM_OUTPUT_PATH))
eco_to_package_topic_dict = gnosis_ptm_obj.get_dictionary()
eco_to_package_to_topic_dict = eco_to_package_topic_dict[
gnosis_constants.GNOSIS_PTM_PACKAGE_TOPIC_MAP]
gnosis_component_class_list = cls._generate_component_class_list_for_eco_package_topic_dict(
eco_to_package_topic_dict=eco_to_package_to_topic_dict)
fp_growth_model = cls._train_fp_growth_model(
data_store=data_store,
eco_to_package_topic_dict=eco_to_package_to_topic_dict,
min_support_count=min_support_count,
additional_path=additional_path, fp_num_partition=fp_num_partition)
gnosis_intent_to_component_class_dict = cls._generate_intent_component_class_dict_fp_growth(
model=fp_growth_model, min_intent_topic_count=min_intent_topic_count,
package_list=gnosis_component_class_list,
fp_tag_intent_limit=fp_tag_intent_limit)
# TODO: modify this while implementing multiple levels in the reference
# architecture
gnosis_intent_to_intent_dict = {}
gnosis_intent_list = cls._generate_intent_list(
gnosis_intent_to_intent_dict=gnosis_intent_to_intent_dict,
gnosis_intent_to_component_class_dict=gnosis_intent_to_component_class_dict)
gnosis_edge_list = cls._generate_edge_list(
gnosis_intent_to_component_class_dict=gnosis_intent_to_component_class_dict,
gnosis_intent_to_intent_dict=gnosis_intent_to_intent_dict)
gnosis_model = cls._generate_gnosis_model(
gnosis_intent_to_intent_dict=gnosis_intent_to_intent_dict,
gnosis_intent_to_component_class_dict=gnosis_intent_to_component_class_dict,
gnosis_component_class_list=gnosis_component_class_list,
gnosis_intent_list=gnosis_intent_list,
gnosis_edge_list=gnosis_edge_list)
return gnosis_model
@classmethod
def load(cls, data_store, filename):
"""Load already saved Gnosis.
:param data_store: Data store to load Gnosis from.
:param filename: the file from which Gnosis is to be loaded from.
:return: a Gnosis object.
"""
gnosis_ra_json = data_store.read_json_file(filename=filename)
gnosis_ra_dict = dict(gnosis_ra_json)
gnosis_ra_obj = GnosisReferenceArchitecture(dictionary=gnosis_ra_dict)
return gnosis_ra_obj
def save(self, data_store, filename):
"""Save the Gnosis object in json format.
:param data_store: Data store to save Gnosis in
:param filename: the file into which Gnosis is to be saved.
"""
data_store.write_json_file(
filename=filename, contents=self.get_dictionary())
return None
def get_dictionary(self):
"""Get the dictionary."""
return self._dictionary
@classmethod
def _generate_component_class_list(cls, gnosis_intent_component_class_dict):
"""Generate the component class list.
:param gnosis_intent_component_class_dict: intent-component_class dict.
:return: the list of component classes.
"""
assert cls is not None # just make checkers happy
component_class_list = utils.generate_value_list_from_dict(
dictionary=gnosis_intent_component_class_dict)
return component_class_list
@classmethod
def _generate_edge_list(cls, gnosis_intent_to_component_class_dict,
gnosis_intent_to_intent_dict):
"""Generate the list of edges as the list of tuples.
If a form: [(source,destination),(source,destination),...].
:param gnosis_intent_to_component_class_dict: lowest level of Gnosis
hierarchy in dict format.
:param gnosis_intent_to_intent_dict: all the levels except the lowest
level of Gnosis hierarchy in dict format.
:return: list of edges where edges are represented as tuples.
"""
intent_to_component_class_edge_list = utils.generate_key_to_value_edges(
dictionary=gnosis_intent_to_component_class_dict)
intent_to_intent_edge_list = utils.generate_key_to_value_edges(
dictionary=gnosis_intent_to_intent_dict)
edge_list = intent_to_component_class_edge_list + intent_to_intent_edge_list
return edge_list
@classmethod
def _generate_intent_list(cls, gnosis_intent_to_intent_dict,
gnosis_intent_to_component_class_dict):
"""Generate the list of intents.
:param gnosis_intent_to_intent_dict: all the levels except the lowest
level of Gnosis hierarchy in dict format.
:return: list of intents.
"""
super_intent_list = gnosis_intent_to_intent_dict.keys()
sub_intent_list = utils.generate_value_list_from_dict(
gnosis_intent_to_intent_dict)
intent_list = gnosis_intent_to_component_class_dict.keys()
node_set = set.union(set(super_intent_list), set(
sub_intent_list), set(intent_list))
node_list = list(node_set)
return node_list
@classmethod
def _generate_gnosis_model(cls, gnosis_intent_to_intent_dict,
gnosis_intent_to_component_class_dict, gnosis_component_class_list,
gnosis_intent_list, gnosis_edge_list):
"""Generate the Gnosis model.
:param gnosis_edge_list_string: list of edges in the string format
'[(source,destination),(source,destination),...]'.
:param gnosis_intent_to_intent_dict: Intent to Intent map.
:param gnosis_intent_component_class_dict: Intent to component class map.
:param gnosis_component_class_list: Component class list.
:param gnosis_intent_list: Intent list.
:return: Gnosis model.
"""
gnosis_ra_dict = dict()
gnosis_ra_dict[gnosis_constants.GNOSIS_RA_DICT] = \
dict(gnosis_intent_to_intent_dict,
**gnosis_intent_to_component_class_dict)
gnosis_ra_dict[
gnosis_constants.GNOSIS_RA_COMPONENT_CLASS_LIST] = gnosis_component_class_list
gnosis_ra_dict[gnosis_constants.GNOSIS_RA_INTENT_LIST] = gnosis_intent_list
gnosis_ra_dict[gnosis_constants.GNOSIS_RA_EDGE_LIST] = gnosis_edge_list
gnosis_ra_obj = GnosisReferenceArchitecture(dictionary=gnosis_ra_dict)
return gnosis_ra_obj
@classmethod
def _train_fp_growth_model(cls, data_store, eco_to_package_topic_dict, min_support_count,
additional_path, fp_num_partition):
sc = SparkContext()
manifest_file_list = data_store.list_files(
prefix=os.path.join(additional_path, gnosis_constants.MANIFEST_FILEPATH))
list_of_topic_list = list()
for manifest_file in manifest_file_list:
eco_to_package_list_json_array = data_store.read_json_file(
manifest_file)
for eco_to_package_list_json in eco_to_package_list_json_array:
ecosystem = eco_to_package_list_json.get(gnosis_constants.MANIFEST_ECOSYSTEM)
list_of_package_list = eco_to_package_list_json.get(
gnosis_constants.MANIFEST_PACKAGE_LIST)
for package_list in list_of_package_list:
package_list_lowercase = [x.lower() for x in package_list]
topic_list = cls.get_topic_list_for_package_list(package_list_lowercase,
ecosystem,
eco_to_package_topic_dict)
list_of_topic_list.append(topic_list)
transactions = sc.parallelize(list_of_topic_list)
transactions.cache()
min_support = float(min_support_count / float(transactions.count()))
model = FPGrowth.train(transactions, minSupport=min_support,
numPartitions=fp_num_partition)
return model
@classmethod
def _generate_intent_component_class_dict_fp_growth(
cls, model, min_intent_topic_count, package_list,
fp_tag_intent_limit=None):
# TODO: reduce cyclomatic complexity
fp_tag_intent_limit = fp_tag_intent_limit or gnosis_constants.FP_TAG_INTENT_LIMIT
result = model.freqItemsets().collect()
itemset_freq_tuple_list = [(fi.items, fi.freq) for fi in result]
topic_num_to_itemset_dict = dict()
for itemset_freq_tuple in itemset_freq_tuple_list:
item_length = len(itemset_freq_tuple[0])
if item_length == min_intent_topic_count:
if item_length in topic_num_to_itemset_dict:
lst = topic_num_to_itemset_dict[item_length]
lst.append(itemset_freq_tuple)
topic_num_to_itemset_dict[item_length] = lst
else:
topic_num_to_itemset_dict[
item_length] = [itemset_freq_tuple]
for key in topic_num_to_itemset_dict:
item_list = topic_num_to_itemset_dict[key]
sorted_item_list = sorted(
item_list, key=lambda x: x[1], reverse=False)
topic_num_to_itemset_dict[key] = sorted_item_list
item_dict = {z: fp_tag_intent_limit for z in package_list}
intent_dict = dict()
for key_value in topic_num_to_itemset_dict:
k_itemset_list = topic_num_to_itemset_dict[key_value]
index = 0
for items_support_tuple in k_itemset_list:
items = items_support_tuple[0]
parent = ":".join(items)
intent_dict[parent] = items
for item in items:
if item in item_dict:
item_dict[item] -= 1
keys_to_remove = list()
for key, value in item_dict.items():
if value == 0:
k_itemset_list = utils.modify_list(
key, k_itemset_list, index)
keys_to_remove.append(key)
for key in keys_to_remove:
del item_dict[key]
index += 1
return intent_dict
@classmethod
def _generate_component_class_list_for_eco_package_topic_dict(cls, eco_to_package_topic_dict):
# TODO raise exception when ecosystem is not there in
# eco_to_package_topic_dict
gnosis_component_class_list = list()
for ecosystem in eco_to_package_topic_dict:
component_class_list = cls._generate_component_class_list(
gnosis_intent_component_class_dict=eco_to_package_topic_dict[ecosystem])
gnosis_component_class_list.extend(component_class_list)
gnosis_component_class_list = list(set(gnosis_component_class_list))
return gnosis_component_class_list
@classmethod
def get_topic_list_for_package_list(cls, package_list, ecosystem, eco_to_package_topic_dict):
"""Get list of topics for given ecosystem and package list."""
# TODO raise exception when package or ecosystem is not there in
# eco_to_package_topic_dict
topic_set = set()
for package in package_list:
topic_set |= (set(eco_to_package_topic_dict[ecosystem][package]))
return list(topic_set)