Skip to content

Commit

Permalink
fix #2827 Add CPU load calibration during search log download to prev…
Browse files Browse the repository at this point in the history
…ent high CPU usage.
  • Loading branch information
marevol committed Jul 5, 2024
1 parent 5d74bd3 commit 4a010e7
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,18 @@
import org.codelibs.fess.es.log.exbhv.FavoriteLogBhv;
import org.codelibs.fess.es.log.exbhv.SearchLogBhv;
import org.codelibs.fess.es.log.exbhv.UserInfoBhv;
import org.codelibs.fess.es.log.exentity.ClickLog;
import org.codelibs.fess.es.log.exentity.FavoriteLog;
import org.codelibs.fess.es.log.exentity.SearchLog;
import org.codelibs.fess.es.log.exentity.UserInfo;
import org.codelibs.fess.helper.SystemHelper;
import org.codelibs.fess.mylasta.direction.FessConfig;
import org.codelibs.fess.util.ComponentUtil;
import org.codelibs.fess.util.GsaConfigParser;
import org.codelibs.fess.util.RenderDataUtil;
import org.codelibs.fess.util.ResourceUtil;
import org.codelibs.fess.util.SearchEngineUtil;
import org.dbflute.bhv.readable.EntityRowHandler;
import org.lastaflute.core.magic.async.AsyncManager;
import org.lastaflute.web.Execute;
import org.lastaflute.web.response.ActionResponse;
Expand Down Expand Up @@ -431,121 +437,157 @@ private static StringBuilder appendJson(final String field, final Object value,
}

public static Consumer<Writer> getSearchLogNdjsonWriteCall() {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
final SystemHelper systemHelper = ComponentUtil.getSystemHelper();
final long timeout = fessConfig.getIndexBackupLogLoadTimeoutAsInteger().longValue();
return writer -> {
final SearchLogBhv bhv = ComponentUtil.getComponent(SearchLogBhv.class);
bhv.selectCursor(cb -> {
cb.query().matchAll();
cb.query().addOrderBy_RequestedAt_Asc();
}, entity -> {
final StringBuilder buf = new StringBuilder();
buf.append('{');
appendJson("id", entity.getId(), buf).append(',');
appendJson("query-id", entity.getQueryId(), buf).append(',');
appendJson("user-info-id", entity.getUserInfoId(), buf).append(',');
appendJson("user-session-id", entity.getUserSessionId(), buf).append(',');
appendJson("user", entity.getUser(), buf).append(',');
appendJson("search-word", entity.getSearchWord(), buf).append(',');
appendJson("hit-count", entity.getHitCount(), buf).append(',');
appendJson("query-page-size", entity.getQueryPageSize(), buf).append(',');
appendJson("query-offset", entity.getQueryOffset(), buf).append(',');
appendJson("referer", entity.getReferer(), buf).append(',');
appendJson("languages", entity.getLanguages(), buf).append(',');
appendJson("roles", entity.getRoles(), buf).append(',');
appendJson("user-agent", entity.getUserAgent(), buf).append(',');
appendJson("client-ip", entity.getClientIp(), buf).append(',');
appendJson("access-type", entity.getAccessType(), buf).append(',');
appendJson("query-time", entity.getQueryTime(), buf).append(',');
appendJson("response-time", entity.getResponseTime(), buf).append(',');
appendJson("requested-at", entity.getRequestedAt(), buf).append(',');
final Map<String, List<String>> searchFieldMap = entity.getSearchFieldLogList().stream()
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.mapping(Pair::getSecond, Collectors.toList())));
appendJson("search-field", searchFieldMap, buf).append(',');
final Map<String, List<String>> requestHeaderMap = entity.getRequestHeaderList().stream()
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.mapping(Pair::getSecond, Collectors.toList())));
appendJson("headers", requestHeaderMap, buf);
buf.append('}');
buf.append('\n');
try {
writer.write(buf.toString());
} catch (final IOException e) {
throw new IORuntimeException(e);
}, new LogEntityRowHandler<SearchLog>() {
@Override
public void handle(final SearchLog entity) {
final StringBuilder buf = new StringBuilder();
buf.append('{');
appendJson("id", entity.getId(), buf).append(',');
appendJson("query-id", entity.getQueryId(), buf).append(',');
appendJson("user-info-id", entity.getUserInfoId(), buf).append(',');
appendJson("user-session-id", entity.getUserSessionId(), buf).append(',');
appendJson("user", entity.getUser(), buf).append(',');
appendJson("search-word", entity.getSearchWord(), buf).append(',');
appendJson("hit-count", entity.getHitCount(), buf).append(',');
appendJson("query-page-size", entity.getQueryPageSize(), buf).append(',');
appendJson("query-offset", entity.getQueryOffset(), buf).append(',');
appendJson("referer", entity.getReferer(), buf).append(',');
appendJson("languages", entity.getLanguages(), buf).append(',');
appendJson("roles", entity.getRoles(), buf).append(',');
appendJson("user-agent", entity.getUserAgent(), buf).append(',');
appendJson("client-ip", entity.getClientIp(), buf).append(',');
appendJson("access-type", entity.getAccessType(), buf).append(',');
appendJson("query-time", entity.getQueryTime(), buf).append(',');
appendJson("response-time", entity.getResponseTime(), buf).append(',');
appendJson("requested-at", entity.getRequestedAt(), buf).append(',');
final Map<String, List<String>> searchFieldMap = entity.getSearchFieldLogList().stream()
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.mapping(Pair::getSecond, Collectors.toList())));
appendJson("search-field", searchFieldMap, buf).append(',');
final Map<String, List<String>> requestHeaderMap = entity.getRequestHeaderList().stream()
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.mapping(Pair::getSecond, Collectors.toList())));
appendJson("headers", requestHeaderMap, buf);
buf.append('}');
buf.append('\n');
try {
writer.write(buf.toString());
} catch (final IOException e) {
throw new IORuntimeException(e);
}
if (!systemHelper.calibrateCpuLoad(timeout)) {
breakCursor = true;
}
}
});
};
}

