Skip to content
Permalink
Browse files

Added validator directory support and clean up (#369)

  • Loading branch information
joachimmetz committed Dec 10, 2019
1 parent 13de2ea commit b3e0ea18a55e3d15b7fe671a6e5bd4faab7dc4a4
Showing with 57 additions and 35 deletions.
  1. +24 −24 tools/stats.py
  2. +33 −11 tools/validator.py
@@ -18,12 +18,12 @@ class ArtifactStatistics(object):
def __init__(self):
"""Initializes artifact statistics."""
super(ArtifactStatistics, self).__init__()
self.label_counts = {}
self.os_counts = {}
self.path_count = 0
self.reg_key_count = 0
self.source_type_counts = {}
self.total_count = 0
self._label_counts = {}
self._os_counts = {}
self._path_count = 0
self._reg_key_count = 0
self._source_type_counts = {}
self._total_count = 0

def _PrintDictAsTable(self, src_dict):
"""Prints a table of artifact definitions.
@@ -52,17 +52,17 @@ def _PrintDictAsTable(self, src_dict):
def PrintOSTable(self):
"""Prints a table of artifact definitions by operating system."""
print('**Artifacts by OS**\n')
self._PrintDictAsTable(self.os_counts)
self._PrintDictAsTable(self._os_counts)

def PrintLabelTable(self):
"""Prints a table of artifact definitions by label."""
print('**Artifacts by label**\n')
self._PrintDictAsTable(self.label_counts)
self._PrintDictAsTable(self._label_counts)

def PrintSourceTypeTable(self):
"""Prints a table of artifact definitions by source type."""
print('**Artifacts by type**\n')
self._PrintDictAsTable(self.source_type_counts)
self._PrintDictAsTable(self._source_type_counts)

def PrintSummaryTable(self):
"""Prints a summary table."""
@@ -75,41 +75,41 @@ def PrintSummaryTable(self):
| **Registry keys covered** | **{2:d}** |
| **Total artifacts** | **{3:d}** |
""".format(
time.strftime('%Y-%m-%d'), self.path_count, self.reg_key_count,
self.total_count))
time.strftime('%Y-%m-%d'), self._path_count, self._reg_key_count,
self._total_count))

def BuildStats(self):
"""Builds the statistics."""
artifact_reader = reader.YamlArtifactsReader()
self.label_counts = {}
self.os_counts = {}
self.path_count = 0
self.reg_key_count = 0
self.source_type_counts = {}
self.total_count = 0
self._label_counts = {}
self._os_counts = {}
self._path_count = 0
self._reg_key_count = 0
self._source_type_counts = {}
self._total_count = 0

for artifact_definition in artifact_reader.ReadDirectory('data'):
if hasattr(artifact_definition, 'labels'):
for label in artifact_definition.labels:
self.label_counts[label] = self.label_counts.get(label, 0) + 1
self._label_counts[label] = self._label_counts.get(label, 0) + 1

for source in artifact_definition.sources:
self.total_count += 1
self._total_count += 1
source_type = source.type_indicator
self.source_type_counts[source_type] = self.source_type_counts.get(
self._source_type_counts[source_type] = self._source_type_counts.get(
source_type, 0) + 1

if source_type == definitions.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY:
self.reg_key_count += len(source.keys)
self._reg_key_count += len(source.keys)
elif source_type == definitions.TYPE_INDICATOR_WINDOWS_REGISTRY_VALUE:
self.reg_key_count += len(source.key_value_pairs)
self._reg_key_count += len(source.key_value_pairs)
elif source_type in (definitions.TYPE_INDICATOR_FILE,
definitions.TYPE_INDICATOR_DIRECTORY):
self.path_count += len(source.paths)
self._path_count += len(source.paths)

os_list = source.supported_os
for os_str in os_list:
self.os_counts[os_str] = self.os_counts.get(os_str, 0) + 1
self._os_counts[os_str] = self._os_counts.get(os_str, 0) + 1

def PrintStats(self):
"""Build stats and print in MarkDown format."""
@@ -6,6 +6,7 @@
from __future__ import unicode_literals

import argparse
import glob
import logging
import os
import sys
@@ -425,6 +426,23 @@ def _HasDuplicateRegistryKeyPaths(
self._artifact_registry_key_paths.update(source.keys)
return result

def CheckDirectory(self, path):
"""Validates the artifacts definition in a specific directory.
Args:
path (str): path of the directory containing the artifacts definition
files.
Returns:
bool: True if the file contains valid artifacts definitions.
"""
for filename in glob.glob(os.path.join(path, '*.yaml')):
result = self.CheckFile(filename)
if not result:
break

return result

def CheckFile(self, filename):
"""Validates the artifacts definition in a specific file.
@@ -528,31 +546,35 @@ def Main():
description='Validates an artifact definitions file.')

args_parser.add_argument(
'filename',
nargs='?',
action='store',
metavar='artifacts.yaml',
default=None,
help=('path of the file that contains the artifact '
'definitions', nargs='?', action='store', metavar='PATH', default=None,
help=('path of the file or directory that contains the artifact '
'definitions.'))

options = args_parser.parse_args()

if not options.filename:
if not options.definitions:
print('Source value is missing.')
print('')
args_parser.print_help()
print('')
return False

if not os.path.isfile(options.filename):
print('No such file: {0:s}'.format(options.filename))
if not os.path.exists(options.definitions):
print('No such file or directory: {0:s}'.format(options.definitions))
print('')
return False

print('Validating: {0:s}'.format(options.filename))
validator = ArtifactDefinitionsValidator()
if not validator.CheckFile(options.filename):

if os.path.isdir(options.definitions):
print('Validating definitions in: {0:s}/*.yaml'.format(options.definitions))
result = validator.CheckDirectory(options.definitions)

elif os.path.isfile(options.definitions):
print('Validating definitions in: {0:s}'.format(options.definitions))
result = validator.CheckFile(options.definitions)

if not result:
print('FAILURE')
return False

0 comments on commit b3e0ea1

Please sign in to comment.
You can’t perform that action at this time.