Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
psmagin committed Jun 19, 2024
1 parent acb43ac commit 18a582d
Show file tree
Hide file tree
Showing 20 changed files with 214 additions and 163 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.folio.search.exception;

public class ScopeExecutionException extends RuntimeException {

public ScopeExecutionException(Exception e) {
super(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.folio.search.domain.dto.ResourceEventType.CREATE;
import static org.folio.search.domain.dto.ResourceEventType.DELETE;
import static org.folio.search.domain.dto.ResourceEventType.REINDEX;
import static org.folio.search.model.types.ResourceType.CLASSIFICATION_TYPE;
import static org.folio.search.utils.SearchConverterUtils.getEventPayload;
import static org.folio.search.utils.SearchConverterUtils.getResourceEventId;
import static org.folio.search.utils.SearchConverterUtils.getResourceSource;
Expand All @@ -17,6 +18,7 @@
import static org.folio.search.utils.SearchUtils.SOURCE_CONSORTIUM_PREFIX;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -27,11 +29,10 @@
import org.apache.logging.log4j.message.FormattedMessage;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.model.event.ConsortiumInstanceEvent;
import org.folio.search.model.types.ResourceType;
import org.folio.search.service.ResourceService;
import org.folio.search.service.TenantScopedExecutionService;
import org.folio.search.service.config.ConfigSynchronizationService;
import org.folio.search.utils.KafkaConstants;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
Expand All @@ -46,7 +47,7 @@ public class KafkaMessageListener {

private final ResourceService resourceService;
private final FolioMessageBatchProcessor folioMessageBatchProcessor;
private final SystemUserScopedExecutionService executionService;
private final TenantScopedExecutionService executionService;
private final ConfigSynchronizationService configSynchronizationService;

/**
Expand All @@ -64,11 +65,11 @@ public void handleInstanceEvents(List<ConsumerRecord<String, ResourceEvent>> con
log.info("Processing instance ids from kafka events [number of events: {}]", consumerRecords.size());
var batch = getInstanceResourceEvents(consumerRecords);
var batchByTenant = batch.stream().collect(Collectors.groupingBy(ResourceEvent::getTenant));
batchByTenant.forEach((tenant, resourceEvents) -> executionService.executeSystemUserScoped(tenant, () -> {
folioMessageBatchProcessor.consumeBatchWithFallback(resourceEvents, KAFKA_RETRY_TEMPLATE_NAME,
resourceService::indexInstancesById, KafkaMessageListener::logFailedEvent);
return null;
}));
batchByTenant.forEach((tenant, resourceEvents) -> folioMessageBatchProcessor
.consumeBatchWithFallback(resourceEvents, KAFKA_RETRY_TEMPLATE_NAME,
resourceIdEvents -> executionService.executeSystemUserScoped(tenant,
() -> resourceService.indexInstancesById(resourceIdEvents)),
KafkaMessageListener::logFailedEvent));

}

Expand Down Expand Up @@ -150,11 +151,13 @@ public void handleConsortiumInstanceEvents(List<ConsumerRecord<String, Consortiu

var batchByTenant = batch.stream().collect(Collectors.groupingBy(ConsortiumInstanceEvent::getTenant));

batchByTenant.forEach((tenant, resourceEvents) -> executionService.executeSystemUserScoped(tenant, () -> {
for (Map.Entry<String, List<ConsortiumInstanceEvent>> entry : batchByTenant.entrySet()) {
log.info("Consortium instance tenant [{}]", entry.getKey());
folioMessageBatchProcessor.consumeBatchWithFallback(batch, KAFKA_RETRY_TEMPLATE_NAME,
resourceService::indexConsortiumInstances, KafkaMessageListener::logFailedConsortiumEvent);
return null;
}));
consortiumInstances -> executionService.executeTenantScoped(entry.getKey(),
() -> resourceService.indexConsortiumInstances(consortiumInstances)),
KafkaMessageListener::logFailedConsortiumEvent);
}
}

@KafkaListener(
Expand All @@ -168,16 +171,10 @@ public void handleClassificationTypeEvents(List<ConsumerRecord<String, ResourceE
log.info("Processing classification-type events from Kafka [number of events: {}]", consumerRecords.size());
var batch = consumerRecords.stream()
.map(ConsumerRecord::value)
.filter(resourceEvent -> resourceEvent.getType() == DELETE).toList();

var batchByTenant = batch.stream().collect(Collectors.groupingBy(ResourceEvent::getTenant));
.filter(resourceEvent -> resourceEvent.getType() == DELETE)
.toList();

batchByTenant.forEach((tenant, resourceEvents) -> executionService.executeSystemUserScoped(tenant, () -> {
folioMessageBatchProcessor.consumeBatchWithFallback(batch, KAFKA_RETRY_TEMPLATE_NAME,
resourceEvent -> configSynchronizationService.sync(resourceEvent, ResourceType.CLASSIFICATION_TYPE),
KafkaMessageListener::logFailedEvent);
return null;
}));
indexResources(batch, resourceEvent -> configSynchronizationService.sync(resourceEvent, CLASSIFICATION_TYPE));
}

@KafkaListener(
Expand Down Expand Up @@ -215,11 +212,11 @@ public void handleBibframeEvents(List<ConsumerRecord<String, ResourceEvent>> con
private void indexResources(List<ResourceEvent> batch, Consumer<List<ResourceEvent>> indexConsumer) {
var batchByTenant = batch.stream().collect(Collectors.groupingBy(ResourceEvent::getTenant));

batchByTenant.forEach((tenant, resourceEvents) -> executionService.executeSystemUserScoped(tenant, () -> {
folioMessageBatchProcessor.consumeBatchWithFallback(resourceEvents, KAFKA_RETRY_TEMPLATE_NAME,
indexConsumer, KafkaMessageListener::logFailedEvent);
return null;
}));
for (var entry : batchByTenant.entrySet()) {
folioMessageBatchProcessor.consumeBatchWithFallback(entry.getValue(), KAFKA_RETRY_TEMPLATE_NAME,
executionService.executeTenantScoped(entry.getKey(), () -> indexConsumer),
KafkaMessageListener::logFailedEvent);
}
}

private static List<ResourceEvent> getInstanceResourceEvents(List<ConsumerRecord<String, ResourceEvent>> events) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,12 @@ private Map<String, Object> prepareInstance(String instanceId, String tenantId,
}

private Map<String, Object> prepareDocumentBody(Map<String, Object> payload, Set<Map<String, Object>> instances) {
payload.put("contributorNameTypeId", payload.remove("nameTypeId"));
payload.put("instances", instances);
payload.remove(INSTANCE_ID);
payload.remove(TYPE_ID);
return payload;
var newPayload = new HashMap<>(payload);
newPayload.put("contributorNameTypeId", newPayload.remove("nameTypeId"));
newPayload.put("instances", instances);
newPayload.remove(INSTANCE_ID);
newPayload.remove(TYPE_ID);
return newPayload;
}

private Map<String, Object> getPayload(SearchDocumentBody doc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.folio.search.utils.SearchResponseHelper.getErrorIndexOperationResponse;
import static org.folio.search.utils.SearchResponseHelper.getSuccessIndexOperationResponse;
import static org.folio.search.utils.SearchUtils.INSTANCE_SUBJECT_UPSERT_SCRIPT_ID;
import static org.folio.search.utils.SearchUtils.SHARED_FIELD_NAME;
import static org.opensearch.script.ScriptType.STORED;

import java.util.EnumMap;
Expand Down Expand Up @@ -41,7 +42,6 @@ public class InstanceSubjectRepository extends AbstractResourceRepository {
@Override
public FolioIndexOperationResponse indexResources(List<SearchDocumentBody> documentBodies) {
var bulkRequest = new BulkRequest();

var docsById = documentBodies.stream().collect(groupingBy(SearchDocumentBody::getId));
for (var entry : docsById.entrySet()) {
var documents = entry.getValue();
Expand Down Expand Up @@ -103,9 +103,11 @@ private Map<String, Object> prepareScriptParams(EnumMap<IndexActionType, Set<Map

private Map<String, Object> prepareDocumentBody(Map<String, Object> payload,
Map<IndexActionType, Set<Map<String, Object>>> instances) {
payload.put("instances", subtract(instances.get(INDEX), instances.get(DELETE)));
payload.remove(INSTANCE_ID);
return payload;
var newPayload = new HashMap<>(payload);
newPayload.put("instances", subtract(instances.get(INDEX), instances.get(DELETE)));
newPayload.remove(INSTANCE_ID);
newPayload.remove(SHARED_FIELD_NAME);
return newPayload;
}

private Map<String, Object> getPayload(SearchDocumentBody doc) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/folio/search/service/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private FolioCreateIndexResponse doCreateIndex(String resourceName, String tenan
var index = indexNameProvider.getIndexName(resourceName, tenantId);
var mappings = mappingHelper.getMappings(resourceName);

log.info("Attempts to create index by [indexName: {}, mappings: {}, settings: {}]",
log.info("Attempts to create index [indexName: {}, mappings: {}, settings: {}]",
index, mappings, indexSettings);
return indexRepository.createIndex(index, indexSettings, mappings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
import org.folio.search.domain.dto.LanguageConfig;
import org.folio.search.domain.dto.LanguageConfigs;
import org.folio.search.exception.ValidationException;
import org.folio.search.model.config.LanguageConfigEntity;
import org.folio.search.repository.LanguageConfigRepository;
import org.folio.search.service.metadata.LocalSearchFieldProvider;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
Expand All @@ -31,7 +29,7 @@ public class LanguageConfigService {

private final LanguageConfigRepository configRepository;
private final LocalSearchFieldProvider searchFieldProvider;
private final SystemUserScopedExecutionService executionService;
private final TenantScopedExecutionService executionService;
private final SearchConfigurationProperties searchConfiguration;

/**
Expand Down Expand Up @@ -127,16 +125,4 @@ public Set<String> getAllLanguageCodes() {
.collect(Collectors.toSet());
}

/**
* Returns all supported language codes for tenant.
*
* @param tenant tenant id as {@link String} object
* @return {@link Set} with language configuration codes.
*/
public Set<String> getAllLanguagesForTenant(String tenant) {
return executionService.executeSystemUserScoped(tenant,
() -> configRepository.findAll().stream()
.map(LanguageConfigEntity::getCode)
.collect(Collectors.toSet()));
}
}
29 changes: 16 additions & 13 deletions src/main/java/org/folio/search/service/ResourceService.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,12 @@ public FolioIndexOperationResponse indexConsortiumInstances(List<ConsortiumInsta

var instanceIds = validConsortiumInstances.stream().map(ConsortiumInstanceEvent::getInstanceId).collect(toSet());

return consortiumTenantExecutor.execute(centralTenant, () -> {
var resourceEvents = consortiumInstanceService.fetchInstances(instanceIds);
var indexDocuments = multiTenantSearchDocumentConverter.convert(resourceEvents);
var bulkIndexResponse = indexSearchDocuments(indexDocuments);
log.info("Records indexed to central index [requests: {}{}]",
getNumberOfRequests(indexDocuments), getErrorMessage(bulkIndexResponse));
return bulkIndexResponse;
});
var resourceEvents = consortiumInstanceService.fetchInstances(instanceIds);
var indexDocuments = multiTenantSearchDocumentConverter.convert(resourceEvents);
var bulkIndexResponse = indexSearchDocuments(indexDocuments);
log.info("Records indexed to central index [requests: {}{}]",
getNumberOfRequests(indexDocuments), getErrorMessage(bulkIndexResponse));
return bulkIndexResponse;
}

private List<ResourceEvent> getEventsToIndex(List<ResourceEvent> events) {
Expand All @@ -158,7 +156,8 @@ private Map<String, List<SearchDocumentBody>> processIndexInstanceEvents(List<Re
messageProducer.prepareAndSendContributorEvents(fetchedInstances);
messageProducer.prepareAndSendSubjectEvents(fetchedInstances);

var list = preProcessEvents(fetchedInstances, consortiumInstanceService::saveInstances);
var list = consortiumTenantExecutor
.execute(() -> preProcessEvents(fetchedInstances, consortiumInstanceService::saveInstances));
return multiTenantSearchDocumentConverter.convert(list);
}

Expand All @@ -168,7 +167,7 @@ private List<ResourceEvent> preProcessEvents(List<ResourceEvent> instanceEvents,
instanceEvents = Collections.emptyList();
}
var list = instanceEvents.stream()
.map(event -> consortiumTenantExecutor.execute(() -> instanceEventPreProcessor.preProcess(event)))
.map(instanceEventPreProcessor::preProcess)
.filter(Objects::nonNull)
.flatMap(List::stream)
.collect(toList());
Expand All @@ -183,7 +182,8 @@ private List<ResourceEvent> preProcessEvents(List<ResourceEvent> instanceEvents,
private Map<String, List<SearchDocumentBody>> processDeleteInstanceEvents(List<ResourceEvent> deleteEvents) {
messageProducer.prepareAndSendContributorEvents(deleteEvents);
messageProducer.prepareAndSendSubjectEvents(deleteEvents);
var list = preProcessEvents(deleteEvents, consortiumInstanceService::deleteInstances);
var list = consortiumTenantExecutor
.execute(() -> preProcessEvents(deleteEvents, consortiumInstanceService::deleteInstances));
return multiTenantSearchDocumentConverter.convert(list);
}

Expand All @@ -196,8 +196,11 @@ private FolioIndexOperationResponse indexSearchDocuments(Map<String, List<Search
responses.add(primaryResourceRepository.indexResources(primaryResources));
eventsByRepository.remove(PRIMARY_INDEXING_REPOSITORY_NAME);

eventsByRepository.forEach((repository, events) ->
responses.add(resourceRepositoryBeans.get(repository).indexResources(events)));
eventsByRepository.forEach((repository, events) -> {
var resourceRepository = resourceRepositoryBeans.get(repository);
var e = resourceRepository.indexResources(events);
responses.add(e);
});

var errorMessage = responses.stream()
.map(FolioIndexOperationResponse::getErrorMessage)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.folio.search.service;

import static java.util.Collections.singleton;
import static org.apache.commons.lang3.StringUtils.isNotBlank;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import lombok.extern.log4j.Log4j2;
import org.folio.search.exception.ScopeExecutionException;
import org.folio.spring.DefaultFolioExecutionContext;
import org.folio.spring.FolioExecutionContext;
import org.folio.spring.FolioModuleMetadata;
import org.folio.spring.config.properties.FolioEnvironment;
import org.folio.spring.context.ExecutionContextBuilder;
import org.folio.spring.integration.XOkapiHeaders;
import org.folio.spring.scope.FolioExecutionContextSetter;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

@Log4j2
@Service
@Primary
public class TenantScopedExecutionService extends SystemUserScopedExecutionService {

private final ExecutionContextBuilder contextBuilder;

public TenantScopedExecutionService(FolioExecutionContext executionContext,
ExecutionContextBuilder contextBuilder) {
super(executionContext, contextBuilder);
this.contextBuilder = contextBuilder;
}

public <T> T executeTenantScoped(String tenantId, Callable<T> action) {
try (var fex = new FolioExecutionContextSetter(contextBuilder.buildContext(tenantId))) {
log.info("Executing tenant scoped action [tenant={}]", tenantId);
new ScopeExecutionException(new RuntimeException()).printStackTrace();
return action.call();
} catch (Exception e) {
log.error("Failed to execute tenant scoped action", e);
throw new ScopeExecutionException(e);
}
}

@Primary
@Component
protected static class TenantScopedExecutionContextBuilder extends ExecutionContextBuilder {

private final FolioExecutionContext executionContext;

TenantScopedExecutionContextBuilder(FolioEnvironment folioEnvironment,
FolioModuleMetadata moduleMetadata,
FolioExecutionContext executionContext) {
super(folioEnvironment, moduleMetadata);
this.executionContext = executionContext;
}

@Override
public FolioExecutionContext buildContext(String tenantId) {
return buildContextWithTenant(tenantId);
}

private FolioExecutionContext buildContextWithTenant(String tenantId) {
Map<String, Collection<String>> headers = new HashMap<>();
if (isNotBlank(tenantId)) {
headers.put(XOkapiHeaders.TENANT, singleton(tenantId));
}
var okapiUrl = executionContext.getOkapiUrl();
if (isNotBlank(okapiUrl)) {
headers.put(XOkapiHeaders.URL, singleton(okapiUrl));
}
var token = executionContext.getToken();
if (isNotBlank(token)) {
headers.put(XOkapiHeaders.TOKEN, singleton(token));
}
var userId = executionContext.getUserId() == null ? "" : executionContext.getUserId().toString();
if (isNotBlank(userId)) {
headers.put(XOkapiHeaders.USER_ID, singleton(userId));
}
var requestId = executionContext.getRequestId();
if (isNotBlank(requestId)) {
headers.put(XOkapiHeaders.REQUEST_ID, singleton(requestId));
}
return new DefaultFolioExecutionContext(executionContext.getFolioModuleMetadata(), headers);
}
}
}
Loading

0 comments on commit 18a582d

Please sign in to comment.