Skip to content
Permalink
Browse files

Some linter fixes (#277)

* Some pylint fixes
  • Loading branch information
aarontp authored and Onager committed Oct 25, 2018
1 parent b65bc0d commit 4593d9351c7d4f1a41768188be38f0a6b5e9cfbf
@@ -19,5 +19,5 @@

if __name__ == '__main__':
subprocess.check_call([
'nosetests', '-vv', '--with-coverage', '--cover-package=turbinia', '--exe'
'nosetests', '-vv', '--with-coverage', '--cover-package=turbinia', '--exe'
])
@@ -37,38 +37,38 @@
import turbinia # pylint: disable=wrong-import-position

turbinia_description = (
'Turbinia is an open-source framework for deploying, managing, and running'
'forensic workloads on cloud platforms. It is intended to automate running of'
'common forensic processing tools (i.e. Plaso, TSK, strings, etc) to help'
'with processing evidence in the Cloud, scaling the processing of large'
'amounts of evidence, and decreasing response time by parallelizing'
'processing where possible.')
'Turbinia is an open-source framework for deploying, managing, and running'
'forensic workloads on cloud platforms. It is intended to automate running '
'of common forensic processing tools (i.e. Plaso, TSK, strings, etc) to '
'help with processing evidence in the Cloud, scaling the processing of '
'large amounts of evidence, and decreasing response time by parallelizing'
'processing where possible.')

setup(
name='turbinia',
version=turbinia.__version__,
description='Automation and Scaling of Digital Forensics Tools',
long_description=turbinia_description,
license='Apache License, Version 2.0',
url='http://turbinia.plumbing/',
maintainer='Turbinia development team',
maintainer_email='turbinia-dev@googlegroups.com',
classifiers=[
'Development Status :: 4 - Beta',
'Environment :: Console',
'Operating System :: OS Independent',
'Programming Language :: Python',
],
packages=find_packages(),
include_package_data=True,
zip_safe=False,
entry_points={"console_scripts": ["turbiniactl=turbinia.turbiniactl:main"]},
install_requires=[str(req.req) for req in parse_requirements(
'requirements.txt', session=PipSession())
],
extras_require={
'dev': ['mock', 'nose'],
'local': ['celery>=4.1.0', 'kombu>=4.1.0', 'redis>=2.10.6'],
'worker': ['plaso>=20171118']
}
name='turbinia',
version=turbinia.__version__,
description='Automation and Scaling of Digital Forensics Tools',
long_description=turbinia_description,
license='Apache License, Version 2.0',
url='http://turbinia.plumbing/',
maintainer='Turbinia development team',
maintainer_email='turbinia-dev@googlegroups.com',
classifiers=[
'Development Status :: 4 - Beta',
'Environment :: Console',
'Operating System :: OS Independent',
'Programming Language :: Python',
],
packages=find_packages(),
include_package_data=True,
zip_safe=False,
entry_points={'console_scripts': ['turbiniactl=turbinia.turbiniactl:main']},
install_requires=[str(req.req) for req in parse_requirements(
'requirements.txt', session=PipSession())
],
extras_require={
'dev': ['mock', 'nose'],
'local': ['celery>=4.1.0', 'kombu>=4.1.0', 'redis>=2.10.6'],
'worker': ['plaso>=20171118']
}
)
@@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Script for deploying cloud functions."""


from __future__ import print_function
@@ -20,10 +21,11 @@

for cloud_function in function_names:
print('Deploying function {0:s}'.format(cloud_function))
cmd = ('gcloud --project {0:s} beta functions deploy {1:s} --stage-bucket '
'{2:s} --region {3:s} --trigger-http'.format(config.PROJECT, cloud_function,
config.BUCKET_NAME,
config.TURBINIA_REGION))
cmd = (
'gcloud --project {0:s} beta functions deploy {1:s} --stage-bucket {2:s} '
'--region {3:s} --trigger-http'.format(
config.PROJECT, cloud_function, config.BUCKET_NAME,
config.TURBINIA_REGION))
print(subprocess.check_call(cmd, shell=True))

print('/nCreating Datastore index from {0:s}'.format(index_file))
@@ -36,6 +36,7 @@


def main():
"""Main function for config parser"""
if len(sys.argv) < 2:
print('%s <key name>' % sys.argv[0])
sys.exit(100)
@@ -42,7 +42,8 @@ def create_graph():


if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Create Turbinia evidence graph.')
parser = argparse.ArgumentParser(
description='Create Turbinia evidence graph.')
parser.add_argument('filename', type=unicode, help='where to save the file')
args = parser.parse_args()

@@ -307,7 +307,7 @@ class TurbiniaCeleryClient(TurbiniaClient):
redis (RedisStateManager): Redis datastore object
"""

