This repository has been archived by the owner on Nov 16, 2022. It is now read-only.
/
apiWrapper.py
90 lines (82 loc) · 5.27 KB
/
apiWrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from __future__ import print_function
import argparse
import sys
import subprocess
import time
from datetime import datetime
def processArguments():
parser = argparse.ArgumentParser(description = 'A Python wrapper to the Veracode Java API jar providing "break the build" functionality',
epilog = 'Any additional arguments will be passed through to the API jar.', allow_abbrev = False)
parser.add_argument('-n', '--name', type = str, help = 'Application name.')
parser.add_argument('-o', '--operation', type = str, help = 'API operating to execute. Options = UploadAndScan, CreateAndSubmitDynamicRescan', default = 'uploadandscan')
parser.add_argument('-a', '--apiWrapperPath', type = str, help = 'File path to Veracode API Java wrapper', default = './vosp-api-wrappers-java-19.6.5.8.jar')
parser.add_argument('-v', '--veracodeId', type = str, help = 'Veracode API credentials ID')
parser.add_argument('-s', '--veracodeSecret', type = str, help = 'Veracode API credentials secret')
parser.add_argument('-m', '--monitor', action = "store_true", help = 'Monitor the process until the policy compliance status has been determined.')
parser.add_argument('-wi', '--checkInterval', type = int, default = 60, help = 'Time interval in seconds between scan policy status checks, default = 60s')
parser.add_argument('-wm', '--maximumWait', type = int, default = 3600, help = 'Maximum time in seconds to wait for scan to complete, default = 3600s')
args, unparsed = parser.parse_known_args()
return args, unparsed
class VeracodeAPI():
def __init__(self, veracodeId, veracodeSecret, apiWrapperPath):
self.veracodeId = veracodeId
self.veracodeSecret = veracodeSecret
self.apiWrapperPath = apiWrapperPath
self.baseCommand = ['java', '-jar', self.apiWrapperPath, '-vid', self.veracodeId, '-vkey', self.veracodeSecret]
def log(self, linesToLog: str):
exLinesToLog = "{0} - {1}".format(datetime.now().strftime('[%y.%m.%d %H:%M:%S]'), linesToLog)
print(exLinesToLog, flush = True)
def processOutput(self, outputTextToProcess: str, startsWith: str, endsWith: str) -> str:
end_of_leader = outputTextToProcess.index(startsWith) + len(startsWith)
start_of_trailer = outputTextToProcess.index(endsWith, end_of_leader)
return outputTextToProcess[end_of_leader:start_of_trailer]
def runCommand(self, operation: str, extraArgs: list):
print(extraArgs)
commandToRun = self.baseCommand + ['-action', operation] + extraArgs
self.log('Running command: ' + ' '.join(commandToRun))
commandExecution = subprocess.run(commandToRun, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 0)
self.log(commandExecution.stdout.decode())
if commandExecution.returncode == 0:
try:
appId = self.processOutput(commandExecution.stdout.decode(), 'appid=', ')')
buildId = self.processOutput(commandExecution.stdout.decode(), 'The build_id of the new build is "', '"')
return appId, buildId
except ValueError as e:
self.log(e)
sys.exit(1)
else:
sys.exit(commandExecution.returncode)
return
def checkStatus(self, appId: str, buildId: str, checkInterval: int, maximumWait: int):
commandToRun = self.baseCommand + ['-action', 'GetBuildInfo', '-appid', appId, '-buildid', buildId]
totalTime = 0
while totalTime <= maximumWait:
time.sleep(checkInterval)
commandExecution = subprocess.run(commandToRun, stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
self.log('Checking scan status [' + str(totalTime // checkInterval) + '/' + str(maximumWait // checkInterval) + ']')
if 'results_ready="true"' in commandExecution.stdout.decode():
while True:
buildStatus = self.processOutput(commandExecution.stdout.decode(), 'policy_compliance_status="', '"')
if buildStatus not in ['Calculating...', 'Not Assessed']:
log('Scan complete, policy compliance status: ' + buildStatus)
if buildStatus in ['Conditional Pass', 'Pass']:
return 'Pass'
else:
return 'Fail'
else:
time.sleep(checkInterval)
commandExecution = subprocess.run(commandToRun, stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
log('Scan complete, checking policy status')
else:
totalTime = totalTime + checkInterval
self.log('Scan did not complete within maximum wait time.')
return 'Time exceeded'
args, unparsed = processArguments()
if args.veracodeId and args.veracodeSecret and args.apiWrapperPath:
vcApi = VeracodeAPI(args.veracodeId, args.veracodeSecret, args.apiWrapperPath)
appId, buildId = vcApi.runCommand(args.operation, unparsed)
if args.monitor and appId and buildId and args.checkInterval and args.maximumWait:
buildStatus = vcApi.checkStatus(appId, buildId, args.checkInterval, args.maximumWait)
print(buildStatus)
else:
print('One or more required arguments not provided')