From f067e6ec1e84d0f24e73f17edc6f0d25c2a202f3 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Fri, 31 May 2024 14:42:39 -0700 Subject: [PATCH] ioc match service --- .../alerts/AlertsService.java | 21 +-- .../iocscan/dao/IocMatchService.java | 156 ++++++++++++++++++ .../iocscan/dto/IocScanContext.java | 6 +- .../iocscan/service/IoCScanService.java | 55 +++--- .../service/IoCScanServiceInterface.java | 4 - .../TransportIndexDetectorAction.java | 8 +- .../securityanalytics/util/DetectorUtils.java | 2 +- .../alerts/AlertingServiceTests.java | 4 +- .../iocscan/dao/IocMatchServiceIT.java | 51 ++++++ 9 files changed, 253 insertions(+), 54 deletions(-) create mode 100644 src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchService.java create mode 100644 src/test/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchServiceIT.java diff --git a/src/main/java/org/opensearch/securityanalytics/alerts/AlertsService.java b/src/main/java/org/opensearch/securityanalytics/alerts/AlertsService.java index a61fe9d35..03b12a2fd 100644 --- a/src/main/java/org/opensearch/securityanalytics/alerts/AlertsService.java +++ b/src/main/java/org/opensearch/securityanalytics/alerts/AlertsService.java @@ -133,16 +133,7 @@ public void getAlertsByMonitorIds( ) { org.opensearch.commons.alerting.action.GetAlertsRequest req = - new org.opensearch.commons.alerting.action.GetAlertsRequest( - table, - severityLevel, - alertState, - null, - alertIndex, - monitorIds, - null, - null - ); + null; AlertingPluginInterface.INSTANCE.getAlerts((NodeClient) client, req, new ActionListener<>() { @Override @@ -247,15 +238,7 @@ public void getAlerts(List alertIds, Detector detector, Table table, ActionListener actionListener) { - GetAlertsRequest request = new GetAlertsRequest( - table, - "ALL", - "ALL", - null, - DetectorMonitorConfig.getAllAlertsIndicesPattern(detector.getDetectorType()), - null, - null, - alertIds); + GetAlertsRequest request = null; AlertingPluginInterface.INSTANCE.getAlerts( (NodeClient) client, request, actionListener); diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchService.java new file mode 100644 index 000000000..a06564fa8 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchService.java @@ -0,0 +1,156 @@ +package org.opensearch.securityanalytics.threatIntel.iocscan.dao; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.securityanalytics.SecurityAnalyticsPlugin; +import org.opensearch.securityanalytics.model.threatintel.IocMatch; +import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; +import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext; +import org.opensearch.securityanalytics.util.SecurityAnalyticsException; +import org.opensearch.threadpool.ThreadPool; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Data layer to perform CRUD operations for threat intel ioc match : store in system index. + */ +public class IocMatchService { + //TODO manage index rollover + public static final String INDEX_NAME = ".opensearch-sap-iocmatch"; + private static final Logger log = LogManager.getLogger(IocMatchService.class); + private final Client client; + private final ClusterService clusterService; + + public IocMatchService(final Client client, final ClusterService clusterService) { + this.client = client; + this.clusterService = clusterService; + } + + public void indexIocMatches(List iocMatches, + final ActionListener actionListener) { + try { + Integer batchSize = this.clusterService.getClusterSettings().get(SecurityAnalyticsSettings.BATCH_SIZE); + createIndexIfNotExists(ActionListener.wrap( + r -> { + List bulkRequestList = new ArrayList<>(); + BulkRequest bulkRequest = new BulkRequest(INDEX_NAME); + for (int i = 0; i < iocMatches.size(); i++) { + IocMatch iocMatch = iocMatches.get(i); + try { + IndexRequest indexRequest = new IndexRequest(INDEX_NAME) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source(iocMatch.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .opType(DocWriteRequest.OpType.CREATE); + bulkRequest.add(indexRequest); + if ( + bulkRequest.requests().size() == batchSize + && i != iocMatches.size() - 1 // final bulk request will be added outside for loop with refresh policy none + ) { + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); + bulkRequestList.add(bulkRequest); + bulkRequest = new BulkRequest(); + } + } catch (IOException e) { + log.error(String.format("Failed to create index request for ioc match %s moving on to next"), e); + } + } + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + bulkRequestList.add(bulkRequest); + GroupedActionListener groupedListener = new GroupedActionListener<>(ActionListener.wrap(bulkResponses -> { + int idx = 0; + for (BulkResponse response : bulkResponses) { + BulkRequest request = bulkRequestList.get(idx); + if (response.hasFailures()) { + log.error("Failed to bulk index {} Ioc Matches. Failure: {}", request.batchSize(), response.buildFailureMessage()); + } + } + actionListener.onResponse(null); + }, actionListener::onFailure), bulkRequestList.size()); + for (BulkRequest req : bulkRequestList) { + try { + StashedThreadContext.run(client, () -> client.bulk(req, groupedListener)); + } catch (Exception e) { + log.error("Failed to save ioc matches.", e); + } + } + }, e -> { + log.error("Failed to create System Index"); + actionListener.onFailure(e); + })); + + + } catch (Exception e) { + log.error("Exception saving the threat intel source config in index", e); + actionListener.onFailure(e); + } + } + + private String getIndexMapping() { + try { + try (InputStream is = IocMatchService.class.getResourceAsStream("/mappings/ioc_match_mapping.json")) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { + return reader.lines().map(String::trim).collect(Collectors.joining()); + } + } + } catch (IOException e) { + log.error("Failed to get the threat intel ioc match index mapping", e); + throw new SecurityAnalyticsException("Failed to get the threat intel ioc match index mapping", RestStatus.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Index name: .opensearch-sap-iocmatch + * Mapping: /mappings/ioc_match_mapping.json + * + * @param listener setup listener + */ + public void createIndexIfNotExists(final ActionListener listener) { + // check if job index exists + try { + if (clusterService.state().metadata().hasIndex(INDEX_NAME) == true) { + listener.onResponse(null); + return; + } + final CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX_NAME).mapping(getIndexMapping()) + .settings(SecurityAnalyticsPlugin.TIF_JOB_INDEX_SETTING); + StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, ActionListener.wrap( + r -> { + log.debug("Ioc match index created"); + listener.onResponse(null); + }, e -> { + if (e instanceof ResourceAlreadyExistsException) { + log.debug("index {} already exist", INDEX_NAME); + listener.onResponse(null); + return; + } + log.error("Failed to create security analytics threat intel job index", e); + listener.onFailure(e); + } + ))); + } catch (Exception e) { + log.error("Failure in creating ioc_match index", e); + listener.onFailure(e); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dto/IocScanContext.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dto/IocScanContext.java index 6b673f8d3..3e77f2d28 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dto/IocScanContext.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dto/IocScanContext.java @@ -5,12 +5,12 @@ import java.util.List; public class IocScanContext { - IocScanMonitor iocScanMonitor; + IocScanMonitor monitor; boolean dryRun; List data; - public IocScanMonitor getIocScanMonitor() { - return iocScanMonitor; + public IocScanMonitor getMonitor() { + return monitor; } public boolean isDryRun() { diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanService.java index bb1542d67..eb8c14552 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanService.java @@ -2,6 +2,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.commons.alerting.model.Finding; import org.opensearch.securityanalytics.model.threatintel.IocMatch; import org.opensearch.securityanalytics.threatIntel.iocscan.dto.IocScanContext; @@ -29,15 +30,22 @@ public void scanIoCs(IocScanContext iocScanContext, BiConsumer scanCallback ) { List data = iocScanContext.getData(); - IocScanMonitor iocScanMonitor = iocScanContext.getIocScanMonitor(); + IocScanMonitor iocScanMonitor = iocScanContext.getMonitor(); long start = System.currentTimeMillis(); // log.debug("beginning to scan IoC's") IocLookupDtos iocLookupDtos = extractIocPerTypeSet(data, iocScanMonitor.getIocTypeToIndexFieldMappings()); BiConsumer, Exception> iocScanResultConsumer = (List maliciousIocs, Exception e) -> { if (e == null) { - createIoCMatches(maliciousIocs, iocLookupDtos.iocValueToDocIdMap, iocScanContext); - createFindings(maliciousIocs, iocLookupDtos.docIdToIocsMap, iocScanMonitor); + createIoCMatches(maliciousIocs, iocLookupDtos.iocValueToDocIdMap, iocScanContext, + new BiConsumer, Exception>() { + @Override + public void accept(List iocs, Exception e) { + createFindings(maliciousIocs, iocLookupDtos.docIdToIocsMap, iocScanMonitor); + } + } + ); + } else { // onIocMatchFailure(e, iocScanMonitor); @@ -93,10 +101,10 @@ private IocLookupDtos extractIocPerTypeSet(List data, List iocs, Map> iocValueToDocIdMap, IocScanContext iocScanContext) { + public void createIoCMatches(List iocs, Map> iocValueToDocIdMap, IocScanContext iocScanContext, BiConsumer, Exception> callback) { try { Instant timestamp = Instant.now(); - IocScanMonitor iocScanMonitor = iocScanContext.getIocScanMonitor(); + IocScanMonitor iocScanMonitor = iocScanContext.getMonitor(); // Map to collect unique IocValue with their respective FeedIds Map> iocValueToFeedIds = new HashMap<>(); @@ -115,27 +123,32 @@ public void createIoCMatches(List iocs, Map> iocValueTo List relatedDocIds = new ArrayList<>(iocValueToDocIdMap.getOrDefault(iocValue, new HashSet<>())); List feedIdsList = new ArrayList<>(feedIds); - - IocMatch iocMatch = new IocMatch( - UUID.randomUUID().toString(), // Generating a unique ID - relatedDocIds, - feedIdsList, - iocScanMonitor.getId(), - iocScanMonitor.getName(), - iocValue, - iocs.stream().filter(i -> i.getIocValue().equals(iocValue)).findFirst().orElseThrow().getIocType(), - timestamp, - UUID.randomUUID().toString() // TODO execution ID - ); - - iocMatches.add(iocMatch); + try { + IocMatch iocMatch = new IocMatch( + UUID.randomUUID().toString(), // Generating a unique ID + relatedDocIds, + feedIdsList, + iocScanMonitor.getId(), + iocScanMonitor.getName(), + iocValue, + iocs.stream().filter(i -> i.getIocValue().equals(iocValue)).findFirst().orElseThrow().getIocType(), + timestamp, + UUID.randomUUID().toString() // TODO execution ID + ); + iocMatches.add(iocMatch); + } catch (Exception e) { + log.error(String.format("skipping creating ioc match for %s due to unexpected failure.", entry.getKey()), e); + } } - //TODO save iocs + saveIocs(iocs, callback); } catch (Exception e) { - + log.error(() -> new ParameterizedMessage("Failed to create ioc matches due to unexpected error {}", iocScanContext.getMonitor().getId()), e); + callback.accept(null, e); } } + abstract void saveIocs(List iocs, BiConsumer, Exception> callback); + public List createFindings(List iocs, Map> docIdToIocsMap, IocScanMonitor iocScanMonitor) { List findings = new ArrayList<>(); diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanServiceInterface.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanServiceInterface.java index 324e6641f..0746eddf4 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanServiceInterface.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanServiceInterface.java @@ -9,10 +9,6 @@ public interface IoCScanServiceInterface { - void createIoCFindings(List findings); - - void createIoCMatch(List iocMatches); - void scanIoCs( IocScanContext iocScanContext, BiConsumer scanCallback diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java index f9fb9b198..ca3b39bf2 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java @@ -785,7 +785,7 @@ private IndexMonitorRequest createDocLevelMonitorRequest(List } Monitor monitor = new Monitor(monitorId, Monitor.NO_VERSION, detector.getName(), false, detector.getSchedule(), detector.getLastUpdateTime(), null, - Monitor.MonitorType.DOC_LEVEL_MONITOR, detector.getUser(), 1, docLevelMonitorInputs, triggers, Map.of(), + Monitor.MonitorType.DOC_LEVEL_MONITOR.getValue(), detector.getUser(), 1, docLevelMonitorInputs, triggers, Map.of(), new DataSources(detector.getRuleIndex(), detector.getFindingsIndex(), detector.getFindingsIndexPattern(), @@ -886,7 +886,7 @@ private IndexMonitorRequest createDocLevelMonitorMatchAllRequest( } Monitor monitor = new Monitor(monitorId, Monitor.NO_VERSION, monitorName, false, detector.getSchedule(), detector.getLastUpdateTime(), null, - Monitor.MonitorType.DOC_LEVEL_MONITOR, detector.getUser(), 1, docLevelMonitorInputs, triggers, Map.of(), + Monitor.MonitorType.DOC_LEVEL_MONITOR.getValue(), detector.getUser(), 1, docLevelMonitorInputs, triggers, Map.of(), new DataSources(detector.getRuleIndex(), detector.getFindingsIndex(), detector.getFindingsIndexPattern(), @@ -1060,7 +1060,7 @@ public void onResponse(GetIndexMappingsResponse getIndexMappingsResponse) { } **/ Monitor monitor = new Monitor(monitorId, Monitor.NO_VERSION, detector.getName(), false, detector.getSchedule(), detector.getLastUpdateTime(), null, - MonitorType.BUCKET_LEVEL_MONITOR, detector.getUser(), 1, bucketLevelMonitorInputs, triggers, Map.of(), + MonitorType.BUCKET_LEVEL_MONITOR.getValue(), detector.getUser(), 1, bucketLevelMonitorInputs, triggers, Map.of(), new DataSources(detector.getRuleIndex(), detector.getFindingsIndex(), detector.getFindingsIndexPattern(), @@ -1782,7 +1782,7 @@ private Map mapMonitorIds(List monitorResp Collectors.toMap( // In the case of bucket level monitors rule id is trigger id it -> { - if (MonitorType.BUCKET_LEVEL_MONITOR == it.getMonitor().getMonitorType()) { + if (MonitorType.BUCKET_LEVEL_MONITOR.getValue() == it.getMonitor().getMonitorType()) { return it.getMonitor().getTriggers().get(0).getId(); } else { if (it.getMonitor().getName().contains("_chained_findings")) { diff --git a/src/main/java/org/opensearch/securityanalytics/util/DetectorUtils.java b/src/main/java/org/opensearch/securityanalytics/util/DetectorUtils.java index 14c241f83..bb76afd8d 100644 --- a/src/main/java/org/opensearch/securityanalytics/util/DetectorUtils.java +++ b/src/main/java/org/opensearch/securityanalytics/util/DetectorUtils.java @@ -109,7 +109,7 @@ public static List getBucketLevelMonitorIds( ) { return monitorResponses.stream().filter( // In the case of bucket level monitors rule id is trigger id - it -> Monitor.MonitorType.BUCKET_LEVEL_MONITOR == it.getMonitor().getMonitorType() + it -> Monitor.MonitorType.BUCKET_LEVEL_MONITOR.getValue() == it.getMonitor().getMonitorType() ).map(IndexMonitorResponse::getId).collect(Collectors.toList()); } public static List getAggRuleIdsConfiguredToTrigger(Detector detector, List> rulesById) { diff --git a/src/test/java/org/opensearch/securityanalytics/alerts/AlertingServiceTests.java b/src/test/java/org/opensearch/securityanalytics/alerts/AlertingServiceTests.java index 3542ee309..e5c1172cf 100644 --- a/src/test/java/org/opensearch/securityanalytics/alerts/AlertingServiceTests.java +++ b/src/test/java/org/opensearch/securityanalytics/alerts/AlertingServiceTests.java @@ -88,7 +88,7 @@ public void testGetAlerts_success() { new CronSchedule("31 * * * *", ZoneId.of("Asia/Kolkata"), Instant.ofEpochSecond(1538164858L)), Instant.now(), Instant.now(), - Monitor.MonitorType.DOC_LEVEL_MONITOR, + Monitor.MonitorType.DOC_LEVEL_MONITOR.getValue(), null, 1, List.of(), @@ -122,7 +122,7 @@ public void testGetAlerts_success() { new CronSchedule("31 * * * *", ZoneId.of("Asia/Kolkata"), Instant.ofEpochSecond(1538164858L)), Instant.now(), Instant.now(), - Monitor.MonitorType.DOC_LEVEL_MONITOR, + Monitor.MonitorType.DOC_LEVEL_MONITOR.getValue(), null, 1, List.of(), diff --git a/src/test/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchServiceIT.java b/src/test/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchServiceIT.java new file mode 100644 index 000000000..d5942ebd2 --- /dev/null +++ b/src/test/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchServiceIT.java @@ -0,0 +1,51 @@ +package org.opensearch.securityanalytics.threatIntel.iocscan.dao; + +import org.opensearch.action.LatchedActionListener; +import org.opensearch.core.action.ActionListener; +import org.opensearch.securityanalytics.model.threatintel.IocMatch; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + + +public class IocMatchServiceIT extends OpenSearchIntegTestCase { + + public void test_indexIocMatches() { + IocMatchService service = new IocMatchService(client(), clusterService()); + List iocMatches = generateIocMatches(10); + CountDownLatch latch = new CountDownLatch(1); + service.indexIocMatches(iocMatches, new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(Void unused) { + } + + @Override + public void onFailure(Exception e) { + fail(); + } + }, latch)); + } + + private List generateIocMatches(int i) { + List iocMatches = new ArrayList<>(); + String monitorId = randomAlphaOfLength(10); + String monitorName = randomAlphaOfLength(10); + for (int i1 = 0; i1 < i; i1++) { + iocMatches.add(new IocMatch( + randomAlphaOfLength(10), + randomList(10, () -> randomAlphaOfLength(10)),//docids + randomList(10, () -> randomAlphaOfLength(10)), //feedids + monitorId, + monitorName, + randomAlphaOfLength(10), + "IP", + Instant.now(), + randomAlphaOfLength(10) + )); + } + return iocMatches; + } +} \ No newline at end of file