def __init__(self, *args, **kwargs):
def __init__(self, *_, **__):
super(TurbiniaCeleryClient, self).__init__()
self.redis = RedisStateManager()

@@ -375,7 +375,7 @@ class TurbiniaCeleryWorker(TurbiniaClient):
worker (celery.app): Celery worker app
"""

def __init__(self, *args, **kwargs):
def __init__(self, *_, **__):
"""Initialization for Celery worker."""
super(TurbiniaCeleryWorker, self).__init__()
check_directory(config.MOUNT_DIR_PREFIX)
@@ -19,7 +19,6 @@
import unittest
import os
import shutil
import stat
import tempfile

import mock
@@ -46,4 +46,4 @@ def create_tasks(self, evidence):
tasks = [HadoopAnalysisTask() for _ in evidence]
return tasks

manager.JobsManager.RegisterJob(HadoopAnalysisJob)
manager.JobsManager.RegisterJob(HadoopAnalysisJob)
@@ -78,4 +78,4 @@ def create_tasks(self, evidence):
return [wordpress.WordpressAccessLogAnalysisTask() for _ in evidence]

manager.JobsManager.RegisterJobs(
[HTTPAccessLogExtractionJob, HTTPAccessLogAnalysisJob])
[HTTPAccessLogExtractionJob, HTTPAccessLogAnalysisJob])
@@ -29,7 +29,7 @@ class JenkinsAnalysisJob(interface.TurbiniaJob):
"""Jenkins analysis job."""

evidence_input = [
Directory, RawDisk, GoogleCloudDisk, GoogleCloudDiskRawEmbedded]
Directory, RawDisk, GoogleCloudDisk, GoogleCloudDiskRawEmbedded]
evidence_output = [ReportText]

NAME = 'JenkinsAnalysisJob'
@@ -235,6 +235,7 @@ class LocalOutputWriter(OutputWriter):

NAME = 'LocalWriter'

# pylint: disable=keyword-arg-before-vararg
def __init__(self, base_output_dir=None, *args, **kwargs):
super(LocalOutputWriter, self).__init__(base_output_dir=base_output_dir,
*args, **kwargs)
@@ -94,6 +94,7 @@ def setUp(self):
request = getTurbiniaRequest()
self.pubsub = pubsub.TurbiniaPubSub('fake_topic')
pub_sub_message = MockPubSubMessage(request.to_json(), 'msg id')
# pylint: disable=protected-access
self.pubsub._queue.put(pub_sub_message)
self.pubsub.topic_path = 'faketopicpath'

@@ -114,6 +115,7 @@ def testBadCheckMessages(self):
"""Test check_messages returns empty list for an invalid message."""
pub_sub_message = MockPubSubMessage('non-json-data', 'msg id2')
# Clear the queue so we can add an invalid message
# pylint: disable=protected-access
self.pubsub._queue.get()
self.pubsub._queue.put(pub_sub_message)

@@ -131,6 +133,7 @@ class TestTurbiniaKombu(unittest.TestCase):
"""Test turbinia.pubsub Kombu module."""

def setUp(self):
"""Sets up test class."""
request = getTurbiniaRequest()
self.kombu = celery.TurbiniaKombu('fake_topic')
result = mock.MagicMock()
@@ -141,6 +144,7 @@ def setUp(self):
result, queue.Empty('Empty Queue')]

def testCheckMessages(self):
"""Test check_messages method."""
results = self.kombu.check_messages()
self.assertTrue(len(results) == 1)
request_new = results[0]
@@ -153,6 +157,7 @@ def testCheckMessages(self):
self.assertEqual(request_new.evidence[0].name, 'My Evidence')

def testBadCheckMessages(self):
"""Test check_messages method with non-json data."""
result = mock.MagicMock()
result.payload = 'non-json-data'
self.kombu.queue.get.side_effect = [result, queue.Empty('Empty Queue')]
@@ -52,6 +52,7 @@ def get_state_manager():
Initialized StateManager object.
"""
config.LoadConfig()
# pylint: disable=no-else-return
if config.STATE_MANAGER == 'Datastore':
return DatastoreStateManager()
elif config.STATE_MANAGER == 'redis':
@@ -226,6 +227,7 @@ def get_task_data(self, instance, days=0, task_id=None, request_id=None):
if json.loads(self.client.get(task)).get('instance') == instance or
not instance
]
# pylint: disable=no-else-return
if days:
start_time = datetime.now() - timedelta(days=days)
# Redis only supports strings; we convert to/from datetime here and below
@@ -94,6 +94,7 @@ def testStateManagerGetTaskDict(self, _):
def testStateManagerValidateDataValidDict(self, _):
"""Test State Manger _validate_data() base case."""
self.state_manager = self._get_state_manager()
# pylint: disable=protected-access
test_data = self.state_manager._validate_data(self.test_data)
self.assertDictEqual(test_data, self.test_data)

