This repository has been archived by the owner on Jan 2, 2024. It is now read-only.
/
dbtools.py
152 lines (123 loc) · 5.11 KB
/
dbtools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""Helper functions for managing the false positives database"""
"""
Copyright (c) 2014 F-Secure
See LICENSE for details
"""
import os
import datetime
import socket
import json
from sqlalchemy import create_engine, Table, Column, MetaData, exc, types
from sqlalchemy import sql, and_
def open_database(context):
"""Opens the database specified in the feature file and creates
tables if not already created
:param context: The Behave context
:return: A database handle, or None if no database in use
"""
if hasattr(context, 'dburl') is False:
return None # No false positives database is in use
dbconn = None
# Try to connect to the database
try:
db_engine = create_engine(context.dburl)
dbconn = db_engine.connect()
except (IOError, exc.OperationalError):
assert False, "Cannot connect to database '%s'" % context.dburl
# Set up the database table to store new findings and false positives.
# We use LargeBinary to store the message, because it can potentially
# be big.
db_metadata = MetaData()
db_metadata.bind = db_engine
context.headlessscanner_issues = Table(
'headlessscanner_issues',
db_metadata,
Column('new_issue', types.Boolean),
Column('issue_no', types.Integer, primary_key=True, nullable=False), # Implicit autoincrement
Column('timestamp', types.DateTime(timezone=True)),
Column('test_runner_host', types.Text),
Column('scenario_id', types.Text),
Column('url', types.Text),
Column('severity', types.Text),
Column('issuetype', types.Text),
Column('issuename', types.Text),
Column('issuedetail', types.Text),
Column('confidence', types.Text),
Column('host', types.Text),
Column('port', types.Text),
Column('protocol', types.Text),
Column('messages', types.LargeBinary)
)
# Create the table if it doesn't exist
# and otherwise no effect
db_metadata.create_all(db_engine)
return dbconn
def known_false_positive(context, issue):
"""Check whether a finding already exists in the database (usually
a "false positive" if it does exist)
:param context: The Behave context
:param issue: A finding from the scanner (see steps.py)
:return: True or False, depending on whether this is a known issue
"""
dbconn = open_database(context)
if dbconn is None:
# No false positive db is in use, all findings are treated as new
return False
# Check whether we already know about this. A finding is a duplicate if:
# - It has the same scenario id, AND
# - It was found in the same URL, AND
# - It has the same issue type.
db_select = sql.select([context.headlessscanner_issues]).where(
and_(
context.headlessscanner_issues.c.scenario_id == issue['scenario_id'], # Text
context.headlessscanner_issues.c.url == issue['url'], # Text
context.headlessscanner_issues.c.issuetype == issue['issuetype'])) # Text
db_result = dbconn.execute(db_select)
# If none found with these criteria, we did not know about this
if len(db_result.fetchall()) == 0:
return False # No, we did not know about this
db_result.close()
dbconn.close()
return True
def add_false_positive(context, issue):
"""Add a finding into the database as a new finding
:param context: The Behave context
:param response: An issue data structure (see steps.py)
"""
dbconn = open_database(context)
if dbconn is None:
# There is no false positive db in use, and we cannot store the data,
# so we will assert a failure.
assert False, "Issues were found in scan, but no false positive database is in use."
# Add the finding into the database
db_insert = context.headlessscanner_issues.insert().values(
new_issue=True, # Boolean
# The result from Burp Extender does not include a timestamp,
# so we add the current time
timestamp=datetime.datetime.utcnow(), # DateTime
test_runner_host=socket.gethostbyname(socket.getfqdn()), # Text
scenario_id=issue['scenario_id'], # Text
url=issue['url'], # Text
severity=issue['severity'], # Text
issuetype=issue['issuetype'], # Text
issuename=issue['issuename'], # Text
issuedetail=issue['issuedetail'], # Text
confidence=issue['confidence'], # Text
host=issue['host'], # Text
port=issue['port'], # Text
protocol=issue['protocol'], # Text
messages=json.dumps(issue['messages'])) # Blob
dbconn.execute(db_insert)
dbconn.close()
def number_of_new_in_database(context):
dbconn = open_database(context)
if dbconn is None: # No database in use
return 0
true_value = True # SQLAlchemy cannot have "is True" in where clause
db_select = sql.select([context.headlessscanner_issues]).where(
context.headlessscanner_issues.c.new_issue == true_value)
db_result = dbconn.execute(db_select)
findings = len(db_result.fetchall())
db_result.close()
dbconn.close()
return findings