Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into transport_version_for…
Browse files Browse the repository at this point in the history
…_recovery_start_file
  • Loading branch information
Tim-Brooks committed Jun 23, 2023
2 parents 89b5004 + 227b6c2 commit 8396329
Show file tree
Hide file tree
Showing 29 changed files with 522 additions and 223 deletions.
12 changes: 6 additions & 6 deletions README.asciidoc
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
= Elasticsearch

Elasticsearch is the distributed, RESTful search and analytics engine at the
heart of the https://www.elastic.co/products[Elastic Stack]. You can use
Elasticsearch to store, search, and manage data for:
Elasticsearch is a distributed, RESTful search engine optimized for speed and relevance on production-scale workloads. You can use Elasticsearch to perform real-time search over massive datasets for applications including:

* Vector search
* Full-text search
* Logs
* Metrics
* A search backend
* Application monitoring
* Endpoint security
* Application performance monitoring (APM)
* Security logs
\... and more!

Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/92574.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 92574
summary: cleanup some code NoriTokenizerFactory and KuromojiTokenizerFactory
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,11 @@ public void testGetTimeSeriesDataStream() {
);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/96672")
public void testGetTimeSeriesMixedDataStream() {
Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
Instant instant = Instant.parse("2023-06-06T14:00:00.000Z").truncatedTo(ChronoUnit.SECONDS);
String dataStream1 = "ds-1";
Instant twoHoursAgo = now.minus(2, ChronoUnit.HOURS);
Instant twoHoursAhead = now.plus(2, ChronoUnit.HOURS);
Instant twoHoursAgo = instant.minus(2, ChronoUnit.HOURS);
Instant twoHoursAhead = instant.plus(2, ChronoUnit.HOURS);

ClusterState state;
{
Expand All @@ -213,7 +212,7 @@ public void testGetTimeSeriesMixedDataStream() {
mBuilder,
List.of(Tuple.tuple(dataStream1, 2)),
List.of(),
now.toEpochMilli(),
instant.toEpochMilli(),
Settings.EMPTY,
0,
false
Expand All @@ -231,9 +230,9 @@ public void testGetTimeSeriesMixedDataStream() {
ClusterSettings.createBuiltInClusterSettings()
);

var name1 = DataStream.getDefaultBackingIndexName("ds-1", 1, now.toEpochMilli());
var name2 = DataStream.getDefaultBackingIndexName("ds-1", 2, now.toEpochMilli());
var name3 = DataStream.getDefaultBackingIndexName("ds-1", 3, now.toEpochMilli());
var name1 = DataStream.getDefaultBackingIndexName("ds-1", 1, instant.toEpochMilli());
var name2 = DataStream.getDefaultBackingIndexName("ds-1", 2, instant.toEpochMilli());
var name3 = DataStream.getDefaultBackingIndexName("ds-1", 3, twoHoursAgo.toEpochMilli());
assertThat(
response.getDataStreams(),
contains(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.elasticsearch.index.analysis.Analysis;

import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;

public class KuromojiTokenizerFactory extends AbstractTokenizerFactory {
Expand Down Expand Up @@ -58,48 +60,42 @@ public static UserDictionary getUserDictionary(Environment env, Settings setting
"It is not allowed to use [" + USER_DICT_PATH_OPTION + "] in conjunction" + " with [" + USER_DICT_RULES_OPTION + "]"
);
}
try {
List<String> ruleList = Analysis.getWordList(env, settings, USER_DICT_PATH_OPTION, USER_DICT_RULES_OPTION, false);
if (ruleList == null || ruleList.isEmpty()) {
return null;
}
Set<String> dup = new HashSet<>();
int lineNum = 0;
for (String line : ruleList) {
// ignore comments
if (line.startsWith("#") == false) {
String[] values = CSVUtil.parse(line);
if (dup.add(values[0]) == false) {
throw new IllegalArgumentException(
"Found duplicate term [" + values[0] + "] in user dictionary " + "at line [" + lineNum + "]"
);
}
}
++lineNum;
}
StringBuilder sb = new StringBuilder();
for (String line : ruleList) {
sb.append(line).append(System.lineSeparator());
}
return UserDictionary.open(new StringReader(sb.toString()));
List<String> ruleList = Analysis.getWordList(env, settings, USER_DICT_PATH_OPTION, USER_DICT_RULES_OPTION, false);
if (ruleList == null || ruleList.isEmpty()) {
return null;
}
validateDuplicatedWords(ruleList);
StringBuilder sb = new StringBuilder();
for (String line : ruleList) {
sb.append(line).append(System.lineSeparator());
}
try (Reader rulesReader = new StringReader(sb.toString())) {
return UserDictionary.open(rulesReader);
} catch (IOException e) {
throw new ElasticsearchException("failed to load kuromoji user dictionary", e);
}
}

public static JapaneseTokenizer.Mode getMode(Settings settings) {
JapaneseTokenizer.Mode mode = JapaneseTokenizer.DEFAULT_MODE;
String modeSetting = settings.get("mode", null);
if (modeSetting != null) {
if ("search".equalsIgnoreCase(modeSetting)) {
mode = JapaneseTokenizer.Mode.SEARCH;
} else if ("normal".equalsIgnoreCase(modeSetting)) {
mode = JapaneseTokenizer.Mode.NORMAL;
} else if ("extended".equalsIgnoreCase(modeSetting)) {
mode = JapaneseTokenizer.Mode.EXTENDED;
private static void validateDuplicatedWords(List<String> ruleList) {
Set<String> dup = new HashSet<>();
int lineNum = 0;
for (String line : ruleList) {
// ignore comments
if (line.startsWith("#") == false) {
String[] values = CSVUtil.parse(line);
if (dup.add(values[0]) == false) {
throw new IllegalArgumentException(
"Found duplicate term [" + values[0] + "] in user dictionary " + "at line [" + lineNum + "]"
);
}
}
++lineNum;
}
return mode;
}

public static JapaneseTokenizer.Mode getMode(Settings settings) {
String modeSetting = settings.get("mode", JapaneseTokenizer.DEFAULT_MODE.name());
return JapaneseTokenizer.Mode.valueOf(modeSetting.toUpperCase(Locale.ENGLISH));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ public static UserDictionary getUserDictionary(Environment env, Settings setting
);
}
List<String> ruleList = Analysis.getWordList(env, settings, USER_DICT_PATH_OPTION, USER_DICT_RULES_OPTION, true);
StringBuilder sb = new StringBuilder();
if (ruleList == null || ruleList.isEmpty()) {
return null;
}
StringBuilder sb = new StringBuilder();
for (String line : ruleList) {
sb.append(line).append(System.lineSeparator());
}
Expand All @@ -61,12 +61,8 @@ public static UserDictionary getUserDictionary(Environment env, Settings setting
}

public static KoreanTokenizer.DecompoundMode getMode(Settings settings) {
KoreanTokenizer.DecompoundMode mode = KoreanTokenizer.DEFAULT_DECOMPOUND;
String modeSetting = settings.get("decompound_mode", null);
if (modeSetting != null) {
mode = KoreanTokenizer.DecompoundMode.valueOf(modeSetting.toUpperCase(Locale.ENGLISH));
}
return mode;
String modeSetting = settings.get("decompound_mode", KoreanTokenizer.DEFAULT_DECOMPOUND.name());
return KoreanTokenizer.DecompoundMode.valueOf(modeSetting.toUpperCase(Locale.ENGLISH));
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/elasticsearch/TransportVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ private static TransportVersion registerTransportVersion(int id, String uniqueId
public static final TransportVersion V_8_500_016 = registerTransportVersion(8_500_016, "492C94FB-AAEA-4C9E-8375-BDB67A398584");
public static final TransportVersion V_8_500_017 = registerTransportVersion(8_500_017, "0EDCB5BA-049C-443C-8AB1-5FA58FB996FB");
public static final TransportVersion V_8_500_018 = registerTransportVersion(8_500_018, "827C32CE-33D9-4AC3-A773-8FB768F59EAF");
// 8.10.0
public static final TransportVersion V_8_500_019 = registerTransportVersion(8_500_019, "09bae57f-cab8-423c-aab3-c9778509ffe3");
// 8.9.0
public static final TransportVersion V_8_500_020 = registerTransportVersion(8_500_020, "ECB42C26-B258-42E5-A835-E31AF84A76DE");
public static final TransportVersion V_8_500_021 = registerTransportVersion(8_500_021, "102e0d84-0c08-402c-a696-935f3a3da873");
public static final TransportVersion V_8_500_022 = registerTransportVersion(8_500_022, "4993c724-7a81-4955-84e7-403484610091");
Expand Down Expand Up @@ -174,7 +174,7 @@ private static TransportVersion findCurrent(TransportVersion fallback) {
* Reference to the minimum transport version that can be used with CCS.
* This should be the transport version used by the previous minor release.
*/
public static final TransportVersion MINIMUM_CCS_VERSION = V_8_500_019;
public static final TransportVersion MINIMUM_CCS_VERSION = V_8_500_020;

static {
// see comment on IDS field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,7 @@ protected void masterOperation(
ClusterState state,
final ActionListener<AcknowledgedResponse> listener
) {
repositoriesService.unregisterRepository(
request,
listener.map(unregisterRepositoryResponse -> AcknowledgedResponse.of(unregisterRepositoryResponse.isAcknowledged()))
);
repositoriesService.unregisterRepository(request, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected void masterOperation(
ClusterState state,
final ActionListener<AcknowledgedResponse> listener
) {
repositoriesService.registerRepository(request, listener.map(response -> AcknowledgedResponse.of(response.isAcknowledged())));
repositoriesService.registerRepository(request, listener);
}

@Override
Expand Down
46 changes: 25 additions & 21 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -3747,28 +3747,32 @@ ReplicationTracker getReplicationTracker() {
* Executes a scheduled refresh if necessary. Completes the listener with true if a refreshed was performed otherwise false.
*/
public void scheduledRefresh(ActionListener<Boolean> listener) {
verifyNotClosed();
boolean listenerNeedsRefresh = refreshListeners.refreshNeeded();
final Engine engine = getEngine();
if (isReadAllowed() && (listenerNeedsRefresh || engine.refreshNeeded())) {
if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it
&& engine.allowSearchIdleOptimization()
&& isSearchIdle()
&& indexSettings.isExplicitRefresh() == false
&& active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive
// lets skip this refresh since we are search idle and
// don't necessarily need to refresh. the next searcher access will register a refreshListener and that will
// cause the next schedule to refresh.
engine.maybePruneDeletes(); // try to prune the deletes in the engine if we accumulated some
setRefreshPending(engine);
ActionListener.completeWith(listener, () -> false);
} else {
logger.trace("refresh with source [schedule]");
engine.maybeRefresh("schedule", listener.map(Engine.RefreshResult::refreshed));
ActionListener.run(listener, l -> {
verifyNotClosed();
boolean listenerNeedsRefresh = refreshListeners.refreshNeeded();
final Engine engine = getEngine();
if (isReadAllowed() && (listenerNeedsRefresh || engine.refreshNeeded())) {
if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it
&& engine.allowSearchIdleOptimization()
&& isSearchIdle()
&& indexSettings.isExplicitRefresh() == false
&& active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive
// lets skip this refresh since we are search idle and
// don't necessarily need to refresh. the next searcher access will register a refreshListener and that will
// cause the next schedule to refresh.
engine.maybePruneDeletes(); // try to prune the deletes in the engine if we accumulated some
setRefreshPending(engine);
l.onResponse(false);
return;
} else {
logger.trace("refresh with source [schedule]");
engine.maybeRefresh("schedule", l.map(Engine.RefreshResult::refreshed));
return;
}
}
}
engine.maybePruneDeletes(); // try to prune the deletes in the engine if we accumulated some
ActionListener.completeWith(listener, () -> false);
engine.maybePruneDeletes(); // try to prune the deletes in the engine if we accumulated some
l.onResponse(false);
});
}

/**
Expand Down
Loading

0 comments on commit 8396329

Please sign in to comment.