|
| 1 | +# Apache Spark SQL Streaming connector for Google PubSub Lite (Beta) |
| 2 | + |
| 3 | +The connector is a custom implementation of [Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) |
| 4 | +that supports reading messages from [Google PubSub Lite](https://cloud.google.com/pubsub/lite/docs) subscriptions into Spark. |
| 5 | + |
| 6 | +## Beta Disclaimer |
| 7 | + |
| 8 | +This connector is in Beta and are subject to change. |
| 9 | + |
| 10 | +## Requirements |
| 11 | + |
| 12 | +### Enable the PubSub Lite API |
| 13 | + |
| 14 | +Follow [these instructions](https://cloud.google.com/pubsub/lite/docs/quickstart#before-you-begin). |
| 15 | + |
| 16 | +### Create a new subscription or use existing subscription |
| 17 | + |
| 18 | +Follow [the instruction](https://cloud.google.com/pubsub/lite/docs/quickstart#create_a_lite_subscription) to create a new |
| 19 | +subscription or use existing subscription. If using existing subscription, the connector will read message from the |
| 20 | +oldest unacknowledged. |
| 21 | + |
| 22 | +### Create a Google Cloud Dataproc cluster (Optional) |
| 23 | + |
| 24 | +If you do not have an Apache Spark environment you can create a Cloud Dataproc cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can use `spark-submit` on any cluster. |
| 25 | + |
| 26 | +``` |
| 27 | +MY_CLUSTER=... |
| 28 | +gcloud dataproc clusters create "$MY_CLUSTER" |
| 29 | +``` |
| 30 | + |
| 31 | +## Downloading and Using the Connector |
| 32 | + |
| 33 | +<!--- TODO(jiangmichael): Add jar link for spark-pubsublite-latest.jar --> |
| 34 | +The latest version connector of the connector (Scala 2.11) is publicly available in |
| 35 | +gs://spark-lib/pubsublite/spark-pubsublite-latest.jar. |
| 36 | + |
| 37 | +<!--- TODO(jiangmichael): Release on Maven Central and add Maven Central link --> |
| 38 | +The connector is also available from the Maven Central |
| 39 | +repository. It can be used using the `--packages` option or the |
| 40 | +`spark.jars.packages` configuration property. Use the following value |
| 41 | + |
| 42 | +| Scala version | Connector Artifact | |
| 43 | +| --- | --- | |
| 44 | +| Scala 2.11 | `com.google.cloud.pubsublite.spark:pubsublite-spark-sql-streaming-with-dependencies_2.11:0.1.0` | |
| 45 | + |
| 46 | +<!--- TODO(jiangmichael): Add exmaple code and brief description here --> |
| 47 | + |
| 48 | +## Usage |
| 49 | + |
| 50 | +### Reading data from PubSub Lite |
| 51 | + |
| 52 | +``` |
| 53 | +df = spark.readStream \ |
| 54 | + .option("pubsublite.subscription", "projects/123456789/locations/us-central1-a/subscriptions/test-spark-subscription") |
| 55 | + .format("pubsublite") \ |
| 56 | + .load |
| 57 | +``` |
| 58 | + |
| 59 | +Note that the connector supports both MicroBatch Processing and [Continuous Processing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing). |
| 60 | + |
| 61 | +### Properties |
| 62 | + |
| 63 | +The connector supports a number of options to configure the read: |
| 64 | + |
| 65 | +| Option | Type | Required | Meaning | |
| 66 | +| ------ | ---- | -------- | ------- | |
| 67 | +| pubsublite.subscription | String | Y | Full subscription path that the connector will read from. | |
| 68 | +| pubsublite.flowcontrol.byteoutstandingperpartition | Long | N | Max number of bytes per partition that will be cached in workers before Spark processes the messages. Default to 50000000 bytes. | |
| 69 | +| pubsublite.flowcontrol.messageoutstandingperpartition | Long | N | Max number of messages per partition that will be cached in workers before Spark processes the messages. Default to Long.MAX_VALUE. | |
| 70 | +| gcp.credentials.key | String | N | Service account JSON in base64. Default to [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically). | |
| 71 | + |
| 72 | +### Data Schema |
| 73 | + |
| 74 | +The connector has fixed data schema as follows: |
| 75 | + |
| 76 | +| Data Field | Spark Data Type | Notes | |
| 77 | +| ---------- | --------------- | ----- | |
| 78 | +| subscription | StringType | Full subscription path | |
| 79 | +| partition | LongType | | |
| 80 | +| offset | LongType | | |
| 81 | +| key | BinaryType | | |
| 82 | +| data | BinaryType | | |
| 83 | +| attributes | MapType\[StringType, ArrayType\[BinaryType\]\] | | |
| 84 | +| publish_timestamp | TimestampType | | |
| 85 | +| event_timestamp | TimestampType | Nullable | |
| 86 | + |
| 87 | +## Compiling with the connector |
| 88 | + |
| 89 | +To include the connector in your project: |
| 90 | + |
| 91 | +### Maven |
| 92 | + |
| 93 | +```xml |
| 94 | +<dependency> |
| 95 | + <groupId>com.google.cloud.pubsublite.spark</groupId> |
| 96 | + <artifactId>pubsublite-spark-sql-streaming-with-dependencies_2.11</artifactId> |
| 97 | + <version>0.1.0</version> |
| 98 | +</dependency> |
| 99 | +``` |
| 100 | + |
| 101 | +### SBT |
| 102 | + |
| 103 | +```sbt |
| 104 | +libraryDependencies += "com.google.cloud.pubsublite.spark" %% "pubsublite-spark-sql-streaming-with-dependencies_2.11" % "0.1.0" |
| 105 | +``` |
| 106 | + |
| 107 | +## Building the Connector |
| 108 | + |
| 109 | +The connector is built using Maven. Following command creates a jar with shaded dependencies: |
| 110 | + |
| 111 | +``` |
| 112 | +mvn package |
| 113 | +``` |
| 114 | + |
| 115 | +## FAQ |
| 116 | + |
| 117 | +### What is the Pricing for the PubSub Lite? |
| 118 | + |
| 119 | +See the [PubSub Lite pricing documentation](https://cloud.google.com/pubsub/lite/pricing). |
| 120 | + |
| 121 | +### Can I configure the number of spark partitions? |
| 122 | + |
| 123 | +No, the number of spark partitions is set to be the number of PubSub Lite partitions of the topic that the supplied subscription is for. |
| 124 | + |
| 125 | +### How do I authenticate outside GCE / Dataproc? |
| 126 | + |
| 127 | +Use a service account JSON key and `GOOGLE_APPLICATION_CREDENTIALS` as described [here](https://cloud.google.com/docs/authentication/getting-started). |
| 128 | + |
| 129 | +Credentials can be provided with `gcp.credentials.key` option, it needs be passed in as a base64-encoded string directly. |
| 130 | + |
| 131 | +Example: |
| 132 | +``` |
| 133 | +spark.readStream.format("pubsublite").option("gcp.credentials.key", "<SERVICE_ACCOUNT_JSON_IN_BASE64>") |
| 134 | +``` |
0 commit comments