Skip to content
Permalink
Browse files
[BAHIR-213] Faster S3 file Source for Structured Streaming with SQS (#91
)

Using FileStreamSource to read files from a S3 bucket has problems 
both in terms of costs and latency:

Latency: Listing all the files in S3 buckets every micro-batch can be both
slow and resource-intensive.

Costs: Making List API requests to S3 every micro-batch can be costly.

The solution is to use Amazon Simple Queue Service (SQS) which lets 
you find new files written to S3 bucket without the need to list all the 
files every micro-batch.

S3 buckets can be configured to send a notification to an Amazon SQS Queue
on Object Create / Object Delete events. For details see AWS documentation
here Configuring S3 Event Notifications

Spark can leverage this to find new files written to S3 bucket by reading 
notifications from SQS queue instead of listing files every micro-batch.

This PR adds a new SQSSource which uses Amazon SQS queue to find 
new files every micro-batch.

Usage

val inputDf = spark .readStream
   .format("s3-sqs")
   .schema(schema)
   .option("fileFormat", "json")
   .option("sqsUrl", "https://QUEUE_URL")
   .option("region", "us-east-1")
   .load()
  • Loading branch information
abhishekd0907 authored and lresende committed Dec 28, 2019
1 parent 1628c76 commit d036820c0efa1b2e9b8021506164b67582352dff
Showing 15 changed files with 1,348 additions and 0 deletions.
@@ -80,6 +80,7 @@
<module>sql-streaming-akka</module>
<module>sql-streaming-mqtt</module>
<module>sql-streaming-jdbc</module>
<module>sql-streaming-sqs</module>
<module>streaming-akka</module>
<module>streaming-mqtt</module>
<module>streaming-pubnub</module>
@@ -0,0 +1,59 @@
A library for reading data from Amzon S3 with optimised listing using Amazon SQS using Spark SQL Streaming ( or Structured streaming.).

## Linking

Using SBT:

libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-sqs" % "{{site.SPARK_VERSION}}"

Using Maven:

<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-sql-streaming-akka_{{site.SCALA_BINARY_VERSION}}</artifactId>
<version>{{site.SPARK_VERSION}}</version>
</dependency>

This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
For example, to include it when starting the spark shell:

$ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-sqs_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}}

Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
The `--packages` argument can also be used with `bin/spark-submit`.

This library is compiled for Scala 2.12 only, and intends to support Spark 2.4.0 onwards.

## Configuration options
The configuration is obtained from parameters.

Name |Default | Meaning
--- |:---:| ---
sqsUrl|required, no default value|sqs queue url, like 'https://sqs.us-east-1.amazonaws.com/330183209093/TestQueue'
region|required, no default value|AWS region where queue is created
fileFormat|required, no default value|file format for the s3 files stored on Amazon S3
schema|required, no default value|schema of the data being read
sqsFetchIntervalSeconds|10|time interval (in seconds) after which to fetch messages from Amazon SQS queue
sqsLongPollingWaitTimeSeconds|20|wait time (in seconds) for long polling on Amazon SQS queue
sqsMaxConnections|1|number of parallel threads to connect to Amazon SQS queue
sqsMaxRetries|10|Maximum number of consecutive retries in case of a connection failure to SQS before giving up
ignoreFileDeletion|false|whether to ignore any File deleted message in SQS queue
fileNameOnly|false|Whether to check new files based on only the filename instead of on the full path
shouldSortFiles|true|whether to sort files based on timestamp while listing them from SQS
useInstanceProfileCredentials|false|Whether to use EC2 instance profile credentials for connecting to Amazon SQS
maxFilesPerTrigger|no default value|maximum number of files to process in a microbatch
maxFileAge|7d|Maximum age of a file that can be found in this directory

## Example

An example to create a SQL stream which uses Amazon SQS to list files on S3,

