Skip to content

Commit 2921585

Browse files
feat: Pub/Sub Lite Spark Connector (#3)
Spark SQL Structured Streaming
1 parent 23a378f commit 2921585

32 files changed

Lines changed: 2952 additions & 1 deletion

.gitignore

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Compiled class file
2+
*.class
3+
4+
# Log file
5+
*.log
6+
7+
# BlueJ files
8+
*.ctxt
9+
10+
# Mobile Tools for Java (J2ME)
11+
.mtj.tmp/
12+
13+
# Package Files #
14+
*.jar
15+
*.war
16+
*.nar
17+
*.ear
18+
*.zip
19+
*.tar.gz
20+
*.rar
21+
22+
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
23+
hs_err_pid*
24+
25+
# intellij project folder
26+
.idea/
27+
*.iml
28+
29+
# maven build directory
30+
target/
31+
32+
.flattened-pom.xml

.readme-partials.yaml

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
custom_content: |
2+
## Requirements
3+
4+
### Enable the PubSub Lite API
5+
6+
Follow [these instructions](https://cloud.google.com/pubsub/lite/docs/quickstart#before-you-begin).
7+
8+
### Create a new subscription or use existing subscription
9+
10+
Follow [the instruction](https://cloud.google.com/pubsub/lite/docs/quickstart#create_a_lite_subscription) to create a new
11+
subscription or use existing subscription. If using existing subscription, the connector will read message from the
12+
oldest unacknowledged.
13+
14+
### Create a Google Cloud Dataproc cluster (Optional)
15+
16+
If you do not have an Apache Spark environment you can create a Cloud Dataproc cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can use `spark-submit` on any cluster.
17+
18+
```
19+
MY_CLUSTER=...
20+
gcloud dataproc clusters create "$MY_CLUSTER"
21+
```
22+
23+
## Downloading and Using the Connector
24+
25+
<!--- TODO(jiangmichael): Add jar link for spark-pubsublite-latest.jar -->
26+
The latest version connector of the connector (Scala 2.11) is publicly available in
27+
gs://spark-lib/pubsublite/spark-pubsublite-latest.jar.
28+
29+
<!--- TODO(jiangmichael): Release on Maven Central and add Maven Central link -->
30+
The connector is also available from the Maven Central
31+
repository. It can be used using the `--packages` option or the
32+
`spark.jars.packages` configuration property. Use the following value
33+
34+
| Scala version | Connector Artifact |
35+
| --- | --- |
36+
| Scala 2.11 | `com.google.cloud.pubsublite.spark:pubsublite-spark-sql-streaming-with-dependencies_2.11:0.1.0` |
37+
38+
<!--- TODO(jiangmichael): Add exmaple code and brief description here -->
39+
40+
## Usage
41+
42+
### Reading data from PubSub Lite
43+
44+
```
45+
df = spark.readStream \
46+
.option("pubsublite.subscription", "projects/123456789/locations/us-central1-a/subscriptions/test-spark-subscription")
47+
.format("pubsublite") \
48+
.load
49+
```
50+
51+
Note that the connector supports both MicroBatch Processing and [Continuous Processing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing).
52+
53+
### Properties
54+
55+
The connector supports a number of options to configure the read:
56+
57+
| Option | Type | Required | Meaning |
58+
| ------ | ---- | -------- | ------- |
59+
| pubsublite.subscription | String | Y | Full subscription path that the connector will read from. |
60+
| pubsublite.flowcontrol.byteoutstandingperpartition | Long | N | Max number of bytes per partition that will be cached in workers before Spark processes the messages. Default to 50000000 bytes. |
61+
| pubsublite.flowcontrol.messageoutstandingperpartition | Long | N | Max number of messages per partition that will be cached in workers before Spark processes the messages. Default to Long.MAX_VALUE. |
62+
| gcp.credentials.key | String | N | Service account JSON in base64. Default to [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically). |
63+
64+
### Data Schema
65+
66+
The connector has fixed data schema as follows:
67+
68+
| Data Field | Spark Data Type | Notes |
69+
| ---------- | --------------- | ----- |
70+
| subscription | StringType | Full subscription path |
71+
| partition | LongType | |
72+
| offset | LongType | |
73+
| key | BinaryType | |
74+
| data | BinaryType | |
75+
| attributes | MapType\[StringType, ArrayType\[BinaryType\]\] | |
76+
| publish_timestamp | TimestampType | |
77+
| event_timestamp | TimestampType | Nullable |
78+
79+
## Compiling with the connector
80+
81+
To include the connector in your project:
82+
83+
### Maven
84+
85+
```xml
86+
<dependency>
87+
<groupId>com.google.cloud.pubsublite.spark</groupId>
88+
<artifactId>pubsublite-spark-sql-streaming-with-dependencies_2.11</artifactId>
89+
<version>0.1.0</version>
90+
</dependency>
91+
```
92+
93+
### SBT
94+
95+
```sbt
96+
libraryDependencies += "com.google.cloud.pubsublite.spark" %% "pubsublite-spark-sql-streaming-with-dependencies_2.11" % "0.1.0"
97+
```
98+
99+
## Building the Connector
100+
101+
The connector is built using Maven. Following command creates a jar with shaded dependencies:
102+
103+
```
104+
mvn package
105+
```
106+
107+
## FAQ
108+
109+
### What is the Pricing for the PubSub Lite?
110+
111+
See the [PubSub Lite pricing documentation](https://cloud.google.com/pubsub/lite/pricing).
112+
113+
### Can I configure the number of spark partitions?
114+
115+
No, the number of spark partitions is set to be the number of PubSub Lite partitions of the topic that the supplied subscription is for.
116+
117+
### How do I authenticate outside GCE / Dataproc?
118+
119+
Use a service account JSON key and `GOOGLE_APPLICATION_CREDENTIALS` as described [here](https://cloud.google.com/docs/authentication/getting-started).
120+
121+
Credentials can be provided with `gcp.credentials.key` option, it needs be passed in as a base64-encoded string directly.
122+
123+
Example:
124+
```
125+
spark.readStream.format("pubsublite").option("gcp.credentials.key", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
126+
```

README.md

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)