Skip to content

Commit

Permalink
Merge branch 'refs/heads/master' into MSEARCH-762
Browse files Browse the repository at this point in the history
  • Loading branch information
viacheslavpoliakov committed Jun 19, 2024
2 parents 25c6025 + acb43ac commit 683b2db
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 4 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Search consolidated items/holdings data in consortium ([MSEARCH-759](https://folio-org.atlassian.net/browse/MSEARCH-759))
* Create bibframe index and process bibframe events ([MSEARCH-781](https://folio-org.atlassian.net/browse/MSEARCH-781))
* Allow Unified List of Inventory Locations in a Consortium to be fetched by member tenants ([MSEARCH-660](https://folio-org.atlassian.net/browse/MSEARCH-660))
* Implement Indexing of Campuses from Kafka ([MSEARCH-770](https://issues.folio.org/browse/MSEARCH-770))
* Extend response with additional Location fields for Inventory Locations in a Consortium endpoint ([MSEARCH-775](https://folio-org.atlassian.net/browse/MSEARCH-775))

### Bug fixes
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l
| KAFKA_CONTRIBUTORS_TOPIC_PARTITIONS | 50 | Amount of partitions for `search.instance-contributor` topic. |
| KAFKA_CONTRIBUTORS_TOPIC_REPLICATION_FACTOR | - | Replication factor for `search.instance-contributor` topic. |
| KAFKA_CONSORTIUM_INSTANCE_CONCURRENCY | 2 | Custom number of kafka concurrent threads for consortium.instance message consuming. |
| KAFKA_LOCATION_CONCURRENCY | 1 | Custom number of kafka concurrent threads for inventory.location message consuming. |
| KAFKA_LOCATION_CONCURRENCY | 1 | Custom number of kafka concurrent threads for inventory.location & inventory.campus message consuming. |
| KAFKA_BIBFRAME_CONCURRENCY | 1 | Custom number of kafka concurrent threads for bibframe message consuming. |
| KAFKA_CONSORTIUM_INSTANCE_TOPIC_PARTITIONS | 50 | Amount of partitions for `search.consortium.instance` topic. |
| KAFKA_CONSORTIUM_INSTANCE_TOPIC_REPLICATION_FACTOR | - | Replication factor for `search.consortium.instance` topic. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ public class ResourceEventBatchInterceptor implements BatchInterceptor<String, R
"search.instance-contributor", SearchUtils.CONTRIBUTOR_RESOURCE,
"search.instance-subject", SearchUtils.INSTANCE_SUBJECT_RESOURCE,
"inventory.classification-type", SearchUtils.CLASSIFICATION_TYPE_RESOURCE,
"inventory.location", SearchUtils.LOCATION_RESOURCE
"inventory.location", SearchUtils.LOCATION_RESOURCE,
"inventory.campus", SearchUtils.CAMPUS_RESOURCE
);

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.folio.search.model.dto.locationunit;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;
import lombok.Data;
import lombok.With;
import lombok.extern.jackson.Jacksonized;

/**
* Describes Campus object that comes from external channels.
*/
@Data
@With
@Builder
@Jacksonized
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class CampusDto {

@JsonProperty("id")
private String id;
@JsonProperty("name")
private String name;
@JsonProperty("code")
private String code;
@JsonProperty("institutionId")
private String institutionId;

}
1 change: 1 addition & 0 deletions src/main/java/org/folio/search/utils/SearchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class SearchUtils {
public static final String LOCATION_RESOURCE = "location";
public static final String CLASSIFICATION_TYPE_RESOURCE = "classification-type";
public static final String BIBFRAME_RESOURCE = "bibframe";
public static final String CAMPUS_RESOURCE = "campus";

public static final String ID_FIELD = "id";
public static final String SOURCE_FIELD = "source";
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ folio:
group-id: ${folio.environment}-mod-search-classification-type-group
location:
concurrency: ${KAFKA_LOCATION_CONCURRENCY:1}
topic-pattern: (${folio.environment}\.)(.*\.)inventory\.location
topic-pattern: (${folio.environment}\.)(.*\.)inventory\.(location|campus)
group-id: ${folio.environment}-mod-search-location-type-group
bibframe:
concurrency: ${KAFKA_BIBFRAME_CONCURRENCY:1}
Expand Down
21 changes: 21 additions & 0 deletions src/main/resources/elasticsearch/index/campus.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"index": {
"number_of_shards": 4,
"number_of_replicas": 2,
"refresh_interval": "1s",
"codec": "best_compression",
"mapping.total_fields.limit": 1000
},
"analysis": {
"normalizer": {
"keyword_lowercase": {
"filter": [
"lowercase",
"icu_folding"
],
"type": "custom"
}
},
"tokenizers": {}
}
}
27 changes: 27 additions & 0 deletions src/main/resources/model/campus.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"name": "campus",
"eventBodyJavaClass": "org.folio.search.model.dto.locationunit.CampusDto",
"reindexSupported": false,
"fields": {
"id": {
"index": "keyword_lowercase",
"showInResponse": [ "search" ]
},
"tenantId": {
"index": "keyword_lowercase",
"showInResponse": [ "search" ],
"isTenant": true
},
"name": {
"index": "keyword_lowercase",
"showInResponse": [ "search" ]
},
"code": {
"index": "keyword_lowercase",
"showInResponse": [ "search" ]
},
"institutionId": {
"index": "keyword_lowercase"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package org.folio.search.controller;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.ONE_MINUTE;
import static org.awaitility.Durations.ONE_SECOND;
import static org.folio.search.domain.dto.ResourceEventType.CREATE;
import static org.folio.search.domain.dto.ResourceEventType.DELETE;
import static org.folio.search.domain.dto.ResourceEventType.DELETE_ALL;
import static org.folio.search.domain.dto.ResourceEventType.UPDATE;
import static org.folio.search.utils.SearchUtils.CAMPUS_RESOURCE;
import static org.folio.search.utils.SearchUtils.getIndexName;
import static org.folio.search.utils.TestConstants.CENTRAL_TENANT_ID;
import static org.folio.search.utils.TestConstants.MEMBER_TENANT_ID;
import static org.folio.search.utils.TestConstants.inventoryCampusTopic;
import static org.folio.search.utils.TestUtils.kafkaResourceEvent;
import static org.folio.search.utils.TestUtils.randomId;
import static org.folio.search.utils.TestUtils.toMap;
import static org.opensearch.index.query.QueryBuilders.boolQuery;
import static org.opensearch.search.builder.SearchSourceBuilder.searchSource;

import java.io.IOException;
import java.util.Objects;
import lombok.extern.log4j.Log4j2;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.model.dto.locationunit.CampusDto;
import org.folio.search.support.base.BaseConsortiumIntegrationTest;
import org.folio.spring.testing.type.IntegrationTest;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.index.query.QueryBuilders;

@Log4j2
@IntegrationTest
class CampusesIndexingConsortiumIT extends BaseConsortiumIntegrationTest {

@BeforeAll
static void prepare() {
setUpTenant(CENTRAL_TENANT_ID);
setUpTenant(MEMBER_TENANT_ID);
}

@AfterAll
static void cleanUp() {
removeTenant(CENTRAL_TENANT_ID);
removeTenant(MEMBER_TENANT_ID);
}

@AfterEach
void tearDown() throws IOException {
cleanUpIndex(CAMPUS_RESOURCE, CENTRAL_TENANT_ID);
}

@Test
void shouldIndexAndRemoveCampus() {
var campus = campus();
var createEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, CREATE, toMap(campus), null);
kafkaTemplate.send(inventoryCampusTopic(CENTRAL_TENANT_ID), createEvent);

awaitAssertCampusCount(1);

var deleteEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, DELETE, null, toMap(campus));
kafkaTemplate.send(inventoryCampusTopic(CENTRAL_TENANT_ID), deleteEvent);

awaitAssertCampusCount(0);
}

@Test
void shouldIndexAndUpdateCampus() {
var campus = campus();
var createEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, CREATE, toMap(campus), null);
kafkaTemplate.send(inventoryCampusTopic(CENTRAL_TENANT_ID), createEvent);

awaitAssertCampusCount(1);

var campusUpdated = campus.withName("nameUpdated");
var updateEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, UPDATE, toMap(campusUpdated), toMap(campus));
kafkaTemplate.send(inventoryCampusTopic(CENTRAL_TENANT_ID), updateEvent);

awaitAssertCampusCountAfterUpdate(1, campusUpdated);
}

@Test
void shouldIndexSameCampusFromDifferentTenantsAsSeparateDocs() {
var campus = campus();
var createCentralEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, CREATE, toMap(campus), null);
var createMemberEvent = kafkaResourceEvent(MEMBER_TENANT_ID, CREATE, toMap(campus), null);
kafkaTemplate.send(inventoryCampusTopic(CENTRAL_TENANT_ID), createCentralEvent);
kafkaTemplate.send(inventoryCampusTopic(CENTRAL_TENANT_ID), createMemberEvent);

awaitAssertCampusCount(2);
}

@Test
void shouldRemoveAllDocumentsByTenantIdOnDeleteAllEvent() {
var campus = campus();
var createCentralEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, CREATE, toMap(campus), null);
var createMemberEvent = kafkaResourceEvent(MEMBER_TENANT_ID, CREATE, toMap(campus), null);
kafkaTemplate.send(inventoryCampusTopic(CENTRAL_TENANT_ID), createCentralEvent);
kafkaTemplate.send(inventoryCampusTopic(CENTRAL_TENANT_ID), createMemberEvent);

awaitAssertCampusCount(2);

var deleteAllMemberEvent = new ResourceEvent().type(DELETE_ALL).tenant(MEMBER_TENANT_ID);
kafkaTemplate.send(inventoryCampusTopic(MEMBER_TENANT_ID), deleteAllMemberEvent);

awaitAssertCampusCount(1);
}

private static CampusDto campus() {
return CampusDto.builder().id(randomId())
.name("name")
.code("code")
.build();
}

public static void awaitAssertCampusCount(int expected) {
await().atMost(ONE_MINUTE).pollInterval(ONE_SECOND).untilAsserted(() -> {
var totalHits = countIndexDocument(CAMPUS_RESOURCE, CENTRAL_TENANT_ID);

assertThat(totalHits).isEqualTo(expected);
});
}

public static void awaitAssertCampusCountAfterUpdate(int expected, CampusDto campusUpdated) {
await().atMost(ONE_MINUTE).pollInterval(ONE_SECOND).untilAsserted(() -> {
var idQuery = QueryBuilders.matchQuery("id", campusUpdated.getId());
var nameQuery = QueryBuilders.matchQuery("name", campusUpdated.getName());

var searchRequest = new SearchRequest()
.source(searchSource().query(boolQuery().must(idQuery).must(nameQuery))
.trackTotalHits(true).from(0).size(1))
.indices(getIndexName(CAMPUS_RESOURCE, CENTRAL_TENANT_ID));
var searchResponse = elasticClient.search(searchRequest, RequestOptions.DEFAULT);
var hitCount = Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value;

assertThat(hitCount).isEqualTo(expected);
});
}
}
5 changes: 5 additions & 0 deletions src/test/java/org/folio/search/utils/TestConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class TestConstants {
public static final String INVENTORY_CLASSIFICATION_TYPE_TOPIC = "inventory.classification-type";
public static final String CONSORTIUM_INSTANCE_TOPIC = "search.consortium.instance";
public static final String BIBFRAME_TOPIC = "search.bibframe";
public static final String CAMPUS_TOPIC = "inventory.campus";

public static final String LOCAL_CN_TYPE = "6fd29f52-5c9c-44d0-b529-e9c5eb3a0aba";
public static final String FOLIO_CN_TYPE = "6e4d7565-b277-4dfa-8b7d-fbf306d9d0cd";
Expand Down Expand Up @@ -124,6 +125,10 @@ public static String bibframeTopic(String tenantId) {
return getTopicName(tenantId, BIBFRAME_TOPIC);
}

public static String inventoryCampusTopic(String tenantId) {
return getTopicName(tenantId, CAMPUS_TOPIC);
}

public static String indexName(String tenantId) {
return String.join("_", ENV, INSTANCE_RESOURCE, tenantId);
}
Expand Down
5 changes: 4 additions & 1 deletion src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ folio:
- name: search.bibframe
numPartitions: 1
replicationFactor: 1
- name: inventory.campus
numPartitions: 1
replicationFactor: 1
listener:
events:
concurrency: 2
Expand Down Expand Up @@ -140,7 +143,7 @@ folio:
group-id: ${folio.environment}-mod-search-classification-type-group
location:
concurrency: 1
topic-pattern: (${folio.environment}\.)(.*\.)inventory\.location
topic-pattern: (${folio.environment}\.)(.*\.)inventory\.(location|campus)
group-id: ${folio.environment}-mod-search-location-type-group
bibframe:
concurrency: ${KAFKA_BIBFRAME_CONCURRENCY:1}
Expand Down

0 comments on commit 683b2db

Please sign in to comment.