Skip to content

Commit

Permalink
ioc match service
Browse files Browse the repository at this point in the history
  • Loading branch information
eirsep committed Jun 1, 2024
1 parent d956c12 commit e4c3cbd
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,7 @@ public void getAlertsByMonitorIds(
) {

org.opensearch.commons.alerting.action.GetAlertsRequest req =
new org.opensearch.commons.alerting.action.GetAlertsRequest(
table,
severityLevel,
alertState,
null,
alertIndex,
monitorIds,
null,
null
);
null;

AlertingPluginInterface.INSTANCE.getAlerts((NodeClient) client, req, new ActionListener<>() {
@Override
Expand Down Expand Up @@ -247,15 +238,7 @@ public void getAlerts(List<String> alertIds,
Detector detector,
Table table,
ActionListener<org.opensearch.commons.alerting.action.GetAlertsResponse> actionListener) {
GetAlertsRequest request = new GetAlertsRequest(
table,
"ALL",
"ALL",
null,
DetectorMonitorConfig.getAllAlertsIndicesPattern(detector.getDetectorType()),
null,
null,
alertIds);
GetAlertsRequest request = null;
AlertingPluginInterface.INSTANCE.getAlerts(
(NodeClient) client,
request, actionListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -69,8 +70,11 @@ public void ensureLogTypesLoaded() {
private List<LogType> loadBuiltinLogTypes() throws URISyntaxException, IOException {
List<LogType> logTypes = new ArrayList<>();

final String url = Objects.requireNonNull(BuiltinLogTypeLoader.class.getClassLoader().getResource(BASE_PATH)).toURI().toString();
String pathurl = Paths.get(BuiltinLogTypeLoader.class.getClassLoader().getResource(BASE_PATH).toURI()).toString();

final String url = Objects.requireNonNull(BuiltinLogTypeLoader.class.getClassLoader().getResource(BASE_PATH)).toURI().toString();
logger.error("SASHANK Path url is {}", pathurl);
logger.error("SASHANK currently used url is {}", url);
Path dirPath = null;
if (url.contains("!")) {
final String[] paths = url.split("!");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package org.opensearch.securityanalytics.threatIntel.iocscan.dao;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.securityanalytics.SecurityAnalyticsPlugin;
import org.opensearch.securityanalytics.model.threatintel.IocMatch;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.threadpool.ThreadPool;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
* Data layer to perform CRUD operations for threat intel ioc match : store in system index.
*/
public class IocMatchService {
//TODO manage index rollover
public static final String INDEX_NAME = ".opensearch-sap-iocmatch";
private static final Logger log = LogManager.getLogger(IocMatchService.class);
private final Client client;
private final ClusterService clusterService;

public IocMatchService(final Client client, final ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
}

public void indexIocMatches(List<IocMatch> iocMatches,
final ActionListener<Void> actionListener) {
try {
Integer batchSize = this.clusterService.getClusterSettings().get(SecurityAnalyticsSettings.BATCH_SIZE);
createIndexIfNotExists(ActionListener.wrap(
r -> {
List<BulkRequest> bulkRequestList = new ArrayList<>();
BulkRequest bulkRequest = new BulkRequest(INDEX_NAME);
for (int i = 0; i < iocMatches.size(); i++) {
IocMatch iocMatch = iocMatches.get(i);
try {
IndexRequest indexRequest = new IndexRequest(INDEX_NAME)
.source(iocMatch.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.opType(DocWriteRequest.OpType.CREATE);
bulkRequest.add(indexRequest);
if (
bulkRequest.requests().size() == batchSize
&& i != iocMatches.size() - 1 // final bulk request will be added outside for loop with refresh policy none
) {
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
bulkRequestList.add(bulkRequest);
bulkRequest = new BulkRequest();
}
} catch (IOException e) {
log.error(String.format("Failed to create index request for ioc match %s moving on to next"), e);
}
}
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
bulkRequestList.add(bulkRequest);
GroupedActionListener<BulkResponse> groupedListener = new GroupedActionListener<>(ActionListener.wrap(bulkResponses -> {
int idx = 0;
for (BulkResponse response : bulkResponses) {
BulkRequest request = bulkRequestList.get(idx);
if (response.hasFailures()) {
log.error("Failed to bulk index {} Ioc Matches. Failure: {}", request.batchSize(), response.buildFailureMessage());
}
}
actionListener.onResponse(null);
}, actionListener::onFailure), bulkRequestList.size());
for (BulkRequest req : bulkRequestList) {
try {
client.bulk(req, groupedListener); //todo why stash context here?
} catch (Exception e) {
log.error("Failed to save ioc matches.", e);
}
}
}, e -> {
log.error("Failed to create System Index");
actionListener.onFailure(e);
}));


} catch (Exception e) {
log.error("Exception saving the threat intel source config in index", e);
actionListener.onFailure(e);
}
}

private String getIndexMapping() {
try {
try (InputStream is = IocMatchService.class.getResourceAsStream("/mappings/ioc_match_mapping.json")) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
return reader.lines().map(String::trim).collect(Collectors.joining());
}
}
} catch (IOException e) {
log.error("Failed to get the threat intel ioc match index mapping", e);
throw new SecurityAnalyticsException("Failed to get the threat intel ioc match index mapping", RestStatus.INTERNAL_SERVER_ERROR, e);
}
}

/**
* Index name: .opensearch-sap-iocmatch
* Mapping: /mappings/ioc_match_mapping.json
*
* @param listener setup listener
*/
public void createIndexIfNotExists(final ActionListener<Void> listener) {
// check if job index exists
try {
if (clusterService.state().metadata().hasIndex(INDEX_NAME) == true) {
listener.onResponse(null);
return;
}
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX_NAME).mapping(getIndexMapping())
.settings(SecurityAnalyticsPlugin.TIF_JOB_INDEX_SETTING);
StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, ActionListener.wrap(
r -> {
log.debug("Ioc match index created");
listener.onResponse(null);
}, e -> {
if (e instanceof ResourceAlreadyExistsException) {
log.debug("index {} already exist", INDEX_NAME);
listener.onResponse(null);
return;
}
log.error("Failed to create security analytics threat intel job index", e);
listener.onFailure(e);
}
)));
} catch (Exception e) {
log.error("Failure in creating ioc_match index", e);
listener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import java.util.List;

public class IocScanContext<Data> {
IocScanMonitor iocScanMonitor;
IocScanMonitor monitor;
boolean dryRun;
List<Data> data;

public IocScanMonitor getIocScanMonitor() {
return iocScanMonitor;
public IocScanMonitor getMonitor() {
return monitor;
}

public boolean isDryRun() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.commons.alerting.model.Finding;
import org.opensearch.securityanalytics.model.threatintel.IocMatch;
import org.opensearch.securityanalytics.threatIntel.iocscan.dto.IocScanContext;
Expand Down Expand Up @@ -29,15 +30,22 @@ public void scanIoCs(IocScanContext<Data> iocScanContext,
BiConsumer<Object, Exception> scanCallback
) {
List<Data> data = iocScanContext.getData();
IocScanMonitor iocScanMonitor = iocScanContext.getIocScanMonitor();
IocScanMonitor iocScanMonitor = iocScanContext.getMonitor();

long start = System.currentTimeMillis();
// log.debug("beginning to scan IoC's")
IocLookupDtos iocLookupDtos = extractIocPerTypeSet(data, iocScanMonitor.getIocTypeToIndexFieldMappings());
BiConsumer<List<Ioc>, Exception> iocScanResultConsumer = (List<Ioc> maliciousIocs, Exception e) -> {
if (e == null) {
createIoCMatches(maliciousIocs, iocLookupDtos.iocValueToDocIdMap, iocScanContext);
createFindings(maliciousIocs, iocLookupDtos.docIdToIocsMap, iocScanMonitor);
createIoCMatches(maliciousIocs, iocLookupDtos.iocValueToDocIdMap, iocScanContext,
new BiConsumer<List<Ioc>, Exception>() {
@Override
public void accept(List<Ioc> iocs, Exception e) {
createFindings(maliciousIocs, iocLookupDtos.docIdToIocsMap, iocScanMonitor);
}
}
);

} else {
// onIocMatchFailure(e, iocScanMonitor);

Expand Down Expand Up @@ -93,10 +101,10 @@ private IocLookupDtos extractIocPerTypeSet(List<Data> data, List<PerIocTypeField

public abstract String getId(Data datum);

public void createIoCMatches(List<Ioc> iocs, Map<String, Set<String>> iocValueToDocIdMap, IocScanContext iocScanContext) {
public void createIoCMatches(List<Ioc> iocs, Map<String, Set<String>> iocValueToDocIdMap, IocScanContext iocScanContext, BiConsumer<List<Ioc>, Exception> callback) {
try {
Instant timestamp = Instant.now();
IocScanMonitor iocScanMonitor = iocScanContext.getIocScanMonitor();
IocScanMonitor iocScanMonitor = iocScanContext.getMonitor();
// Map to collect unique IocValue with their respective FeedIds
Map<String, Set<String>> iocValueToFeedIds = new HashMap<>();

Expand All @@ -115,27 +123,32 @@ public void createIoCMatches(List<Ioc> iocs, Map<String, Set<String>> iocValueTo

List<String> relatedDocIds = new ArrayList<>(iocValueToDocIdMap.getOrDefault(iocValue, new HashSet<>()));
List<String> feedIdsList = new ArrayList<>(feedIds);

IocMatch iocMatch = new IocMatch(
UUID.randomUUID().toString(), // Generating a unique ID
relatedDocIds,
feedIdsList,
iocScanMonitor.getId(),
iocScanMonitor.getName(),
iocValue,
iocs.stream().filter(i -> i.getIocValue().equals(iocValue)).findFirst().orElseThrow().getIocType(),
timestamp,
UUID.randomUUID().toString() // TODO execution ID
);

iocMatches.add(iocMatch);
try {
IocMatch iocMatch = new IocMatch(
UUID.randomUUID().toString(), // Generating a unique ID
relatedDocIds,
feedIdsList,
iocScanMonitor.getId(),
iocScanMonitor.getName(),
iocValue,
iocs.stream().filter(i -> i.getIocValue().equals(iocValue)).findFirst().orElseThrow().getIocType(),
timestamp,
UUID.randomUUID().toString() // TODO execution ID
);
iocMatches.add(iocMatch);
} catch (Exception e) {
log.error(String.format("skipping creating ioc match for %s due to unexpected failure.", entry.getKey()), e);
}
}
//TODO save iocs
saveIocs(iocs, callback);
} catch (Exception e) {

log.error(() -> new ParameterizedMessage("Failed to create ioc matches due to unexpected error {}", iocScanContext.getMonitor().getId()), e);
callback.accept(null, e);
}
}

abstract void saveIocs(List<Ioc> iocs, BiConsumer<List<Ioc>, Exception> callback);

public List<Finding> createFindings(List<Ioc> iocs, Map<String, Set<String>> docIdToIocsMap, IocScanMonitor iocScanMonitor) {
List<Finding> findings = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@

public interface IoCScanServiceInterface<Data> {

void createIoCFindings(List<Finding> findings);

void createIoCMatch(List<IocMatch> iocMatches);

void scanIoCs(
IocScanContext<Data> iocScanContext,
BiConsumer<Object, Exception> scanCallback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ private IndexMonitorRequest createDocLevelMonitorRequest(List<Pair<String, Rule>
}

Monitor monitor = new Monitor(monitorId, Monitor.NO_VERSION, detector.getName(), false, detector.getSchedule(), detector.getLastUpdateTime(), null,
Monitor.MonitorType.DOC_LEVEL_MONITOR, detector.getUser(), 1, docLevelMonitorInputs, triggers, Map.of(),
Monitor.MonitorType.DOC_LEVEL_MONITOR.getValue(), detector.getUser(), 1, docLevelMonitorInputs, triggers, Map.of(),
new DataSources(detector.getRuleIndex(),
detector.getFindingsIndex(),
detector.getFindingsIndexPattern(),
Expand Down Expand Up @@ -886,7 +886,7 @@ private IndexMonitorRequest createDocLevelMonitorMatchAllRequest(
}

Monitor monitor = new Monitor(monitorId, Monitor.NO_VERSION, monitorName, false, detector.getSchedule(), detector.getLastUpdateTime(), null,
Monitor.MonitorType.DOC_LEVEL_MONITOR, detector.getUser(), 1, docLevelMonitorInputs, triggers, Map.of(),
Monitor.MonitorType.DOC_LEVEL_MONITOR.getValue(), detector.getUser(), 1, docLevelMonitorInputs, triggers, Map.of(),
new DataSources(detector.getRuleIndex(),
detector.getFindingsIndex(),
detector.getFindingsIndexPattern(),
Expand Down Expand Up @@ -1060,7 +1060,7 @@ public void onResponse(GetIndexMappingsResponse getIndexMappingsResponse) {
} **/

Monitor monitor = new Monitor(monitorId, Monitor.NO_VERSION, detector.getName(), false, detector.getSchedule(), detector.getLastUpdateTime(), null,
MonitorType.BUCKET_LEVEL_MONITOR, detector.getUser(), 1, bucketLevelMonitorInputs, triggers, Map.of(),
MonitorType.BUCKET_LEVEL_MONITOR.getValue(), detector.getUser(), 1, bucketLevelMonitorInputs, triggers, Map.of(),
new DataSources(detector.getRuleIndex(),
detector.getFindingsIndex(),
detector.getFindingsIndexPattern(),
Expand Down Expand Up @@ -1782,7 +1782,7 @@ private Map<String, String> mapMonitorIds(List<IndexMonitorResponse> monitorResp
Collectors.toMap(
// In the case of bucket level monitors rule id is trigger id
it -> {
if (MonitorType.BUCKET_LEVEL_MONITOR == it.getMonitor().getMonitorType()) {
if (MonitorType.BUCKET_LEVEL_MONITOR.getValue() == it.getMonitor().getMonitorType()) {
return it.getMonitor().getTriggers().get(0).getId();
} else {
if (it.getMonitor().getName().contains("_chained_findings")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public static List<String> getBucketLevelMonitorIds(
) {
return monitorResponses.stream().filter(
// In the case of bucket level monitors rule id is trigger id
it -> Monitor.MonitorType.BUCKET_LEVEL_MONITOR == it.getMonitor().getMonitorType()
it -> Monitor.MonitorType.BUCKET_LEVEL_MONITOR.getValue() == it.getMonitor().getMonitorType()
).map(IndexMonitorResponse::getId).collect(Collectors.toList());
}
public static List<String> getAggRuleIdsConfiguredToTrigger(Detector detector, List<Pair<String, Rule>> rulesById) {
Expand Down
Loading

0 comments on commit e4c3cbd

Please sign in to comment.