Skip to content

Commit

Permalink
Merge branch 'develop' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
bsummers-tc committed Sep 4, 2020
2 parents ccc2379 + a329fe6 commit b5e32fb
Show file tree
Hide file tree
Showing 425 changed files with 13,179 additions and 5,219 deletions.
4 changes: 3 additions & 1 deletion .pre-commit-config.yaml
Expand Up @@ -16,7 +16,6 @@ repos:
- id: debug-statements
- id: double-quote-string-fixer
- id: end-of-file-fixer
- id: fix-encoding-pragma
- id: mixed-line-ending
- id: pretty-format-json
args:
Expand Down Expand Up @@ -66,6 +65,9 @@ repos:
rev: v2.7.1
hooks:
- id: pyupgrade
args:
- --py3-plus
- --py36-plus
- repo: https://github.com/yunojuno/pre-commit-xenon
rev: v0.1
hooks:
Expand Down
1 change: 0 additions & 1 deletion app_init/__main__.py
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Legacy "main" file for running App"""
# first-party
from run import run
Expand Down
7 changes: 3 additions & 4 deletions app_init/app_lib.py
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Class to set App lib directory for current version of Python"""
# standard library
import os
Expand All @@ -17,7 +16,7 @@ def __init__(self):
self.lib_minor_version = f'{self.lib_major_version}.{sys.version_info.minor}'
self.lib_micro_version = f'{self.lib_minor_version}.{sys.version_info.micro}'

def find_lib_directory(self):
def find_lib_directory(self) -> str:
"""Find the optimal lib directory."""
lib_directory = None
if self.lib_micro_version in self.lib_directories:
Expand All @@ -38,7 +37,7 @@ def find_lib_directory(self):
return lib_directory

@property
def lib_directories(self):
def lib_directories(self) -> list:
"""Return all "lib_" directories."""
if self._lib_directories is None:
self._lib_directories = []
Expand All @@ -50,7 +49,7 @@ def lib_directories(self):
self._lib_directories.append(c)
return sorted(self._lib_directories, reverse=True)

def update_path(self):
def update_path(self) -> None:
"""Update sys path to ensure all required modules can be found.
All Apps must be able to access included modules, this method will ensure that the system
Expand Down
3 changes: 1 addition & 2 deletions app_init/external/app.py
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""ThreatConnect External App"""

# first-party
Expand All @@ -8,7 +7,7 @@
class App(ExternalApp):
"""External App"""

def run(self):
def run(self) -> None:
"""Run the App main logic.
This method should contain the core logic of the App.
Expand Down
16 changes: 9 additions & 7 deletions app_init/external/external_app.py
@@ -1,30 +1,32 @@
# -*- coding: utf-8 -*-
"""External App Template."""
from tcex import TcEx


class ExternalApp:
"""Get the owners and indicators in the given owner."""

def __init__(self, _tcex):
def __init__(self, _tcex: TcEx):
"""Initialize class properties."""
self.tcex = _tcex
self.tcex: TcEx = _tcex

# properties
self.args = None
self.exit_message = 'Success'
self.args = self.tcex.args
self.args: object = self.tcex.args

def run(self):
def run(self) -> None:
"""Run the App main logic."""
self.tcex.log.info('No run logic provided.')

def setup(self):
def setup(self) -> None:
"""Perform prep/setup logic."""
# run legacy method
if hasattr(self, 'start'):
self.tcex.log.warning('calling legacy start method')
self.start() # pylint: disable=no-member
self.tcex.log.trace('setup')

def teardown(self):
def teardown(self) -> None:
"""Perform cleanup/teardown logic."""
# run legacy method
if hasattr(self, 'done'):
Expand Down
5 changes: 2 additions & 3 deletions app_init/external/run.py
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Job App"""
# standard library
import os
Expand All @@ -8,7 +7,7 @@
from app_lib import AppLib


def run():
def run() -> None:
"""Update path and run the App."""

# update the path to ensure the App has access to required modules
Expand Down Expand Up @@ -36,7 +35,7 @@ def run():
app.setup()

