Skip to content

Commit

Permalink
[backend] update indexed files if entity restrictions change
Browse files Browse the repository at this point in the history
  • Loading branch information
SouadHadjiat committed Oct 6, 2023
1 parent 2e867d8 commit 6a47ef0
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 27 deletions.
51 changes: 37 additions & 14 deletions opencti-platform/opencti-graphql/src/database/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -1996,13 +1996,12 @@ const buildIndexFileBody = (documentId, file, entity = null) => {
};
if (entity) {
documentBody.entity_id = entity.internal_id;
// index entity markings
// index entity markings & organization restrictions
documentBody[buildRefRelationKey(RELATION_OBJECT_MARKING)] = entity[RELATION_OBJECT_MARKING] ?? [];
// index entity organization restrictions
documentBody[buildRefRelationKey(RELATION_GRANTED_TO)] = entity[RELATION_GRANTED_TO] ?? [];
// index entity authorized_members & authorized_authorities
documentBody.authorized_members = entity.authorized_members ?? [];
documentBody.authorized_authorities = entity.authorized_authorities ?? [];
// index entity authorized_members & authorized_authorities => not yet
// documentBody.authorized_members = entity.authorized_members ?? [];
// documentBody.authorized_authorities = entity.authorized_authorities ?? [];
}
return documentBody;
};
Expand All @@ -2027,10 +2026,7 @@ export const elBulkIndexFiles = async (context, user, files, maxBulkOperations =
retry_on_conflict: ES_RETRY_ON_CONFLICT
}
};
let entity = null;
if (entity_id) {
entity = entitiesMap[entity_id];
}
const entity = entity_id ? entitiesMap[entity_id] : null;
const fileObject = {
id: file_id,
content: file_data,
Expand All @@ -2052,6 +2048,33 @@ export const elBulkIndexFiles = async (context, user, files, maxBulkOperations =
await BluePromise.map(groupsOfOperations, concurrentUpdate, { concurrency: ES_MAX_CONCURRENCY });
};

export const elUpdateFilesWithEntityRestrictions = async (entity) => {
if (!entity) {
return null;
}
const changes = {
[buildRefRelationKey(RELATION_OBJECT_MARKING)]: entity[RELATION_OBJECT_MARKING] ?? [],
[buildRefRelationKey(RELATION_GRANTED_TO)]: entity[RELATION_GRANTED_TO] ?? [],
};
logApp.info('elUpdateFilesWithEntityRestrictions', { changes, entity });
const source = 'for (change in params.changes.entrySet()) { ctx._source[change.getKey()] = change.getValue() }';
return elRawUpdateByQuery({
index: READ_INDEX_FILES,
refresh: true,
conflicts: 'proceed',
body: {
script: { source, params: { changes } },
query: {
term: {
'entity_id.keyword': entity.internal_id
}
},
},
}).catch((err) => {
throw DatabaseError('[SEARCH] Error updating elastic (files entity restrictions)', { error: err, entityId: entity.internal_id });
});
};

const buildFilesSearchResult = (data, first, searchAfter, connectionFormat = true) => {
const convertedHits = data.hits.hits.map((hit) => {
const elementData = hit._source;
Expand All @@ -2078,7 +2101,7 @@ const buildFilesSearchResult = (data, first, searchAfter, connectionFormat = tru
};
export const elSearchFiles = async (context, user, options = {}) => {
const { first = 20, after, orderBy = null } = options; // pagination options // TODO orderMode = 'asc'
const { search = null, fileIds = [] } = options; // search options
const { search = null, fileIds = [], entityIds = [] } = options; // search options
const { fields = [], excludeFields = ['attachment.content'], connectionFormat = true, highlight = true } = options; // result options
const searchAfter = after ? cursorToOffset(after) : undefined;
const { includeAuthorities = false } = options;
Expand All @@ -2096,10 +2119,10 @@ export const elSearchFiles = async (context, user, options = {}) => {
must.push(fullTextSearch);
}
if (fileIds?.length > 0) {
const fileIdsSearch = {
terms: { 'file_id.keyword': fileIds }
};
must.push(fileIdsSearch);
must.push({ terms: { 'file_id.keyword': fileIds } });
}
if (entityIds?.length > 0) {
must.push({ terms: { 'entity_id.keyword': entityIds } });
}
if (!orderBy) { // TODO handle orderby param
// order by last indexed date by default
Expand Down
37 changes: 24 additions & 13 deletions opencti-platform/opencti-graphql/src/manager/fileIndexManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,24 @@ import { EVENT_TYPE_UPDATE, isNotEmptyField } from '../database/utils';
import conf, { ENABLED_FILE_INDEX_MANAGER, logApp } from '../config/conf';
import {
createStreamProcessor,
lockResource, type StreamProcessor,
lockResource,
type StreamProcessor,
} from '../database/redis';
import { executionContext, SYSTEM_USER } from '../utils/access';
import { getEntityFromCache } from '../database/cache';
import { ENTITY_TYPE_SETTINGS } from '../schema/internalObject';
import { elBulkIndexFiles, elSearchFiles, isAttachmentProcessorEnabled } from '../database/engine';
import {
elBulkIndexFiles,
elLoadById,
elSearchFiles,
elUpdateFilesWithEntityRestrictions,
isAttachmentProcessorEnabled
} from '../database/engine';
import { getFileContent, rawFilesListing } from '../database/file-storage';
import type { AuthContext } from '../types/user';
import { generateInternalId } from '../schema/identifier';
import { TYPE_LOCK_ERROR } from '../config/errors';
import type { SseEvent, StreamDataEvent } from '../types/event';
import type { SseEvent, StreamDataEvent, UpdateEvent } from '../types/event';
import { STIX_EXT_OCTI } from '../types/stix-extensions';

const FILE_INDEX_MANAGER_KEY = conf.get('file_index_manager:lock_key');
Expand Down Expand Up @@ -91,16 +98,19 @@ const handleStreamEvents = async (streamEvents: Array<SseEvent<StreamDataEvent>>
const context = executionContext('file_index_manager');
for (let index = 0; index < streamEvents.length; index += 1) {
const event = streamEvents[index];
const stix = event.data.data;
const entityId = stix.extensions[STIX_EXT_OCTI].id;
const entityType = stix.extensions[STIX_EXT_OCTI].type;
const stixFiles = stix.extensions[STIX_EXT_OCTI].files;
const isUpdateEvent = event.data.type === EVENT_TYPE_UPDATE;
// TODO test if markings or organization sharing or authorized members or authorities have been updated
if (isUpdateEvent && stixFiles?.length > 0) {
// reindex all uploaded files for this entity
const entityFilesPath = `import/${entityType}/${entityId}/`;
await indexImportedFiles(context, null, entityFilesPath);
if (event.data.type === EVENT_TYPE_UPDATE) {
const updateEvent: UpdateEvent = event.data as UpdateEvent;
const stix = updateEvent.data;
const entityId = stix.extensions[STIX_EXT_OCTI].id;
const stixFiles = stix.extensions[STIX_EXT_OCTI].files;
// test if markings or organization sharing or authorized members or authorities have been updated
const isDataRestrictionsUpdate = updateEvent.context?.patch && updateEvent.context.patch
.map((op) => op.path && (op.path.includes('granted_refs') || op.path.includes('object_marking_refs')));
if (stixFiles?.length > 0 && isDataRestrictionsUpdate) {
// update all indexed files for this entity
const entity = await elLoadById(context, SYSTEM_USER, entityId);
await elUpdateFilesWithEntityRestrictions(entity);
}
}
}
} catch (e) {
Expand Down Expand Up @@ -132,6 +142,7 @@ const initFileIndexManager = () => {
running = true;
logApp.info('[OPENCTI-MODULE] Running file index manager');
const lastFiles = await elSearchFiles(context, SYSTEM_USER, { first: 1, connectionFormat: false });
// TODO fix last time we indexed all uploaded files
const lastIndexedDate = lastFiles?.length > 0 ? moment(lastFiles[0].indexed_at).toDate() : null;
logApp.info('[OPENCTI-MODULE] Index imported files since', { lastIndexedDate });
await indexImportedFiles(context, lastIndexedDate);
Expand Down

0 comments on commit 6a47ef0

Please sign in to comment.