Skip to content
71 changes: 71 additions & 0 deletions demo.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@

# ./gradlew dev

# following the https://github.com/apache/solr/blob/main/solr/example/films/README.md steps except for
# starting with `-cloud -noprompt` arguments to get cloud mode
# creating with `-s 2` argument to get two shards

solr/packaging/build/dev/bin/solr start -cloud -noprompt

solr/packaging/build/dev/bin/solr create -c films -s 2

curl --silent http://localhost:8983/solr/films/schema -X POST -H 'Content-type:application/json' --data-binary '{
"add-field-type" : {
"name":"knn_vector_10",
"class":"solr.DenseVectorField",
"vectorDimension":10,
"similarityFunction":"cosine",
"knnAlgorithm":"hnsw"
},
"add-field" : [
{
"name":"name",
"type":"text_general",
"multiValued":false,
"stored":true
},
{
"name":"initial_release_date",
"type":"pdate",
"stored":true
},
{
"name":"film_vector",
"type":"knn_vector_10",
"indexed":true,
"stored":true
}
]
}'

solr/packaging/build/dev/bin/post -c films solr/packaging/build/dev/example/films/films.json

curl --silent "http://localhost:8983/solr/films/select?q=name:batman&rows=10"

curl --silent --data-urlencode 'expr=
search(films,
q="name:batman",
fl="id,name,genre,film_vector",
sort="id asc",
rows="10")
' "http://localhost:8983/solr/films/stream"

curl --data-urlencode 'expr=
demoVectorToText(
search(films,
q="name:batman",
fl="id,name,genre,film_vector",
sort="id asc",
rows="10"),
vectorField="film_vector",
outputKey="answer",
demoParam="externalComputation"
)
' "http://localhost:8983/solr/films/stream"

