Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solr for Alpakka 1.0 #1491

Merged
merged 14 commits into from
Feb 14, 2019
4 changes: 4 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ lazy val docs = project
"extref.javaee-api.base_url" -> "https://docs.oracle.com/javaee/7/api/index.html?%s.html",
"extref.paho-api.base_url" -> "https://www.eclipse.org/paho/files/javadoc/index.html?%s.html",
"extref.slick.base_url" -> s"https://slick.lightbend.com/doc/${Dependencies.SlickVersion}/%s",
// Solr
"extref.solr.base_url" -> s"http://lucene.apache.org/solr/guide/${Dependencies.SolrVersionForDocs}/%s",
"javadoc.org.apache.solr.base_url" -> s"https://lucene.apache.org/solr/${Dependencies.SolrVersionForDocs}_0/solr-solrj/",
// Java
"javadoc.base_url" -> "https://docs.oracle.com/javase/8/docs/api/",
"javadoc.javax.jms.base_url" -> "https://docs.oracle.com/javaee/7/api/",
"javadoc.com.couchbase.base_url" -> s"https://docs.couchbase.com/sdk-api/couchbase-java-client-${Dependencies.CouchbaseVersion}/",
Expand Down
182 changes: 92 additions & 90 deletions docs/src/main/paradox/solr.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# Apache Solr

The Solr connector provides Akka Stream sources and sinks for Solr.
@@@ note { title="Apache Solr" }

Solr (pronounced "solar") is an open source enterprise search platform, written in Java, from the Apache Lucene project. Its major features include full-text search, hit highlighting, faceted search, real-time indexing, dynamic clustering, database integration, NoSQL features and rich document (e.g., Word, PDF) handling. Providing distributed search and index replication, Solr is designed for scalability and fault tolerance. Solr is widely used for enterprise search and analytics use cases and has an active development community and regular releases.

