Skip to content

Commit

Permalink
Merge pull request #24 from anikobartos/master
Browse files Browse the repository at this point in the history
OPSWAT Filescan Sandbox Integration
  • Loading branch information
battleoverflow authored Nov 8, 2023
2 parents a6dd33c + 78d3463 commit af7500b
Show file tree
Hide file tree
Showing 9 changed files with 12,743 additions and 92 deletions.
22 changes: 13 additions & 9 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ This library currently supports the following sandbox systems:
* `FireEye AX Series`_
* `Hatching Triage`_
* `Joe Sandbox`_
* `OPSWAT Sandbox`_
* `OPSWAT Filescan Sandbox`_
* `VMRay Analyzer`_
* `WildFire Sandbox`_

Expand Down Expand Up @@ -242,20 +242,23 @@ Example::
Currently, only the WildFire cloud sandbox is supported and not the WildFire appliance.


OPSWAT Sandbox
~~~~~~~~~~~~~~
OPSWAT Filescan Sandbox
~~~~~~~~~~~~~~~~~~~~~~~

Constructor signature::

OpswatAPI(apikey, profile, verify_ssl=True)
OPSWATSandboxAPI(api_key, url=None, verify_ssl=True)

Example::

OpswatAPI(apikey, 'windows7')
OPSWATSandboxAPI('mykey')

OPSWAT sandbox on MetaDefender Cloud. Please create an account on `OPSWAT portal`_ to receive a free MetaDefender Cloud apikey.
OPSWAT Filescan Sandbox. You can use the Activation Key that you received
from your OPSWAT Sales Representative, and follow the instructions on the
`OPSWAT Licence Activation`_ page or you can create an API key on the
`OPSWAT Filescan Community Site`_ under API Key tab.

More details in the `OPSWAT API documentation`_.
More details in the `OPSWAT Filescan Sandbox API documentation`_.


Hatching Triage
Expand Down Expand Up @@ -297,8 +300,9 @@ number of online analysis services.
.. _AX Series product page: https://www.fireeye.com/products/malware-analysis.html
.. _official Joe Sandbox library: https://github.com/joesecurity/joesandboxcloudapi
.. _official Falcon library: https://github.com/PayloadSecurity/VxAPI
.. _OPSWAT portal: https://go.opswat.com
.. _OPSWAT API documentation: https://onlinehelp.opswat.com/mdcloud/10._Dynamic_analysis.html
.. _OPSWAT Licence Activation: https://docs.opswat.com/filescan/installation/license-activation
.. _OPSWAT Filescan Community Site: https://www.filescan.io/users/profile
.. _OPSWAT Filescan Sandbox API documentation: https://docs.opswat.com/filescan/opswat-filescan
.. _malsub: https://github.com/diogo-fernan/malsub
.. _Triage public cloud: https://tria.ge/
.. _Triage API documentation: https://tria.ge/docs/
1 change: 1 addition & 0 deletions sandboxapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
'fireeye',
'joe',
'triage',
'opswat',
'vmray',
'falcon',
'wildfire',
Expand Down
188 changes: 105 additions & 83 deletions sandboxapi/opswat.py
Original file line number Diff line number Diff line change
@@ -1,88 +1,96 @@
from __future__ import print_function

import sandboxapi
import sys
import time
import json

from requests.auth import HTTPBasicAuth

import sandboxapi
class OPSWATSandboxAPI(sandboxapi.SandboxAPI):
"""OPSWAT Filescan Sandbox API wrapper."""

class OpswatAPI(sandboxapi.SandboxAPI):
"""Opswat Sandbox API wrapper."""
def __init__(
self, api_key, url="https://www.filescan.io", verify_ssl=True, **kwargs
):
"""Initialize the interface to OPSWAT Filescan Sandbox API.
:type api_key: str
:param api_key: OPSWAT Filescan Sandbox API key
def __init__(self, apikey, profile, verify_ssl=True, **kwargs):
"""Initialize the interface to Opswat Sandbox API."""
:type url str
:param url The url (including the port) of the OPSWAT Filescan Sandbox
instance defaults to https://www.filescan.io
"""
sandboxapi.SandboxAPI.__init__(self, **kwargs)

