Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

semaphore for async indexing and sync index in transaction after publish #10388

Merged
merged 14 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/release-notes/10381-index-after-publish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
New release adds a new microprofile setting for maximum number of simultaneously running asynchronous dataset index operations that defaults to ``4``:

dataverse.solr.concurrency.max-async-indexes
9 changes: 9 additions & 0 deletions doc/sphinx-guides/source/installation/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2352,6 +2352,15 @@ when using it to configure your core name!

Can also be set via *MicroProfile Config API* sources, e.g. the environment variable ``DATAVERSE_SOLR_PATH``.

dataverse.solr.concurrency.max-async-indexes
++++++++++++++++++++++++++++++++++++++++++++

Maximum number of simultaneously running asynchronous dataset index operations.

Defaults to ``4``.

Can also be set via *MicroProfile Config API* sources, e.g. the environment variable ``DATAVERSE_SOLR_CONCURRENCY_MAX_ASYNC_INDEXES``.

dataverse.rserve.host
+++++++++++++++++++++

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ public Dataset execute(CommandContext ctxt) throws CommandException {

logger.info("Successfully published the dataset "+readyDataset.getGlobalId().asString());
readyDataset = ctxt.em().merge(readyDataset);

try {
ctxt.index().indexDataset(readyDataset, true);
} catch (SolrServerException | IOException e) {
throw new CommandException("Indexing failed: " + e.getMessage(), this);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to understand what's going on here. I understand why we may want to change indexing from asynchronous to synchronous. But why are we moving the indexing from onSuccess() and back into the execute() method?
Is this just to address the original issue with timestamps (#10381)?
Are we positive we want to do this? - If I understand correctly, this would bring us back to a failure to index resulting in a failure to publish. Which I thought we specifically wanted to avoid.
I may easily be missing something of course.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@landreev @scolapasta I do not fully understand the issue myself. Moving the indexing back to execute makes the indexing to happen earlier, not later. Also making it synchronous instead of async makes it also happen earlier, not later. If the issue is that the index did happen, but its timestamp is before the publish moment, this PR would make the problem worse, not better. I just do not see yet why the publish date would be greater than the index moment, unless the index was interrupted (e.g., by a circuit breaker), or a wrong version of dataset was indexed (e.g., the draft and not the published version). I will go again through the code to see if I am missing something, and maybe wrong version gets indexed, etc. As its turns out, we just migrated to the version 6.1 (from 5.14) and recently published dataset has the same problem. I think that the core cause is still not addressed. I will keep digging.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@landreev @scolapasta I did not see any errors on solr, no failure log in payara, etc. Also, the index looks fine, with the right version etc. I assume that indexing went fine after publish, only the "indextime" was not updated on the DB. I reverted to async update in finalize after publish, and added em.flush() after the indextime update, which should fix the problem.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if em.flush changes anything, from what I understand the change should have been written anyway. I think I am still missing something. In the example on our server, the publishing took 20+ hours due to the checksum checks on 20+ TiB. I got e-mail that the dataset was published the next day after publishing started. From the code, the index should happen directly after sending the notifications. I would assume that it either succeeded, but the indextime did not update, or it failed, but then I should see it in the process-failures log, which is empty.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I think that index was done, the correct indextime was written, but then it got overwritten by the merge in

try {
            ExportService instance = ExportService.getInstance();
            instance.exportAllFormats(dataset);
            dataset = ctxt.datasets().merge(dataset); 
        }

I moved the indexing to the bottom, I am more confident now that it is fixed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me that it is safe to merge the current code of this PR and leave it like this until we see how it will behave in production. I saved the link to this issue as it contains some interesting discussion, especially on running batches and metrics. We can revisit these ideas at some point. As for adding the sleep calls, I am against that since this would makes the queue holding indexing jobs longer than necessary. I like the idea of adding the index logic to the timer for fixing exports, but I agree it is out of scope for this PR. We will pick it up later, if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just asked a question in dv-tech slack about simplifying the whole scheme somewhat radically. (but probably not in the context of this PR either).
I'm generally in favor of merging this PR as is (just want to run a couple of more experiments with multiple indexing attempts in parallel).
I would consider however including one small extra change - adding a few extra hours to the index timestamp in the test in the isIndexedVersion() method in DatasetPage.java:

return (workingVersion.getDataset().getIndexTime() != null) && 
workingVersion.getDataset().getIndexTime().after(workingVersion.getReleaseTime());

i.e., something like

(dataset.getIndexTime().getTime() + (a few hours worth of milliseconds)) > version.getReleaseTime().getTime()

instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ik have looked at the differences, but nothing stroke me as suspicious.

In solrconfig.xml some requesthandlers are disabled, but I do not think that these were used anyway. Those are the /browse, /spell, /update/extract, /tvhr and /elevate endpoints.

In schema.xml I did find it strange that the schema version in 6.1 is lower than that of 5.14. But I do not know what the impact of that version number is. Could be just informative. The other differences are replacing the now obsolete solr.TrieXXXField with solr.XXXPointField as one would expect for Solr 9. There are also some changes to the definitions of dynamic field definitions. AFAIK Dataverse does not use dynamic field types in Solr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be the move from Solr 8 to 9 itself that changed something dramatically?
From a brief look, I'm not seeing any information online suggesting 9 is dramatically slower than 8 in general. ... But could be something Dataverse specifically relies on when indexing that is behaving differently.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the indexed time in the isIndexedVersion test by 3 hours as suggested. This should fix the problem for already published datasets with indexedtime before releasetime.


return readyDataset;
}
Expand All @@ -267,7 +273,6 @@ public boolean onSuccess(CommandContext ctxt, Object r) {
} catch (Exception e) {
logger.warning("Failure to send dataset published messages for : " + dataset.getId() + " : " + e.getMessage());
}
ctxt.index().asyncIndexDataset(dataset, true);

//re-indexing dataverses that have additional subjects
if (!dataversesToIndex.isEmpty()){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -341,6 +342,8 @@ public void indexDatasetInNewTransaction(Long datasetId) { //Dataset dataset) {
private static final Map<Long, Dataset> NEXT_TO_INDEX = new ConcurrentHashMap<>();
// indexingNow is a set of dataset ids of datasets being indexed asynchronously right now
private static final Map<Long, Boolean> INDEXING_NOW = new ConcurrentHashMap<>();
// semaphore for async indexing
private static final Semaphore ASYNC_INDEX_SEMAPHORE = new Semaphore(JvmSettings.MAX_ASYNC_INDEXES.lookupOptional(Integer.class).orElse(4), true);

// When you pass null as Dataset parameter to this method, it indicates that the indexing of the dataset with "id" has finished
// Pass non-null Dataset to schedule it for indexing
Expand Down Expand Up @@ -385,6 +388,19 @@ synchronized private static Dataset getNextToIndex(Long id, Dataset d) {
*/
@Asynchronous
public void asyncIndexDataset(Dataset dataset, boolean doNormalSolrDocCleanUp) {
try {
ASYNC_INDEX_SEMAPHORE.acquire();
doAyncIndexDataset(dataset, doNormalSolrDocCleanUp);
} catch (InterruptedException e) {
String failureLogText = "Indexing failed: interrupted. You can kickoff a re-index of this dataset with: \r\n curl http://localhost:8080/api/admin/index/datasets/" + dataset.getId().toString();
failureLogText += "\r\n" + e.getLocalizedMessage();
LoggingUtil.writeOnSuccessFailureLog(null, failureLogText, dataset);
} finally {
ASYNC_INDEX_SEMAPHORE.release();
}
}

private void doAyncIndexDataset(Dataset dataset, boolean doNormalSolrDocCleanUp) {
Long id = dataset.getId();
Dataset next = getNextToIndex(id, dataset); // if there is an ongoing index job for this dataset, next is null (ongoing index job will reindex the newest version after current indexing finishes)
while (next != null) {
Expand All @@ -402,7 +418,16 @@ public void asyncIndexDataset(Dataset dataset, boolean doNormalSolrDocCleanUp) {
@Asynchronous
public void asyncIndexDatasetList(List<Dataset> datasets, boolean doNormalSolrDocCleanUp) {
for(Dataset dataset : datasets) {
asyncIndexDataset(dataset, true);
try {
ASYNC_INDEX_SEMAPHORE.acquire();
ErykKul marked this conversation as resolved.
Show resolved Hide resolved
doAyncIndexDataset(dataset, true);
} catch (InterruptedException e) {
String failureLogText = "Indexing failed: interrupted. You can kickoff a re-index of this dataset with: \r\n curl http://localhost:8080/api/admin/index/datasets/" + dataset.getId().toString();
failureLogText += "\r\n" + e.getLocalizedMessage();
LoggingUtil.writeOnSuccessFailureLog(null, failureLogText, dataset);
} finally {
ASYNC_INDEX_SEMAPHORE.release();
}
}
}

Expand All @@ -414,7 +439,7 @@ public void indexDvObject(DvObject objectIn) throws SolrServerException, IOExce
}
}

private void indexDataset(Dataset dataset, boolean doNormalSolrDocCleanUp) throws SolrServerException, IOException {
public void indexDataset(Dataset dataset, boolean doNormalSolrDocCleanUp) throws SolrServerException, IOException {
doIndexDataset(dataset, doNormalSolrDocCleanUp);
updateLastIndexedTime(dataset.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public enum JvmSettings {
SOLR_CORE(SCOPE_SOLR, "core"),
SOLR_PATH(SCOPE_SOLR, "path"),

// INDEX CONCURENCY
SCOPE_SOLR_CONCURENCY(SCOPE_SOLR, "concurrency"),
MAX_ASYNC_INDEXES(SCOPE_SOLR_CONCURENCY, "max-async-indexes"),

// RSERVE CONNECTION
SCOPE_RSERVE(PREFIX, "rserve"),
RSERVE_HOST(SCOPE_RSERVE, "host"),
Expand Down