# run the App logic
app.run()
app.run() # pylint: disable=no-member

# perform cleanup/teardown operations
app.teardown()
Expand Down
19 changes: 9 additions & 10 deletions app_init/external_ingress/app.py
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""ThreatConnect Job App"""
# standard library
import csv
Expand All @@ -10,24 +9,24 @@
class App(ExternalApp):
"""External App"""

def __init__(self, _tcex):
def __init__(self, _tcex: object):
"""Initialize class properties."""
super().__init__(_tcex)
self.batch = None
self.url = 'https://feodotracker.abuse.ch/downloads/malware_hashes.csv'

def run(self):
def run(self) -> None:
"""Run main App logic."""
self.batch = self.tcex.batch(self.args.tc_owner)
self.batch: object = self.tcex.batch(self.args.tc_owner)

# using tcex requests to get built-in features (e.g., proxy, logging, retries)
with self.tcex.session_external as s:
r = s.get(self.url)
r: object = s.get(self.url)

if r.ok:
decoded_content = r.content.decode('utf-8').splitlines()
decoded_content: str = r.content.decode('utf-8').splitlines()

reader = csv.reader(decoded_content, delimiter=',', quotechar='"')
reader: object = csv.reader(decoded_content, delimiter=',', quotechar='"')
for row in reader:
# CSV headers
# Firstseen,MD5hash,Malware
Expand All @@ -37,16 +36,16 @@ def run(self):
continue

# create batch entry
file_hash = self.batch.file(row[1], rating='4.0', confidence='100')
file_hash: object = self.batch.file(row[1], rating='4.0', confidence='100')
file_hash.tag(row[2])
occurrence = file_hash.occurrence()
occurrence: object = file_hash.occurrence()
occurrence.date = row[0]
self.batch.save(file_hash) # optionally save object to disk
else:
self.tcex.exit(1, 'Failed to download CSV data.')

# submit batch job(s)
batch_status = self.batch.submit_all()
batch_status: list = self.batch.submit_all()
self.tcex.log.debug(batch_status)

# self.exit_message = f'Downloaded and created {self.batch.indicator_len} file hashes.'
3 changes: 2 additions & 1 deletion app_init/ftl_templates/args.py.ftl
@@ -1,10 +1,11 @@
# -*- coding: utf-8 -*-
"""Auto-generated App Args"""
from argparse import ArgumentParser

class Args:
"""App Args"""

def __init__(self, parser):
def __init__(self, parser: ArgumentParser):
"""Initialize class properties."""
<#list install.params as param>
<#if param.type == "Boolean">
Expand Down
1 change: 1 addition & 0 deletions app_init/gitignore
Expand Up @@ -56,6 +56,7 @@ junit.xml
py36.xml
py37.xml
py38.xml
dump.rdb

#-------------------------------------------------
# Other Nonsense
Expand Down
3 changes: 1 addition & 2 deletions app_init/job/app.py
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""ThreatConnect Job App"""

# first-party
Expand All @@ -8,7 +7,7 @@
class App(JobApp):
"""Job App"""

