-
Notifications
You must be signed in to change notification settings - Fork 5
/
DiffStruct.py
154 lines (129 loc) · 5.92 KB
/
DiffStruct.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import os
import re
import logging
from collections import defaultdict
from ctf.diff import git_wrapper
from ctf.utils import file_path_to_log
logger = logging.getLogger("content-test-filtering.diff")
class DiffStruct:
def __init__(self, filepath):
self.absolute_path = git_wrapper.repo_path + "/" + filepath
# Remove duplicite slashes in filepath (e.g. from parameter)
self.absolute_path = self.absolute_path.replace("//", "/")
self.file_type = None
self.changed_rules = defaultdict(set) # {"product": {"rule1", ...}, ...}
self.changed_profiles = defaultdict(set) # {"product": {"profile1", ...}}
self.changed_products = set()
self.funcionality_changed = False
self.affected_files = []
self.rules_logging = defaultdict(set)
self.profiles_logging = defaultdict(set)
self.products_logging = defaultdict(set)
self.macros_logging = defaultdict(set)
self.functionality_logging = set()
def get_changed_rules_with_products(self):
for product in self.changed_rules:
for rule in self.changed_rules[product]:
yield product, rule
def get_changed_profiles_with_products(self):
for product in self.changed_profiles:
for profile in self.changed_profiles[product]:
yield product, profile
def find_rule_profiles(self, rule):
product_folders = []
# We know that products are in products/ folder
# We want all products' paths
products_folder = git_wrapper.repo_path + "/" + "products"
for product_name in os.listdir(products_folder):
product_path = products_folder + "/" + product_name
if not os.path.isdir(product_path):
continue
for subfile in os.listdir(product_path):
if subfile == "profiles":
product_folders.append(product_path)
continue
find_rule = re.compile(r"^\s*-\s*" + rule + r"\s*$")
# Create list of all profiles that contain the rule
for folder in product_folders:
for profile in os.listdir(folder + "/profiles"):
profile_file = folder + "/profiles/" + profile
with open(profile_file) as f:
for line in f:
if find_rule.search(line):
yield profile_file
def get_rule_ruleyml(self, rule):
# Find a directory with a rule name and check if it has rule.yml file
for root, dirs, files in os.walk(git_wrapper.repo_path):
if root.endswith("/" + rule) and "rule.yml" in files:
ruleyml_path = root + "/rule.yml"
logger.debug("rule.yml path - %s", ruleyml_path)
return ruleyml_path
logger.debug("rule.yml was not found for %s", rule)
return None
def get_rule_profiles(self, rule):
profiles = []
# Parse from matched profiles profile names
for profile_path in self.find_rule_profiles(rule):
parse_file = re.match(r".+/(?:\w|-)+/profiles/((?:\w|-)+)\.profile",
profile_path)
profiles.append(parse_file.group(1))
return profiles
def get_rule_products(self, rule):
# Parse from matched profiles product names
ruleyml_path = self.get_rule_ruleyml(rule)
prodtype_line = None
with open(ruleyml_path) as f:
for line in f.readlines():
if "prodtype:" in line:
prodtype_line = line
break
# rule.yml does not have prodtype
if not prodtype_line:
return None
prodtypes = re.match(r"\s*prodtype:\s*([\w|,]+)\s*", prodtype_line).group(1)
products = prodtypes.split(",")
products = sorted(products, key=lambda k: (k!="rhel8", k!="rhel7", k!="ocp4", k))
return products
def add_changed_rule(self, rule_name, product_name=None, msg=""):
self.add_rule_log(rule_name, msg)
if not product_name:
product_name = self.get_rule_products(rule_name)
if product_name:
product_name = product_name[0]
logger.debug("Rule %s is part of %s datastream.", rule_name, product_name)
else:
product_name = "rhel8"
logger.debug("Rule %s is not part of any datastream. "
"Added default rhel8 value", rule_name)
self.changed_rules[product_name].add(rule_name)
def add_changed_profile(self, profile_name, product_name, msg=""):
profile_product = "%s on %s" % (profile_name, product_name)
self.add_profile_log(profile_product, msg)
self.changed_profiles[product_name].add(profile_name)
def add_changed_product_by_rule(self, rule_name, msg=""):
product_name = self.get_rule_products(rule_name)
if product_name:
product_name = product_name[0]
else:
msg = "The rule doesn't occur in any profile nor product."
self.add_rule_log(rule_name, msg)
return
logger.debug("Rule %s is part of %s datastream.", rule_name, product_name)
self.changed_products.add(product_name)
def add_functionality_test(self, msg=""):
self.add_functionality_log(msg)
self.funcionality_changed = True
def add_rule_log(self, rule, msg):
self.rules_logging[rule].add(msg)
def add_profile_log(self, profile, msg):
self.profiles_logging[profile].add(msg)
def add_functionality_log(self, msg):
self.functionality_logging.add(msg)
def add_macro_log(self, macro, msg):
self.macros_logging[macro].add(msg)
def add_macro_rule_log(self, macro, usage_list):
for usage in usage_list:
msg = file_path_to_log(usage)
self.macros_logging[macro].add(msg)
def add_macro_template_log(self, macro, msg):
pass