Unified data processing with Apache Pulsar and Apache Spark.
- Java 8 or later
- Spark 2.4.0 or later
- Pulsar 2.4.0 or later
For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact:
groupId = io.streamnative.connectors
artifactId = pulsar-spark-connector_{{SCALA_BINARY_VERSION}}
version = {{PULSAR_SPARK_VERSION}}
Currently, the artifact is available in Bintray Maven repository of StreamNative.
For Maven project, you can add the repository to your pom.xml
as follows:
<repositories>
<repository>
<id>central</id>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>bintray-streamnative-maven</id>
<name>bintray</name>
<url>https://dl.bintray.com/streamnative/maven</url>
</repository>
</repositories>
As with any Spark applications, spark-submit
is used to launch your application.
pulsar-spark-connector_{{SCALA_BINARY_VERSION}}
and its dependencies can be directly added to spark-submit
using --packages
.
Example
$ ./bin/spark-submit
--packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}}
--repositories https://dl.bintray.com/streamnative/maven
...
For experimenting on spark-shell
(or pyspark
for Python), you can also use --packages
to add pulsar-spark-connector_{{SCALA_BINARY_VERSION}}
and its dependencies directly.
Example
$ ./bin/spark-shell
--packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}}
--repositories https://dl.bintray.com/streamnative/maven
...
When locating an artifact or library, --packages
option checks the following repositories in order:
-
Local maven repository
-
Maven central repository
-
Other repositories specified by
--repositories
The format for the coordinates should be groupId:artifactId:version
.
For more information about submitting applications with external dependencies, see Application Submission Guide.
The following examples are in Scala.
// Subscribe to 1 topic
val df = spark
.readStream
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("topic", "topic1")
.load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics
val df = spark
.readStream
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("topics", "topic1,topic2")
.load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern
val df = spark
.readStream
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("topicsPattern", "topic.*")
.load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
For more information on how to use other language bindings for Spark Structured Streaming, see Structured Streaming Programming Guide.
If you have a use case that is better suited to batch processing, you can create a Dataset/DataFrame for a defined range of offsets.
The following examples are in Scala.
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.read
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("topic", "topic1")
.load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Pulsar offsets
import org.apache.spark.sql.pulsar.JsonUtils._
val startingOffsets = topicOffsets(Map("topic1" -> messageId1, "topic2" -> messageId2))
val endingOffsets = topicOffsets(...)
val df = spark
.read
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("endingOffsets", endingOffsets)
.load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.read
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("topicsPattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
The following options must be set for the Pulsar source for both batch and streaming queries.
Option | Value | Description |
---|---|---|
`topic` | A topic name string | The topic to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source. |
`topics` | A comma-separated list of topics | The topic list to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source. |
`topicsPattern` | A Java regex string | The pattern used to subscribe to topic(s). Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source. |
`service.url` | A service URL of your Pulsar cluster | The Pulsar `serviceUrl` configuration. |
`admin.url` | A service HTTP URL of your Pulsar cluster | The Pulsar `serviceHttpUrl` configuration. |
The following configurations are optional.
Option | Value | Default | Query Type | Description |
---|---|---|---|---|
`startingOffsets` | The following are valid values:
|
|
Streaming and batch queries |
Note:
|
`endingOffsets` | The following are valid values:
Example {"topic-1":[8,12,16,102,24,2,32,2],"topic-5":[8,16,16,106,24,6,32,6]} |
"latest" | Batch query |
|
`failOnDataLoss` | The following are valid values:
|
true | Streaming query |
This may cause a false alarm. You can set it to A batch query always fails if it fails to read any data from the provided offsets due to data loss. |
Should the Pulsar cluster require authentication, credentials can be set in the following way.
The following examples are in Scala.
// Secure connection with authentication, using the same credentials on the
// Pulsar client and admin interface (if not given explicitly, the client configuration
// is used for admin as well).
val df = spark
.readStream
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
.option("pulsar.client.authParams","token:<valid client JWT token>")
.option("topicsPattern", "sensitiveTopic")
.load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Secure connection with authentication, using different credentials for
// Pulsar client and admin interfaces.
val df = spark
.readStream
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("pulsar.admin.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
.option("pulsar.admin.authParams","token:<valid admin JWT token>")
.option("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
.option("pulsar.client.authParams","token:<valid client JWT token>")
.option("topicsPattern", "sensitiveTopic")
.load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Secure connection with client TLS enabled.
// Note that the certificate file has to be present at the specified
// path on every machine of the cluster!
val df = spark
.readStream
.format("pulsar")
.option("service.url", "pulsar+ssl://localhost:6651")
.option("admin.url", "http://localhost:8080")
.option("pulsar.admin.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
.option("pulsar.admin.authParams","token:<valid admin JWT token>")
.option("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
.option("pulsar.client.authParams","token:<valid client JWT token>")
.option("pulsar.client.tlsTrustCertsFilePath","/path/to/tls/cert/cert.pem")
.option("pulsar.client.tlsAllowInsecureConnection","false")
.option("pulsar.client.tlsHostnameVerificationenable","true")
.option("topicsPattern", "sensitiveTopic")
.load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
-
For topics without schema or with primitive schema in Pulsar, messages' payload is loaded to a
value
column with the corresponding type with Pulsar schema. -
For topics with Avro or JSON schema, their field names and field types are kept in the result rows.
Besides, each row in the source has the following metadata fields as well.
Column | Type |
---|---|
`__key` | Binary |
`__topic` | String |
`__messageId` | Binary |
`__publishTime` | Timestamp |
`__eventTime` | Timestamp |
** Example**
The topic of AVRO schema s in Pulsar is as below:
case class Foo(i: Int, f: Float, bar: Bar)
case class Bar(b: Boolean, s: String)
val s = Schema.AVRO(Foo.getClass)
has the following schema as a DataFrame/DataSet in Spark:
root
|-- i: integer (nullable = false)
|-- f: float (nullable = false)
|-- bar: struct (nullable = true)
| |-- b: boolean (nullable = false)
| |-- s: string (nullable = true)
|-- __key: binary (nullable = true)
|-- __topic: string (nullable = true)
|-- __messageId: binary (nullable = true)
|-- __publishTime: timestamp (nullable = true)
|-- __eventTime: timestamp (nullable = true)
For Pulsar topic with Schema.DOUBLE
, it's schema as a DataFrame is:
root
|-- value: double (nullable = false)
|-- __key: binary (nullable = true)
|-- __topic: string (nullable = true)
|-- __messageId: binary (nullable = true)
|-- __publishTime: timestamp (nullable = true)
|-- __eventTime: timestamp (nullable = true)
The DataFrame written to Pulsar can have arbitrary schema, since each record in DataFrame is transformed as one message sent to Pulsar, fields of DataFrame are divided into two groups: __key
and __eventTime
fields are encoded as metadata of Pulsar message; other fields are grouped and encoded using AVRO and put in value()
:
producer.newMessage().key(__key).value(avro_encoded_fields).eventTime(__eventTime)
The following examples are in Scala.
// Write key-value data from a DataFrame to a specific Pulsar topic specified in an option
val ds = df
.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("topic", "topic1")
.start()
// Write key-value data from a DataFrame to Pulsar using a topic specified in the data
val ds = df
.selectExpr("__topic", "CAST(__key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.start()
The following examples are in Scala.
// Write key-value data from a DataFrame to a specific Pulsar topic specified in an option
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.write
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("topic", "topic1")
.save()
// Write key-value data from a DataFrame to Pulsar using a topic specified in the data
df.selectExpr("__topic", "CAST(__key AS STRING)", "CAST(value AS STRING)")
.write
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.save()
Currently, we provide at-least-once semantic. Consequently, when writing either streaming queries or batch queries to Pulsar, some records may be duplicated. A possible solution to remove duplicates when reading the written data could be to introduce a primary (unique) key that can be used to perform de-duplication when reading.
Client/producer/reader configurations of Pulsar can be set via DataStreamReader.option
with pulsar.client.
/pulsar.producer.
/pulsar.reader.
prefix, e.g,
stream.option("pulsar.reader.receiverQueueSize", "1000000")
.
Since the connector needs to access the Pulsar Admin interface as well, separate
configuration of the admin client can be set via the same method with the
pulsar.admin
prefix. For example: stream.option("pulsar.admin.authParams","token:<token>")
.
This can be useful if a different authentication plugin or
token need to be used. If this is not given explicitly, the client
parameters (with pulsar.client
prefix) will be used for accessing the admin
interface as well.
For possible Pulsar parameters, check docs at
Pulsar client libraries.
If you want to build a Spark-Pulsar connector reading data from Pulsar and writing results to Pulsar, follow the steps below.
- Checkout the source code.
$ git clone https://github.com/streamnative/pulsar-spark.git
$ cd pulsar-spark
- Install Docker.
Pulsar-spark connector is using Testcontainers for integration tests. In order to run the integration tests, make sure you have installed Docker.
- Set a Scala version.
Change
scala.version
andscala.binary.version
inpom.xml
.Scala version should be consistent with the Scala version of Spark you use.
- Build the project.
$ mvn clean install -DskipTests
If you get the following error during compilation, try running Maven with Java 8:
[ERROR] [Error] : Source option 6 is no longer supported. Use 7 or later.
[ERROR] [Error] : Target option 6 is no longer supported. Use 7 or later.
- Run the tests.
$ mvn clean install
Note: by configuring scalatest-maven-plugin
in the usual ways, individual tests can be executed, if that is needed:
mvn -Dsuites=org.apache.spark.sql.pulsar.CachedPulsarClientSuite clean install
This might be handy if test execution is slower, or you get a java.io.IOException: Too many open files
exception during full suite run.
Once the installation is finished, there is a fat jar generated under both local maven repo and target
directory.