Skip to content

Commit b5edda6

Browse files
docs: Add write support documentations (#132)
1 parent 1e20e48 commit b5edda6

1 file changed

Lines changed: 57 additions & 3 deletions

File tree

.readme-partials.yaml

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,56 @@ custom_content: |
2828
2929
## Usage
3030
31+
### Samples
32+
33+
There are 3 java samples (word count, simple write, simple read) under [samples](https://github.com/googleapis/java-pubsublite-spark/tree/master/samples) that shows using the connector inside Dataproc.
34+
3135
### Reading data from Pub/Sub Lite
3236
37+
Here is an example in Python:
3338
```python
3439
df = spark.readStream \
35-
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID")
3640
.format("pubsublite") \
41+
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID") \
3742
.load
3843
```
44+
Here is an example in Java:
45+
```java
46+
Dataset<Row> df = spark
47+
.readStream()
48+
.format("pubsublite")
49+
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID"t )
50+
.load();
51+
```
3952
4053
Note that the connector supports both MicroBatch Processing and [Continuous Processing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing).
4154
55+
### Writing data to Pub/Sub Lite
56+
57+
Here is an example in Python:
58+
```python
59+
df.writeStream \
60+
.format("pubsublite") \
61+
.option("pubsublite.topic", "projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID") \
62+
.option("checkpointLocation", "path/to/HDFS/dir")
63+
.outputMode("complete") \
64+
.trigger(processingTime="2 seconds") \
65+
.start()
66+
```
67+
Here is an example in Java:
68+
```java
69+
df.writeStream()
70+
.format("pubsublite")
71+
.option("pubsublite.topic", "projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID")
72+
.option("checkpointLocation", "path/to/HDFS/dir")
73+
.outputMode(OutputMode.Complete())
74+
.trigger(Trigger.ProcessingTime(2, TimeUnit.SECONDS))
75+
.start();
76+
```
77+
4278
### Properties
4379
44-
The connector supports a number of options to configure the read:
80+
When reading from Pub/Sub Lite, the connector supports a number of configuration options:
4581
4682
| Option | Type | Required | Default Value | Meaning |
4783
| ------ | ---- | -------- | ------------- | ------- |
@@ -51,9 +87,16 @@ custom_content: |
5187
| pubsublite.flowcontrol.maxmessagesperbatch | Long | N | Long.MAX | Max number of messages in micro batch. |
5288
| gcp.credentials.key | String | N | [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically) | Service account JSON in base64. |
5389
90+
When writing to Pub/Sub Lite, the connector supports a number of configuration options:
91+
92+
| Option | Type | Required | Default Value | Meaning |
93+
| ------ | ---- | -------- | ------------- | ------- |
94+
| pubsublite.topic | String | Y | | Full topic path that the connector will write to. |
95+
| gcp.credentials.key | String | N | [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically) | Service account JSON in base64. |
96+
5497
### Data Schema
5598
56-
The connector has fixed data schema as follows:
99+
When reading from Pub/Sub Lite, the connector has a fixed data schema as follows:
57100
58101
| Data Field | Spark Data Type | Notes |
59102
| ---------- | --------------- | ----- |
@@ -66,6 +109,17 @@ custom_content: |
66109
| publish_timestamp | TimestampType | |
67110
| event_timestamp | TimestampType | Nullable |
68111
112+
When writing to Pub/Sub Lite, the connetor matches the following data field and data types as follows:
113+
114+
| Data Field | Spark Data Type | Required |
115+
| ---------- | --------------- | ----- |
116+
| key | BinaryType | N |
117+
| data | BinaryType | N |
118+
| attributes | MapType\[StringType, ArrayType\[BinaryType\]\] | N |
119+
| event_timestamp | TimestampType | N |
120+
121+
Note that when a data field is present in the table but the data type mismatches, the connector will throw IllegalArgumentException that terminates the query.
122+
69123
## Building the Connector
70124
71125
The connector is built using Maven. Following command creates a JAR file with shaded dependencies:

0 commit comments

Comments
 (0)