-- [Wikipedia](https://en.wikipedia.org/wiki/Apache_Solr)

@@@

Alpakka Solr provides Akka Stream sources and sinks for Apache Solr.

For more information about Solr please visit the [Solr documentation](http://lucene.apache.org/solr/resources.html).

Expand All @@ -19,189 +27,183 @@ The table below shows direct dependencies of this module and the second tab show
@@dependencies { projectId="solr" }


## Set up client
## Set up a Solr client

Sources, Flows and Sinks provided by this connector need a prepared `org.apache.solr.client.solrj.SolrClient` to
access to Solr.
Sources, Flows and Sinks provided by this connector need a prepared @javadoc[`SolrClient`](org.apache.solr.client.solrj.SolrClient) (eg. @javadoc[`CloudSolrClient`](org.apache.solr.client.solrj.impl.CloudSolrClient)) to access to Solr.


Scala
: @@snip [snip](/solr/src/test/scala/akka/stream/alpakka/solr/SolrSpec.scala) { #init-client }
: @@snip [snip](/solr/src/test/scala/docs/scaladsl/SolrSpec.scala) { #init-client }

Java
: @@snip [snip](/solr/src/test/java/akka/stream/alpakka/solr/SolrTest.java) { #init-client }
: @@snip [snip](/solr/src/test/java/docs/javadsl/SolrTest.java) { #init-client }


## Source Usage
## Reading from Solr

Create a tuple stream.
Create a @javadoc[Solr `TupleStream`](org.apache.solr.client.solrj.io.stream.TupleStream) (eg. via @javadoc[`CloudSolrStream`](org.apache.solr.client.solrj.io.stream.CloudSolrStream)) and use `SolrSource.fromTupleStream` (@scala[@scaladoc[API](akka.stream.alpakka.solr.scaladsl.SolrSource$)]@java[@scaladoc[API](akka.stream.alpakka.solr.javadsl.SolrSource$)]) to create a source.

Scala
: @@snip [snip](/solr/src/test/scala/akka/stream/alpakka/solr/SolrSpec.scala) { #tuple-stream }
: @@snip [snip](/solr/src/test/scala/docs/scaladsl/SolrSpec.scala) { #tuple-stream }

Java
: @@snip [snip](/solr/src/test/java/akka/stream/alpakka/solr/SolrTest.java) { #tuple-stream }
: @@snip [snip](/solr/src/test/java/docs/javadsl/SolrTest.java) { #tuple-stream }


Use `SolrSource.create` to create
@scala[@scaladoc[SolrSource](akka.stream.alpakka.solr.scaladsl.SolrSource$).]
@java[@scaladoc[SolrSource](akka.stream.alpakka.solr.javadsl.SolrSource$).]
## Writing to Solr

Alpakka Solr batches updates to Solr by sending all updates of the same operation type at once to Solr. These batches are extracted from the elements within one collection sent to a Solr flow or sink. Updates of different types may be contained in a single collection sent, though. In case streams don't have natural batches of updates, you may use the `groupedWithin` operator to create count or time-based batches.

Scala
: @@snip [snip](/solr/src/test/scala/akka/stream/alpakka/solr/SolrSpec.scala) { #define-source }
Alpakka Solr offers three styles for writing to Apache Solr:

Java
: @@snip [snip](/solr/src/test/java/akka/stream/alpakka/solr/SolrTest.java) { #define-source }
1. Using @javadoc[`SolrInputDocument`](org.apache.solr.common.SolrInputDocument) (via `SolrSink.documents`, `SolrFlow.documents` and `SolrFlow.documentsWithPassThrough`)
1. Annotated *Java Bean* classes supported by @javadoc[Solr's `DocumentObjectBinder`](org.apache.solr.client.solrj.beans.DocumentObjectBinder) (via `SolrSink.beans`, `SolrFlow.beans` and `SolrFlow.beansWithPassThrough`)
1. Typed streams with document binders to translate to @javadoc[`SolrInputDocument`](org.apache.solr.common.SolrInputDocument) (via `SolrSink.typeds`, `SolrFlow.typeds` and `SolrFlow.typedsWithPassThrough`)

## Sink Usage
In all variations the data is wrapped into `WriteMessage`s.

Now we can stream messages to Solr by providing the `SolrClient` to the
@scala[@scaladoc[SolrSink](akka.stream.alpakka.solr.scaladsl.SolrSink$).]
@java[@scaladoc[SolrSink](akka.stream.alpakka.solr.javadsl.SolrSink$).]

### Committing and configuration for updates

Scala
: @@snip [snip](/solr/src/test/scala/akka/stream/alpakka/solr/SolrSpec.scala) { #define-class }
Data sent to Solr is not searchable until it has been committed to the index. These are the major options for handling commits:

Java
: @@snip [snip](/solr/src/test/java/akka/stream/alpakka/solr/SolrTest.java) { #define-class }
1. The Solr installation can be configured to use **auto-commit**.
2. Specify **commit-within** in `SolrUpdateSettings` to trigger commits after every write through Alpakka Solr.
3. Use explicit committing via the `SolrClient.commit` methods on stream completion as most examples show. As `commit` is a blocking operation, choose an appropriate execution context (preferably *not* `system.dispatcher`).

### With document sink
Configuration of Solr committing is described in @extref[UpdateHandlers in SolrConfig](solr:updatehandlers-in-solrconfig.html#commits).

Use `SolrSink.document` to stream `SolrInputDocument` to Solr.

#### Available settings
| Parameter | Default | Description |
| ------------------- | ------- | ------------------------------------------------------------------------------------------------------ |
| commitWithin | -1 | Max time (in ms) before a commit will happen, -1 for explicit committing |


Scala
: @@snip [snip](/solr/src/test/scala/akka/stream/alpakka/solr/SolrSpec.scala) { #run-document }
: @@snip [snip](/solr/src/test/scala/docs/scaladsl/SolrSpec.scala) { #solr-update-settings }

Java
: @@snip [snip](/solr/src/test/java/akka/stream/alpakka/solr/SolrTest.java) { #run-document }
: @@snip [snip](/solr/src/test/java/docs/javadsl/SolrTest.java) { #solr-update-settings }

### With bean sink

Firstly, create a POJO.

Scala
: @@snip [snip](/solr/src/test/scala/akka/stream/alpakka/solr/SolrSpec.scala) { #define-bean }
### Writing `SolrInputDocument`s

Java
: @@snip [snip](/solr/src/test/java/akka/stream/alpakka/solr/SolrTest.java) { #define-bean }

Use `SolrSink.bean` to stream POJOs to Solr.
Use `SolrSink.document` to stream `SolrInputDocument` to Solr.

#### Defining mappings

Scala
: @@snip [snip](/solr/src/test/scala/akka/stream/alpakka/solr/SolrSpec.scala) { #run-bean }
: @@snip [snip](/solr/src/test/scala/docs/scaladsl/SolrSpec.scala) { #define-class }

Java
: @@snip [snip](/solr/src/test/java/akka/stream/alpakka/solr/SolrTest.java) { #run-bean }
: @@snip [snip](/solr/src/test/java/docs/javadsl/SolrTest.java) { #define-class }


### With typed sink
#### Writing `SolrInputDocument`s

Use `SolrSink.typed` to stream messages with custom binding to Solr.
Use `SolrSink.documents`, `SolrFlow.documents` or `SolrFlow.documentsWithPassThrough` to stream `SolrInputDocument`s to Solr.

A `SolrClient` must be provided to
@scala[@scaladoc[SolrSink](akka.stream.alpakka.solr.scaladsl.SolrSink$) implicitly.]
@java[@scaladoc[SolrSink](akka.stream.alpakka.solr.javadsl.SolrSink$).]

Scala
: @@snip [snip](/solr/src/test/scala/akka/stream/alpakka/solr/SolrSpec.scala) { #run-typed }
: @@snip [snip](/solr/src/test/scala/docs/scaladsl/SolrSpec.scala) { #run-document }

Java
: @@snip [snip](/solr/src/test/java/akka/stream/alpakka/solr/SolrTest.java) { #run-typed }
: @@snip [snip](/solr/src/test/java/docs/javadsl/SolrTest.java) { #run-document }

### Configuration

We can configure the sink by `SolrUpdateSettings`.
### Writing Java beans

Firstly, create a POJO.

Scala
: @@snip [snip](/solr/src/test/scala/akka/stream/alpakka/solr/SolrSpec.scala) { #solr-update-settings }
: @@snip [snip](/solr/src/test/scala/docs/scaladsl/SolrSpec.scala) { #define-bean }

Java
: @@snip [snip](/solr/src/test/java/akka/stream/alpakka/solr/SolrTest.java) { #solr-update-settings }
: @@snip [snip](/solr/src/test/java/docs/javadsl/SolrTest.java) { #define-bean }

Use `SolrSink.beans`, `SolrFlow.beans` or `SolrFlow.beansWithPassThrough` to stream POJOs to Solr.

| Parameter | Default | Description |
| ------------------- | ------- | ------------------------------------------------------------------------------------------------------ |
| commitWithin | -1 | Max time (in ms) before a commit will happen, -1 for manual committing |
Scala
: @@snip [snip](/solr/src/test/scala/docs/scaladsl/SolrSpec.scala) { #run-bean }

Java
: @@snip [snip](/solr/src/test/java/docs/javadsl/SolrTest.java) { #run-bean }

### Update atomically documents

We can update atomically documents.
### Writing arbitrary classes via custom binding

Use `SolrSink.typeds`, `SolrFlow.typeds` or `SolrFlow.typedsWithPassThrough` to stream messages with custom binding to Solr.

Scala
: @@snip [snip](/solr/src/test/scala/akka/stream/alpakka/solr/SolrSpec.scala) { #update-atomically-documents }
: @@snip [snip](/solr/src/test/scala/docs/scaladsl/SolrSpec.scala) { #run-typed }

Java
: @@snip [snip](/solr/src/test/java/akka/stream/alpakka/solr/SolrTest.java) { #update-atomically-documents }

We can use typed and bean to update atomically.
: @@snip [snip](/solr/src/test/java/docs/javadsl/SolrTest.java) { #run-typed }

If a collection contains a router field, we have to use the IncomingAtomicUpdateMessage with the router field parameter.

### Delete documents by ids
#### Using a flow with custom binding

We can delete documents by ids.
You can also build flow stages with
@scala[@scaladoc[SolrFlow](akka.stream.alpakka.solr.scaladsl.SolrFlow$).]
@java[@scaladoc[SolrFlow](akka.stream.alpakka.solr.javadsl.SolrFlow$).]

Scala
: @@snip [snip](/solr/src/test/scala/akka/stream/alpakka/solr/SolrSpec.scala) { #delete-documents }
: @@snip [snip](/solr/src/test/scala/docs/scaladsl/SolrSpec.scala) { #typeds-flow }

Java
: @@snip [snip](/solr/src/test/java/akka/stream/alpakka/solr/SolrTest.java) { #delete-documents }
: @@snip [snip](/solr/src/test/java/docs/javadsl/SolrTest.java) { #typeds-flow }

We can use typed and bean to delete.

### Delete documents by query
#### Passing data through SolrFlow

We can delete documents by query.
All flow types (`documents`, `beans`, `typeds`) exist with pass-through support:
Use `SolrFlow.documentsWithPassThrough`, `SolrFlow.beansWithPassThrough` or `SolrFlow.typedsWithPassThrough`.

When streaming documents from Kafka, you might want to commit to Kafka **AFTER** the document has been written to Solr. This scenario uses implicit committing via the **commit within** setting.

Scala
: @@snip [snip](/solr/src/test/scala/akka/stream/alpakka/solr/SolrSpec.scala) { #delete-documents-query }
: @@snip [snip](/solr/src/test/scala/docs/scaladsl/SolrSpec.scala) { #kafka-example }

Java
: @@snip [snip](/solr/src/test/java/akka/stream/alpakka/solr/SolrTest.java) { #delete-documents-query }
: @@snip [snip](/solr/src/test/java/docs/javadsl/SolrTest.java) { #kafka-example }

We can use typed and bean to delete.


## Update documents

## Flow Usage

You can also build flow stages with
@scala[@scaladoc[SolrFlow](akka.stream.alpakka.solr.scaladsl.SolrFlow$).]
@java[@scaladoc[SolrFlow](akka.stream.alpakka.solr.javadsl.SolrFlow$).]
The API is similar to creating Sinks.
With `WriteMessage.createUpdateMessage` documents can be updated atomically. All flow and sink types (`documents`, `beans`, `typeds`) support atomic updates.

Scala
: @@snip [snip](/solr/src/test/scala/akka/stream/alpakka/solr/SolrSpec.scala) { #run-flow }
: @@snip [snip](/solr/src/test/scala/docs/scaladsl/SolrSpec.scala) { #update-atomically-documents }

Java
: @@snip [snip](/solr/src/test/java/akka/stream/alpakka/solr/SolrTest.java) { #run-flow }
: @@snip [snip](/solr/src/test/java/docs/javadsl/SolrTest.java) { #update-atomically-documents }

If a collection contains a router field, use the `WriteMessage.createUpdateMessage(...).withRoutingFieldValue(..)` to set the router field.

### Passing data through SolrFlow

Use `SolrFlow.documentWithPassThrough`, `SolrFlow.beanWithPassThrough` or `SolrFlow.typedWithPassThrough`.
## Delete documents by ids

When streaming documents from Kafka, you might want to commit to Kafka **AFTER** the document has been written to Solr.
With `WriteMessage.createDeleteMessage(id)` documents may be deleted by ID. All flow and sink types (`documents`, `beans`, `typeds`) support deleting.

Scala
: @@snip [snip](/solr/src/test/scala/akka/stream/alpakka/solr/SolrSpec.scala) { #kafka-example }
: @@snip [snip](/solr/src/test/scala/docs/scaladsl/SolrSpec.scala) { #delete-documents }

Java
: @@snip [snip](/solr/src/test/java/akka/stream/alpakka/solr/SolrTest.java) { #kafka-example }
: @@snip [snip](/solr/src/test/java/docs/javadsl/SolrTest.java) { #delete-documents }


### Running the example code
## Delete documents by query

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
With `WriteMessage.createDeleteByQueryMessage(query)` documents matching a query may be deleted. All flow and sink types (`documents`, `beans`, `typeds`) support deleting.

Scala
: ```
sbt
> solr/testOnly *.SolrSpec
```
: @@snip [snip](/solr/src/test/scala/docs/scaladsl/SolrSpec.scala) { #delete-documents-query }

Java
: ```
sbt
> solr/testOnly *.SolrTest
```
: @@snip [snip](/solr/src/test/java/docs/javadsl/SolrTest.java) { #delete-documents-query }
11 changes: 6 additions & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -346,15 +346,16 @@ object Dependencies {
)
)

val SolrjVersion = "7.4.0"
val SolrVersionForDocs = "7_4"

val Solr = {
val solrjVersion = "7.4.0"
val slf4jVersion = "1.7.25"
Seq(
libraryDependencies ++= Seq(
"org.apache.solr" % "solr-solrj" % solrjVersion, // ApacheV2
//Test
"org.apache.solr" % "solr-test-framework" % solrjVersion % Test, // ApacheV2
"org.slf4j" % "slf4j-log4j12" % slf4jVersion % Test // MIT like: http://www.slf4j.org/license.html
"org.apache.solr" % "solr-solrj" % SolrjVersion, // ApacheV2
"org.apache.solr" % "solr-test-framework" % SolrjVersion % Test exclude("org.apache.logging.log4j", "log4j-slf4j-impl"), // ApacheV2
"org.slf4j" % "log4j-over-slf4j" % slf4jVersion % Test // MIT like: http://www.slf4j.org/license.html
),
resolvers += ("restlet" at "https://maven.restlet.com")
)
Expand Down
Loading