def run(self):
def run(self) -> None:
"""Run the App main logic.
This method should contain the core logic of the App.
Expand Down
4 changes: 2 additions & 2 deletions app_init/job/args.py
@@ -1,9 +1,9 @@
# -*- coding: utf-8 -*-
"""Job Args"""
from argparse import ArgumentParser


class Args:
"""Job Args"""

def __init__(self, parser):
def __init__(self, parser: ArgumentParser):
"""Initialize class properties."""
17 changes: 9 additions & 8 deletions app_init/job/job_app.py
@@ -1,39 +1,40 @@
# -*- coding: utf-8 -*-
"""Job App Template."""
# first-party
from args import Args

from tcex import TcEx


class JobApp:
"""Get the owners and indicators in the given owner."""

def __init__(self, _tcex):
def __init__(self, _tcex: TcEx):
"""Initialize class properties."""
self.tcex = _tcex
self.tcex: TcEx = _tcex
self.args = None
self.exit_message = 'Success'

# automatically parse args on init
self.parse_args()

def parse_args(self):
def parse_args(self) -> None:
"""Parse CLI args."""
Args(self.tcex.parser)
self.args = self.tcex.args
self.args: object = self.tcex.args

def run(self):
def run(self) -> None:
"""Run the App main logic."""
self.tcex.log.info('No run logic provided.')

def setup(self):
def setup(self) -> None:
"""Perform prep/setup logic."""
# run legacy method
if hasattr(self, 'start'):
self.tcex.log.warning('calling legacy start method')
self.start() # pylint: disable=no-member
self.tcex.log.trace('setup')

def teardown(self):
def teardown(self) -> None:
"""Perform cleanup/teardown logic."""
# run legacy method
if hasattr(self, 'done'):
Expand Down
3 changes: 1 addition & 2 deletions app_init/job/run.py
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Job App"""
# standard library
import traceback
Expand All @@ -8,7 +7,7 @@


# pylint: disable=no-member
def run():
def run() -> None:
"""Update path and run the App."""

# update the path to ensure the App has access to required modules
Expand Down
22 changes: 13 additions & 9 deletions app_init/job_batch/app.py
@@ -1,11 +1,13 @@
# -*- coding: utf-8 -*-
"""ThreatConnect Job App"""
# standard library
import csv

# first-party
from job_app import JobApp # Import default Job App Class (Required)

from tcex.batch import Batch
from tcex.sessions import ExternalSession


class App(JobApp):
"""Job App"""
Expand All @@ -15,13 +17,13 @@ def __init__(self, _tcex):
super().__init__(_tcex)

# properties
self.batch = self.tcex.batch(self.args.tc_owner)
self.batch: Batch = self.tcex.batch(self.args.tc_owner)
self.session = None

def setup(self):
"""Perform prep/setup logic."""
# using tcex session_external to get built-in features (e.g., proxy, logging, retries)
self.session = self.tcex.session_external
self.session: ExternalSession = self.tcex.session_external

# setting the base url allow for subsequent API call to be made by only
# providing the API endpoint/path.
Expand All @@ -34,9 +36,9 @@ def run(self):
r = s.get('downloads/malware_hashes.csv')

if r.ok:
decoded_content = r.content.decode('utf-8').splitlines()
decoded_content: str = r.content.decode('utf-8').splitlines()

reader = csv.reader(decoded_content, delimiter=',', quotechar='"')
reader: object = csv.reader(decoded_content, delimiter=',', quotechar='"')
for row in reader:
# CSV headers
# Firstseen,MD5hash,Malware
Expand All @@ -46,19 +48,21 @@ def run(self):
continue

# create batch entry
indicator_value = row[1]
file_hash = self.batch.file(indicator_value, rating='4.0', confidence='100')
indicator_value: str = row[1]
file_hash: object = self.batch.file(
indicator_value, rating='4.0', confidence='100'
)
file_hash.tag(row[2])

# add occurrence to batch entry
occurrence = file_hash.occurrence()
occurrence: object = file_hash.occurrence()
occurrence.date = row[0]
self.batch.save(file_hash) # optionally save object to disk
else:
self.tcex.exit(1, 'Failed to download CSV data.')

# submit batch job
batch_status = self.batch.submit_all()
batch_status: list = self.batch.submit_all()
print(batch_status)

self.exit_message = ( # pylint: disable=attribute-defined-outside-init
Expand Down
4 changes: 2 additions & 2 deletions app_init/job_batch/args.py
@@ -1,10 +1,10 @@
# -*- coding: utf-8 -*-
"""Job Args"""
from argparse import ArgumentParser


class Args:
"""Job Args"""

def __init__(self, parser):
def __init__(self, parser: ArgumentParser):
"""Initialize class properties."""
parser.add_argument('--tc_owner', required=True)

0 comments on commit b5e32fb

Please sign in to comment.