Skip to content


Fixes #19338: Make a script to help visualize global variables define…
Browse files Browse the repository at this point in the history
…d in the configuration
  • Loading branch information
Fdall authored and Jenkins CI committed Jun 21, 2021
1 parent c802266 commit 0fb9782
Show file tree
Hide file tree
Showing 5 changed files with 571 additions and 0 deletions.
157 changes: 157 additions & 0 deletions scripts/rca/
@@ -0,0 +1,157 @@
import rule
import directive
import sys
from tabulate import tabulate
sys.path.insert(0, "/usr/share/rudder-api-client/")
sys.path.insert(0, "/opt/rudder/share/python")
from rudder import RudderEndPoint, RudderError

with open('/var/rudder/run/api-token') as ftoken:
endpoint = RudderEndPoint(RUDDER_URL, TOKEN, verify=False)

## Get all variable definition
rawDirectives = endpoint.list_techniques_directives("genericVariableDefinition")["directives"]
directives = [ directive.Directive(x) for x in rawDirectives ]

## Get all rules
rawRules = endpoint.list_rules()["rules"]
rules = [ rule.Rule(x) for x in rawRules ]

## Get all groups
rawGroups = endpoint.list_groups()["groups"]
groups = [ x["id"] for x in rawGroups ]

#### Functions
Return all directives defined
def get_directives():
return directives

Return all rules defined
def get_rules():
return rules

Return all groups defined
def get_groups():
return groups

Return a list with the ids of all directives applied to the target group
def get_directives_applied_to(group):
appliedDirectives = set()
for iRule in rules:
if group in iRule.getIncludeTargets():
for iDirective in iRule.directives:
appliedDirectives |= set([ x for x in directives if == iDirective ])
return list(appliedDirectives)

Sort the table to display on the given column.
toSort must respect the same input than the display function
def sortToDisplay(toSort, criterion):
sortedList = []
columnNames = [x['title'] for x in toSort]
if criterion not in columnNames:
print("Unknown column name '%s' to sort by, available columns are: %s"%(criterion, columnNames))
refColumn = next(x for x in toSort if x['title'] == criterion)['value']
for iColumn in toSort:
zipped = zip(refColumn, iColumn['value'])
sortedvalue = [j for (i, j) in zipped]
'title' : iColumn['title'],
'value' : sortedvalue
return sortedList

Print dict list in a fancy manner
Assume the list is following the format:
{ "title": "strkey1", "value" : [ "str", "str2", ... ] },
{ "tilte": "strkey2", "value" : [ "str", "str2", ... ] },
def dictToAsciiTable(data):
# Get maximum text length to print
lengths = [ len(max([str(i) for i in [column["title"]] + column["value"]], key=len)) for column in data ]
lengths = [50, 50, 50]
lines = []
sep = "|"
nbElem = len(data[0]['value'])
for i in range(0, nbElem):
line = []
for c in data:

headers = []
for c in data:

# Print headers
toPrint = generateSepLine(lengths)
toPrint += generateLine(headers, lengths, sep)
for i in lines:
toPrint += generateSepLine(lengths)
toPrint += generateLine(i, lengths, sep)
toPrint += generateSepLine(lengths)
return toPrint

def generateSepLine(lengths):
sepBar = ""
for iSep in lengths:
fieldLength = int(iSep) -1
sepBar += "+" + "-" * fieldLength
return sepBar + "+\n"

Parse the line array to split each element in multiline
["line 1 of A", "line 2 of A" ],
["line 1 of B", "line 2 of B" ],
["line 1 of C"
Transform it to something like:
["line 1 of A", "line 1 of B", "line 1 of C", ... ],
["line 2 of A", " ", "line 2 of C", ... ],
[ ... ]
def generateLine(line, lengths, sep):
sublines = []
for i in range(len(line)):
string = str(line[i])
width = lengths[i] -1
sublines.append([string[x:x+width] for x in range(0, len(string), width)])

printables = []
for i in range(max([ len(x) for x in sublines])):
for j in range(len(sublines)):
width = lengths[j] -1
if i < len(sublines[j]):
subfield = '{0: <{1}}'.format(sublines[j][i], width)
subfield = ' ' * width
printables[i].append(subfield.replace('\n', ' '))
printedLine = ""
for i in printables:
printedLine += "|"
printedLine += "|".join(i) + "|\n"
return printedLine
64 changes: 64 additions & 0 deletions scripts/rca/
@@ -0,0 +1,64 @@
import json
from pprint import pprint
class Directive:
def __init__(self, data): = data["id"]
self.displayName = data["displayName"]
self.shortDescription = data["shortDescription"]
self.longDescription = data["longDescription"]
self.techniqueName = data["techniqueName"]
self.techniqueVersion = data["techniqueVersion"]
self.priority = data["priority"]
self.system = data["system"]
self.enabled = data["enabled"]
self.parameters = data["parameters"]
self.tags = data["tags"]
self.policyMode = data["policyMode"]

def toJson(self):
data = {
"displayName" : self.displayName,
"shortDescription" : self.shortDescription,
"longDescription" : self.longDescription,
"techniqueName" : self.techniqueName,
"techniqueVersion" : self.techniqueVersion,
"priority" : self.priority,
"system" : self.system,
"enabled" : self.enabled,
"parameters" : self.parameters,
"tags" : self.tags,
"policyMode" : self.policyMode

return json.dumps(data)

def usedIn(self, rules):
results = []
for iRule in rules:
if in iRule.directives:
return results

class GenericVariableDefinitionDirective:
def __init__(self, directive):
self.directive = directive

def getVariables(self):
variables = []
for iSection in self.directive.parameters["section"]["sections"]:
for iVar in iSection["section"]["vars"]:
if iVar["var"]["name"] == "GENERIC_VARIABLE_NAME":
name = "generic_variable_definition." + iVar["var"]["value"]
elif iVar["var"]["name"] == "GENERIC_VARIABLE_CONTENT":
value = iVar["var"]["value"]
variables.append({ "name": name, "value": value })
return variables

def getValues(self, varName):
variables = self.getVariables()
return [ x["value"] for x in variables if x["name"] == varName ]

def defines(self, varName):
variables = self.getVariables()
return any([ True for x in variables if x["name"] == varName ])

0 comments on commit 0fb9782

Please sign in to comment.