public static Consumer<Writer> getUserInfoNdjsonWriteCall() {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
final SystemHelper systemHelper = ComponentUtil.getSystemHelper();
final long timeout = fessConfig.getIndexBackupLogLoadTimeoutAsInteger().longValue();
return writer -> {
final UserInfoBhv bhv = ComponentUtil.getComponent(UserInfoBhv.class);
bhv.selectCursor(cb -> {
cb.query().matchAll();
cb.query().addOrderBy_CreatedAt_Asc();
}, entity -> {
final StringBuilder buf = new StringBuilder();
buf.append('{');
appendJson("id", entity.getId(), buf).append(',');
appendJson("created-at", entity.getCreatedAt(), buf).append(',');
appendJson("updated-at", entity.getUpdatedAt(), buf);
buf.append('}');
buf.append('\n');
try {
writer.write(buf.toString());
} catch (final IOException e) {
throw new IORuntimeException(e);
}, new LogEntityRowHandler<UserInfo>() {
@Override
public void handle(final UserInfo entity) {
final StringBuilder buf = new StringBuilder();
buf.append('{');
appendJson("id", entity.getId(), buf).append(',');
appendJson("created-at", entity.getCreatedAt(), buf).append(',');
appendJson("updated-at", entity.getUpdatedAt(), buf);
buf.append('}');
buf.append('\n');
try {
writer.write(buf.toString());
} catch (final IOException e) {
throw new IORuntimeException(e);
}
if (!systemHelper.calibrateCpuLoad(timeout)) {
breakCursor = true;
}
}
});
};
}

public static Consumer<Writer> getFavoriteLogNdjsonWriteCall() {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
final SystemHelper systemHelper = ComponentUtil.getSystemHelper();
final long timeout = fessConfig.getIndexBackupLogLoadTimeoutAsInteger().longValue();
return writer -> {
final FavoriteLogBhv bhv = ComponentUtil.getComponent(FavoriteLogBhv.class);
bhv.selectCursor(cb -> {
cb.query().matchAll();
cb.query().addOrderBy_CreatedAt_Asc();
}, entity -> {
final StringBuilder buf = new StringBuilder();
buf.append('{');
appendJson("id", entity.getId(), buf).append(',');
appendJson("created-at", entity.getCreatedAt(), buf).append(',');
appendJson("query-id", entity.getQueryId(), buf).append(',');
appendJson("user-info-id", entity.getUserInfoId(), buf).append(',');
appendJson("doc-id", entity.getDocId(), buf).append(',');
appendJson("url", entity.getUrl(), buf);
buf.append('}');
buf.append('\n');
try {
writer.write(buf.toString());
} catch (final IOException e) {
throw new IORuntimeException(e);
}, new LogEntityRowHandler<FavoriteLog>() {
@Override
public void handle(final FavoriteLog entity) {
final StringBuilder buf = new StringBuilder();
buf.append('{');
appendJson("id", entity.getId(), buf).append(',');
appendJson("created-at", entity.getCreatedAt(), buf).append(',');
appendJson("query-id", entity.getQueryId(), buf).append(',');
appendJson("user-info-id", entity.getUserInfoId(), buf).append(',');
appendJson("doc-id", entity.getDocId(), buf).append(',');
appendJson("url", entity.getUrl(), buf);
buf.append('}');
buf.append('\n');
try {
writer.write(buf.toString());
} catch (final IOException e) {
throw new IORuntimeException(e);
}
if (!systemHelper.calibrateCpuLoad(timeout)) {
breakCursor = true;
}
}
});
};
}

public static Consumer<Writer> getClickLogNdjsonWriteCall() {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
final SystemHelper systemHelper = ComponentUtil.getSystemHelper();
final long timeout = fessConfig.getIndexBackupLogLoadTimeoutAsInteger().longValue();
return writer -> {
final ClickLogBhv bhv = ComponentUtil.getComponent(ClickLogBhv.class);
bhv.selectCursor(cb -> {
cb.query().matchAll();
cb.query().addOrderBy_RequestedAt_Asc();
}, entity -> {
final StringBuilder buf = new StringBuilder();
buf.append('{');
appendJson("id", entity.getId(), buf).append(',');
appendJson("query-id", entity.getQueryId(), buf).append(',');
appendJson("user-session-id", entity.getUserSessionId(), buf).append(',');
appendJson("doc-id", entity.getDocId(), buf).append(',');
appendJson("url", entity.getUrl(), buf).append(',');
appendJson("order", entity.getOrder(), buf).append(',');
appendJson("query-requested-at", entity.getQueryRequestedAt(), buf).append(',');
appendJson("requested-at", entity.getRequestedAt(), buf);
buf.append('}');
buf.append('\n');
try {
writer.write(buf.toString());
} catch (final IOException e) {
throw new IORuntimeException(e);
}, new LogEntityRowHandler<ClickLog>() {
@Override
public void handle(final ClickLog entity) {
final StringBuilder buf = new StringBuilder();
buf.append('{');
appendJson("id", entity.getId(), buf).append(',');
appendJson("query-id", entity.getQueryId(), buf).append(',');
appendJson("user-session-id", entity.getUserSessionId(), buf).append(',');
appendJson("doc-id", entity.getDocId(), buf).append(',');
appendJson("url", entity.getUrl(), buf).append(',');
appendJson("order", entity.getOrder(), buf).append(',');
appendJson("query-requested-at", entity.getQueryRequestedAt(), buf).append(',');
appendJson("requested-at", entity.getRequestedAt(), buf);
buf.append('}');
buf.append('\n');
try {
writer.write(buf.toString());
} catch (final IOException e) {
throw new IORuntimeException(e);
}
if (!systemHelper.calibrateCpuLoad(timeout)) {
breakCursor = true;
}
}
});
};
Expand All @@ -572,4 +614,12 @@ private void deleteTempFile(final File tempFile) {
}
}

private static abstract class LogEntityRowHandler<ENTITY> implements EntityRowHandler<ENTITY> {
protected boolean breakCursor = false;

@Override
public boolean isBreakCursor() {
return breakCursor;
}
}
}
19 changes: 16 additions & 3 deletions src/main/java/org/codelibs/fess/helper/SystemHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -601,19 +601,31 @@ public File createTempFile(final String prefix, final String suffix) {
}
}

public void calibrateCpuLoad() {
public boolean calibrateCpuLoad() {
return calibrateCpuLoad(0L);
}

public boolean calibrateCpuLoad(final long timeoutInMillis) {
final short percent = ComponentUtil.getFessConfig().getAdaptiveLoadControlAsInteger().shortValue();
if (percent <= 0) {
return;
return true;
}
short current = getSystemCpuPercent();
if (current < percent) {
return;
return true;
}
final long startTime = getCurrentTimeAsLong();
final String threadName = Thread.currentThread().getName();
try {
waitingThreadNames.add(threadName);
while (current >= percent) {
if (timeoutInMillis > 0 && getCurrentTimeAsLong() - startTime > timeoutInMillis) {
if (logger.isInfoEnabled()) {
logger.info("Cpu Load {}% is greater than {}%. {} waiting thread(s). {} thread is timed out.", current, percent,
waitingThreadNames.size(), threadName);
}
return false;
}
if (logger.isInfoEnabled()) {
logger.info("Cpu Load {}% is greater than {}%. {} waiting thread(s).", current, percent, waitingThreadNames.size());
}
Expand All @@ -626,6 +638,7 @@ public void calibrateCpuLoad() {
} finally {
waitingThreadNames.remove(threadName);
}
return true;
}

public void waitForNoWaitingThreads() {
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/org/codelibs/fess/mylasta/direction/FessConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,9 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
/** The key of the configuration. e.g. click_log.ndjson,favorite_log.ndjson,search_log.ndjson,user_info.ndjson */
String INDEX_BACKUP_LOG_TARGETS = "index.backup.log.targets";

/** The key of the configuration. e.g. 60000 */
String INDEX_BACKUP_LOG_LOAD_TIMEOUT = "index.backup.log.load.timeout";

/** The key of the configuration. e.g. true */
String LOGGING_SEARCH_DOCS_ENABLED = "logging.search.docs.enabled";

Expand Down Expand Up @@ -5505,6 +5508,21 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
*/
String getIndexBackupLogTargets();

/**
* Get the value for the key 'index.backup.log.load.timeout'. <br>
* The value is, e.g. 60000 <br>
* @return The value of found property. (NotNull: if not found, exception but basically no way)
*/
String getIndexBackupLogLoadTimeout();

/**
* Get the value for the key 'index.backup.log.load.timeout' as {@link Integer}. <br>
* The value is, e.g. 60000 <br>
* @return The value of found property. (NotNull: if not found, exception but basically no way)
* @throws NumberFormatException When the property is not integer.
*/
Integer getIndexBackupLogLoadTimeoutAsInteger();

/**
* Get the value for the key 'logging.search.docs.enabled'. <br>
* The value is, e.g. true <br>
Expand Down Expand Up @@ -9611,6 +9629,14 @@ public String getIndexBackupLogTargets() {
return get(FessConfig.INDEX_BACKUP_LOG_TARGETS);
}

public String getIndexBackupLogLoadTimeout() {
return get(FessConfig.INDEX_BACKUP_LOG_LOAD_TIMEOUT);
}

public Integer getIndexBackupLogLoadTimeoutAsInteger() {
return getAsInteger(FessConfig.INDEX_BACKUP_LOG_LOAD_TIMEOUT);
}

public String getLoggingSearchDocsEnabled() {
return get(FessConfig.LOGGING_SEARCH_DOCS_ENABLED);
}
Expand Down Expand Up @@ -11155,6 +11181,7 @@ protected java.util.Map<String, String> prepareGeneratedDefaultMap() {
defaultMap.put(FessConfig.INDEX_BACKUP_TARGETS,
"fess_basic_config.bulk,fess_config.bulk,fess_user.bulk,system.properties,fess.json,doc.json");
defaultMap.put(FessConfig.INDEX_BACKUP_LOG_TARGETS, "click_log.ndjson,favorite_log.ndjson,search_log.ndjson,user_info.ndjson");
defaultMap.put(FessConfig.INDEX_BACKUP_LOG_LOAD_TIMEOUT, "60000");
defaultMap.put(FessConfig.LOGGING_SEARCH_DOCS_ENABLED, "true");
defaultMap.put(FessConfig.LOGGING_SEARCH_DOCS_FIELDS,
"filetype,created,click_count,title,doc_id,url,score,site,filename,host,digest,boost,mimetype,favorite_count,_id,lang,last_modified,content_length,timestamp");
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/fess_config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ ftp.role.from.file=true
# backup
index.backup.targets=fess_basic_config.bulk,fess_config.bulk,fess_user.bulk,system.properties,fess.json,doc.json
index.backup.log.targets=click_log.ndjson,favorite_log.ndjson,search_log.ndjson,user_info.ndjson
index.backup.log.load.timeout=60000

# logging
logging.search.docs.enabled=true
Expand Down

0 comments on commit 4a010e7

Please sign in to comment.