Skip to content

Commit

Permalink
ioc match service
Browse files Browse the repository at this point in the history
  • Loading branch information
eirsep committed May 31, 2024
1 parent d956c12 commit f067e6e
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,7 @@ public void getAlertsByMonitorIds(
) {

org.opensearch.commons.alerting.action.GetAlertsRequest req =
new org.opensearch.commons.alerting.action.GetAlertsRequest(
table,
severityLevel,
alertState,
null,
alertIndex,
monitorIds,
null,
null
);
null;

AlertingPluginInterface.INSTANCE.getAlerts((NodeClient) client, req, new ActionListener<>() {
@Override
Expand Down Expand Up @@ -247,15 +238,7 @@ public void getAlerts(List<String> alertIds,
Detector detector,
Table table,
ActionListener<org.opensearch.commons.alerting.action.GetAlertsResponse> actionListener) {
GetAlertsRequest request = new GetAlertsRequest(
table,
"ALL",
"ALL",
null,
DetectorMonitorConfig.getAllAlertsIndicesPattern(detector.getDetectorType()),
null,
null,
alertIds);
GetAlertsRequest request = null;
AlertingPluginInterface.INSTANCE.getAlerts(
(NodeClient) client,
request, actionListener);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package org.opensearch.securityanalytics.threatIntel.iocscan.dao;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.securityanalytics.SecurityAnalyticsPlugin;
import org.opensearch.securityanalytics.model.threatintel.IocMatch;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.threadpool.ThreadPool;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
* Data layer to perform CRUD operations for threat intel ioc match : store in system index.
*/
public class IocMatchService {
//TODO manage index rollover
public static final String INDEX_NAME = ".opensearch-sap-iocmatch";
private static final Logger log = LogManager.getLogger(IocMatchService.class);
private final Client client;
private final ClusterService clusterService;

public IocMatchService(final Client client, final ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
}

public void indexIocMatches(List<IocMatch> iocMatches,
final ActionListener<Void> actionListener) {
try {
Integer batchSize = this.clusterService.getClusterSettings().get(SecurityAnalyticsSettings.BATCH_SIZE);
createIndexIfNotExists(ActionListener.wrap(
r -> {
List<BulkRequest> bulkRequestList = new ArrayList<>();
BulkRequest bulkRequest = new BulkRequest(INDEX_NAME);
for (int i = 0; i < iocMatches.size(); i++) {
IocMatch iocMatch = iocMatches.get(i);
try {
IndexRequest indexRequest = new IndexRequest(INDEX_NAME)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(iocMatch.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.opType(DocWriteRequest.OpType.CREATE);
bulkRequest.add(indexRequest);
if (
bulkRequest.requests().size() == batchSize
&& i != iocMatches.size() - 1 // final bulk request will be added outside for loop with refresh policy none
) {
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
bulkRequestList.add(bulkRequest);
bulkRequest = new BulkRequest();
}
} catch (IOException e) {
log.error(String.format("Failed to create index request for ioc match %s moving on to next"), e);
}
}
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
bulkRequestList.add(bulkRequest);
GroupedActionListener<BulkResponse> groupedListener = new GroupedActionListener<>(ActionListener.wrap(bulkResponses -> {
int idx = 0;
for (BulkResponse response : bulkResponses) {
BulkRequest request = bulkRequestList.get(idx);
if (response.hasFailures()) {
log.error("Failed to bulk index {} Ioc Matches. Failure: {}", request.batchSize(), response.buildFailureMessage());
}
}
actionListener.onResponse(null);
}, actionListener::onFailure), bulkRequestList.size());
for (BulkRequest req : bulkRequestList) {
try {
StashedThreadContext.run(client, () -> client.bulk(req, groupedListener));
} catch (Exception e) {
log.error("Failed to save ioc matches.", e);
}
}
}, e -> {
log.error("Failed to create System Index");
actionListener.onFailure(e);
}));


} catch (Exception e) {
log.error("Exception saving the threat intel source config in index", e);
actionListener.onFailure(e);
}
}

private String getIndexMapping() {
try {
try (InputStream is = IocMatchService.class.getResourceAsStream("/mappings/ioc_match_mapping.json")) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
return reader.lines().map(String::trim).collect(Collectors.joining());
}
}
} catch (IOException e) {
log.error("Failed to get the threat intel ioc match index mapping", e);
throw new SecurityAnalyticsException("Failed to get the threat intel ioc match index mapping", RestStatus.INTERNAL_SERVER_ERROR, e);
}
}

/**
* Index name: .opensearch-sap-iocmatch
* Mapping: /mappings/ioc_match_mapping.json
*
* @param listener setup listener
*/
public void createIndexIfNotExists(final ActionListener<Void> listener) {
// check if job index exists
try {
if (clusterService.state().metadata().hasIndex(INDEX_NAME) == true) {
listener.onResponse(null);
return;
}
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX_NAME).mapping(getIndexMapping())
.settings(SecurityAnalyticsPlugin.TIF_JOB_INDEX_SETTING);
StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, ActionListener.wrap(
r -> {
log.debug("Ioc match index created");
listener.onResponse(null);
}, e -> {
if (e instanceof ResourceAlreadyExistsException) {
log.debug("index {} already exist", INDEX_NAME);
listener.onResponse(null);
return;
}
log.error("Failed to create security analytics threat intel job index", e);
listener.onFailure(e);
}
)));
} catch (Exception e) {
log.error("Failure in creating ioc_match index", e);
listener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import java.util.List;

public class IocScanContext<Data> {
IocScanMonitor iocScanMonitor;
IocScanMonitor monitor;
boolean dryRun;
List<Data> data;

public IocScanMonitor getIocScanMonitor() {
return iocScanMonitor;
public IocScanMonitor getMonitor() {
return monitor;
}

public boolean isDryRun() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.commons.alerting.model.Finding;
import org.opensearch.securityanalytics.model.threatintel.IocMatch;
import org.opensearch.securityanalytics.threatIntel.iocscan.dto.IocScanContext;
Expand Down Expand Up @@ -29,15 +30,22 @@ public void scanIoCs(IocScanContext<Data> iocScanContext,
BiConsumer<Object, Exception> scanCallback
) {
List<Data> data = iocScanContext.getData();
IocScanMonitor iocScanMonitor = iocScanContext.getIocScanMonitor();
IocScanMonitor iocScanMonitor = iocScanContext.getMonitor();

long start = System.currentTimeMillis();
// log.debug("beginning to scan IoC's")
IocLookupDtos iocLookupDtos = extractIocPerTypeSet(data, iocScanMonitor.getIocTypeToIndexFieldMappings());
BiConsumer<List<Ioc>, Exception> iocScanResultConsumer = (List<Ioc> maliciousIocs, Exception e) -> {
if (e == null) {
createIoCMatches(maliciousIocs, iocLookupDtos.iocValueToDocIdMap, iocScanContext);
createFindings(maliciousIocs, iocLookupDtos.docIdToIocsMap, iocScanMonitor);
createIoCMatches(maliciousIocs, iocLookupDtos.iocValueToDocIdMap, iocScanContext,
new BiConsumer<List<Ioc>, Exception>() {
@Override
public void accept(List<Ioc> iocs, Exception e) {
createFindings(maliciousIocs, iocLookupDtos.docIdToIocsMap, iocScanMonitor);
}
}
);

} else {
// onIocMatchFailure(e, iocScanMonitor);

Expand Down Expand Up @@ -93,10 +101,10 @@ private IocLookupDtos extractIocPerTypeSet(List<Data> data, List<PerIocTypeField

public abstract String getId(Data datum);

public void createIoCMatches(List<Ioc> iocs, Map<String, Set<String>> iocValueToDocIdMap, IocScanContext iocScanContext) {
public void createIoCMatches(List<Ioc> iocs, Map<String, Set<String>> iocValueToDocIdMap, IocScanContext iocScanContext, BiConsumer<List<Ioc>, Exception> callback) {
try {
Instant timestamp = Instant.now();
IocScanMonitor iocScanMonitor = iocScanContext.getIocScanMonitor();
IocScanMonitor iocScanMonitor = iocScanContext.getMonitor();
// Map to collect unique IocValue with their respective FeedIds
Map<String, Set<String>> iocValueToFeedIds = new HashMap<>();

Expand All @@ -115,27 +123,32 @@ public void createIoCMatches(List<Ioc> iocs, Map<String, Set<String>> iocValueTo

List<String> relatedDocIds = new ArrayList<>(iocValueToDocIdMap.getOrDefault(iocValue, new HashSet<>()));
List<String> feedIdsList = new ArrayList<>(feedIds);

IocMatch iocMatch = new IocMatch(
UUID.randomUUID().toString(), // Generating a unique ID
relatedDocIds,
feedIdsList,
iocScanMonitor.getId(),
iocScanMonitor.getName(),
iocValue,
iocs.stream().filter(i -> i.getIocValue().equals(iocValue)).findFirst().orElseThrow().getIocType(),
timestamp,
UUID.randomUUID().toString() // TODO execution ID
);

iocMatches.add(iocMatch);
try {
IocMatch iocMatch = new IocMatch(
UUID.randomUUID().toString(), // Generating a unique ID
relatedDocIds,
feedIdsList,
iocScanMonitor.getId(),
iocScanMonitor.getName(),
iocValue,
iocs.stream().filter(i -> i.getIocValue().equals(iocValue)).findFirst().orElseThrow().getIocType(),
timestamp,
UUID.randomUUID().toString() // TODO execution ID
);
iocMatches.add(iocMatch);
} catch (Exception e) {
log.error(String.format("skipping creating ioc match for %s due to unexpected failure.", entry.getKey()), e);
}
}
//TODO save iocs
saveIocs(iocs, callback);
} catch (Exception e) {

log.error(() -> new ParameterizedMessage("Failed to create ioc matches due to unexpected error {}", iocScanContext.getMonitor().getId()), e);
callback.accept(null, e);
}
}

abstract void saveIocs(List<Ioc> iocs, BiConsumer<List<Ioc>, Exception> callback);

public List<Finding> createFindings(List<Ioc> iocs, Map<String, Set<String>> docIdToIocsMap, IocScanMonitor iocScanMonitor) {
List<Finding> findings = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@

public interface IoCScanServiceInterface<Data> {

void createIoCFindings(List<Finding> findings);

void createIoCMatch(List<IocMatch> iocMatches);

void scanIoCs(
IocScanContext<Data> iocScanContext,
BiConsumer<Object, Exception> scanCallback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ private IndexMonitorRequest createDocLevelMonitorRequest(List<Pair<String, Rule>
}

Monitor monitor = new Monitor(monitorId, Monitor.NO_VERSION, detector.getName(), false, detector.getSchedule(), detector.getLastUpdateTime(), null,
Monitor.MonitorType.DOC_LEVEL_MONITOR, detector.getUser(), 1, docLevelMonitorInputs, triggers, Map.of(),
Monitor.MonitorType.DOC_LEVEL_MONITOR.getValue(), detector.getUser(), 1, docLevelMonitorInputs, triggers, Map.of(),
new DataSources(detector.getRuleIndex(),
detector.getFindingsIndex(),
detector.getFindingsIndexPattern(),
Expand Down Expand Up @@ -886,7 +886,7 @@ private IndexMonitorRequest createDocLevelMonitorMatchAllRequest(
}

Monitor monitor = new Monitor(monitorId, Monitor.NO_VERSION, monitorName, false, detector.getSchedule(), detector.getLastUpdateTime(), null,
Monitor.MonitorType.DOC_LEVEL_MONITOR, detector.getUser(), 1, docLevelMonitorInputs, triggers, Map.of(),
Monitor.MonitorType.DOC_LEVEL_MONITOR.getValue(), detector.getUser(), 1, docLevelMonitorInputs, triggers, Map.of(),
new DataSources(detector.getRuleIndex(),
detector.getFindingsIndex(),
detector.getFindingsIndexPattern(),
Expand Down Expand Up @@ -1060,7 +1060,7 @@ public void onResponse(GetIndexMappingsResponse getIndexMappingsResponse) {
} **/

Monitor monitor = new Monitor(monitorId, Monitor.NO_VERSION, detector.getName(), false, detector.getSchedule(), detector.getLastUpdateTime(), null,
MonitorType.BUCKET_LEVEL_MONITOR, detector.getUser(), 1, bucketLevelMonitorInputs, triggers, Map.of(),
MonitorType.BUCKET_LEVEL_MONITOR.getValue(), detector.getUser(), 1, bucketLevelMonitorInputs, triggers, Map.of(),
new DataSources(detector.getRuleIndex(),
detector.getFindingsIndex(),
detector.getFindingsIndexPattern(),
Expand Down Expand Up @@ -1782,7 +1782,7 @@ private Map<String, String> mapMonitorIds(List<IndexMonitorResponse> monitorResp
Collectors.toMap(
// In the case of bucket level monitors rule id is trigger id
it -> {
if (MonitorType.BUCKET_LEVEL_MONITOR == it.getMonitor().getMonitorType()) {
if (MonitorType.BUCKET_LEVEL_MONITOR.getValue() == it.getMonitor().getMonitorType()) {
return it.getMonitor().getTriggers().get(0).getId();
} else {
if (it.getMonitor().getName().contains("_chained_findings")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public static List<String> getBucketLevelMonitorIds(
) {
return monitorResponses.stream().filter(
// In the case of bucket level monitors rule id is trigger id
it -> Monitor.MonitorType.BUCKET_LEVEL_MONITOR == it.getMonitor().getMonitorType()
it -> Monitor.MonitorType.BUCKET_LEVEL_MONITOR.getValue() == it.getMonitor().getMonitorType()
).map(IndexMonitorResponse::getId).collect(Collectors.toList());
}
public static List<String> getAggRuleIdsConfiguredToTrigger(Detector detector, List<Pair<String, Rule>> rulesById) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void testGetAlerts_success() {
new CronSchedule("31 * * * *", ZoneId.of("Asia/Kolkata"), Instant.ofEpochSecond(1538164858L)),
Instant.now(),
Instant.now(),
Monitor.MonitorType.DOC_LEVEL_MONITOR,
Monitor.MonitorType.DOC_LEVEL_MONITOR.getValue(),
null,
1,
List.of(),
Expand Down Expand Up @@ -122,7 +122,7 @@ public void testGetAlerts_success() {
new CronSchedule("31 * * * *", ZoneId.of("Asia/Kolkata"), Instant.ofEpochSecond(1538164858L)),
Instant.now(),
Instant.now(),
Monitor.MonitorType.DOC_LEVEL_MONITOR,
Monitor.MonitorType.DOC_LEVEL_MONITOR.getValue(),
null,
1,
List.of(),
Expand Down
Loading

0 comments on commit f067e6e

Please sign in to comment.