This repository has been archived by the owner on Mar 15, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 19
/
vt_query.py
94 lines (74 loc) · 3.21 KB
/
vt_query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
''' VTQuery worker '''
import os
import requests
import collections
import ConfigParser
import pprint
class VTQuery(object):
''' This worker query Virus Total, an apikey needs to be provided '''
dependencies = ['meta']
def __init__(self):
''' VTQuery Init'''
# Grab API key from configuration file
config_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../server/config.ini')
conf = ConfigParser.ConfigParser()
conf.read(config_path)
self.apikey = conf.get('workbench', 'vt_apikey')
# Make sure key isn't the dummy value
if self.apikey == '123':
raise RuntimeError('VTQuery: Invalid api_key, put your VT api key in the config.ini file.')
# Change this if you want these fields
self.exclude = ['scan_id', 'md5', 'sha1', 'sha256', 'resource', 'response_code', 'permalink',
'verbose_msg', 'scans']
def execute(self, input_data):
''' Execute the VTQuery worker '''
md5 = input_data['meta']['md5']
response = requests.get('https://www.virustotal.com/vtapi/v2/file/report',
params={'apikey':self.apikey,'resource':md5, 'allinfo':1})
# Make sure we got a json blob back
try:
vt_output = response.json()
except ValueError:
return {'vt_error': 'VirusTotal Query Error, no valid response... past per min quota?'}
# Just pull some of the fields
output = {field:vt_output[field] for field in vt_output.keys() if field not in self.exclude}
# Check for not-found
not_found = False if output else True
# Add in file_type
output['file_type'] = input_data['meta']['file_type']
# Toss back a not found
if not_found:
output['not_found'] = True
return output
# Organize the scans fields
scan_results = collections.Counter()
for scan in vt_output['scans'].values():
if 'result' in scan:
if scan['result']:
scan_results[scan['result']] += 1
output['scan_results'] = scan_results.most_common(5)
return output
# Unit test: Create the class, the proper input and run the execute() method for a test
def test():
''' -- vt_query.py test -- '''
# This worker test requires a local server running
import zerorpc
workbench = zerorpc.Client()
workbench.connect("tcp://127.0.0.1:4242")
# Generate input for the worker
data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'../data/pdf/bad/067b3929f096768e864f6a04f04d4e54')
md5 = workbench.store_sample('bad_pdf', open(data_path, 'rb').read(), 'pdf')
input_data = workbench.get_sample(md5)
input_data.update(workbench.work_request('meta', md5))
# Execute the worker (unit test)
worker = VTQuery()
output = worker.execute(input_data)
print '\n<<< Unit Test >>>'
pprint.pprint(output)
# Execute the worker (server test)
output = workbench.work_request('vt_query', md5)
print '\n<<< Server Test >>>'
pprint.pprint(output)
if __name__ == "__main__":
test()