self.api_url = "https://api.metadefender.com/v4"
self.profile = profile or 'windows7'
self.api_token = apikey
self.api_key = api_key
self.api_url = url
self.headers = {"X-Api-Key": self.api_key}
self.verify_ssl = verify_ssl

def analyze(self, handle, filename):
def analyze(self, handle, filename, password=None, is_private=False):
"""Submit a file for analysis.
:type handle: File handle
:param handle: Handle to file to upload for analysis.
:type filename: str
:param filename: File name.
:type password: str
:param password: Custom password, in case uploaded archive is protected.
:type is_private: boolean
:param is_private: If file should not be available for download by other users.
:rtype: str
:return: SHA256 as a string
:return: flow_id as a string
"""

if not self.api_token:
raise sandboxapi.SandboxError("Missing token")
if not self.api_key:
raise sandboxapi.SandboxError("Missing API key")

# multipart post files.
files = {"file": (filename, handle)}

# ensure the handle is at offset 0.
handle.seek(0)

# add submission options
headers = {
'apikey': self.api_token,
'sandbox': self.profile
}

try:
response = self._request("/file", method='POST', headers=headers, files=files)
if response.status_code == 200:
# good response
try:
if 'sha256' in response.json():
sha256 = response.json()['sha256']
response = self._request(
"/hash/{sha256}/sandbox".format(sha256=sha256), headers=headers)
if "scan_in_progress" in response.json():
return response.json()['scan_in_progress']
except (ValueError, KeyError) as e:
raise sandboxapi.SandboxError("error in analyze: {e}".format(e=e))
else:
raise sandboxapi.SandboxError("api error in analyze ({u}): {r}".format(u=response.url, r=response.content))
params = {"password": password, "is_private": is_private}

response = self._request(
"/api/scan/file",
method="POST",
params=params,
headers=self.headers,
files=files,
)

if response.status_code == 200 and response and response.json():
# send file, get flow_id
if "flow_id" in response.json():
return response.json()["flow_id"]

raise sandboxapi.SandboxError(
"api error in analyze ({u}): {r}".format(
u=response.url, r=response.content
)
)
except (ValueError, KeyError) as e:
raise sandboxapi.SandboxError("error in analyze: {e}".format(e=e))

def check(self, item_id):
"""Check if an analysis is complete.
:type item_id: str
:param item_id: SHA256 to check.
:param item_id: flow_id to check.
:rtype: bool
:return: Boolean indicating if a report is done or not.
"""
response = self._request(
"/sandbox/{sandbox_id}".format(sandbox_id=item_id))
response = self._request("/api/scan/{flow_id}/report".format(flow_id=item_id))

if response.status_code == 404:
# unknown id
return False

try:
if "scan_in_progress" not in response.json() and "scan_results" in response.json():
if "allFinished" in response.json() and response.json()["allFinished"]:
return True

except ValueError as e:
Expand All @@ -91,7 +99,7 @@ def check(self, item_id):
return False

def is_available(self):
"""Determine if the Opswat API server is alive.
"""Determine if the OPSWAT Filescan Sandbox API server is alive.
:rtype: bool
:return: True if service is available, False otherwise.
Expand All @@ -106,13 +114,12 @@ def is_available(self):
# otherwise, we have to check with the cloud.
else:
try:
response = self._request("/status")
response = self._request("/api/users/me", headers=self.headers)

# we've got opswat.
if response.status_code == 200:
if response.status_code == 200 and "accountId" in response.json():
self.server_available = True
return True

except sandboxapi.SandboxError:
pass

