Skip to content

Commit

Permalink
Merge branch 'main' of github.com:SigmaHQ/pySigma
Browse files Browse the repository at this point in the history
  • Loading branch information
frack113 committed Dec 16, 2023
2 parents de9549d + 0a56e1d commit 1f5014d
Show file tree
Hide file tree
Showing 15 changed files with 1,037 additions and 370 deletions.
859 changes: 556 additions & 303 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions print-coverage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Prints code testing coverage as percentage for badge generation.
import xml.etree.ElementTree as et
from defusedxml.ElementTree import parse

tree = et.parse("cov.xml")
tree = parse("cov.xml")
root = tree.getroot()
coverage = float(root.attrib["line-rate"]) * 100
print(f"COVERAGE={coverage:3.4}%")
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pySigma"
version = "0.10.9"
version = "0.10.10"
license = "LGPL-2.1-only"
description = "Sigma rule processing and conversion tools"
authors = ["Thomas Patzke <thomas@patzke.org>"]
Expand Down Expand Up @@ -37,6 +37,7 @@ pytest = "^6.2.2"
pytest-cov = "^2.11.1"
pytest-mypy = "^0.6.2"
Sphinx = "^4.2.0"
defusedxml = "^0.7.1"

[tool.black]
line-length = 100
Expand Down
6 changes: 6 additions & 0 deletions sigma/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ class SigmaDescriptionError(SigmaError):
pass


class SigmaReferencesError(SigmaError):
"""Error in Sigma rule references"""

pass


class SigmaFieldsError(SigmaError):
"""Error in Sigma rule fields"""

Expand Down
37 changes: 37 additions & 0 deletions sigma/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,13 @@ def is_compatible(self) -> Optional[bool]:
except importlib.metadata.PackageNotFoundError:
return None

def is_installed(self) -> bool:
try:
subprocess.check_call([sys.executable, "-m", "pip", "-q", "show", self.package])
return True
except:
return False

def install(self):
"""Install plugin with pip."""
if sys.prefix == sys.base_prefix: # not in a virtual environment
Expand Down Expand Up @@ -344,6 +351,36 @@ def install(self):
]
)

def upgrade(self):
"""Upgrade plugin with pip."""
if sys.prefix == sys.base_prefix: # not in a virtual environment
subprocess.check_call(
[
sys.executable,
"-m",
"pip",
"-q",
"--disable-pip-version-check",
"install",
"--upgrade",
self.package,
]
)
else:
subprocess.check_call(
[
sys.executable,
"-m",
"pip",
"-q",
"--disable-pip-version-check",
"install",
"--upgrade",
"--no-user",
self.package,
]
)

def uninstall(self):
"""Uninstall plugin with pip."""
subprocess.check_call([sys.executable, "-m", "pip", "-q", "uninstall", "-y", self.package])
Expand Down
26 changes: 24 additions & 2 deletions sigma/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,6 @@ def from_dict(
source=source,
)
)
raise SigmaTypeError("Sigma rule fields must be a list", source=source)

# Rule falsepositives validation
rule_falsepositives = rule.get("falsepositives")
Expand Down Expand Up @@ -838,6 +837,16 @@ def from_dict(
)
)

# Rule references validation
rule_references = rule.get("references")
if rule_references is not None and not isinstance(rule_references, list):
errors.append(
sigma_exceptions.SigmaReferencesError(
"Sigma rule references must be a list",
source=source,
)
)

# Rule title validation
rule_title = rule.get("title")
if rule_title is None:
Expand All @@ -854,6 +863,13 @@ def from_dict(
source=source,
)
)
elif len(rule_title) > 256:
errors.append(
sigma_exceptions.SigmaTitleError(
"Sigma rule title length must not exceed 256 characters",
source=source,
)
)

# parse log source
logsource = None
Expand All @@ -865,6 +881,12 @@ def from_dict(
"Sigma rule must have a log source", source=source
)
)
except AttributeError:
errors.append(
sigma_exceptions.SigmaLogsourceError(
"Sigma logsource must be a valid YAML map", source=source
)
)
except SigmaError as e:
errors.append(e)

Expand All @@ -891,7 +913,7 @@ def from_dict(
level=level,
status=status,
description=rule_description,
references=rule.get("references"),
references=rule_references,
tags=[SigmaRuleTag.from_str(tag) for tag in rule.get("tags", list())],
author=rule_author,
date=rule_date,
Expand Down
48 changes: 24 additions & 24 deletions sigma/validators/core/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ def is_uuid_v4(val: str) -> bool:

@dataclass
class IdentifierExistenceIssue(SigmaValidationIssue):
description = "Rule has no identifier (UUID)"
severity = SigmaValidationIssueSeverity.MEDIUM
description: ClassVar[str] = "Rule has no identifier (UUID)"
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM


class IdentifierExistenceValidator(SigmaRuleValidator):
Expand Down Expand Up @@ -67,17 +67,17 @@ def finalize(self) -> List[SigmaValidationIssue]:


@dataclass
class TitleLengthIssue(SigmaValidationIssue):
description = "Rule has a title longer than 100 characters"
class TitleLengthSigmaHQIssue(SigmaValidationIssue):
description = "Rule has a title longer than 110 characters"
severity = SigmaValidationIssueSeverity.MEDIUM


class TitleLengthValidator(SigmaRuleValidator):
"""Checks if rule has a title length longer than 100."""
class TitleLengthSigmaHQValidator(SigmaRuleValidator):
"""Checks if rule has a title length longer than 110."""

def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
if len(rule.title) > 100:
return [TitleLengthIssue([rule])]
def validate(self, rule: SigmaRule) -> List[TitleLengthSigmaHQIssue]:
if len(rule.title) > 110:
return [TitleLengthSigmaHQIssue([rule])]
else:
return []

Expand Down Expand Up @@ -112,8 +112,8 @@ def finalize(self) -> List[SigmaValidationIssue]:

@dataclass
class DuplicateReferencesIssue(SigmaValidationIssue):
description = "The same references appears multiple times"
severity = SigmaValidationIssueSeverity.MEDIUM
description: ClassVar[str] = "The same references appears multiple times"
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM
reference: str


Expand All @@ -131,8 +131,8 @@ def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:

@dataclass
class StatusExistenceIssue(SigmaValidationIssue):
description = "Rule has no status"
severity = SigmaValidationIssueSeverity.MEDIUM
description: ClassVar[str] = "Rule has no status"
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM


class StatusExistenceValidator(SigmaRuleValidator):
Expand All @@ -147,8 +147,8 @@ def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:

@dataclass
class StatusUnsupportedIssue(SigmaValidationIssue):
description = "Rule has UNSUPPORTED status"
severity = SigmaValidationIssueSeverity.MEDIUM
description: ClassVar[str] = "Rule has UNSUPPORTED status"
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM


class StatusUnsupportedValidator(SigmaRuleValidator):
Expand All @@ -163,8 +163,8 @@ def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:

@dataclass
class DateExistenceIssue(SigmaValidationIssue):
description = "Rule has no date"
severity = SigmaValidationIssueSeverity.MEDIUM
description: ClassVar[str] = "Rule has no date"
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM


class DateExistenceValidator(SigmaRuleValidator):
Expand Down Expand Up @@ -250,7 +250,7 @@ class CustomAttributesIssue(SigmaValidationIssue):


class CustomAttributesValidator(SigmaRuleValidator):
"""Check rule filename lengh"""
"""Check if field name is similar to legit one"""

known_custom_attributes: Set[str] = {
"realted",
Expand All @@ -270,8 +270,8 @@ def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:

@dataclass
class DescriptionExistenceIssue(SigmaValidationIssue):
description = "Rule has no description"
severity = SigmaValidationIssueSeverity.MEDIUM
description: ClassVar[str] = "Rule has no description"
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM


class DescriptionExistenceValidator(SigmaRuleValidator):
Expand All @@ -286,8 +286,8 @@ def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:

@dataclass
class DescriptionLengthIssue(SigmaValidationIssue):
description = "Rule has a too short description"
severity = SigmaValidationIssueSeverity.MEDIUM
description: ClassVar[str] = "Rule has a too short description"
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM


class DescriptionLengthValidator(SigmaRuleValidator):
Expand All @@ -302,8 +302,8 @@ def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:

@dataclass
class LevelExistenceIssue(SigmaValidationIssue):
description = "Rule has no level"
severity = SigmaValidationIssueSeverity.MEDIUM
description: ClassVar[str] = "Rule has no level"
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM


class LevelExistenceValidator(SigmaRuleValidator):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
title: CodeIntegrity - Unsigned Kernel Module Loaded
id: 951f8d29-1234-1234-1234-0673ff105e6f
status: experimental
description: Detects the presence of a loaded unsigned kernel module on the system.
references:
- https://learn.microsoft.com/en-us/windows/security/threat-protection/windows-defender-application-control/event-id-explanations
- https://learn.microsoft.com/en-us/windows/security/threat-protection/windows-defender-application-control/event-tag-explanations
- Internal Research
author: test
date: 2023/06/06
modified: 2023/12/11
tags:
- attack.privilege_escalation
logsource:
product: windows
service: codeintegrity-operational
detection:
selection:
EventID: 1234
condition: selection
falsepositives:
- Unlikely
level: high
4 changes: 2 additions & 2 deletions tests/test_conversion_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2313,8 +2313,8 @@ def test_convert_dropped_detection_item_or():
re.compile(";"),
)
quote_always_escape_quote_config = ("'", None, True, "\\", True, None)
quote_only_config = ("'", re.compile("^.*\s"), False, None, False, None)
escape_only_config = (None, None, True, "\\", False, re.compile("[\s]"))
quote_only_config = ("'", re.compile(r"^.*\s"), False, None, False, None)
escape_only_config = (None, None, True, "\\", False, re.compile(r"[\s]"))
quoting_escaping_testcases = (
{ # test name -> quoting/escaping parameters (see test below), test field name, expected result
"escape_quote_nothing": (quote_escape_config, "foo.bar", "foo.bar"),
Expand Down
38 changes: 32 additions & 6 deletions tests/test_pipelines_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
logsource_linux_network_connection,
logsource_linux_file_create,
logsource_linux_process_creation,
logsource_windows,
logsource_windows_dns_query,
logsource_windows_file_change,
logsource_windows_file_event,
Expand All @@ -29,7 +28,18 @@
logsource_windows_driver_load,
logsource_windows_create_stream_hash,
logsource_windows_create_remote_thread,
logsource_macos_process_creation,
logsource_macos_file_create,
logsource_azure_riskdetection,
logsource_azure_pim,
logsource_azure_auditlogs,
logsource_azure_azureactivity,
logsource_azure_signinlogs,
logsource_linux,
logsource_macos,
logsource_windows,
generate_windows_logsource_items,
logsource_category,
)
from sigma.processing.conditions import (
LogsourceCondition,
Expand All @@ -45,11 +55,16 @@ def test_windows_logsource_mapping():
assert windows_logsource_mapping["security"] == "Security"


def test_logsource_windows():
assert logsource_windows("security") == LogsourceCondition(
product="windows",
service="security",
)
@pytest.mark.parametrize(
("func", "service", "product"),
[
(logsource_windows, "test", "windows"),
(logsource_linux, "test", "linux"),
(logsource_macos, "test", "macos"),
],
)
def test_generic_service_sources(func, service, product):
assert func(service) == LogsourceCondition(service=service, product=product)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -81,6 +96,13 @@ def test_logsource_windows():
(logsource_linux_process_creation, "process_creation", "linux"),
(logsource_linux_network_connection, "network_connection", "linux"),
(logsource_linux_file_create, "file_create", "linux"),
(logsource_macos_process_creation, "process_creation", "macos"),
(logsource_macos_file_create, "file_create", "macos"),
(logsource_azure_riskdetection, "riskdetection", "azure"),
(logsource_azure_pim, "pim", "azure"),
(logsource_azure_auditlogs, "auditlogs", "azure"),
(logsource_azure_azureactivity, "azureactivity", "azure"),
(logsource_azure_signinlogs, "signinlogs", "azure"),
],
)
def test_generic_log_sources(func, category, product):
Expand Down Expand Up @@ -137,3 +159,7 @@ def test_generate_windows_logsource_items():
]
}
)


def test_logsource_category():
assert logsource_category("test") == LogsourceCondition(category="test")

0 comments on commit 1f5014d

Please sign in to comment.