Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.dotcms.content.index.IndexConfigHelper.isMigrationNotStarted;
import static com.dotcms.content.index.IndexConfigHelper.isMigrationStarted;
import static com.dotcms.content.index.IndexConfigHelper.isReadEnabled;
import static com.dotcms.content.index.IndexConfigHelper.logShadowWriteFailure;
import static com.dotmarketing.util.StringUtils.builder;

import com.dotcms.api.system.event.message.MessageSeverity;
Expand Down Expand Up @@ -75,6 +76,7 @@
import com.liferay.portal.util.PortalUtil;
import com.liferay.util.StringPool;
import com.rainerhahnekamp.sneakythrow.Sneaky;
import javax.annotation.Nullable;
import io.vavr.control.Try;
import java.io.IOException;
import java.sql.Connection;
Expand Down Expand Up @@ -152,6 +154,7 @@ public class ContentletIndexAPIImpl implements ContentletIndexAPI {

private static final String SELECT_CONTENTLET_VERSION_INFO =
"select working_inode,live_inode from contentlet_version_info where identifier IN (%s)";
@Nullable
private final ReindexQueueAPI queueApi;
private final IndexAPI indexAPI;
private final IndiciesAPI legacyIndiciesAPI;
Expand All @@ -175,7 +178,8 @@ public ContentletIndexAPIImpl() {
CDIUtils.getBeanThrows(ContentletIndexOperationsOS.class));
}