echo
echo
echo "TODO: solr/packaging/build/dev/bin/solr stop -all"
echo
echo

Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@
import org.apache.solr.client.solrj.io.stream.CsvStream;
import org.apache.solr.client.solrj.io.stream.DaemonStream;
import org.apache.solr.client.solrj.io.stream.DeleteStream;
import org.apache.solr.client.solrj.io.stream.DemoVectorToTextStream;
import org.apache.solr.client.solrj.io.stream.DrillStream;
import org.apache.solr.client.solrj.io.stream.EchoStream;
import org.apache.solr.client.solrj.io.stream.EvalStream;
Expand Down Expand Up @@ -415,6 +416,9 @@ public static void register(StreamFactory streamFactory) {
.withFunctionName("distinct", DistinctOperation.class)
.withFunctionName("having", HavingStream.class)

// experimental
.withFunctionName("demoVectorToText", DemoVectorToTextStream.class)

// Stream Evaluators
.withFunctionName("val", RawValueEvaluator.class)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.DefaultStreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;

public abstract class AbstractVectorToTextStream extends TupleStream implements Expressible {

private static final String VECTOR_FIELD_PARAM = "vectorField";
private static final String OUTPUT_KEY_PARAM = "outputKey";

private final TupleStream tupleStream;

private final String vectorField;
private final String outputKey;

public AbstractVectorToTextStream(StreamExpression streamExpression, StreamFactory streamFactory)
throws IOException {

final List<StreamExpression> streamExpressions =
streamFactory.getExpressionOperandsRepresentingTypes(
streamExpression, Expressible.class, TupleStream.class);
if (streamExpressions.size() == 1) {
this.tupleStream = streamFactory.constructStream(streamExpressions.get(0));
} else {
throw new IOException("Expected exactly one stream in expression: " + streamExpression);
}

this.vectorField = getOperandValue(streamExpression, streamFactory, VECTOR_FIELD_PARAM);
this.outputKey = getOperandValue(streamExpression, streamFactory, OUTPUT_KEY_PARAM);

if (!(streamFactory instanceof DefaultStreamFactory)) {
throw new IOException(
this.getClass().getName()
+ " requires a "
+ DefaultStreamFactory.class.getName()
+ " StreamFactory");
}
}

protected static String getOperandValue(
StreamExpression streamExpression, StreamFactory streamFactory, String operandName)
throws IOException {
final StreamExpressionNamedParameter namedParameter =
streamFactory.getNamedOperand(streamExpression, operandName);
String operandValue = null;
if (namedParameter != null && namedParameter.getParameter() instanceof StreamExpressionValue) {
operandValue = ((StreamExpressionValue) namedParameter.getParameter()).getValue();
}
if (operandValue == null) {
throw new IOException("Expected '" + operandName + "' in expression: " + streamExpression);
} else {
return operandValue;
}
}

public void setStreamContext(StreamContext streamContext) {
tupleStream.setStreamContext(streamContext);
}

public List<TupleStream> children() {
return tupleStream.children();
}

public void open() throws IOException {
tupleStream.open();
}

public void close() throws IOException {
tupleStream.close();
}

public Tuple read() throws IOException {
List<List<Double>> vectors = new ArrayList<>();
Tuple tuple = tupleStream.read();
while (!tuple.EOF) {
vectors.add(tuple.getDoubles(this.vectorField));
tuple = tupleStream.read();
}

final Tuple result = new Tuple();
result.put(this.outputKey, vectorToText(vectors));
result.EOF = true;
return result;
}

protected abstract String vectorToText(List<List<Double>> vectors);

public StreamComparator getStreamSort() {
return tupleStream.getStreamSort();
}

public Explanation toExplanation(StreamFactory streamFactory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[] {tupleStream.toExplanation(streamFactory)})
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withFunctionName(streamFactory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpression(toExpression(streamFactory, false).toString());
}

public StreamExpressionParameter toExpression(StreamFactory streamFactory) throws IOException {
return toExpression(streamFactory, true /* includeStreams */);
}

protected StreamExpression toExpression(StreamFactory streamFactory, boolean includeStreams)
throws IOException {
final String functionName = streamFactory.getFunctionName(this.getClass());
final StreamExpression streamExpression = new StreamExpression(functionName);

if (includeStreams) {
if (this.tupleStream instanceof Expressible) {
streamExpression.addParameter(((Expressible) this.tupleStream).toExpression(streamFactory));
} else {
throw new IOException(
"This "
+ this.getClass().getName()
+ " contains a non-Expressible TupleStream "
+ this.tupleStream.getClass().getName());
}
} else {
streamExpression.addParameter("<stream>");
}

streamExpression.addParameter(
new StreamExpressionNamedParameter(VECTOR_FIELD_PARAM, this.vectorField));
streamExpression.addParameter(
new StreamExpressionNamedParameter(OUTPUT_KEY_PARAM, this.outputKey));

return streamExpression;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;

import java.io.IOException;
import java.util.List;
import java.util.Locale;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;

/**
* Illustrative expression snippet:
*
* <pre>
* demoVectorToText(search(myCollection,
* q="*:*",
* fl="id,fieldX",
* sort="id asc",
* qt="/export"),
* vectorField="fieldX",
* outputKey="answer",
* demoParam="foobar")
* </pre>
*/
public class DemoVectorToTextStream extends AbstractVectorToTextStream {

private static final String DEMO_PARAM = "demoParam";

private final String demoParam;

public DemoVectorToTextStream(StreamExpression streamExpression, StreamFactory streamFactory)
throws IOException {
super(streamExpression, streamFactory);
this.demoParam = getOperandValue(streamExpression, streamFactory, DEMO_PARAM);
}

/** TODO: replace this dummy text with something model based */
protected String vectorToText(List<List<Double>> vectors) {
final StringBuilder sb = new StringBuilder();
sb.append(this.demoParam);
sb.append("([\n");
for (List<Double> vector : vectors) {
sb.append(String.format(Locale.ROOT, "%s\n", vector));
}
sb.append("])");
return sb.toString();
}

protected StreamExpression toExpression(StreamFactory streamFactory, boolean includeStreams)
throws IOException {
final StreamExpression streamExpression = super.toExpression(streamFactory, includeStreams);

streamExpression.addParameter(new StreamExpressionNamedParameter(DEMO_PARAM, this.demoParam));

return streamExpression;
}
}