Skip to content

Commit

Permalink
Reduce verbosity of the bulk indexing audit log (#98470)
Browse files Browse the repository at this point in the history
This PR reduces the number of audit entries for bulk items
in bulk shard requests. Currently, each bulk item from every
bulk shard request generates exactly one audit log entry.
This change reduces that to one audit log per action type
(out of 4 possible) per bulk shard request.
  • Loading branch information
albertzaharovits committed Aug 30, 2023
1 parent 9beddb1 commit 293650c
Show file tree
Hide file tree
Showing 8 changed files with 499 additions and 94 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/98470.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 98470
summary: Reduce verbosity of the bulk indexing audit log
area: Audit
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public String index() {
return request.indices()[0];
}

BulkItemResponse getPrimaryResponse() {
// public for tests
public BulkItemResponse getPrimaryResponse() {
return primaryResponse;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void explicitIndexAccessEvent(
AuditLevel eventType,
Authentication authentication,
String action,
String indices,
String[] indices,
String requestName,
InetSocketAddress remoteAddress,
AuthorizationInfo authorizationInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void explicitIndexAccessEvent(
AuditLevel eventType,
Authentication authentication,
String action,
String indices,
String[] indices,
String requestName,
InetSocketAddress remoteAddress,
AuthorizationInfo authorizationInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -811,13 +811,12 @@ public void explicitIndexAccessEvent(
AuditLevel eventType,
Authentication authentication,
String action,
String index,
String[] indices,
String requestName,
InetSocketAddress remoteAddress,
AuthorizationInfo authorizationInfo
) {
assert eventType == ACCESS_DENIED || eventType == AuditLevel.ACCESS_GRANTED || eventType == SYSTEM_ACCESS_GRANTED;
final String[] indices = index == null ? null : new String[] { index };
final User user = authentication.getEffectiveSubject().getUser();
if (user instanceof InternalUser && eventType == ACCESS_GRANTED) {
eventType = SYSTEM_ACCESS_GRANTED;
Expand All @@ -830,7 +829,7 @@ public void explicitIndexAccessEvent(
// can be null for API keys created before version 7.7
Optional.ofNullable(ApiKeyService.getCreatorRealmName(authentication)),
Optional.of(authorizationInfo),
Optional.ofNullable(indices),
Optional.of(indices),
Optional.of(action)
)
) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,50 +750,29 @@ private void authorizeBulkItems(
final BulkShardRequest request = (BulkShardRequest) requestInfo.getRequest();
// Maps original-index -> expanded-index-name (expands date-math, but not aliases)
final Map<String, String> resolvedIndexNames = new HashMap<>();
// Maps action -> resolved indices set
final Map<String, Set<String>> actionToIndicesMap = new HashMap<>();
// Maps action -> resolved indices set (there are 4 action types total)
final Map<String, Set<String>> actionToIndicesMap = new HashMap<>(4);
final AuditTrail auditTrail = auditTrailService.get();

resolvedIndicesAsyncSupplier.getAsync(ActionListener.wrap(overallResolvedIndices -> {
final Set<String> localIndices = new HashSet<>(overallResolvedIndices.getLocal());
for (BulkItemRequest item : request.items()) {
final String itemAction = getAction(item);
String resolvedIndex = resolvedIndexNames.computeIfAbsent(item.index(), key -> {
final ResolvedIndices resolvedIndices = IndicesAndAliasesResolver.resolveIndicesAndAliasesWithoutWildcards(
itemAction,
item.request()
);
if (resolvedIndices.getRemote().size() != 0) {
throw illegalArgument(
"Bulk item should not write to remote indices, but request writes to "
+ String.join(",", resolvedIndices.getRemote())
);
}
if (resolvedIndices.getLocal().size() != 1) {
throw illegalArgument(
"Bulk item should write to exactly 1 index, but request writes to "
+ String.join(",", resolvedIndices.getLocal())
);
}
final String resolved = resolvedIndices.getLocal().get(0);
final String resolvedIndex = resolvedIndexNames.computeIfAbsent(item.index(), key -> {
final String resolved = resolveIndexNameDateMath(item);
if (localIndices.contains(resolved) == false) {
throw illegalArgument(
"Found bulk item that writes to index " + resolved + " but the request writes to " + localIndices
);
}
return resolved;
});

actionToIndicesMap.compute(itemAction, (key, resolvedIndicesSet) -> {
final Set<String> localSet = resolvedIndicesSet != null ? resolvedIndicesSet : new HashSet<>();
localSet.add(resolvedIndex);
return localSet;
});
actionToIndicesMap.compute(itemAction, (ignore, resolvedIndicesSet) -> addToOrCreateSet(resolvedIndicesSet, resolvedIndex));
}

final ActionListener<Collection<Tuple<String, IndexAuthorizationResult>>> bulkAuthzListener = ActionListener.wrap(
collection -> {
final Map<String, IndicesAccessControl> actionToIndicesAccessControl = new HashMap<>();
final Map<String, IndicesAccessControl> actionToIndicesAccessControl = new HashMap<>(4);
collection.forEach(tuple -> {
final IndicesAccessControl existing = actionToIndicesAccessControl.putIfAbsent(
tuple.v1(),
Expand All @@ -804,23 +783,20 @@ private void authorizeBulkItems(
}
});

final Map<String, Set<String>> actionToGrantedIndicesMap = new HashMap<>(4);
final Map<String, Set<String>> actionToDeniedIndicesMap = new HashMap<>(4);
for (BulkItemRequest item : request.items()) {
final String resolvedIndex = resolvedIndexNames.get(item.index());
final String itemAction = getAction(item);
final IndicesAccessControl indicesAccessControl = actionToIndicesAccessControl.get(itemAction);
final IndicesAccessControl.IndexAccessControl indexAccessControl = indicesAccessControl.getIndexPermissions(
resolvedIndex
);
if (indexAccessControl == null) {
auditTrail.explicitIndexAccessEvent(
requestId,
AuditLevel.ACCESS_DENIED,
authentication,
if (actionToIndicesAccessControl.get(itemAction).hasIndexPermissions(resolvedIndex)) {
actionToGrantedIndicesMap.compute(
itemAction,
resolvedIndex,
item.getClass().getSimpleName(),
request.remoteAddress(),
authzInfo
(ignore, resolvedIndicesSet) -> addToOrCreateSet(resolvedIndicesSet, resolvedIndex)
);
} else {
actionToDeniedIndicesMap.compute(
itemAction,
(ignore, resolvedIndicesSet) -> addToOrCreateSet(resolvedIndicesSet, resolvedIndex)
);
item.abort(
resolvedIndex,
Expand All @@ -833,19 +809,32 @@ private void authorizeBulkItems(
null
)
);
} else {
auditTrail.explicitIndexAccessEvent(
requestId,
AuditLevel.ACCESS_GRANTED,
authentication,
itemAction,
resolvedIndex,
item.getClass().getSimpleName(),
request.remoteAddress(),
authzInfo
);
}
}
actionToDeniedIndicesMap.forEach((action, resolvedIndicesSet) -> {
auditTrail.explicitIndexAccessEvent(
requestId,
AuditLevel.ACCESS_DENIED,
authentication,
action,
resolvedIndicesSet.toArray(new String[0]),
BulkItemRequest.class.getSimpleName(),
request.remoteAddress(),
authzInfo
);
});
actionToGrantedIndicesMap.forEach((action, resolvedIndicesSet) -> {
auditTrail.explicitIndexAccessEvent(
requestId,
AuditLevel.ACCESS_GRANTED,
authentication,
action,
resolvedIndicesSet.toArray(new String[0]),
BulkItemRequest.class.getSimpleName(),
request.remoteAddress(),
authzInfo
);
});
listener.onResponse(null);
},
listener::onFailure
Expand Down Expand Up @@ -876,6 +865,30 @@ private void authorizeBulkItems(
}, listener::onFailure));
}

private static Set<String> addToOrCreateSet(Set<String> set, String item) {
final Set<String> localSet = set != null ? set : new HashSet<>(4);
localSet.add(item);
return localSet;
}

private static String resolveIndexNameDateMath(BulkItemRequest bulkItemRequest) {
final ResolvedIndices resolvedIndices = IndicesAndAliasesResolver.resolveIndicesAndAliasesWithoutWildcards(
getAction(bulkItemRequest),
bulkItemRequest.request()
);
if (resolvedIndices.getRemote().size() != 0) {
throw illegalArgument(
"Bulk item should not write to remote indices, but request writes to " + String.join(",", resolvedIndices.getRemote())
);
}
if (resolvedIndices.getLocal().size() != 1) {
throw illegalArgument(
"Bulk item should write to exactly 1 index, but request writes to " + String.join(",", resolvedIndices.getLocal())
);
}
return resolvedIndices.getLocal().get(0);
}

private static IllegalArgumentException illegalArgument(String message) {
assert false : message;
return new IllegalArgumentException(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2027,7 +2027,7 @@ public void testSystemAccessGranted() throws Exception {
randomFrom(AuditLevel.ACCESS_GRANTED, AuditLevel.SYSTEM_ACCESS_GRANTED),
authentication,
"_action",
randomFrom(randomAlphaOfLengthBetween(1, 4), null),
new String[] { randomAlphaOfLengthBetween(1, 4) },
BulkItemRequest.class.getName(),
request.remoteAddress(),
authorizationInfo
Expand Down Expand Up @@ -2059,13 +2059,13 @@ public void testSystemAccessGranted() throws Exception {
assertMsg(logger, checkedFields, checkedArrayFields);
clearLog();

String index = randomFrom(randomAlphaOfLengthBetween(1, 4), null);
String[] indices = randomArray(0, 4, String[]::new, () -> randomBoolean() ? null : randomAlphaOfLengthBetween(1, 4));
auditTrail.explicitIndexAccessEvent(
requestId,
randomFrom(AuditLevel.ACCESS_GRANTED, AuditLevel.SYSTEM_ACCESS_GRANTED),
authentication,
"_action",
index,
indices,
BulkItemRequest.class.getName(),
request.remoteAddress(),
authorizationInfo
Expand All @@ -2084,9 +2084,7 @@ public void testSystemAccessGranted() throws Exception {
opaqueId(threadContext, checkedFields);
traceId(threadContext, checkedFields);
forwardedFor(threadContext, checkedFields);
if (index != null) {
checkedArrayFields.put(LoggingAuditTrail.INDICES_FIELD_NAME, new String[] { index });
}
checkedArrayFields.put(LoggingAuditTrail.INDICES_FIELD_NAME, indices);
assertMsg(logger, checkedFields, checkedArrayFields);
}

Expand Down

0 comments on commit 293650c

Please sign in to comment.