A custom sink provider for Apache Spark that sends the contents of a dataframe to AWS SQS.
It grabs the content of the first column of the dataframe and sends it to an AWS SQS queue. It needs the following parameters:
- region of the queue (us-east-2, for instance)
- name of the queue
- batch size so we can group N messages in one call
- queueOwnerAWSAccountId you might have an architecture where the Spark job and the SQS are in different AWS accounts. In that case, you can specify one extra option to make the writer aware of which account to use. This is an optional argument.
df.write()
.format("sqs")
.mode(SaveMode.Append)
.option("region", "us-east-2")
.option("queueName", "my-test-queue")
.option("batchSize", "10")
.option("queueOwnerAWSAccountId", "123456789012") // optional
.save();
The dataframe:
- must have a column called value (string), because this column will be used as the body of each message.
- may have a column called msg_attributes (map of [string, string]). In this case, the library will add each key/value as a metadata attribute to the SQS message.
- may have a column called group_id (string). In this case, the library will add the group id used by FIFO queues.
The folder /spark-aws-messaging/src/test/resources contains some PySpark simple examples used in the integration tests (the endpoint option is not required).
Don't forget you'll need to configure the default credentials in your machine before running the example. See Configuration and credential file settings for more information.
It also needs the com.amazonaws:aws-java-sdk-sqs package to run, so you can provide it through the packages parameter of spark-submit.
The following command can be used to run the sample of how to use this library.
spark-submit \
--packages com.fabiogouw:spark-aws-messaging:1.1.0,com.amazonaws:aws-java-sdk-sqs:1.12.13 \
test.py sample.txt
And this is the test.py file content.
import sys
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
print("File: " + sys.argv[1])
spark = SparkSession\
.builder\
.appName("SQS Write")\
.getOrCreate()
df = spark.read.text(sys.argv[1])
df.show()
df.printSchema()
df.write.format("sqs").mode("append")\
.option("queueName", "test")\
.option("batchSize", "10") \
.option("region", "us-east-2") \
.save()
spark.stop()
This library is available at Maven Central repository, so you can reference it in your project with the following snippet.
<dependency>
<groupId>com.fabiogouw</groupId>
<artifactId>spark-aws-messaging</artifactId>
<version>1.1.0</version>
</dependency>
The IAM permissions needed for this library to write on a SQS queue are sqs:GetQueueUrl and sqs:SendMessage.
The sink is at least once so some messages might be duplicated. If something wrong happens when the data is being written by a worker node, Spark will retry the task in another node. Messages that have already been sent could be sent again.
It's easy to get lost while understanding all the classes are needed, so we can create a custom sink for Spark. Here's a class diagram to make it a little easy to find yourself. Start at SQSSinkProvider, it's the class that we configure in Spark code as a format method's value.