@@ -103,6 +104,7 @@ def testStateManagerValidateDataInvalidDict(self, _):
self.state_manager = self._get_state_manager()
invalid_dict = copy.deepcopy(self.test_data)
invalid_dict['status'] = 'A' * state_manager.MAX_DATASTORE_STRLEN + 'BORKEN'
# pylint: disable=protected-access
test_data = self.state_manager._validate_data(invalid_dict)
self.assertListEqual(list(test_data.keys()), list(self.test_data.keys()))
self.assertNotEqual(test_data['status'], self.test_data['status'])
@@ -49,6 +49,7 @@ def get_task_manager():
Initialized TaskManager object.
"""
config.LoadConfig()
# pylint: disable=no-else-return
if config.TASK_MANAGER == 'PSQ':
return PSQTaskManager()
elif config.TASK_MANAGER == 'Celery':
@@ -347,6 +348,7 @@ def __init__(self):
config.LoadConfig()
super(PSQTaskManager, self).__init__()

# pylint: disable=keyword-arg-before-vararg
def _backend_setup(self, server=True, *args, **kwargs):
"""
Args:
@@ -41,6 +41,7 @@


def main():
"""Main function for turbiniactl"""
# TODO(aarontp): Allow for single run mode when
# by specifying evidence which will also terminate the task manager after
# evidence has been processed.
@@ -17,19 +17,17 @@
from __future__ import unicode_literals

from datetime import datetime
import errno
import filelock
import getpass
import json
import logging
import os
import pickle
import platform
import subprocess
import time
import traceback
import uuid

import filelock

from turbinia import config
from turbinia import output_manager
from turbinia import TurbiniaException
@@ -23,6 +23,7 @@


class JenkinsAnalysisTaskTest(unittest.TestCase):
"""Test class for JenkinsAnalysisTask"""

JENKINS_SYSTEM_CONFIG = """<?xml version='1.1' encoding='UTF-8'?>
<hudson>
@@ -111,8 +112,9 @@ def test_extract_jenkins_version(self):
config.LoadConfig()
task = jenkins.JenkinsAnalysisTask()

# pylint: disable=protected-access
version = task._extract_jenkins_version(
str(self.JENKINS_SYSTEM_CONFIG))
str(self.JENKINS_SYSTEM_CONFIG))

self.assertEqual(version, self.EXPECTED_VERSION)

@@ -121,8 +123,9 @@ def test_extract_jenkins_credentials(self):
config.LoadConfig()
task = jenkins.JenkinsAnalysisTask()

# pylint: disable=protected-access
credentials = task._extract_jenkins_credentials(
str(self.JENKINS_USER_CONFIG))
str(self.JENKINS_USER_CONFIG))

self.assertEqual(credentials, self.EXPECTED_CREDENTIALS)

@@ -59,5 +59,6 @@ def testAnalyzeHadoopAppRoot(self):
config.LoadConfig()
task = hadoop.HadoopAnalysisTask()
self.maxDiff = None
# pylint: disable=protected-access
report = '\n'.join(task._AnalyzeHadoopAppRoot([self.test_file]))
self.assertEqual(report.replace(self.filedir, ''), self._EXPECTED_REPORT)

0 comments on commit 4593d93

Please sign in to comment.
You can’t perform that action at this time.