Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 58 additions & 16 deletions scala/datastax-v4/aws-glue/export-to-s3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,45 @@ The following example exports data to S3 using the spark-cassandra-connector. Th


```
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.streaming.Trigger
import scala.collection.JavaConverters._
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.SaveMode._



object GlueApp {

def main(sysArgs: Array[String]) {

val spark: SparkContext = new SparkContext()
val conf = new SparkConf()
.setAll(
Seq(
("spark.cassandra.connection.config.profile.path", "cassandra-application.conf"),
("spark.cassandra.query.retry.count", "100"),

("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"),
("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"),
("spark.cassandra.concurrent.reads", "512"),

("spark.cassandra.output.concurrent.writes", "5"),
("spark.cassandra.output.batch.grouping.key", "none"),
("spark.cassandra.output.batch.size.rows", "1")
))


val spark: SparkContext = new SparkContext(conf)
val glueContext: GlueContext = new GlueContext(spark)
val sparkSession: SparkSession = glueContext.getSparkSession

Expand Down Expand Up @@ -57,32 +91,40 @@ The following configuration for connecting to Amazon Keyspaces with the spark-ca
Using the RateLimitingRequestThrottler we can ensure that request do not exceed configured Keyspaces capacity. The G1.X DPU creates one executor per worker. The RateLimitingRequestThrottler in this example is set for 1000 request per second. With this configuration and G.1X DPU you will achieve 1000 request per Glue worker. Adjust the max-requests-per-second accordingly to fit your workload. Increase the number of workers to scale throughput to a table.

```

datastax-java-driver {
basic.request.consistency = "LOCAL_QUORUM"
basic.request.consistency = "LOCAL_ONE"
basic.request.default-idempotence = true
basic.contact-points = [ "cassandra.us-east-1.amazonaws.com:9142"]

advanced.reconnect-on-init = true
advanced.reconnect-on-init = true

basic.load-balancing-policy {
local-datacenter = "us-east-1"
}
advanced.auth-provider = {
}

advanced.auth-provider = {
class = PlainTextAuthProvider
username = "user-at-sample"
password = "S@MPLE=PASSWORD="
password = "SAMPLE#PASSWORD"
}

advanced.throttler = {
class = RateLimitingRequestThrottler
max-requests-per-second = 1000
max-queue-size = 50000
drain-interval = 1 millisecond
class = RateLimitingRequestThrottler
max-requests-per-second = 1000
max-queue-size = 50000
drain-interval = 1 millisecond
}
advanced.ssl-engine-factory {

advanced.ssl-engine-factory {
class = DefaultSslEngineFactory
hostname-validation = false
}
advanced.connection.pool.local.size = 1

advanced.connection.pool.local.size = 2
advanced.resolve-contact-points = false

}

```

## Create S3 bucket to store job artifacts
Expand All @@ -93,7 +135,7 @@ aws s3 mb s3://amazon-keyspaces-backups

## Upload job artifacts to S3
The job will require
* The spark-cassandra-connector to allow reads from Amazon Keyspaces. Amazon Keyspaces recommends version 2.5.2 of the spark-cassandra-connector or above.
* The spark-cassandra-connector to allow reads from Amazon Keyspaces. Amazon Keyspaces recommends version 2.5.2 of the spark-cassandra-connector or above.
* application.conf containing the cassandra driver configuration for Keyspaces access
* export-sample.scala script containing the export code.

Expand All @@ -102,7 +144,7 @@ curl -L -O https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-con

aws s3api put-object --bucket amazon-keyspaces-backups --key jars/spark-cassandra-connector-assembly_2.11-2.5.2.jar --body spark-cassandra-connector-assembly_2.11-2.5.2.jar

aws s3api put-object --bucket amazon-keyspaces-backups --key conf/application.conf --body application.conf
aws s3api put-object --bucket amazon-keyspaces-backups --key conf/cassandra-application.conf --body cassandra-application.conf

aws s3api put-object --bucket amazon-keyspaces-backups --key scripts/export-sample.scala --body export-sample.scala

Expand All @@ -124,9 +166,9 @@ aws glue create-job \
"--KEYSPACE_NAME":"my_keyspace",
"--TABLE_NAME":"my_table",
"--S3_URI":"s3://amazon-keyspaces-backups/snap-shots/",
"--DRIVER_CONF":"cassandra-application.conf",
"--extra-jars":"s3://amazon-keyspaces-backups/jars/spark-cassandra-connector-assembly_2.11-2.5.2.jar",
"--extra-files":"s3://amazon-keyspaces-backups/conf/application.conf",
"--conf":"spark.cassandra.connection.config.profile.path=application.conf",
"--class":"GlueApp"
}'
```
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ datastax-java-driver {
advanced.auth-provider = {
class = PlainTextAuthProvider
username = "user-at-sample"
password = "S@MPLE=PASSWORD="
password = "SAMPLE#PASSWORD"
}

advanced.throttler = {
class = RateLimitingRequestThrottler
max-requests-per-second = 1000
drain-interval = 10 milliseconds
class = RateLimitingRequestThrottler
max-requests-per-second = 1000
max-queue-size = 50000
drain-interval = 1 millisecond
}

advanced.ssl-engine-factory {
Expand All @@ -27,5 +28,6 @@ datastax-java-driver {
}

advanced.connection.pool.local.size = 2
advanced.resolve-contact-points = false

}
19 changes: 18 additions & 1 deletion scala/datastax-v4/aws-glue/export-to-s3/export-sample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
Expand All @@ -19,7 +20,23 @@ object GlueApp {

def main(sysArgs: Array[String]) {

val spark: SparkContext = new SparkContext()
val conf = new SparkConf()
.setAll(
Seq(
("spark.cassandra.connection.config.profile.path", "cassandra-application.conf"),
("spark.cassandra.query.retry.count", "100"),

("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"),
("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"),
("spark.cassandra.concurrent.reads", "512"),

("spark.cassandra.output.concurrent.writes", "5"),
("spark.cassandra.output.batch.grouping.key", "none"),
("spark.cassandra.output.batch.size.rows", "1")
))


val spark: SparkContext = new SparkContext(conf)
val glueContext: GlueContext = new GlueContext(spark)
val sparkSession: SparkSession = glueContext.getSparkSession

Expand Down