Expand All @@ -125,7 +132,7 @@ def report(self, item_id, report_format="json"):
Available formats include: json.
:type item_id: str
:param item_id: SHA256 number
:param item_id: flow_id number
:type report_format: str
:param report_format: Return format
Expand All @@ -136,92 +143,107 @@ def report(self, item_id, report_format="json"):
if report_format == "html":
return "Report Unavailable"

headers = {
'apikey': self.api_token,
}
filters = [
"filter=general",
"filter=finalVerdict",
"filter=allTags",
"filter=overallState",
"filter=taskReference",
"filter=subtaskReferences",
"filter=allSignalGroups",
"filter=iocs"
]

# else we try JSON
response = self._request(
"/sandbox/{sandbox_id}".format(sandbox_id=item_id), headers=headers)
postfix = "&".join(filters)
url_suffix = "/api/scan/{flow_id}/report?{postfix}".format(
flow_id=item_id, postfix=postfix
)

response = self._request(url_suffix, headers=self.headers)

# if response is JSON, return it as an object
try:
return response.json()
except ValueError:
pass

# otherwise, return the raw content.
return response.content
return response.content.decode("utf-8")

def score(self, report):
"""Pass in the report from self.report(), get back an int."""
score = 0
if report['analysis']['infection_score']:
score = report['analysis']['infection_score']

report_scores = [0]
reports = report.get("reports", {})
for report_value in reports.values():
score = 0
threat_level = report_value.get("finalVerdict", {}).get("threatLevel", 0)
report_scores.append(max(0, threat_level) * 100)

score = max(report_scores)
return score


def opswat_loop(opswat, filename):
# test run
with open(arg, "rb") as handle:
sandbox_id = opswat.analyze(handle, filename)
print("file {f} submitted for analysis, id {i}".format(
f=filename, i=sandbox_id))
flow_id = opswat.analyze(handle, filename)
print("file {f} submitted for analysis, id {i}".format(f=filename, i=flow_id))

while not opswat.check(sandbox_id):
while not opswat.check(flow_id):
print("not done yet, sleeping 10 seconds...")
time.sleep(10)

print("analysis complete. fetching report...")
print(opswat.report(sandbox_id))
print("Analysis complete. fetching report...")
print(opswat.report(flow_id))


if __name__ == "__main__":

def usage():
msg = "%s: apikey <submit <fh> | available | report <id> | analyze <fh>"
msg = "%s: <filescan_url> <api_key> <submit <file_path> | available | report <flow_id> | score <report> | analyze <file_path>"
print(msg % sys.argv[0])
sys.exit(1)

if len(sys.argv) == 2:
cmd = None
api_key = None
url = None

if len(sys.argv) == 4:
cmd = sys.argv.pop().lower()
apikey = sys.argv.pop()
api_key = sys.argv.pop()
url = sys.argv.pop()
arg = None

elif len(sys.argv) >= 3:
elif len(sys.argv) == 5:
arg = sys.argv.pop()
cmd = sys.argv.pop().lower()
apikey = sys.argv.pop()

api_key = sys.argv.pop()
url = sys.argv.pop()

else:
usage()

# instantiate Opswat Sandbox API interface.
opswat = OpswatAPI(apikey, 'windows7')
opswat = OPSWATSandboxAPI(api_key, url)

if arg is None and "available" not in cmd:
usage()

# process command line arguments.
if "submit" in cmd:
if arg is None:
usage()
else:
with open(arg, "rb") as handle:
print(opswat.analyze(handle, arg))
with open(arg, "rb") as handle:
print(opswat.analyze(handle, arg))

elif "available" in cmd:
print(opswat.is_available())

elif "report" in cmd:
if arg is None:
usage()
else:
print(opswat.report(arg))
print(opswat.report(arg))

elif "analyze" in cmd:
if arg is None:
usage()
else:
opswat_loop(opswat, arg)
opswat_loop(opswat, arg)

elif "score" in cmd:
score = opswat.score(arg)
print(score)

else:
usage()
Loading

0 comments on commit af7500b

Please sign in to comment.