/** Package-private constructor for testing. */
/** Package-private constructor for testing: injects only the two provider operations.
* Still calls APILocator for the remaining dependencies. */
ContentletIndexAPIImpl(final ContentletIndexOperations operationsES,
final ContentletIndexOperations operationsOS) {
this.operationsES = operationsES;
Expand All @@ -191,6 +195,31 @@ public ContentletIndexAPIImpl() {
// Use getMappingAPI() for lazy initialization at first use.
}

/**
* Full constructor for unit testing — injects all dependencies without calling
* {@link com.dotmarketing.business.APILocator}, allowing fully isolated tests.
*
* @param operationsES ES write operations provider
* @param operationsOS OS write operations provider
* @param indexAPI phase-aware index management API (controls list/cluster operations)
* @param legacyIndiciesAPI ES index pointer store (working/live slots)
* @param versionedIndicesAPI OS index pointer store (working/live slots)
*/
ContentletIndexAPIImpl(
final ContentletIndexOperations operationsES,
final ContentletIndexOperations operationsOS,
final IndexAPI indexAPI,
final IndiciesAPI legacyIndiciesAPI,
final VersionedIndicesAPI versionedIndicesAPI) {
this.operationsES = operationsES;
this.operationsOS = operationsOS;
this.router = new PhaseRouter<>(operationsES, operationsOS);
this.queueApi = null; // not needed for the methods under test
this.indexAPI = indexAPI;
this.legacyIndiciesAPI = legacyIndiciesAPI;
this.versionedIndicesAPI = versionedIndicesAPI;
}

/**
* Lazy initializer avoids circular reference Stackoverflow error.
* Thread-safe: uses {@link AtomicReference#updateAndGet} to ensure
Expand Down Expand Up @@ -371,7 +400,7 @@ public void close() throws Exception {
} catch (final Exception e) {
if (entry.shadow) {
// OS shadow write — fire-and-forget: log divergence, do not propagate.
Logger.warnAndDebug(CompositeBulkProcessor.class,
logShadowWriteFailure(CompositeBulkProcessor.class,
"OS shadow processor failed to flush on close — ES flush succeeded; "
+ "OS index may diverge until next reindex. Cause: "
+ e.getMessage(), e);
Expand Down Expand Up @@ -557,18 +586,30 @@ public synchronized boolean createContentIndex(String indexName)
@Override
public synchronized boolean createContentIndex(final String indexName, final int shards)
throws IOException {
boolean result = true;
// Track the primary provider's result independently so a shadow failure in dual-write
// phases (ES primary ok, OS shadow fails) does NOT prevent addCustomMapping from running
// against the successfully-created primary index.
final ContentletIndexOperations primary = router.readProvider();
boolean primaryResult = false;
for (final ContentletIndexOperations ops : router.writeProviders()) {
final String physicalName = ops.toPhysicalName(indexName);
try {
result &= ops.createContentIndex(physicalName, shards);
final boolean r = ops.createContentIndex(physicalName, shards);
if (ops == primary) {
primaryResult = r;
}
} catch (Exception e) {
Logger.error(this.getClass(), "Error while creating content index " + physicalName, e);
result = false;
if (ops == primary) {
primaryResult = false;
}
// shadow failures are fire-and-forget in dual-write phases
}
}
MappingHelper.getInstance().addCustomMapping(indexName);
return result;
if (primaryResult) {
MappingHelper.getInstance().addCustomMapping(indexName);
}
return primaryResult;
}


Expand All @@ -590,7 +631,9 @@ private boolean createContentIndex(final String indexName, final int shards, Ind
}
final String physicalName = ops.toPhysicalName(indexName);
final boolean contentIndex = ops.createContentIndex(physicalName, shards);
helper.addCustomMapping(List.of(indexName),tag);
if (contentIndex) {
helper.addCustomMapping(List.of(indexName), tag);
}
return contentIndex;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.dotcms.content.index.opensearch.OSIndexProperty;
import com.dotcms.featureflag.FeatureFlagName;
import com.dotmarketing.util.Config;
import com.dotmarketing.util.Logger;

/**
* Central helper for reading index-layer configuration properties.
Expand Down Expand Up @@ -35,6 +36,35 @@
*/
public interface IndexConfigHelper {

/**
* Config key controlling the log level for OS shadow write failures in dual-write phases.
*
* <p>Valid values: {@code DEBUG}, {@code INFO}, {@code WARN}, {@code ERROR} (default: {@code WARN}).
* Set to {@code ERROR} or {@code DEBUG} to increase/decrease visibility during migration QA.</p>
*/
String SHADOW_WRITE_LOG_LEVEL_KEY = "DOTCMS_SHADOW_WRITE_LOG_LEVEL";

/**
* Logs an OS shadow write failure at the level configured by
* {@value #SHADOW_WRITE_LOG_LEVEL_KEY} (default: {@code WARN}).
*
* @param clazz the class to attribute the log entry to
* @param message the log message
* @param t the throwable, or {@code null} if none
*/
static void logShadowWriteFailure(final Class<?> clazz,
final String message,
final Throwable t) {
final String level = Config.getStringProperty(SHADOW_WRITE_LOG_LEVEL_KEY, "WARN")
.toUpperCase();
switch (level) {
case "DEBUG": Logger.debug(clazz, message, t); break;
case "INFO": Logger.info(clazz, message, t); break;
case "ERROR": Logger.error(clazz, message, t); break;
default: Logger.warn(clazz, message, t); break;
}
}

// -------------------------------------------------------------------------
// Migration phase
// -------------------------------------------------------------------------
Expand Down
10 changes: 6 additions & 4 deletions dotCMS/src/main/java/com/dotcms/content/index/PhaseRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import static com.dotcms.content.index.IndexConfigHelper.isMigrationNotStarted;
import static com.dotcms.content.index.IndexConfigHelper.isReadEnabled;

import static com.dotcms.content.index.IndexConfigHelper.logShadowWriteFailure;

import com.dotmarketing.util.Logger;
import java.util.List;
import java.util.function.Consumer;
Expand Down Expand Up @@ -233,7 +235,7 @@ public boolean writeBoolean(final Function<T, Boolean> fn) {
}
// Dual-write: call every provider; only primary result is returned
final T primary = readProvider();
boolean primaryResult = true;
boolean primaryResult = false; // safe default: assume failure until primary confirms success
RuntimeException primaryEx = null;
for (final T impl : providers) {
try {
Expand All @@ -246,7 +248,7 @@ public boolean writeBoolean(final Function<T, Boolean> fn) {
if (impl == primary) {
primaryEx = e;
} else {
Logger.warn(PhaseRouter.class,
logShadowWriteFailure(PhaseRouter.class,
"Shadow write failed (fire-and-forget in dual-write phase): "
+ e.getMessage(), e);
}
Expand Down Expand Up @@ -332,7 +334,7 @@ public void writeChecked(final ThrowingConsumer<T> action) throws Exception {
if (impl == primary) {
primaryEx = e; // record — shadow must still be called
} else {
Logger.warn(PhaseRouter.class,
logShadowWriteFailure(PhaseRouter.class,
"Shadow write failed (fire-and-forget in dual-write phase): "
+ e.getMessage(), e);
}
Expand Down Expand Up @@ -372,7 +374,7 @@ public <R> R writeReturningChecked(final ThrowingFunction<T, R> fn) throws Excep
try {
fn.apply(shadow);
} catch (Exception e) {
Logger.warn(PhaseRouter.class,
logShadowWriteFailure(PhaseRouter.class,
"Shadow write failed (fire-and-forget in dual-write phase): "
+ e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.dotmarketing.common.reindex;

import static com.dotcms.content.index.IndexConfigHelper.logShadowWriteFailure;

import com.dotcms.content.index.IndexConfigHelper;
import com.dotcms.content.index.IndexTag;
import com.dotcms.content.index.domain.IndexBulkItemResult;
Expand Down Expand Up @@ -111,8 +113,8 @@ public void afterBulk(final long executionId, final List<IndexBulkItemResult> re
// OS shadow — fire-and-forget; log individual failures for observability only
results.stream()
.filter(IndexBulkItemResult::failed)
.forEach(r -> Logger.warn(this.getClass(),
"[OS] Index failure (fire-and-forget): " + r.failureMessage()));
.forEach(r -> logShadowWriteFailure(this.getClass(),
"[OS] Index failure (fire-and-forget): " + r.failureMessage(), null));
return;
}
Logger.debug(this.getClass(), "Bulk process completed");
Expand Down Expand Up @@ -155,7 +157,7 @@ public void afterBulk(final long executionId, final List<IndexBulkItemResult> re
public void afterBulk(final long executionId, final Throwable failure) {
final String msg = failure != null ? failure.getMessage() : "(no message)";
if (shadow) {
Logger.warnAndDebug(this.getClass(),
logShadowWriteFailure(this.getClass(),
"[OS] Bulk process failed entirely (fire-and-forget): " + msg, failure);
return;
}
Expand Down
8 changes: 8 additions & 0 deletions dotCMS/src/main/java/com/dotmarketing/util/Logger.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ public static void info(Class cl, String message) {
loadLogger(cl).info(message);
}

public static void info(Class cl, String message, Throwable ex) {
if (isVelocityMessage(cl)) {
velocityInfo(cl, message);
return;
}
loadLogger(cl).info(message, ex);
}

public static void info(String cl, String message) {
loadLogger(cl).info(message);
}
Expand Down
11 changes: 11 additions & 0 deletions dotCMS/src/main/resources/dotmarketing-config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -922,3 +922,14 @@ telemetry.collection.timeout.seconds=30
# Metrics taking longer than this will be logged as warnings for optimization
# Default: 500 milliseconds
telemetry.metric.slow.threshold.ms=500

## OpenSearch migration — shadow write observability
# Log level for OS shadow write failures during dual-write phases (1 and 2).
# Shadow failures are fire-and-forget: the ES (primary) result is returned to the
# caller regardless of OS outcome. This setting controls how loudly those failures
# are reported.
# Valid values: DEBUG, INFO, WARN, ERROR (default: WARN)
# Set to ERROR to surface shadow failures in dashboards/alerts.
# Set to DEBUG to suppress them during steady-state migration.
#DOTCMS_SHADOW_WRITE_LOG_LEVEL=WARN

Loading
Loading