val inputDf = sparkSession
.readStream
.format("s3-sqs")
.schema(schema)
.option("sqsUrl", queueUrl)
.option("fileFormat", "json")
.option("sqsFetchIntervalSeconds", "2")
.option("sqsLongPollingWaitTimeSeconds", "5")
.load()
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.bahir.examples.sql.streaming.sqs

import scala.util.Random

import org.apache.spark.sql.SparkSession

/**
* Example to read files from S3 using SQS Source and write results to Memory Sink
*
* Usage: SqsSourceExample <Sample Record Path to infer schema> <SQS Queue URL> <File Format>
*/

object SqsSourceExample {

def main(args: Array[String]) {

val randomName = Random.alphanumeric.take(6).mkString("")
val pathName = "path_" + randomName
val queryName = "query_" + randomName
val checkpointDir = s"/checkpoints/$pathName"
val schemaPathString = args(0)

val spark = SparkSession.builder().appName("SqsExample").getOrCreate()

val schema = spark.read.json(schemaPathString).schema

val queueUrl = args(1)

val fileFormat = args(2)

val inputDf = spark
.readStream
.format("s3-sqs")
.schema(schema)
.option("sqsUrl", queueUrl)
.option("fileFormat", fileFormat)
.option("sqsFetchIntervalSeconds", "2")
.option("sqsLongPollingWaitTimeSeconds", "5")
.option("maxFilesPerTrigger", "50")
.option("ignoreFileDeletion", "true")
.load()

val query = inputDf
.writeStream
.queryName(queryName)
.format("memory")
.option("checkpointLocation", checkpointDir)
.start()

query.awaitTermination()
}
}




@@ -0,0 +1,117 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.bahir</groupId>
<artifactId>bahir-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<groupId>org.apache.bahir</groupId>
<artifactId>spark-sql-streaming-sqs_2.12</artifactId>
<properties>
<sbt.project.name>sql-streaming-sqs</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Apache Bahir - Spark SQL Streaming SQS</name>
<url>http://bahir.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>bahir-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<version>1.11.271</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>com.amazonaws:aws-java-sdk-sqs:*</include>
<include>com.amazonaws:aws-java-sdk-core:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/maven/**</exclude>
<exclude>META-INF/MANIFEST.MF</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.streaming.sqs;

import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.AWSCredentials;
import org.apache.commons.lang.StringUtils;

public class BasicAWSCredentialsProvider implements AWSCredentialsProvider {
private final String accessKey;
private final String secretKey;

public BasicAWSCredentialsProvider(String accessKey, String secretKey) {
this.accessKey = accessKey;
this.secretKey = secretKey;
}

public AWSCredentials getCredentials() {
if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) {
return new BasicAWSCredentials(accessKey, secretKey);
}
throw new AmazonClientException(
"Access key or secret key is null");
}

public void refresh() {}

@Override
public String toString() {
return getClass().getSimpleName();
}

}
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.streaming.sqs;


import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class InstanceProfileCredentialsProviderWithRetries
extends InstanceProfileCredentialsProvider {

private static final Log LOG = LogFactory.getLog(
InstanceProfileCredentialsProviderWithRetries.class);

public AWSCredentials getCredentials() {
int retries = 10;
int sleep = 500;
while(retries > 0) {
try {
return super.getCredentials();
}
catch (RuntimeException re) {
LOG.error("Got an exception while fetching credentials " + re);
--retries;
try {
Thread.sleep(sleep);
} catch (InterruptedException ie) {
// Do nothing
}
if (sleep < 10000) {
sleep *= 2;
}
}
catch (Error error) {
LOG.error("Got an exception while fetching credentials " + error);
--retries;
try {
Thread.sleep(sleep);
} catch (InterruptedException ie) {
// Do nothing
}
if (sleep < 10000) {
sleep *= 2;
}
}
}
throw new AmazonClientException("Unable to load credentials.");
}
}
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.spark.sql.streaming.sqs.SqsSourceProvider

0 comments on commit d036820

Please sign in to comment.