This repository has been archived by the owner on Mar 24, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #153 from alphagov/scan-files-on-upload
Scan files on upload
- Loading branch information
Showing
6 changed files
with
116 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import hashlib | ||
import subprocess | ||
import os | ||
|
||
|
||
class VirusSignatureError(StandardError): | ||
def __init__(self, message): | ||
self.message = message | ||
|
||
|
||
class ScannedFile(object): | ||
def __init__(self, file_object): | ||
self.file_object = file_object | ||
self._virus_signature = False | ||
self._file_path = '/tmp/{0}'.format( | ||
os.path.basename(self.file_object.filename)) | ||
|
||
@property | ||
def has_virus_signature(self): | ||
self._save_file_to_disk() | ||
self._scan_file() | ||
self._clean_up() | ||
return self._virus_signature | ||
|
||
def _save_file_to_disk(self): | ||
self.file_object.save(self._file_path) | ||
|
||
def _scan_file(self): | ||
self._virus_signature = (self._virus_signature or | ||
bool(subprocess.call(["clamscan", | ||
self._file_path]))) | ||
|
||
def _clean_up(self): | ||
# Remove temporary file | ||
os.remove(self._file_path) | ||
# Reset stream position on file_object so that it can be read again | ||
self.file_object.seek(0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
import unittest | ||
from hamcrest import assert_that, is_ | ||
from mock import Mock | ||
from backdrop.write.scanned_file import ScannedFile | ||
import subprocess | ||
import os | ||
from tests.support.file_upload_test_case import FileUploadTestCase | ||
|
||
|
||
class TestScannedFile(FileUploadTestCase): | ||
|
||
def setUp(self): | ||
self.file_object = self._file_storage_wrapper("This is a test", "abc.txt") | ||
self.scanner = ScannedFile(self.file_object) | ||
|
||
def tearDown(self): | ||
try: | ||
os.remove('/tmp/abc.txt') | ||
except OSError: | ||
pass | ||
|
||
def test_has_virus_signature(self): | ||
self.scanner._virus_signature = True | ||
self.scanner._save_file_to_disk = Mock() | ||
self.scanner._scan_file = Mock() | ||
self.scanner._clean_up = Mock() | ||
assert_that(self.scanner.has_virus_signature, is_(True)) | ||
self.scanner._save_file_to_disk.assert_called_once_with() | ||
self.scanner._scan_file.assert_called_once_with() | ||
self.scanner._clean_up.assert_called_once_with() | ||
|
||
def test_saving_a_file_to_disk(self): | ||
self.scanner._save_file_to_disk() | ||
assert_that(file('/tmp/abc.txt').read(), is_("This is a test")) | ||
|
||
def test_cleaning_up_after_scanning(self): | ||
self.file_object.save('/tmp/abc.txt') | ||
self.scanner._clean_up() | ||
assert_that(os.path.exists('/tmp/abc.txt'), is_(False)) | ||
|
||
def test_scanning_a_file(self): | ||
mock_call = Mock(return_value = True) | ||
subprocess.call = mock_call | ||
self.scanner._scan_file() | ||
assert_that(self.scanner._virus_signature, is_(True)) | ||
mock_call.assert_called_once_with(["clamscan", "/tmp/abc.txt"]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters