Skip to content

Commit

Permalink
Adding data query paths test for DEX support (#638)
Browse files Browse the repository at this point in the history
* Adding data query paths test for DEX support

MyPy suppressions for some uses of ProviderSettings and PivotRegistration classes.

* Updating unit tests for new pkg_config attributes
  • Loading branch information
ianhelle committed Mar 16, 2023
1 parent 5688432 commit 4dce286
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 4 deletions.
2 changes: 1 addition & 1 deletion msticpy/common/provider_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def get_provider_settings(config_section="TIProviders") -> Dict[str, ProviderSet
settings = {}
for provider, item_settings in section_settings.items():
prov_args = item_settings.get("Args")
prov_settings = ProviderSettings(
prov_settings = ProviderSettings( # type: ignore[call-arg]
name=provider,
description=item_settings.get("Description"),
args=_get_setting_args(
Expand Down
4 changes: 3 additions & 1 deletion msticpy/context/geoip.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,9 @@ def _get_geoip_provider_settings(provider_name: str) -> ProviderSettings:
settings = get_provider_settings(config_section="OtherProviders")
if provider_name in settings:
return settings[provider_name]
return ProviderSettings(name=provider_name, description="Not found.")
return ProviderSettings( # type: ignore[call-arg]
name=provider_name, description="Not found."
)


@export
Expand Down
2 changes: 1 addition & 1 deletion msticpy/init/pivot_core/pivot_register_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _read_reg_file(file_path: str):

for entry_name, settings in pivot_regs.get("pivot_providers").items():
try:
yield PivotRegistration(
yield PivotRegistration( # type: ignore[call-arg]
src_config_path=file_path, src_config_entry=entry_name, **settings
)
except TypeError as err:
Expand Down
2 changes: 1 addition & 1 deletion msticpy/init/pivot_init/pivot_ti_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def _create_lookup_func(
# use IoC name if ioc_type is None
entity_cls, entity_attr = TI_ENTITY_ATTRIBS[ioc or ioc_name]

pivot_reg = PivotRegistration(
pivot_reg = PivotRegistration( # type: ignore[call-arg]
src_func_name=ti_lookup.lookup_iocs.__name__,
input_type="dataframe",
entity_map={entity_cls.__name__: entity_attr},
Expand Down
44 changes: 44 additions & 0 deletions tests/data/test_dataqueries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io
import unittest
import warnings
from copy import deepcopy
from datetime import datetime
from functools import partial
from pathlib import Path
Expand All @@ -17,6 +18,7 @@
import pytest
import pytest_check as check

from msticpy.common import pkg_config
from msticpy.common.exceptions import MsticpyException
from msticpy.data.core.data_providers import (
DriverBase,
Expand Down Expand Up @@ -538,3 +540,45 @@ def test_query_search(search, expected, sentinel_qry_prov):
print(search, len(results))
print(results[:5])
check.greater_equal(len(results), expected)


@pytest.mark.parametrize("mode", ["abs", "rel"])
def test_query_paths(mode):
"""Test correctly loading queries from custom paths."""

if mode == "abs":
query_paths = [str(get_test_data_path().joinpath("kusto"))]
else:
query_paths = ["tests/testdata/kusto"]

qry_prov = QueryProvider("Kusto", query_paths=query_paths)
check.greater_equal(len(qry_prov.list_queries()), 13)
for data_family in (
"AppAuthCluster",
"AppAuthClustera",
"IntegAuthCluster",
"IntegAuthCluster2",
):
check.is_true(hasattr(qry_prov, data_family))

# Custom paths in settings
current_settings = deepcopy(pkg_config._settings.get("QueryDefinitions"))
if not current_settings:
pkg_config._settings["QueryDefinitions"] = {"Custom": query_paths}
elif not current_settings.get("Custom"):
pkg_config._settings["QueryDefinitions"]["Custom"] = query_paths
else:
pkg_config._settings["QueryDefinitions"]["Custom"].extend(query_paths)

check.greater_equal(len(pkg_config._settings["QueryDefinitions"]["Custom"]), 1)

qry_prov = QueryProvider("Kusto")
check.greater_equal(len(qry_prov.list_queries()), 13)
for data_family in (
"AppAuthCluster",
"AppAuthClustera",
"IntegAuthCluster",
"IntegAuthCluster2",
):
check.is_true(hasattr(qry_prov, data_family))
pkg_config._settings["QueryDefinitions"] = current_settings

0 comments on commit 4dce286

Please sign in to comment.