Skip to content

Commit

Permalink
Merge remote-tracking branch 'es/main' into synthetic_source_slow_jav…
Browse files Browse the repository at this point in the history
…a_streams
  • Loading branch information
martijnvg committed May 7, 2024
2 parents 66e6efa + 0296240 commit 71b69c2
Show file tree
Hide file tree
Showing 40 changed files with 782 additions and 226 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/108257.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 108257
summary: "ESQL: Log queries at debug level"
area: ES|QL
type: enhancement
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/108283.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 108283
summary: Fix `BlockHash` `DirectEncoder`
area: ES|QL
type: bug
issues:
- 108268
8 changes: 4 additions & 4 deletions docs/reference/esql/metadata-fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ like other index fields:

[source.merge.styled,esql]
----
include::{esql-specs}/metadata-IT_tests_only.csv-spec[tag=multipleIndices]
include::{esql-specs}/metadata.csv-spec[tag=multipleIndices]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/metadata-IT_tests_only.csv-spec[tag=multipleIndices-result]
include::{esql-specs}/metadata.csv-spec[tag=multipleIndices-result]
|===

Similar to index fields, once an aggregation is performed, a
Expand All @@ -47,9 +47,9 @@ used as a grouping field:

[source.merge.styled,esql]
----
include::{esql-specs}/metadata-IT_tests_only.csv-spec[tag=metaIndexInAggs]
include::{esql-specs}/metadata.csv-spec[tag=metaIndexInAggs]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/metadata-IT_tests_only.csv-spec[tag=metaIndexInAggs-result]
include::{esql-specs}/metadata.csv-spec[tag=metaIndexInAggs-result]
|===

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ static TransportVersion def(int id) {
public static final TransportVersion ROLE_REMOTE_CLUSTER_PRIVS = def(8_649_00_0);
public static final TransportVersion NO_GLOBAL_RETENTION_FOR_SYSTEM_DATA_STREAMS = def(8_650_00_0);
public static final TransportVersion SHUTDOWN_REQUEST_TIMEOUTS_FIX = def(8_651_00_0);
public static final TransportVersion INDEXING_PRESSURE_REQUEST_REJECTIONS_COUNT = def(8_652_00_0);

/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ static class IndexPressureStats implements ToXContentFragment {
long memoryLimit = 0;

long totalCoordinatingOps = 0;
long totalCoordinatingRequests = 0;
long totalPrimaryOps = 0;
long totalReplicaOps = 0;
long currentCoordinatingOps = 0;
Expand Down Expand Up @@ -813,6 +814,7 @@ static class IndexPressureStats implements ToXContentFragment {
currentPrimaryOps += nodeStatIndexingPressureStats.getCurrentPrimaryOps();
currentReplicaOps += nodeStatIndexingPressureStats.getCurrentReplicaOps();
primaryDocumentRejections += nodeStatIndexingPressureStats.getPrimaryDocumentRejections();
totalCoordinatingRequests += nodeStatIndexingPressureStats.getTotalCoordinatingRequests();
}
}
indexingPressureStats = new IndexingPressureStats(
Expand All @@ -834,7 +836,8 @@ static class IndexPressureStats implements ToXContentFragment {
currentCoordinatingOps,
currentPrimaryOps,
currentReplicaOps,
primaryDocumentRejections
primaryDocumentRejections,
totalCoordinatingRequests
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class IndexingPressure {
private final AtomicLong totalReplicaBytes = new AtomicLong(0);

private final AtomicLong totalCoordinatingOps = new AtomicLong(0);
private final AtomicLong totalCoordinatingRequests = new AtomicLong(0);
private final AtomicLong totalPrimaryOps = new AtomicLong(0);
private final AtomicLong totalReplicaOps = new AtomicLong(0);

Expand Down Expand Up @@ -109,6 +110,7 @@ public Releasable markCoordinatingOperationStarted(int operations, long bytes, b
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
totalCoordinatingBytes.getAndAdd(bytes);
totalCoordinatingOps.getAndAdd(operations);
totalCoordinatingRequests.getAndIncrement();
return wrapReleasable(() -> {
logger.trace(() -> Strings.format("removing [%d] coordinating operations and [%d] bytes", operations, bytes));
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
Expand Down Expand Up @@ -221,7 +223,8 @@ public IndexingPressureStats stats() {
currentCoordinatingOps.get(),
currentPrimaryOps.get(),
currentReplicaOps.get(),
primaryDocumentRejections.get()
primaryDocumentRejections.get(),
totalCoordinatingRequests.get()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final TranslogDeletionPolicy translogDeletionPolicy;
private final SoftDeletesPolicy softDeletesPolicy;
private final LongSupplier globalCheckpointSupplier;
private final Map<IndexCommit, Integer> snapshottedCommits; // Number of snapshots held against each commit point.
private final Map<IndexCommit, Integer> acquiredIndexCommits; // Number of references held against each commit point.

interface CommitsListener {

Expand Down Expand Up @@ -71,7 +71,7 @@ interface CommitsListener {
this.softDeletesPolicy = softDeletesPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.commitsListener = commitsListener;
this.snapshottedCommits = new HashMap<>();
this.acquiredIndexCommits = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -120,7 +120,7 @@ public void onCommit(List<? extends IndexCommit> commits) throws IOException {
}
for (int i = 0; i < keptPosition; i++) {
final IndexCommit commit = commits.get(i);
if (snapshottedCommits.containsKey(commit) == false) {
if (acquiredIndexCommits.containsKey(commit) == false) {
deleteCommit(commit);
if (deletedCommits == null) {
deletedCommits = new ArrayList<>();
Expand Down Expand Up @@ -213,7 +213,7 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
assert safeCommit != null : "Safe commit is not initialized yet";
assert lastCommit != null : "Last commit is not initialized yet";
final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit;
snapshottedCommits.merge(snapshotting, 1, Integer::sum); // increase refCount
acquiredIndexCommits.merge(snapshotting, 1, Integer::sum); // increase refCount
return wrapCommit(snapshotting);
}

Expand All @@ -224,27 +224,27 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit) {
/**
* Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}.
*
* @return true if the snapshotting commit can be clean up.
* @return true if the acquired commit can be clean up.
*/
synchronized boolean releaseCommit(final IndexCommit snapshotCommit) {
final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).getIndexCommit();
assert snapshottedCommits.containsKey(releasingCommit)
: "Release non-snapshotted commit;"
+ "snapshotted commits ["
+ snapshottedCommits
synchronized boolean releaseCommit(final IndexCommit acquiredCommit) {
final IndexCommit releasingCommit = ((SnapshotIndexCommit) acquiredCommit).getIndexCommit();
assert acquiredIndexCommits.containsKey(releasingCommit)
: "Release non-acquired commit;"
+ "acquired commits ["
+ acquiredIndexCommits
+ "], releasing commit ["
+ releasingCommit
+ "]";
// release refCount
final Integer refCount = snapshottedCommits.compute(releasingCommit, (key, count) -> {
final Integer refCount = acquiredIndexCommits.compute(releasingCommit, (key, count) -> {
if (count == 1) {
return null;
}
return count - 1;
});

assert refCount == null || refCount > 0 : "Number of snapshots can not be negative [" + refCount + "]";
// The commit can be clean up only if no pending snapshot and it is neither the safe commit nor last commit.
assert refCount == null || refCount > 0 : "Number of references for acquired commit can not be negative [" + refCount + "]";
// The commit can be clean up only if no refCount and it is neither the safe commit nor last commit.
return refCount == null && releasingCommit.equals(safeCommit) == false && releasingCommit.equals(lastCommit) == false;
}

Expand Down Expand Up @@ -296,10 +296,10 @@ private static Set<String> listOfNewFileNames(IndexCommit previous, IndexCommit
}

/**
* Checks whether the deletion policy is holding on to snapshotted commits
* Checks whether the deletion policy is holding on to acquired index commits
*/
synchronized boolean hasSnapshottedCommits() {
return snapshottedCommits.isEmpty() == false;
synchronized boolean hasAcquiredIndexCommits() {
return acquiredIndexCommits.isEmpty() == false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,8 @@ Translog getTranslog() {
}

// Package private for testing purposes only
boolean hasSnapshottedCommits() {
return combinedDeletionPolicy.hasSnapshottedCommits();
boolean hasAcquiredIndexCommits() {
return combinedDeletionPolicy.hasAcquiredIndexCommits();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment {

// These fields will be used for additional back-pressure and metrics in the future
private final long totalCoordinatingOps;
private final long totalCoordinatingRequests;
private final long totalPrimaryOps;
private final long totalReplicaOps;
private final long currentCoordinatingOps;
Expand Down Expand Up @@ -77,6 +78,12 @@ public IndexingPressureStats(StreamInput in) throws IOException {
} else {
primaryDocumentRejections = -1L;
}

if (in.getTransportVersion().onOrAfter(TransportVersions.INDEXING_PRESSURE_REQUEST_REJECTIONS_COUNT)) {
totalCoordinatingRequests = in.readVLong();
} else {
totalCoordinatingRequests = -1L;
}
}

public IndexingPressureStats(
Expand All @@ -98,7 +105,8 @@ public IndexingPressureStats(
long currentCoordinatingOps,
long currentPrimaryOps,
long currentReplicaOps,
long primaryDocumentRejections
long primaryDocumentRejections,
long totalCoordinatingRequests
) {
this.totalCombinedCoordinatingAndPrimaryBytes = totalCombinedCoordinatingAndPrimaryBytes;
this.totalCoordinatingBytes = totalCoordinatingBytes;
Expand All @@ -121,6 +129,7 @@ public IndexingPressureStats(
this.currentReplicaOps = currentReplicaOps;

this.primaryDocumentRejections = primaryDocumentRejections;
this.totalCoordinatingRequests = totalCoordinatingRequests;
}

@Override
Expand All @@ -146,6 +155,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEXING_PRESSURE_DOCUMENT_REJECTIONS_COUNT)) {
out.writeVLong(primaryDocumentRejections);
}

if (out.getTransportVersion().onOrAfter(TransportVersions.INDEXING_PRESSURE_REQUEST_REJECTIONS_COUNT)) {
out.writeVLong(totalCoordinatingRequests);
}
}

public long getTotalCombinedCoordinatingAndPrimaryBytes() {
Expand Down Expand Up @@ -224,6 +237,10 @@ public long getPrimaryDocumentRejections() {
return primaryDocumentRejections;
}

public long getTotalCoordinatingRequests() {
return totalCoordinatingRequests;
}

private static final String COMBINED = "combined_coordinating_and_primary";
private static final String COMBINED_IN_BYTES = "combined_coordinating_and_primary_in_bytes";
private static final String COORDINATING = "coordinating";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.monitor.jvm.GcNames;
import org.elasticsearch.monitor.jvm.JvmStats;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.telemetry.metric.MeterRegistry;

Expand Down Expand Up @@ -529,23 +528,16 @@ private void registerAsyncMetrics(MeterRegistry registry) {
);

metrics.add(
registry.registerDoubleGauge(
"es.indexing.coordinating_operations.rejections.ratio",
"Ratio of rejected coordinating operations",
"ratio",
() -> {
var totalCoordinatingOperations = Optional.ofNullable(stats.getOrRefresh())
.map(NodeStats::getIndexingPressureStats)
.map(IndexingPressureStats::getTotalCoordinatingOps)
.orElse(0L);
var totalCoordinatingRejections = Optional.ofNullable(stats.getOrRefresh())
registry.registerLongAsyncCounter(
"es.indexing.coordinating_operations.requests.total",
"Total number of coordinating requests",
"operations",
() -> new LongWithAttributes(
Optional.ofNullable(stats.getOrRefresh())
.map(NodeStats::getIndexingPressureStats)
.map(IndexingPressureStats::getCoordinatingRejections)
.orElse(0L);
// rejections do not count towards `totalCoordinatingOperations`
var totalOps = totalCoordinatingOperations + totalCoordinatingRejections;
return new DoubleWithAttributes(totalOps != 0 ? (double) totalCoordinatingRejections / totalOps : 0.0);
}
.map(IndexingPressureStats::getTotalCoordinatingRequests)
.orElse(0L)
)
)
);

Expand Down Expand Up @@ -620,23 +612,16 @@ private void registerAsyncMetrics(MeterRegistry registry) {
);

metrics.add(
registry.registerDoubleGauge(
"es.indexing.primary_operations.document.rejections.ratio",
"Ratio of rejected primary operations",
"ratio",
() -> {
var totalPrimaryOperations = Optional.ofNullable(stats.getOrRefresh())
.map(NodeStats::getIndexingPressureStats)
.map(IndexingPressureStats::getTotalPrimaryOps)
.orElse(0L);
var totalPrimaryDocumentRejections = Optional.ofNullable(stats.getOrRefresh())
registry.registerLongAsyncCounter(
"es.indexing.primary_operations.document.rejections.total",
"Total number of rejected indexing documents",
"operations",
() -> new LongWithAttributes(
Optional.ofNullable(stats.getOrRefresh())
.map(NodeStats::getIndexingPressureStats)
.map(IndexingPressureStats::getPrimaryDocumentRejections)
.orElse(0L);
// primary document rejections do not count towards `totalPrimaryOperations`
var totalOps = totalPrimaryOperations + totalPrimaryDocumentRejections;
return new DoubleWithAttributes(totalOps != 0 ? (double) totalPrimaryDocumentRejections / totalOps : 0.0);
}
.orElse(0L)
)
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,7 @@ public static NodeStats createNodeStats() {
randomLongBetween(0, maxStatValue),
randomLongBetween(0, maxStatValue),
randomLongBetween(0, maxStatValue),
randomLongBetween(0, maxStatValue),
randomLongBetween(0, maxStatValue)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,8 @@ protected int getDocCountOfCommit(IndexCommit indexCommit) {
}

@Override
synchronized boolean releaseCommit(IndexCommit indexCommit) {
return super.releaseCommit(wrapCommit(indexCommit));
synchronized boolean releaseCommit(IndexCommit acquiredCommit) {
return super.releaseCommit(wrapCommit(acquiredCommit));
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4000,6 +4000,7 @@ static boolean hasCircularReference(Exception cause) {
return false;
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/108321")
public void testDisabledFsync() throws IOException {
var config = new TranslogConfig(
shardId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1444,10 +1444,10 @@ public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throw
assertBusy(() -> assertThat(engine.getLocalCheckpointTracker().getProcessedCheckpoint(), greaterThanOrEqualTo(seqNo)));
}

public static boolean hasSnapshottedCommits(Engine engine) {
public static boolean hasAcquiredIndexCommits(Engine engine) {
assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass();
InternalEngine internalEngine = (InternalEngine) engine;
return internalEngine.hasSnapshottedCommits();
return internalEngine.hasAcquiredIndexCommits();
}

public static final class PrimaryTermSupplier implements LongSupplier {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ public void beforeIndexDeletion() throws Exception {
assertNoPendingIndexOperations();
assertAllPendingWriteLimitsReleased();
assertOpenTranslogReferences();
assertNoSnapshottedIndexCommit();
assertNoAcquiredIndexCommit();
}

private void assertAllPendingWriteLimitsReleased() throws Exception {
Expand Down Expand Up @@ -1357,7 +1357,7 @@ private void assertOpenTranslogReferences() throws Exception {
}, 60, TimeUnit.SECONDS);
}

private void assertNoSnapshottedIndexCommit() throws Exception {
private void assertNoAcquiredIndexCommit() throws Exception {
assertBusy(() -> {
for (NodeAndClient nodeAndClient : nodes.values()) {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
Expand All @@ -1368,7 +1368,7 @@ private void assertNoSnapshottedIndexCommit() throws Exception {
if (engine instanceof InternalEngine) {
assertFalse(
indexShard.routingEntry().toString() + " has unreleased snapshotted index commits",
EngineTestCase.hasSnapshottedCommits(engine)
EngineTestCase.hasAcquiredIndexCommits(engine)
);
}
} catch (AlreadyClosedException ignored) {
Expand Down

0 comments on commit 71b69c2

Please sign in to comment.