Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,16 @@ private void getPersistedCheckpoints() throws IOException {
SolrDocument doc = httpClient.getById(id);
if (doc != null) {
@SuppressWarnings({"unchecked"})
List<String> checkpoints = (List<String>) doc.getFieldValue("checkpoint_ss");
for (String checkpoint : checkpoints) {
String[] pair = checkpoint.split("~");
this.checkpoints.put(pair[0], Long.parseLong(pair[1]));
List<String> checkpoint_ss = (List<String>) doc.getFieldValue("checkpoint_ss");
for (String checkpoint_s : checkpoint_ss) {
String[] pair = checkpoint_s.split("~");
long checkpoint;
if (initialCheckpoint > -1) {
checkpoint = initialCheckpoint;
} else {
checkpoint = Long.parseLong(pair[1]);
}
this.checkpoints.put(pair[0], checkpoint);
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4336,6 +4336,17 @@ public void testClassifyStream() throws Exception {
updateRequest.add(id, String.valueOf(3), "text_s", "a b e e f");
updateRequest.commit(cluster.getSolrClient(), "uknownCollection");

expr =
"classify("
+
// use cacheMillis=0 to prevent cached results. it doesn't matter on the first run,
// but we want to ensure that when we re-use this expression later after
// training another model, we'll still get accurate results.
"model(modelCollection, id=\"model\", cacheMillis=0),"
+ "topic(checkpointCollection, uknownCollection, q=\"*:*\", fl=\"text_s, id\", id=\"1000000\"),"
+ "field=\"text_s\","
+ "analyzerField=\"tv_text\")";
Comment on lines +4339 to +4348
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the same expression as above but without the initialCheckpoint=0 ... though reading the (current) docs that means "the highest version in the index" though if the highest version in the index was used then the first batch in the stream below would not include the documents just added with ids 2 and 3?

Wondering if the documentation needs tweaking to account for persisted checkpoints?

https://github.com/apache/solr/blob/releases/solr/9.0.0/solr/solr-ref-guide/modules/query-guide/pages/stream-source-reference.adoc#topic-parameters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkpoints are persisted when the stream is closed, or if checkpointEvery > -1 (and then every count % checkpointEvery), otherwise the checkpoints are stored in the checkpoints hashmap, so for 'just' added docs, I think as long as is it matches the underlying query, and those docs have been soft committed (see caveat for topicstream SOLR-8709), I think they should be picked up, unless I'm completely misunderstanding ?

paramsLoc.set("expr", expr);
classifyStream = new SolrStream(url, paramsLoc);
idToLabel = getIdToLabel(classifyStream, "probability_d");
assertEquals(idToLabel.size(), 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2537,6 +2537,97 @@ public void testSubFacetStream() throws Exception {
assertTrue(peri == 1.0D);
}

@Test
public void testTopicStreamInitialCheckpoint() throws Exception {
Assume.assumeTrue(!useAlias);

new UpdateRequest()
.add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1")
.add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2")
.add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5")
.add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6")
.add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7")
.add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);

StreamFactory factory =
new StreamFactory()
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
.withFunctionName("topic", TopicStream.class)
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("daemon", DaemonStream.class);

StreamExpression expression;
TupleStream stream = null;
List<Tuple> tuples;

SolrClientCache cache = new SolrClientCache();

try {

// Store checkpoints in the same index as the main documents. This perfectly valid
expression =
StreamExpressionParser.parse(
"topic(collection1, collection1, q=\"a_s:hello\", fl=\"id\", id=\"1000000\", initialCheckpoint=0)");

stream = new TopicStream(expression, factory);
StreamContext context = new StreamContext();
context.setSolrClientCache(cache);
stream.setStreamContext(context);
tuples = getTuples(stream);

assertEquals(10, tuples.size());

// force commit of checkpoints
cluster.getSolrClient().commit("collection1");

expression =
StreamExpressionParser.parse(
"search(collection1, q=\"id:1000000\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")");
stream = factory.constructStream(expression);
context = new StreamContext();
context.setSolrClientCache(cache);
stream.setStreamContext(context);
tuples = getTuples(stream);
assertEquals(tuples.size(), 1);
List<String> checkpoints = tuples.get(0).getStrings("checkpoint_ss");
assertEquals(checkpoints.size(), 2); // one checkpoint for each shard

expression =
StreamExpressionParser.parse(
"topic(collection1, collection1, q=\"a_s:hello\", fl=\"id\", id=\"1000000\", initialCheckpoint=0)");

stream = new TopicStream(expression, factory);
context = new StreamContext();
context.setSolrClientCache(cache);
stream.setStreamContext(context);
tuples = getTuples(stream);

assertEquals(10, tuples.size());

expression =
StreamExpressionParser.parse(
"topic(collection1, collection1, q=\"a_s:hello\", fl=\"id\", id=\"1000000\")");

stream = new TopicStream(expression, factory);
context = new StreamContext();
context.setSolrClientCache(cache);
stream.setStreamContext(context);
tuples = getTuples(stream);

// Should be zero because the checkpoints will be set to the highest version on the shards.
assertEquals(0, tuples.size());

} finally {
stream.close();
cache.close();
}
}

@Test
public void testTopicStream() throws Exception {
Assume.assumeTrue(!useAlias);
Expand Down Expand Up @@ -2843,13 +2934,14 @@ public void testParallelTopicStream() throws Exception {
.add(id, "13", "a_s", "hello", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);

// Run the same topic again including the initialCheckpoint. It should start where it left
// off. initialCheckpoint should be ignored for all but the first run.
// Run the same topic again including the initialCheckpoint.
// Since initialCheckpoint=0, this should contain all ids that match the query
stream = factory.constructStream(expression);
context = new StreamContext();
context.setSolrClientCache(cache);
stream.setStreamContext(context);
assertTopicRun(stream, "12", "13");
assertTopicRun(
stream, "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13");

// Test text extraction

Expand Down