From f16b56ac05c66e06d2a9f9e3939479e946374e8d Mon Sep 17 00:00:00 2001 From: michaelraney Date: Thu, 16 Jun 2022 19:53:20 -0400 Subject: [PATCH] update example for common spark config paramters update example for common spark config paramters --- .../aws-glue/export-to-s3/README.md | 74 +++++++++++++++---- ...cation.conf => cassandra-application.conf} | 10 ++- .../aws-glue/export-to-s3/export-sample.scala | 19 ++++- 3 files changed, 82 insertions(+), 21 deletions(-) rename scala/datastax-v4/aws-glue/export-to-s3/{application.conf => cassandra-application.conf} (72%) diff --git a/scala/datastax-v4/aws-glue/export-to-s3/README.md b/scala/datastax-v4/aws-glue/export-to-s3/README.md index 6c3fc50..7e9eb57 100644 --- a/scala/datastax-v4/aws-glue/export-to-s3/README.md +++ b/scala/datastax-v4/aws-glue/export-to-s3/README.md @@ -11,11 +11,45 @@ The following example exports data to S3 using the spark-cassandra-connector. Th ``` +import com.amazonaws.services.glue.GlueContext +import com.amazonaws.services.glue.util.GlueArgParser +import com.amazonaws.services.glue.util.Job +import org.apache.spark.SparkContext +import org.apache.spark.SparkConf +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Row +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.from_json +import org.apache.spark.sql.streaming.Trigger +import scala.collection.JavaConverters._ +import com.datastax.spark.connector._ +import org.apache.spark.sql.cassandra._ +import org.apache.spark.sql.SaveMode._ + + + object GlueApp { def main(sysArgs: Array[String]) { - val spark: SparkContext = new SparkContext() + val conf = new SparkConf() + .setAll( + Seq( + ("spark.cassandra.connection.config.profile.path", "cassandra-application.conf"), + ("spark.cassandra.query.retry.count", "100"), + + ("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"), + ("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"), + ("spark.cassandra.concurrent.reads", "512"), + + ("spark.cassandra.output.concurrent.writes", "5"), + ("spark.cassandra.output.batch.grouping.key", "none"), + ("spark.cassandra.output.batch.size.rows", "1") + )) + + + val spark: SparkContext = new SparkContext(conf) val glueContext: GlueContext = new GlueContext(spark) val sparkSession: SparkSession = glueContext.getSparkSession @@ -57,32 +91,40 @@ The following configuration for connecting to Amazon Keyspaces with the spark-ca Using the RateLimitingRequestThrottler we can ensure that request do not exceed configured Keyspaces capacity. The G1.X DPU creates one executor per worker. The RateLimitingRequestThrottler in this example is set for 1000 request per second. With this configuration and G.1X DPU you will achieve 1000 request per Glue worker. Adjust the max-requests-per-second accordingly to fit your workload. Increase the number of workers to scale throughput to a table. ``` + datastax-java-driver { - basic.request.consistency = "LOCAL_QUORUM" + basic.request.consistency = "LOCAL_ONE" + basic.request.default-idempotence = true basic.contact-points = [ "cassandra.us-east-1.amazonaws.com:9142"] - - advanced.reconnect-on-init = true + advanced.reconnect-on-init = true basic.load-balancing-policy { local-datacenter = "us-east-1" - } - advanced.auth-provider = { + } + + advanced.auth-provider = { class = PlainTextAuthProvider username = "user-at-sample" - password = "S@MPLE=PASSWORD=" + password = "SAMPLE#PASSWORD" } + advanced.throttler = { - class = RateLimitingRequestThrottler - max-requests-per-second = 1000 - max-queue-size = 50000 - drain-interval = 1 millisecond + class = RateLimitingRequestThrottler + max-requests-per-second = 1000 + max-queue-size = 50000 + drain-interval = 1 millisecond } - advanced.ssl-engine-factory { + + advanced.ssl-engine-factory { class = DefaultSslEngineFactory hostname-validation = false } - advanced.connection.pool.local.size = 1 + + advanced.connection.pool.local.size = 2 + advanced.resolve-contact-points = false + } + ``` ## Create S3 bucket to store job artifacts @@ -93,7 +135,7 @@ aws s3 mb s3://amazon-keyspaces-backups ## Upload job artifacts to S3 The job will require -* The spark-cassandra-connector to allow reads from Amazon Keyspaces. Amazon Keyspaces recommends version 2.5.2 of the spark-cassandra-connector or above. +* The spark-cassandra-connector to allow reads from Amazon Keyspaces. Amazon Keyspaces recommends version 2.5.2 of the spark-cassandra-connector or above. * application.conf containing the cassandra driver configuration for Keyspaces access * export-sample.scala script containing the export code. @@ -102,7 +144,7 @@ curl -L -O https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-con aws s3api put-object --bucket amazon-keyspaces-backups --key jars/spark-cassandra-connector-assembly_2.11-2.5.2.jar --body spark-cassandra-connector-assembly_2.11-2.5.2.jar -aws s3api put-object --bucket amazon-keyspaces-backups --key conf/application.conf --body application.conf +aws s3api put-object --bucket amazon-keyspaces-backups --key conf/cassandra-application.conf --body cassandra-application.conf aws s3api put-object --bucket amazon-keyspaces-backups --key scripts/export-sample.scala --body export-sample.scala @@ -124,9 +166,9 @@ aws glue create-job \ "--KEYSPACE_NAME":"my_keyspace", "--TABLE_NAME":"my_table", "--S3_URI":"s3://amazon-keyspaces-backups/snap-shots/", + "--DRIVER_CONF":"cassandra-application.conf", "--extra-jars":"s3://amazon-keyspaces-backups/jars/spark-cassandra-connector-assembly_2.11-2.5.2.jar", "--extra-files":"s3://amazon-keyspaces-backups/conf/application.conf", - "--conf":"spark.cassandra.connection.config.profile.path=application.conf", "--class":"GlueApp" }' ``` diff --git a/scala/datastax-v4/aws-glue/export-to-s3/application.conf b/scala/datastax-v4/aws-glue/export-to-s3/cassandra-application.conf similarity index 72% rename from scala/datastax-v4/aws-glue/export-to-s3/application.conf rename to scala/datastax-v4/aws-glue/export-to-s3/cassandra-application.conf index 1bd146f..cb67eb7 100644 --- a/scala/datastax-v4/aws-glue/export-to-s3/application.conf +++ b/scala/datastax-v4/aws-glue/export-to-s3/cassandra-application.conf @@ -12,13 +12,14 @@ datastax-java-driver { advanced.auth-provider = { class = PlainTextAuthProvider username = "user-at-sample" - password = "S@MPLE=PASSWORD=" + password = "SAMPLE#PASSWORD" } advanced.throttler = { - class = RateLimitingRequestThrottler - max-requests-per-second = 1000 - drain-interval = 10 milliseconds + class = RateLimitingRequestThrottler + max-requests-per-second = 1000 + max-queue-size = 50000 + drain-interval = 1 millisecond } advanced.ssl-engine-factory { @@ -27,5 +28,6 @@ datastax-java-driver { } advanced.connection.pool.local.size = 2 + advanced.resolve-contact-points = false } diff --git a/scala/datastax-v4/aws-glue/export-to-s3/export-sample.scala b/scala/datastax-v4/aws-glue/export-to-s3/export-sample.scala index 589c500..7020ac5 100644 --- a/scala/datastax-v4/aws-glue/export-to-s3/export-sample.scala +++ b/scala/datastax-v4/aws-glue/export-to-s3/export-sample.scala @@ -2,6 +2,7 @@ import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import org.apache.spark.SparkContext +import org.apache.spark.SparkConf import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SaveMode @@ -19,7 +20,23 @@ object GlueApp { def main(sysArgs: Array[String]) { - val spark: SparkContext = new SparkContext() + val conf = new SparkConf() + .setAll( + Seq( + ("spark.cassandra.connection.config.profile.path", "cassandra-application.conf"), + ("spark.cassandra.query.retry.count", "100"), + + ("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"), + ("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"), + ("spark.cassandra.concurrent.reads", "512"), + + ("spark.cassandra.output.concurrent.writes", "5"), + ("spark.cassandra.output.batch.grouping.key", "none"), + ("spark.cassandra.output.batch.size.rows", "1") + )) + + + val spark: SparkContext = new SparkContext(conf) val glueContext: GlueContext = new GlueContext(spark) val sparkSession: SparkSession = glueContext.getSparkSession