From 6fa0ca6fb449ccd3842dda6405b7cb0d06388665 Mon Sep 17 00:00:00 2001 From: Lucas Duarte <30901918+lusoal@users.noreply.github.com> Date: Thu, 16 May 2024 12:57:54 -0500 Subject: [PATCH] feat: Spark Streaming using Spark Operator (#516) --- .../examples/consumer/Dockerfile | 49 ++ .../spark-streaming/examples/consumer/app.py | 81 +++ .../manifests/01_spark_application.yaml | 109 ++++ .../examples/consumer/requirements.txt | 2 + .../examples/docker-compose.yaml | 28 + .../examples/producer/00_deployment.yaml | 52 ++ .../examples/producer/01_delete_topic.yaml | 50 ++ .../examples/producer/Dockerfile | 14 + .../spark-streaming/examples/producer/app.py | 129 ++++ .../examples/producer/requirements.txt | 2 + .../examples/s3_automation/app.py | 67 +++ streaming/spark-streaming/terraform/README.md | 94 +++ streaming/spark-streaming/terraform/addons.tf | 566 ++++++++++++++++++ streaming/spark-streaming/terraform/amp.tf | 137 +++++ streaming/spark-streaming/terraform/apps.tf | 76 +++ .../spark-streaming/terraform/cleanup.sh | 33 + streaming/spark-streaming/terraform/eks.tf | 203 +++++++ .../aws-cloudwatch-metrics-values.yaml | 11 + .../helm-values/aws-for-fluentbit-values.yaml | 102 ++++ .../cluster-autoscaler-values.yaml | 25 + .../coredns-autoscaler-values.yaml | 40 ++ .../kube-prometheus-amp-enable.yaml | 76 +++ .../helm-values/kube-prometheus.yaml | 47 ++ .../helm-values/kubecost-values.yaml | 62 ++ .../helm-values/metrics-server-values.yaml | 52 ++ .../terraform/helm-values/nginx-values.yaml | 37 ++ .../helm-values/spark-operator-values.yaml | 42 ++ .../helm-values/yunikorn-values.yaml | 148 +++++ .../spark-streaming/terraform/install.sh | 36 ++ streaming/spark-streaming/terraform/main.tf | 22 + streaming/spark-streaming/terraform/msk.tf | 98 +++ .../spark-streaming/terraform/outputs.tf | 70 +++ .../spark-streaming/terraform/providers.tf | 50 ++ .../spark-streaming/terraform/spark-team.tf | 142 +++++ .../spark-streaming/terraform/variables.tf | 73 +++ .../spark-streaming/terraform/versions.tf | 33 + streaming/spark-streaming/terraform/vpc.tf | 102 ++++ .../streaming-platforms/spark-streaming.md | 276 +++++++++ 38 files changed, 3236 insertions(+) create mode 100644 streaming/spark-streaming/examples/consumer/Dockerfile create mode 100755 streaming/spark-streaming/examples/consumer/app.py create mode 100644 streaming/spark-streaming/examples/consumer/manifests/01_spark_application.yaml create mode 100644 streaming/spark-streaming/examples/consumer/requirements.txt create mode 100644 streaming/spark-streaming/examples/docker-compose.yaml create mode 100644 streaming/spark-streaming/examples/producer/00_deployment.yaml create mode 100644 streaming/spark-streaming/examples/producer/01_delete_topic.yaml create mode 100644 streaming/spark-streaming/examples/producer/Dockerfile create mode 100755 streaming/spark-streaming/examples/producer/app.py create mode 100644 streaming/spark-streaming/examples/producer/requirements.txt create mode 100755 streaming/spark-streaming/examples/s3_automation/app.py create mode 100644 streaming/spark-streaming/terraform/README.md create mode 100644 streaming/spark-streaming/terraform/addons.tf create mode 100644 streaming/spark-streaming/terraform/amp.tf create mode 100644 streaming/spark-streaming/terraform/apps.tf create mode 100755 streaming/spark-streaming/terraform/cleanup.sh create mode 100644 streaming/spark-streaming/terraform/eks.tf create mode 100755 streaming/spark-streaming/terraform/helm-values/aws-cloudwatch-metrics-values.yaml create mode 100644 streaming/spark-streaming/terraform/helm-values/aws-for-fluentbit-values.yaml create mode 100644 streaming/spark-streaming/terraform/helm-values/cluster-autoscaler-values.yaml create mode 100644 streaming/spark-streaming/terraform/helm-values/coredns-autoscaler-values.yaml create mode 100644 streaming/spark-streaming/terraform/helm-values/kube-prometheus-amp-enable.yaml create mode 100644 streaming/spark-streaming/terraform/helm-values/kube-prometheus.yaml create mode 100644 streaming/spark-streaming/terraform/helm-values/kubecost-values.yaml create mode 100644 streaming/spark-streaming/terraform/helm-values/metrics-server-values.yaml create mode 100644 streaming/spark-streaming/terraform/helm-values/nginx-values.yaml create mode 100644 streaming/spark-streaming/terraform/helm-values/spark-operator-values.yaml create mode 100644 streaming/spark-streaming/terraform/helm-values/yunikorn-values.yaml create mode 100755 streaming/spark-streaming/terraform/install.sh create mode 100755 streaming/spark-streaming/terraform/main.tf create mode 100644 streaming/spark-streaming/terraform/msk.tf create mode 100644 streaming/spark-streaming/terraform/outputs.tf create mode 100644 streaming/spark-streaming/terraform/providers.tf create mode 100644 streaming/spark-streaming/terraform/spark-team.tf create mode 100644 streaming/spark-streaming/terraform/variables.tf create mode 100644 streaming/spark-streaming/terraform/versions.tf create mode 100644 streaming/spark-streaming/terraform/vpc.tf create mode 100644 website/docs/blueprints/streaming-platforms/spark-streaming.md diff --git a/streaming/spark-streaming/examples/consumer/Dockerfile b/streaming/spark-streaming/examples/consumer/Dockerfile new file mode 100644 index 000000000..a87633dfd --- /dev/null +++ b/streaming/spark-streaming/examples/consumer/Dockerfile @@ -0,0 +1,49 @@ +# Dockerfile for Apache Spark with additional JARs downloaded at build time +FROM apache/spark-py:v3.3.2 +WORKDIR /app + +# Use root to create a new user and configure permissions +USER root + +# Install wget to download JAR files +RUN apt-get update && apt-get install -y wget && \ + rm -rf /var/lib/apt/lists/* + +# Create a new user 'spark-user' with UID 1001 +RUN groupadd -r spark-group && useradd -r -u 1001 -g spark-group spark-user + +RUN mkdir -p /home/spark-user/.ivy2/cache +RUN mkdir -p /app/jars + +RUN cd /app/jars && \ + wget -q "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.1026/aws-java-sdk-bundle-1.11.1026.jar" && \ + wget -q "https://repo1.maven.org/maven2/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar" && \ + wget -q "https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar" && \ + wget -q "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.2/hadoop-aws-3.3.2.jar" && \ + wget -q "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.2/hadoop-client-api-3.3.2.jar" && \ + wget -q "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.2/hadoop-client-runtime-3.3.2.jar" && \ + wget -q "https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.3_2.12/1.0.0/iceberg-spark-runtime-3.3_2.12-1.0.0.jar" && \ + wget -q "https://repo1.maven.org/maven2/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.jar" && \ + wget -q "https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.8.1/kafka-clients-2.8.1.jar" && \ + wget -q "https://repo1.maven.org/maven2/org/lz4/lz4-java/1.7.1/lz4-java-1.7.1.jar" && \ + wget -q "https://repo1.maven.org/maven2/org/apache/parquet/parquet-avro/1.12.3/parquet-avro-1.12.3.jar" && \ + wget -q "https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.12.15/scala-library-2.12.15.jar" && \ + wget -q "https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.30/slf4j-api-1.7.30.jar" && \ + wget -q "https://repo1.maven.org/maven2/org/xerial/snappy/snappy-java/1.1.8.1/snappy-java-1.1.8.1.jar" && \ + wget -q "https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.2/spark-sql-kafka-0-10_2.12-3.3.2.jar" && \ + wget -q "https://repo1.maven.org/maven2/org/apache/spark/spark-tags_2.12/3.3.2/spark-tags_2.12-3.3.2.jar" && \ + wget -q "https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.3.2/spark-token-provider-kafka-0-10_2.12-3.3.2.jar" && \ + wget -q "https://repo1.maven.org/maven2/org/wildfly/openssl/wildfly-openssl/1.0.7.Final/wildfly-openssl-1.0.7.Final.jar" + +# Set the owner of the Ivy cache directory and /app to the new user +RUN chown -R spark-user:spark-group /home/spark-user/ +RUN chown -R spark-user:spark-group /app/ + +# Switch to the new user for running the application +USER spark-user + +# Add the Spark application script to the container +ADD app.py /app + +# Set the entry point for the container +ENTRYPOINT ["/opt/entrypoint.sh"] diff --git a/streaming/spark-streaming/examples/consumer/app.py b/streaming/spark-streaming/examples/consumer/app.py new file mode 100755 index 000000000..939956216 --- /dev/null +++ b/streaming/spark-streaming/examples/consumer/app.py @@ -0,0 +1,81 @@ +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType, StructField, StringType +from pyspark.sql.functions import from_json +import os + +# Variables +s3_bucket_name = os.getenv("S3_BUCKET_NAME", "my-iceberg-data-bucket") +kafka_address = os.getenv("KAFKA_ADDRESS", 'b-1.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092,b-2.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092') + +def create_spark_session(): + spark = SparkSession.builder \ + .appName("KafkaToIceberg") \ + .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0,org.apache.hadoop:hadoop-aws:3.3.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2") \ + .config("spark.jars.repositories", "https://repo1.maven.org/maven2/") \ + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.local.type", "hadoop") \ + .config("spark.sql.catalog.local.warehouse", f"s3a://{s3_bucket_name}/iceberg/warehouse/") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \ + .config("spark.sql.warehouse.dir", f"s3a://{s3_bucket_name}/iceberg/warehouse/") \ + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ + .config("spark.kryo.registrationRequired", "false") \ + .getOrCreate() + return spark + +def consume_and_write(): + spark = create_spark_session() + # Debug spark DEBUG + spark.sparkContext.setLogLevel("ERROR") + # Create the table + spark.sql(f""" + CREATE TABLE IF NOT EXISTS local.my_table ( + id STRING, + timestamp STRING, + alert_type STRING, + severity STRING, + description STRING + ) + USING iceberg + LOCATION 's3a://{s3_bucket_name}/iceberg/warehouse/my_table' + TBLPROPERTIES ( + 'write.format.default'='parquet' -- Explicitly specifying Parquet format + ) + """) + + # Read from Kafka + df = spark.readStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", kafka_address) \ + .option("subscribe", "security-topic") \ + .option("startingOffsets", "earliest") \ + .option("failOnDataLoss", "false") \ + .load() + + # Define the schema for the JSON data + json_schema = StructType([ + StructField("id", StringType(), True), + StructField("timestamp", StringType(), True), + StructField("alert_type", StringType(), True), + StructField("severity", StringType(), True), + StructField("description", StringType(), True) + ]) + + # Parse JSON and select the required columns + parsed_df = df.selectExpr("CAST(value AS STRING) as json") \ + .select(from_json("json", json_schema).alias("data")) \ + .select("data.id", "data.timestamp", "data.alert_type", "data.severity", "data.description") + + # Write the stream to Iceberg using table name + query = parsed_df.writeStream \ + .format("iceberg") \ + .option("checkpointLocation", f"s3a://{s3_bucket_name}/iceberg/checkpoints/") \ + .option("path", f"s3a://{s3_bucket_name}/iceberg/warehouse/my_table") \ + .outputMode("append") \ + .start() + + query.awaitTermination() # Wait for the stream to finish + +if __name__ == "__main__": + consume_and_write() diff --git a/streaming/spark-streaming/examples/consumer/manifests/01_spark_application.yaml b/streaming/spark-streaming/examples/consumer/manifests/01_spark_application.yaml new file mode 100644 index 000000000..9ae360ed6 --- /dev/null +++ b/streaming/spark-streaming/examples/consumer/manifests/01_spark_application.yaml @@ -0,0 +1,109 @@ +# This script is used to run the spark-consumer application on EKS, +# users need to replace MY_BUCKET_NAME and MY_KAFKA_BROKERS_ADRESS to match your environment. +--- +apiVersion: "sparkoperator.k8s.io/v1beta2" +kind: SparkApplication +metadata: + name: spark-consumer + namespace: spark-team-a +spec: + type: Python + pythonVersion: "3" + mode: cluster + image: "public.ecr.aws/data-on-eks/consumer-spark-streaming-3.3.2-kafka:1" # You can build your own image using the Dockerfile in this folder + mainApplicationFile: "local:///app/app.py" + sparkVersion: "3.3.2" + deps: + jars: + - "local:///app/jars/commons-logging-1.1.3.jar" + - "local:///app/jars/commons-pool2-2.11.1.jar" + - "local:///app/jars/hadoop-client-api-3.3.2.jar" + - "local:///app/jars/hadoop-client-runtime-3.3.2.jar" + - "local:///app/jars/jsr305-3.0.0.jar" + - "local:///app/jars/kafka-clients-2.8.1.jar" + - "local:///app/jars/lz4-java-1.7.1.jar" + - "local:///app/jars/scala-library-2.12.15.jar" + - "local:///app/jars/slf4j-api-1.7.30.jar" + - "local:///app/jars/snappy-java-1.1.8.1.jar" + - "local:///app/jars/spark-sql-kafka-0-10_2.12-3.3.2.jar" + - "local:///app/jars/spark-tags_2.12-3.3.2.jar" + - "local:///app/jars/spark-token-provider-kafka-0-10_2.12-3.3.2.jar" + - "local:///app/jars/iceberg-spark-runtime-3.3_2.12-1.0.0.jar" + - "local:///app/jars/hadoop-aws-3.3.2.jar" + - "local:///app/jars/aws-java-sdk-bundle-1.11.1026.jar" + - "local:///app/jars/wildfly-openssl-1.0.7.Final.jar" + - "local:///app/jars/parquet-avro-1.12.3.jar" + sparkConf: + "spark.app.name": "KafkaToIceberg" + "spark.jars.repositories": "https://repo1.maven.org/maven2/" + "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" + "spark.sql.catalog.local": "org.apache.iceberg.spark.SparkCatalog" + "spark.sql.catalog.local.type": "hadoop" + "spark.sql.catalog.local.warehouse": "s3a://__MY_BUCKET_NAME__/iceberg/warehouse/" # Replace bucket name with your S3 bucket name: s3_bucket_id_iceberg_bucket + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem" + "spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain" + "spark.sql.warehouse.dir": "s3a://__MY_BUCKET_NAME__/iceberg/warehouse/" # Replace bucket name with your S3 bucket name: s3_bucket_id_iceberg_bucket + "spark.metrics.conf.*.sink.prometheusServlet.class": "org.apache.spark.metrics.sink.PrometheusServlet" + "spark.metrics.conf.*.sink.prometheusServlet.path": "/metrics" + "spark.metrics.conf.master.sink.prometheusServlet.path": "/metrics/master" + "spark.metrics.conf.applications.sink.prometheusServlet.path": "/metrics/applications" + "spark.ui.prometheus.enabled": "true" + "spark.ui.prometheus.port": "4040" + restartPolicy: + type: OnFailure + onFailureRetries: 2 + onFailureRetryInterval: 10 + onSubmissionFailureRetries: 3 + onSubmissionFailureRetryInterval: 20 + dynamicAllocation: + enabled: true + initialExecutors: 3 + minExecutors: 3 + maxExecutors: 10 + driver: + cores: 1 + coreLimit: "1200m" + memory: "1024m" + labels: + version: "3.3.2" + app: spark + annotations: + prometheus.io/scrape: 'true' + prometheus.io/path: /metrics + prometheus.io/port: '4040' + serviceAccount: spark-team-a + nodeSelector: + NodeGroupType: "SparkComputeOptimized" + tolerations: + - key: "spark-compute-optimized" + operator: "Exists" + effect: "NoSchedule" + env: + - name: S3_BUCKET_NAME + value: "__MY_BUCKET_NAME__" # Replace with your S3 bucket name: s3_bucket_id_iceberg_bucket + - name: KAFKA_ADDRESS + value: "__MY_KAFKA_BROKERS_ADRESS__" # Replace with your Kafka brokers address: bootstrap_brokers + # value: "b-1.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092,b-2.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092" + executor: + cores: 2 + memory: "1024m" + labels: + version: "3.3.2" + app: spark + annotations: + prometheus.io/scrape: 'true' + prometheus.io/path: /metrics + prometheus.io/port: '4040' + serviceAccount: spark-team-a + nodeSelector: + NodeGroupType: "SparkComputeOptimized" + tolerations: + - key: "spark-compute-optimized" + operator: "Exists" + effect: "NoSchedule" + env: + - name: S3_BUCKET_NAME + value: "__MY_BUCKET_NAME__" # Replace with your S3 bucket name: s3_bucket_id_iceberg_bucket + - name: KAFKA_ADDRESS + value: "__MY_KAFKA_BROKERS_ADRESS__" # Replace with your Kafka brokers address: bootstrap_brokers + # value: "b-1.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092,b-2.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092" diff --git a/streaming/spark-streaming/examples/consumer/requirements.txt b/streaming/spark-streaming/examples/consumer/requirements.txt new file mode 100644 index 000000000..94715d491 --- /dev/null +++ b/streaming/spark-streaming/examples/consumer/requirements.txt @@ -0,0 +1,2 @@ +py4j==0.10.9.5 +pyspark==3.3.2 diff --git a/streaming/spark-streaming/examples/docker-compose.yaml b/streaming/spark-streaming/examples/docker-compose.yaml new file mode 100644 index 000000000..e33479fb0 --- /dev/null +++ b/streaming/spark-streaming/examples/docker-compose.yaml @@ -0,0 +1,28 @@ +version: '3' +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + container_name: zookeeper + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:latest + container_name: kafka + depends_on: + - zookeeper + ports: + - "9092:9092" + - "9093:9093" # Added a new port for the external listener + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9093 # Changed the PLAINTEXT_HOST to a different port + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093 # Changed the PLAINTEXT_HOST to advertise the new port + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 diff --git a/streaming/spark-streaming/examples/producer/00_deployment.yaml b/streaming/spark-streaming/examples/producer/00_deployment.yaml new file mode 100644 index 000000000..875f5b3e5 --- /dev/null +++ b/streaming/spark-streaming/examples/producer/00_deployment.yaml @@ -0,0 +1,52 @@ +--- +# This is the producer deployment file, you can adjust the number of replicas to produce more data. +# You will need to change __MY_AWS_REGION__, __MY_KAFKA_BROKERS__, and __MY_PRODUCER_ROLE_ARN__ to match your environment. +apiVersion: v1 +kind: ServiceAccount +metadata: + name: producer-sa + annotations: + eks.amazonaws.com/role-arn: __MY_PRODUCER_ROLE_ARN__ # Replace with your producer role ARN: producer_iam_role_arn +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: producer-deployment +spec: + replicas: 100 # Adjusted to match the required number of replicas + selector: + matchLabels: + app: producer + template: + metadata: + labels: + app: producer + spec: + serviceAccountName: producer-sa + containers: + - name: producer + image: public.ecr.aws/data-on-eks/producer-kafka:1 + #image: public.ecr.aws/data-on-eks/producer-kafka:1 + command: ["python", "app.py"] + env: + - name: RATE_PER_SECOND + value: "100000" + - name: NUM_OF_MESSAGES + value: "10000000" + - name: AWS_REGION + value: "__MY_AWS_REGION__" # Replace with your AWS region + - name: BOOTSTRAP_BROKERS + value: "__MY_KAFKA_BROKERS__" # Replace with your bootstrap brokers: bootstrap_brokers + resources: + limits: + cpu: "2" # Increased CPU limit + memory: "4Gi" # Increased memory limit + requests: + cpu: "1" # Increased CPU request + memory: "2Gi" # Increased memory request + volumeMounts: + - name: shared-volume + mountPath: /mnt + volumes: + - name: shared-volume + emptyDir: {} diff --git a/streaming/spark-streaming/examples/producer/01_delete_topic.yaml b/streaming/spark-streaming/examples/producer/01_delete_topic.yaml new file mode 100644 index 000000000..a1545d218 --- /dev/null +++ b/streaming/spark-streaming/examples/producer/01_delete_topic.yaml @@ -0,0 +1,50 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: kafka-delete-topic-script + namespace: default +data: + delete_topic.py: | + from kafka.admin import KafkaAdminClient + + def delete_topic(bootstrap_servers, topic_name): + """Delete a Kafka topic.""" + client = KafkaAdminClient(bootstrap_servers=bootstrap_servers) + try: + client.delete_topics([topic_name]) + print(f"Topic {topic_name} deleted successfully.") + except Exception as e: + print(f"Failed to delete topic {topic_name}: {e}") + + # Configuration + import os + bootstrap_servers = os.getenv('BOOTSTRAP_BROKERS', 'localhost:9092') # Replace with your Kafka broker address + topic_name = os.getenv('TOPIC_NAME', 'security-topic') # Replace with your topic name + + # Delete Kafka topic + delete_topic(bootstrap_servers, topic_name) + +--- +apiVersion: v1 +kind: Pod +metadata: + name: kafka-delete-topic-pod + namespace: default +spec: + containers: + - name: delete-topic + image: public.ecr.aws/data-on-eks/producer-kafka:1 # Use an appropriate Python image + command: ["python", "/scripts/delete_topic.py"] + env: + - name: BOOTSTRAP_BROKERS + value: "__MY_KAFKA_BROKERS__" # Replace with your Kafka broker address + - name: TOPIC_NAME + value: "security-topic" # Replace with your topic name + volumeMounts: + - name: script-volume + mountPath: /scripts + restartPolicy: Never + volumes: + - name: script-volume + configMap: + name: kafka-delete-topic-script \ No newline at end of file diff --git a/streaming/spark-streaming/examples/producer/Dockerfile b/streaming/spark-streaming/examples/producer/Dockerfile new file mode 100644 index 000000000..0768d12f4 --- /dev/null +++ b/streaming/spark-streaming/examples/producer/Dockerfile @@ -0,0 +1,14 @@ +# Use an official Python runtime as a parent image +FROM python:3.9-slim + +# Set the working directory in the container +WORKDIR /usr/src/app + +# Copy the local code to the container +COPY . . + +# Install any needed packages specified in requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +# Run app.py when the container launches +CMD ["python", "app.py"] diff --git a/streaming/spark-streaming/examples/producer/app.py b/streaming/spark-streaming/examples/producer/app.py new file mode 100755 index 000000000..b30609439 --- /dev/null +++ b/streaming/spark-streaming/examples/producer/app.py @@ -0,0 +1,129 @@ +import boto3 +import json +from kafka import KafkaProducer +from kafka.admin import KafkaAdminClient, NewTopic +from kafka.errors import KafkaError, TopicAlreadyExistsError +from concurrent.futures import ThreadPoolExecutor, as_completed +import time +import random +import os +import threading + +def create_topic(bootstrap_servers, topic_name, num_partitions, replication_factor): + """Create a Kafka topic.""" + client = KafkaAdminClient(bootstrap_servers=bootstrap_servers) + topic_list = [NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)] + try: + client.create_topics(new_topics=topic_list, validate_only=False) + print(f"Topic {topic_name} created successfully.") + except TopicAlreadyExistsError: + print(f"Topic {topic_name} already exists.") + except Exception as e: + print(f"Failed to create topic {topic_name}: {e}") + +def create_producer(bootstrap_brokers): + """Create a Kafka producer.""" + return KafkaProducer( + bootstrap_servers=bootstrap_brokers, + linger_ms=10, # Add some delay to batch messages + batch_size=32768, # Adjust batch size + buffer_memory=33554432 # Increase buffer memory + ) + +def generate_random_alert(data_id, alert_types, severities, descriptions): + """Generate a random alert message.""" + alert_type = random.choice(alert_types) + severity = random.choice(severities) + description = random.choice(descriptions[alert_type]) + + return { + 'id': data_id, + 'timestamp': time.time(), + 'alert_type': alert_type, + 'severity': severity, + 'description': description + } + +def produce_data(producer, topic_name, rate_per_second, num_messages=1000, num_threads=4): + """Produce data at a specified rate per second.""" + alert_types = ['intrusion', 'data leak', 'malware', 'phishing', 'ransomware'] + severities = ['low', 'medium', 'high', 'critical'] + + descriptions = { + 'intrusion': [ + 'Unauthorized access detected.', + 'Suspicious login attempt blocked.', + 'Possible brute force attack detected.' + ], + 'data leak': [ + 'Sensitive data exposed to public.', + 'Unauthorized data access from multiple locations.', + 'Data exfiltration attempt detected.' + ], + 'malware': [ + 'Malware detected on endpoint.', + 'Ransomware infection attempt blocked.', + 'Suspicious file download intercepted.' + ], + 'phishing': [ + 'Phishing email detected in user inbox.', + 'Credential phishing attempt identified.', + 'Suspicious domain communication intercepted.' + ], + 'ransomware': [ + 'Ransomware encryption behavior detected.', + 'Host isolated due to ransomware threat.', + 'Suspicious encryption of files noticed.' + ] + } + + def produce_batch(batch_id, num_batches, rate_limiter): + for i in range(batch_id * num_batches, (batch_id + 1) * num_batches): + rate_limiter.acquire() # Wait for permission to send + message = generate_random_alert(i, alert_types, severities, descriptions) + try: + producer.send(topic_name, json.dumps(message).encode('utf-8')) + print(f"Sent: {message}") + except KafkaError as e: + print(f"Failed to send message: {e}") + producer.flush() + + num_batches_per_thread = num_messages // num_threads + rate_limiter = threading.Semaphore(rate_per_second) + + def refill_semaphore(): + while True: + time.sleep(1) + for _ in range(rate_per_second): + rate_limiter.release() + + # Start a background thread to refill the rate limiter semaphore + threading.Thread(target=refill_semaphore, daemon=True).start() + + with ThreadPoolExecutor(max_workers=num_threads) as executor: + futures = [executor.submit(produce_batch, thread_id, num_batches_per_thread, rate_limiter) for thread_id in range(num_threads)] + for future in as_completed(futures): + future.result() + +if __name__ == '__main__': + # Configuration + topic_name = "security-topic" + aws_region = os.getenv("AWS_REGION", "us-west-2") # Replace with your AWS region + num_partitions = 3 + replication_factor = 2 + bootstrap_brokers = os.getenv("BOOTSTRAP_BROKERS", 'b-1.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092,b-2.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092') + rate_per_second = int(os.getenv("RATE_PER_SECOND", 100000)) + num_messages = int(os.getenv("NUM_OF_MESSAGES", 10000000)) + num_threads = 8 + + # Create Kafka topic if it doesn't exist + create_topic(bootstrap_brokers, topic_name, num_partitions, replication_factor) + + # Create Kafka producer + producer = create_producer(bootstrap_brokers) + + # Produce data with rate limiting + try: + produce_data(producer, topic_name, rate_per_second=rate_per_second, num_messages=num_messages, num_threads=num_threads) # Adjust `rate_per_second` as needed + finally: + producer.close() \ No newline at end of file diff --git a/streaming/spark-streaming/examples/producer/requirements.txt b/streaming/spark-streaming/examples/producer/requirements.txt new file mode 100644 index 000000000..efd5e74c6 --- /dev/null +++ b/streaming/spark-streaming/examples/producer/requirements.txt @@ -0,0 +1,2 @@ +boto3 +kafka-python diff --git a/streaming/spark-streaming/examples/s3_automation/app.py b/streaming/spark-streaming/examples/s3_automation/app.py new file mode 100755 index 000000000..89727d1d2 --- /dev/null +++ b/streaming/spark-streaming/examples/s3_automation/app.py @@ -0,0 +1,67 @@ +import boto3 +import time +from botocore.exceptions import NoCredentialsError, PartialCredentialsError + +def get_bucket_size(bucket_name): + s3 = boto3.client('s3') + total_size = 0 + try: + paginator = s3.get_paginator('list_objects_v2') + for page in paginator.paginate(Bucket=bucket_name): + for obj in page.get('Contents', []): + total_size += obj['Size'] + except NoCredentialsError: + print("Credentials not available.") + return None + except PartialCredentialsError: + print("Incomplete credentials provided.") + return None + except Exception as e: + print(f"An error occurred: {e}") + return None + + return total_size + +def format_size_mb(size): + # Convert size to MB + return size / (1024 * 1024) + +def delete_directory(bucket_name, prefix): + s3 = boto3.client('s3') + try: + paginator = s3.get_paginator('list_objects_v2') + for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix): + delete_objects = [{'Key': obj['Key']} for obj in page.get('Contents', [])] + if delete_objects: + s3.delete_objects(Bucket=bucket_name, Delete={'Objects': delete_objects}) + print(f"Successfully deleted directory '{prefix}' in bucket '{bucket_name}'.") + except NoCredentialsError: + print("Credentials not available.") + except PartialCredentialsError: + print("Incomplete credentials provided.") + except Exception as e: + print(f"An error occurred while deleting the directory: {e}") + +def main(): + bucket_name = input("Enter the S3 bucket name: ") + refresh_interval = 10 # Refresh interval in seconds + + while True: + action = input("Enter 'size' to get bucket size or 'delete' to delete a directory: ").strip().lower() + if action == 'size': + size = get_bucket_size(bucket_name) + if size is not None: + size_mb = format_size_mb(size) + print(f"Total size of bucket '{bucket_name}': {size_mb:.2f} MB") + else: + print(f"Failed to get the size of bucket '{bucket_name}'.") + elif action == 'delete': + prefix = input("Enter the directory prefix to delete (e.g., 'myfolder/'): ").strip() + delete_directory(bucket_name, prefix) + else: + print("Invalid action. Please enter 'size' or 'delete'.") + + time.sleep(refresh_interval) + +if __name__ == "__main__": + main() diff --git a/streaming/spark-streaming/terraform/README.md b/streaming/spark-streaming/terraform/README.md new file mode 100644 index 000000000..7635abd59 --- /dev/null +++ b/streaming/spark-streaming/terraform/README.md @@ -0,0 +1,94 @@ +# Spark on K8s Operator with EKS +Checkout the [documentation website](https://awslabs.github.io/data-on-eks/docs/blueprints/streaming-platforms/spark-streaming) to deploy this pattern and run sample tests. + + +## Requirements + +| Name | Version | +|------|---------| +| [terraform](#requirement\_terraform) | >= 1.0.0 | +| [aws](#requirement\_aws) | >= 3.72 | +| [helm](#requirement\_helm) | >= 2.4.1 | +| [kubectl](#requirement\_kubectl) | >= 1.14 | +| [kubernetes](#requirement\_kubernetes) | >= 2.10 | +| [random](#requirement\_random) | 3.3.2 | + +## Providers + +| Name | Version | +|------|---------| +| [aws](#provider\_aws) | >= 3.72 | +| [aws.ecr](#provider\_aws.ecr) | >= 3.72 | +| [random](#provider\_random) | 3.3.2 | + +## Modules + +| Name | Source | Version | +|------|--------|---------| +| [amp\_ingest\_irsa](#module\_amp\_ingest\_irsa) | aws-ia/eks-blueprints-addon/aws | ~> 1.0 | +| [consumer\_iam\_role](#module\_consumer\_iam\_role) | terraform-aws-modules/iam/aws//modules/iam-role-for-service-accounts-eks | n/a | +| [ebs\_csi\_driver\_irsa](#module\_ebs\_csi\_driver\_irsa) | terraform-aws-modules/iam/aws//modules/iam-role-for-service-accounts-eks | ~> 5.34 | +| [eks](#module\_eks) | terraform-aws-modules/eks/aws | ~> 19.15 | +| [eks\_blueprints\_addons](#module\_eks\_blueprints\_addons) | aws-ia/eks-blueprints-addons/aws | ~> 1.2 | +| [eks\_data\_addons](#module\_eks\_data\_addons) | aws-ia/eks-data-addons/aws | ~> 1.30 | +| [producer\_iam\_role](#module\_producer\_iam\_role) | terraform-aws-modules/iam/aws//modules/iam-role-for-service-accounts-eks | n/a | +| [s3\_bucket](#module\_s3\_bucket) | terraform-aws-modules/s3-bucket/aws | ~> 3.0 | +| [vpc](#module\_vpc) | terraform-aws-modules/vpc/aws | ~> 5.0 | +| [vpc\_endpoints](#module\_vpc\_endpoints) | terraform-aws-modules/vpc/aws//modules/vpc-endpoints | ~> 5.0 | +| [vpc\_endpoints\_sg](#module\_vpc\_endpoints\_sg) | terraform-aws-modules/security-group/aws | ~> 5.0 | + +## Resources + +| Name | Type | +|------|------| +| [aws_iam_policy.consumer_s3_kafka](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource | +| [aws_iam_policy.grafana](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource | +| [aws_iam_policy.producer_s3_kafka](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource | +| [aws_msk_cluster.kafka_test_demo](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/msk_cluster) | resource | +| [aws_prometheus_workspace.amp](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/prometheus_workspace) | resource | +| [aws_s3_bucket.iceberg_data](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket) | resource | +| [aws_s3_object.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource | +| [aws_secretsmanager_secret.grafana](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/secretsmanager_secret) | resource | +| [aws_secretsmanager_secret_version.grafana](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/secretsmanager_secret_version) | resource | +| [aws_security_group.msk_security_group](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/security_group) | resource | +| [random_password.grafana](https://registry.terraform.io/providers/hashicorp/random/3.3.2/docs/resources/password) | resource | +| [aws_availability_zones.available](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/availability_zones) | data source | +| [aws_caller_identity.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/caller_identity) | data source | +| [aws_ecrpublic_authorization_token.token](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/ecrpublic_authorization_token) | data source | +| [aws_iam_policy_document.grafana](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source | +| [aws_partition.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/partition) | data source | +| [aws_region.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/region) | data source | +| [aws_secretsmanager_secret_version.admin_password_version](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/secretsmanager_secret_version) | data source | + +## Inputs + +| Name | Description | Type | Default | Required | +|------|-------------|------|---------|:--------:| +| [eks\_cluster\_version](#input\_eks\_cluster\_version) | EKS Cluster version | `string` | `"1.28"` | no | +| [eks\_data\_plane\_subnet\_secondary\_cidr](#input\_eks\_data\_plane\_subnet\_secondary\_cidr) | Secondary CIDR blocks. 32766 IPs per Subnet per Subnet/AZ for EKS Node and Pods | `list(string)` |
[
"100.64.0.0/17",
"100.64.128.0/17"
]
| no | +| [enable\_amazon\_prometheus](#input\_enable\_amazon\_prometheus) | Enable AWS Managed Prometheus service | `bool` | `true` | no | +| [enable\_vpc\_endpoints](#input\_enable\_vpc\_endpoints) | Enable VPC Endpoints | `bool` | `false` | no | +| [enable\_yunikorn](#input\_enable\_yunikorn) | Enable Apache YuniKorn Scheduler | `bool` | `false` | no | +| [name](#input\_name) | Name of the VPC and EKS Cluster | `string` | `"spark-operator-doeks"` | no | +| [private\_subnets](#input\_private\_subnets) | Private Subnets CIDRs. 254 IPs per Subnet/AZ for Private NAT + NLB + Airflow + EC2 Jumphost etc. | `list(string)` |
[
"10.1.1.0/24",
"10.1.2.0/24"
]
| no | +| [public\_subnets](#input\_public\_subnets) | Public Subnets CIDRs. 62 IPs per Subnet/AZ | `list(string)` |
[
"10.1.0.0/26",
"10.1.0.64/26"
]
| no | +| [region](#input\_region) | Region | `string` | `"us-west-2"` | no | +| [secondary\_cidr\_blocks](#input\_secondary\_cidr\_blocks) | Secondary CIDR blocks to be attached to VPC | `list(string)` |
[
"100.64.0.0/16"
]
| no | +| [vpc\_cidr](#input\_vpc\_cidr) | VPC CIDR. This should be a valid private (RFC 1918) CIDR range | `string` | `"10.1.0.0/16"` | no | + +## Outputs + +| Name | Description | +|------|-------------| +| [bootstrap\_brokers](#output\_bootstrap\_brokers) | Bootstrap brokers for the MSK cluster | +| [cluster\_arn](#output\_cluster\_arn) | The Amazon Resource Name (ARN) of the cluster | +| [cluster\_name](#output\_cluster\_name) | The Amazon Resource Name (ARN) of the cluster | +| [configure\_kubectl](#output\_configure\_kubectl) | Configure kubectl: make sure you're logged in with the correct AWS profile and run the following command to update your kubeconfig | +| [consumer\_iam\_role\_arn](#output\_consumer\_iam\_role\_arn) | IAM role ARN for the consumer | +| [grafana\_secret\_name](#output\_grafana\_secret\_name) | Grafana password secret name | +| [producer\_iam\_role\_arn](#output\_producer\_iam\_role\_arn) | IAM role ARN for the producer | +| [s3\_bucket\_id\_iceberg\_bucket](#output\_s3\_bucket\_id\_iceberg\_bucket) | Spark History server logs S3 bucket ID | +| [s3\_bucket\_id\_spark\_history\_server](#output\_s3\_bucket\_id\_spark\_history\_server) | Spark History server logs S3 bucket ID | +| [s3\_bucket\_region\_spark\_history\_server](#output\_s3\_bucket\_region\_spark\_history\_server) | Spark History server logs S3 bucket ID | +| [subnet\_ids\_starting\_with\_100](#output\_subnet\_ids\_starting\_with\_100) | Secondary CIDR Private Subnet IDs for EKS Data Plane | + diff --git a/streaming/spark-streaming/terraform/addons.tf b/streaming/spark-streaming/terraform/addons.tf new file mode 100644 index 000000000..75a4e894f --- /dev/null +++ b/streaming/spark-streaming/terraform/addons.tf @@ -0,0 +1,566 @@ +#--------------------------------------------------------------- +# IRSA for EBS CSI Driver +#--------------------------------------------------------------- +module "ebs_csi_driver_irsa" { + source = "terraform-aws-modules/iam/aws//modules/iam-role-for-service-accounts-eks" + version = "~> 5.34" + role_name_prefix = format("%s-%s-", local.name, "ebs-csi-driver") + attach_ebs_csi_policy = true + oidc_providers = { + main = { + provider_arn = module.eks.oidc_provider_arn + namespace_service_accounts = ["kube-system:ebs-csi-controller-sa"] + } + } + tags = local.tags +} + +#--------------------------------------------------------------- +# EKS Blueprints Addons +#--------------------------------------------------------------- +module "eks_blueprints_addons" { + source = "aws-ia/eks-blueprints-addons/aws" + version = "~> 1.2" + + cluster_name = module.eks.cluster_name + cluster_endpoint = module.eks.cluster_endpoint + cluster_version = module.eks.cluster_version + oidc_provider_arn = module.eks.oidc_provider_arn + + #--------------------------------------- + # Amazon EKS Managed Add-ons + #--------------------------------------- + eks_addons = { + aws-ebs-csi-driver = { + service_account_role_arn = module.ebs_csi_driver_irsa.iam_role_arn + } + coredns = { + preserve = true + } + vpc-cni = { + preserve = true + } + kube-proxy = { + preserve = true + } + } + + #--------------------------------------- + # Kubernetes Add-ons + #--------------------------------------- + #--------------------------------------------------------------- + # CoreDNS Autoscaler helps to scale for large EKS Clusters + # Further tuning for CoreDNS is to leverage NodeLocal DNSCache -> https://kubernetes.io/docs/tasks/administer-cluster/nodelocaldns/ + #--------------------------------------------------------------- + enable_cluster_proportional_autoscaler = true + cluster_proportional_autoscaler = { + values = [templatefile("${path.module}/helm-values/coredns-autoscaler-values.yaml", { + target = "deployment/coredns" + })] + description = "Cluster Proportional Autoscaler for CoreDNS Service" + } + + #--------------------------------------- + # Metrics Server + #--------------------------------------- + enable_metrics_server = true + metrics_server = { + values = [templatefile("${path.module}/helm-values/metrics-server-values.yaml", {})] + } + + #--------------------------------------- + # Cluster Autoscaler + #--------------------------------------- + enable_cluster_autoscaler = true + cluster_autoscaler = { + values = [templatefile("${path.module}/helm-values/cluster-autoscaler-values.yaml", { + aws_region = var.region, + eks_cluster_id = module.eks.cluster_name + })] + } + + #--------------------------------------- + # Karpenter Autoscaler for EKS Cluster + #--------------------------------------- + enable_karpenter = true + karpenter_enable_spot_termination = true + karpenter_node = { + iam_role_additional_policies = { + AmazonSSMManagedInstanceCore = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore" + } + } + karpenter = { + chart_version = "v0.34.0" + repository_username = data.aws_ecrpublic_authorization_token.token.user_name + repository_password = data.aws_ecrpublic_authorization_token.token.password + } + + #--------------------------------------- + # CloudWatch metrics for EKS + #--------------------------------------- + enable_aws_cloudwatch_metrics = true + aws_cloudwatch_metrics = { + values = [templatefile("${path.module}/helm-values/aws-cloudwatch-metrics-values.yaml", {})] + } + + #--------------------------------------- + # AWS for FluentBit - DaemonSet + #--------------------------------------- + enable_aws_for_fluentbit = true + aws_for_fluentbit_cw_log_group = { + use_name_prefix = false + name = "/${local.name}/aws-fluentbit-logs" # Add-on creates this log group + retention_in_days = 30 + } + aws_for_fluentbit = { + s3_bucket_arns = [ + module.s3_bucket.s3_bucket_arn, + "${module.s3_bucket.s3_bucket_arn}/*}" + ] + values = [templatefile("${path.module}/helm-values/aws-for-fluentbit-values.yaml", { + region = local.region, + cloudwatch_log_group = "/${local.name}/aws-fluentbit-logs" + s3_bucket_name = module.s3_bucket.s3_bucket_id + cluster_name = module.eks.cluster_name + })] + } + + enable_aws_load_balancer_controller = true + aws_load_balancer_controller = { + chart_version = "1.5.4" + } + + enable_ingress_nginx = true + ingress_nginx = { + version = "4.5.2" + values = [templatefile("${path.module}/helm-values/nginx-values.yaml", {})] + } + + #--------------------------------------- + # Prommetheus and Grafana stack + #--------------------------------------- + #--------------------------------------------------------------- + # Install Kafka Montoring Stack with Prometheus and Grafana + # 1- Grafana port-forward `kubectl port-forward svc/kube-prometheus-stack-grafana 8080:80 -n kube-prometheus-stack` + # 2- Grafana Admin user: admin + # 3- Get admin user password: `aws secretsmanager get-secret-value --secret-id --region $AWS_REGION --query "SecretString" --output text` + #--------------------------------------------------------------- + enable_kube_prometheus_stack = true + kube_prometheus_stack = { + values = [ + var.enable_amazon_prometheus ? templatefile("${path.module}/helm-values/kube-prometheus-amp-enable.yaml", { + region = local.region + amp_sa = local.amp_ingest_service_account + amp_irsa = module.amp_ingest_irsa[0].iam_role_arn + amp_remotewrite_url = "https://aps-workspaces.${local.region}.amazonaws.com/workspaces/${aws_prometheus_workspace.amp[0].id}/api/v1/remote_write" + amp_url = "https://aps-workspaces.${local.region}.amazonaws.com/workspaces/${aws_prometheus_workspace.amp[0].id}" + }) : templatefile("${path.module}/helm-values/kube-prometheus.yaml", {}) + ] + chart_version = "48.1.1" + set_sensitive = [ + { + name = "grafana.adminPassword" + value = data.aws_secretsmanager_secret_version.admin_password_version.secret_string + } + ], + } + + tags = local.tags +} + +#--------------------------------------------------------------- +# Data on EKS Kubernetes Addons +#--------------------------------------------------------------- +module "eks_data_addons" { + source = "aws-ia/eks-data-addons/aws" + version = "~> 1.30" # ensure to update this to the latest/desired version + + oidc_provider_arn = module.eks.oidc_provider_arn + + enable_karpenter_resources = true + + karpenter_resources_helm_config = { + spark-compute-optimized = { + values = [ + <<-EOT + name: spark-compute-optimized + clusterName: ${module.eks.cluster_name} + ec2NodeClass: + karpenterRole: ${split("/", module.eks_blueprints_addons.karpenter.node_iam_role_arn)[1]} + subnetSelectorTerms: + tags: + Name: "${module.eks.cluster_name}-private*" + securityGroupSelectorTerms: + tags: + Name: ${module.eks.cluster_name}-node + instanceStorePolicy: RAID0 + + nodePool: + labels: + - type: karpenter + - NodeGroupType: SparkComputeOptimized + - multiArch: Spark + requirements: + - key: "karpenter.sh/capacity-type" + operator: In + values: ["spot", "on-demand"] + - key: "kubernetes.io/arch" + operator: In + values: ["amd64"] + - key: "karpenter.k8s.aws/instance-category" + operator: In + values: ["c"] + - key: "karpenter.k8s.aws/instance-family" + operator: In + values: ["c5d"] + - key: "karpenter.k8s.aws/instance-cpu" + operator: In + values: ["4", "8", "16", "36"] + - key: "karpenter.k8s.aws/instance-hypervisor" + operator: In + values: ["nitro"] + - key: "karpenter.k8s.aws/instance-generation" + operator: Gt + values: ["2"] + limits: + cpu: 1000 + disruption: + consolidationPolicy: WhenEmpty + consolidateAfter: 30s + expireAfter: 720h + weight: 100 + EOT + ] + } + spark-graviton-memory-optimized = { + values = [ + <<-EOT + name: spark-graviton-memory-optimized + clusterName: ${module.eks.cluster_name} + ec2NodeClass: + karpenterRole: ${split("/", module.eks_blueprints_addons.karpenter.node_iam_role_arn)[1]} + subnetSelectorTerms: + tags: + Name: "${module.eks.cluster_name}-private*" + securityGroupSelectorTerms: + tags: + Name: ${module.eks.cluster_name}-node + instanceStorePolicy: RAID0 + nodePool: + labels: + - type: karpenter + - NodeGroupType: SparkGravitonMemoryOptimized + - multiArch: Spark + requirements: + - key: "karpenter.sh/capacity-type" + operator: In + values: ["spot", "on-demand"] + - key: "kubernetes.io/arch" + operator: In + values: ["arm64"] + - key: "karpenter.k8s.aws/instance-category" + operator: In + values: ["r"] + - key: "karpenter.k8s.aws/instance-family" + operator: In + values: ["r6gd"] + - key: "karpenter.k8s.aws/instance-cpu" + operator: In + values: ["4", "8", "16", "32"] + - key: "karpenter.k8s.aws/instance-hypervisor" + operator: In + values: ["nitro"] + - key: "karpenter.k8s.aws/instance-generation" + operator: Gt + values: ["2"] + limits: + cpu: 1000 + disruption: + consolidationPolicy: WhenEmpty + consolidateAfter: 30s + expireAfter: 720h + weight: 50 + EOT + ] + } + spark-memory-optimized = { + values = [ + <<-EOT + name: spark-memory-optimized + clusterName: ${module.eks.cluster_name} + ec2NodeClass: + karpenterRole: ${split("/", module.eks_blueprints_addons.karpenter.node_iam_role_arn)[1]} + subnetSelectorTerms: + tags: + Name: "${module.eks.cluster_name}-private*" + securityGroupSelectorTerms: + tags: + Name: ${module.eks.cluster_name}-node + instanceStorePolicy: RAID0 + + nodePool: + labels: + - type: karpenter + - NodeGroupType: SparkMemoryOptimized + requirements: + - key: "karpenter.sh/capacity-type" + operator: In + values: ["spot", "on-demand"] + - key: "kubernetes.io/arch" + operator: In + values: ["amd64"] + - key: "karpenter.k8s.aws/instance-category" + operator: In + values: ["r"] + - key: "karpenter.k8s.aws/instance-family" + operator: In + values: ["r5d"] + - key: "karpenter.k8s.aws/instance-cpu" + operator: In + values: ["4", "8", "16", "32"] + - key: "karpenter.k8s.aws/instance-hypervisor" + operator: In + values: ["nitro"] + - key: "karpenter.k8s.aws/instance-generation" + operator: Gt + values: ["2"] + limits: + cpu: 1000 + disruption: + consolidationPolicy: WhenEmpty + consolidateAfter: 30s + expireAfter: 720h + weight: 100 + EOT + ] + } + spark-vertical-ebs-scale = { + values = [ + <<-EOT + name: spark-vertical-ebs-scale + clusterName: ${module.eks.cluster_name} + ec2NodeClass: + karpenterRole: ${split("/", module.eks_blueprints_addons.karpenter.node_iam_role_arn)[1]} + subnetSelectorTerms: + tags: + Name: "${module.eks.cluster_name}-private*" + securityGroupSelectorTerms: + tags: + Name: ${module.eks.cluster_name}-node + userData: | + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="BOUNDARY" + + --BOUNDARY + Content-Type: text/x-shellscript; charset="us-ascii" + + #!/bin/bash + echo "Running a custom user data script" + set -ex + yum install mdadm -y + + IDX=1 + DEVICES=$(lsblk -o NAME,TYPE -dsn | awk '/disk/ {print $1}') + + DISK_ARRAY=() + + for DEV in $DEVICES + do + DISK_ARRAY+=("/dev/$${DEV}") + done + + DISK_COUNT=$${#DISK_ARRAY[@]} + + if [ $${DISK_COUNT} -eq 0 ]; then + echo "No SSD disks available. Creating new EBS volume according to number of cores available in the node." + yum install -y jq awscli + TOKEN=$(curl -s -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 3600") + + # Get instance info + INSTANCE_ID=$(curl -s -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/instance-id) + AVAILABILITY_ZONE=$(curl -s -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/placement/availability-zone) + REGION=$(curl -s -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/placement/availability-zone | sed 's/[a-z]$//') + + # Get the number of cores available + CORES=$(nproc --all) + + # Define volume size based on the number of cores and EBS volume size per core + VOLUME_SIZE=$(expr $CORES \* 10) # 10GB per core. Change as desired + + # Create a volume + VOLUME_ID=$(aws ec2 create-volume --availability-zone $AVAILABILITY_ZONE --size $VOLUME_SIZE --volume-type gp3 --region $REGION --output text --query 'VolumeId') + + # Check whether the volume is available + while [ "$(aws ec2 describe-volumes --volume-ids $VOLUME_ID --region $REGION --query "Volumes[*].State" --output text)" != "available" ]; do + echo "Waiting for volume to become available" + sleep 5 + done + + # Attach the volume to the instance + aws ec2 attach-volume --volume-id $VOLUME_ID --instance-id $INSTANCE_ID --device /dev/xvdb --region $REGION + + # Update the state to delete the volume when the node is terminated + aws ec2 modify-instance-attribute --instance-id $INSTANCE_ID --block-device-mappings "[{\"DeviceName\": \"/dev/xvdb\",\"Ebs\":{\"DeleteOnTermination\":true}}]" --region $REGION + + # Wait for the volume to be attached + while [ "$(aws ec2 describe-volumes --volume-ids $VOLUME_ID --region $REGION --query "Volumes[*].Attachments[*].State" --output text)" != "attached" ]; do + echo "Waiting for volume to be attached" + sleep 5 + done + + # Format the volume + sudo mkfs -t ext4 /dev/xvdb # Improve this to get this value dynamically + # Create a mount point + sudo mkdir /mnt/k8s-disks # Change directory as you like + # Mount the volume + sudo mount /dev/xvdb /mnt/k8s-disks + # To mount this EBS volume on every system reboot, you need to add an entry in /etc/fstab + echo "/dev/xvdb /mnt/k8s-disks ext4 defaults,nofail 0 2" | sudo tee -a /etc/fstab + + # Adding permissions to the mount + /usr/bin/chown -hR +999:+1000 /mnt/k8s-disks + else + if [ $${DISK_COUNT} -eq 1 ]; then + TARGET_DEV=$${DISK_ARRAY[0]} + mkfs.xfs $${TARGET_DEV} + else + mdadm --create --verbose /dev/md0 --level=0 --raid-devices=$${DISK_COUNT} $${DISK_ARRAY[@]} + mkfs.xfs /dev/md0 + TARGET_DEV=/dev/md0 + fi + + mkdir -p /mnt/k8s-disks + echo $${TARGET_DEV} /mnt/k8s-disks xfs defaults,noatime 1 2 >> /etc/fstab + mount -a + /usr/bin/chown -hR +999:+1000 /mnt/k8s-disks + fi + + --BOUNDARY-- + + nodePool: + labels: + - type: karpenter + - provisioner: spark-vertical-ebs-scale + requirements: + - key: "karpenter.sh/capacity-type" + operator: In + values: ["spot", "on-demand"] + - key: "karpenter.k8s.aws/instance-family" + operator: In + values: ["r4", "r4", "r5", "r5d", "r5n", "r5dn", "r5b", "m4", "m5", "m5n", "m5zn", "m5dn", "m5d", "c4", "c5", "c5n", "c5d"] + - key: "kubernetes.io/arch" + operator: In + values: ["amd64"] + - key: "karpenter.k8s.aws/instance-generation" + operator: Gt + values: ["2"] + limits: + cpu: 1000 + disruption: + consolidationPolicy: WhenEmpty + consolidateAfter: 30s + expireAfter: 720h + weight: 100 + EOT + ] + } + } + + #--------------------------------------------------------------- + # Spark Operator Add-on + #--------------------------------------------------------------- + enable_spark_operator = true + spark_operator_helm_config = { + values = [templatefile("${path.module}/helm-values/spark-operator-values.yaml", {})] + } + + #--------------------------------------------------------------- + # Apache YuniKorn Add-on + #--------------------------------------------------------------- + enable_yunikorn = var.enable_yunikorn + yunikorn_helm_config = { + values = [templatefile("${path.module}/helm-values/yunikorn-values.yaml", { + image_version = "1.2.0" + })] + } + + #--------------------------------------------------------------- + # Spark History Server Add-on + #--------------------------------------------------------------- + # Spark history server is required only when EMR Spark Operator is enabled + enable_spark_history_server = true + spark_history_server_helm_config = { + values = [ + <<-EOT + sparkHistoryOpts: "-Dspark.history.fs.logDirectory=s3a://${module.s3_bucket.s3_bucket_id}/${aws_s3_object.this.key}" + EOT + ] + } + + #--------------------------------------------------------------- + # Kubecost Add-on + #--------------------------------------------------------------- + enable_kubecost = true + kubecost_helm_config = { + values = [templatefile("${path.module}/helm-values/kubecost-values.yaml", {})] + repository_username = data.aws_ecrpublic_authorization_token.token.user_name + repository_password = data.aws_ecrpublic_authorization_token.token.password + } + +} + +#--------------------------------------------------------------- +# S3 bucket for Spark Event Logs and Example Data +#--------------------------------------------------------------- +#tfsec:ignore:* +module "s3_bucket" { + source = "terraform-aws-modules/s3-bucket/aws" + version = "~> 3.0" + + bucket_prefix = "${local.name}-spark-logs-" + + # For example only - please evaluate for your environment + force_destroy = true + + server_side_encryption_configuration = { + rule = { + apply_server_side_encryption_by_default = { + sse_algorithm = "AES256" + } + } + } + + tags = local.tags +} + +# Creating an s3 bucket prefix. Ensure you copy Spark History event logs under this path to visualize the dags +resource "aws_s3_object" "this" { + bucket = module.s3_bucket.s3_bucket_id + key = "spark-event-logs/" + content_type = "application/x-directory" +} + +#--------------------------------------------------------------- +# Grafana Admin credentials resources +#--------------------------------------------------------------- +data "aws_secretsmanager_secret_version" "admin_password_version" { + secret_id = aws_secretsmanager_secret.grafana.id + depends_on = [aws_secretsmanager_secret_version.grafana] +} + +resource "random_password" "grafana" { + length = 16 + special = true + override_special = "@_" +} + +#tfsec:ignore:aws-ssm-secret-use-customer-key +resource "aws_secretsmanager_secret" "grafana" { + name = "${local.name}-grafana" + recovery_window_in_days = 0 # Set to zero for this example to force delete during Terraform destroy +} + +resource "aws_secretsmanager_secret_version" "grafana" { + secret_id = aws_secretsmanager_secret.grafana.id + secret_string = random_password.grafana.result +} diff --git a/streaming/spark-streaming/terraform/amp.tf b/streaming/spark-streaming/terraform/amp.tf new file mode 100644 index 000000000..96df2a495 --- /dev/null +++ b/streaming/spark-streaming/terraform/amp.tf @@ -0,0 +1,137 @@ +#IAM Policy for Amazon Prometheus & Grafana +resource "aws_iam_policy" "grafana" { + count = var.enable_amazon_prometheus ? 1 : 0 + + description = "IAM policy for Grafana Pod" + name_prefix = format("%s-%s-", local.name, "grafana") + path = "/" + policy = data.aws_iam_policy_document.grafana[0].json +} + +data "aws_iam_policy_document" "grafana" { + count = var.enable_amazon_prometheus ? 1 : 0 + + statement { + sid = "AllowReadingMetricsFromCloudWatch" + effect = "Allow" + resources = ["*"] + + actions = [ + "cloudwatch:DescribeAlarmsForMetric", + "cloudwatch:ListMetrics", + "cloudwatch:GetMetricData", + "cloudwatch:GetMetricStatistics" + ] + } + + statement { + sid = "AllowGetInsightsCloudWatch" + effect = "Allow" + resources = ["arn:${local.partition}:cloudwatch:${local.region}:${local.account_id}:insight-rule/*"] + + actions = [ + "cloudwatch:GetInsightRuleReport", + ] + } + + statement { + sid = "AllowReadingAlarmHistoryFromCloudWatch" + effect = "Allow" + resources = ["arn:${local.partition}:cloudwatch:${local.region}:${local.account_id}:alarm:*"] + + actions = [ + "cloudwatch:DescribeAlarmHistory", + "cloudwatch:DescribeAlarms", + ] + } + + statement { + sid = "AllowReadingLogsFromCloudWatch" + effect = "Allow" + resources = ["arn:${local.partition}:logs:${local.region}:${local.account_id}:log-group:*:log-stream:*"] + + actions = [ + "logs:DescribeLogGroups", + "logs:GetLogGroupFields", + "logs:StartQuery", + "logs:StopQuery", + "logs:GetQueryResults", + "logs:GetLogEvents", + ] + } + + statement { + sid = "AllowReadingTagsInstancesRegionsFromEC2" + effect = "Allow" + resources = ["*"] + + actions = [ + "ec2:DescribeTags", + "ec2:DescribeInstances", + "ec2:DescribeRegions", + ] + } + + statement { + sid = "AllowReadingResourcesForTags" + effect = "Allow" + resources = ["*"] + actions = ["tag:GetResources"] + } + + statement { + sid = "AllowListApsWorkspaces" + effect = "Allow" + resources = [ + "arn:${local.partition}:aps:${local.region}:${local.account_id}:/*", + "arn:${local.partition}:aps:${local.region}:${local.account_id}:workspace/*", + "arn:${local.partition}:aps:${local.region}:${local.account_id}:workspace/*/*", + ] + actions = [ + "aps:ListWorkspaces", + "aps:DescribeWorkspace", + "aps:GetMetricMetadata", + "aps:GetSeries", + "aps:QueryMetrics", + "aps:RemoteWrite", + "aps:GetLabels" + ] + } +} + +#------------------------------------------ +# Amazon Prometheus +#------------------------------------------ +locals { + amp_ingest_service_account = "amp-iamproxy-ingest-service-account" + amp_namespace = "kube-prometheus-stack" +} + +resource "aws_prometheus_workspace" "amp" { + count = var.enable_amazon_prometheus ? 1 : 0 + + alias = format("%s-%s", "amp-ws", local.name) + tags = local.tags +} + +module "amp_ingest_irsa" { + count = var.enable_amazon_prometheus ? 1 : 0 + + source = "aws-ia/eks-blueprints-addon/aws" + version = "~> 1.0" + create_release = false + create_role = true + create_policy = false + role_name = format("%s-%s", local.name, "amp-ingest") + role_policies = { amp_policy = aws_iam_policy.grafana[0].arn } + + oidc_providers = { + this = { + provider_arn = module.eks.oidc_provider_arn + namespace = local.amp_namespace + service_account = local.amp_ingest_service_account + } + } + + tags = local.tags +} diff --git a/streaming/spark-streaming/terraform/apps.tf b/streaming/spark-streaming/terraform/apps.tf new file mode 100644 index 000000000..dc437fa41 --- /dev/null +++ b/streaming/spark-streaming/terraform/apps.tf @@ -0,0 +1,76 @@ +# Resources needed to test producer and consumer applications +resource "aws_s3_bucket" "iceberg_data" { + bucket_prefix = "my-iceberg-data-bucket-" + + tags = { + Purpose = "Iceberg Data Storage" + } +} + + +resource "aws_iam_policy" "producer_s3_kafka" { + name = "producer_s3_kafka_policy" + description = "Policy for Producer access to S3 and Kafka" + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Effect = "Allow" + Action = [ + "s3:*", + "kafka:Describe*", + "kafka:List*", + "kafka:Get*", + "kafka:Create*", + "kafka:Delete*", + "kafka:Update*" + ] + Resource = "*" + } + ] + }) +} + +resource "aws_iam_policy" "consumer_s3_kafka" { + name = "consumer_s3_kafka_policy" + description = "Policy for Consumer access to S3 and Kafka" + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Effect = "Allow" + Action = [ + "s3:*", + "kafka:Describe*", + "kafka:List*", + "kafka:Get*", + "kafka:Create*", + "kafka:Delete*", + "kafka:Update*" + ] + Resource = "*" + } + ] + }) +} + +module "producer_iam_role" { + source = "terraform-aws-modules/iam/aws//modules/iam-role-for-service-accounts-eks" + + role_name = "producer-irsa" + + role_policy_arns = { + arn = aws_iam_policy.producer_s3_kafka.arn + } + + oidc_providers = { + main = { + provider_arn = module.eks.oidc_provider_arn + namespace_service_accounts = ["default:producer-sa"] + } + } +} + +# Consumer IAM role and Spark additional components are being managed by spark-team.tf diff --git a/streaming/spark-streaming/terraform/cleanup.sh b/streaming/spark-streaming/terraform/cleanup.sh new file mode 100755 index 000000000..1f357fac9 --- /dev/null +++ b/streaming/spark-streaming/terraform/cleanup.sh @@ -0,0 +1,33 @@ +#!/bin/bash +set -o errexit +set -o pipefail + +read -p "Enter the region: " region +export AWS_DEFAULT_REGION=$region + +targets=( + "module.eks_data_addons" + "module.eks_blueprints_addons" + "module.eks" +) + +for target in "${targets[@]}" +do + terraform destroy -target="$target" -auto-approve + destroy_output=$(terraform destroy -target="$target" -auto-approve 2>&1) + if [[ $? -eq 0 && $destroy_output == *"Destroy complete!"* ]]; then + echo "SUCCESS: Terraform destroy of $target completed successfully" + else + echo "FAILED: Terraform destroy of $target failed" + exit 1 + fi +done + +terraform destroy -auto-approve +destroy_output=$(terraform destroy -auto-approve 2>&1) +if [[ $? -eq 0 && $destroy_output == *"Destroy complete!"* ]]; then + echo "SUCCESS: Terraform destroy of all targets completed successfully" +else + echo "FAILED: Terraform destroy of all targets failed" + exit 1 +fi diff --git a/streaming/spark-streaming/terraform/eks.tf b/streaming/spark-streaming/terraform/eks.tf new file mode 100644 index 000000000..60149640e --- /dev/null +++ b/streaming/spark-streaming/terraform/eks.tf @@ -0,0 +1,203 @@ +#--------------------------------------------------------------- +# EKS Cluster +#--------------------------------------------------------------- +module "eks" { + source = "terraform-aws-modules/eks/aws" + version = "~> 19.15" + + cluster_name = local.name + cluster_version = var.eks_cluster_version + + #WARNING: Avoid using this option (cluster_endpoint_public_access = true) in preprod or prod accounts. This feature is designed for sandbox accounts, simplifying cluster deployment and testing. + cluster_endpoint_public_access = true + + vpc_id = module.vpc.vpc_id + # Filtering only Secondary CIDR private subnets starting with "100.". Subnet IDs where the EKS Control Plane ENIs will be created + subnet_ids = compact([for subnet_id, cidr_block in zipmap(module.vpc.private_subnets, module.vpc.private_subnets_cidr_blocks) : + substr(cidr_block, 0, 4) == "100." ? subnet_id : null] + ) + + manage_aws_auth_configmap = true + aws_auth_roles = [ + # We need to add in the Karpenter node IAM role for nodes launched by Karpenter + { + rolearn = module.eks_blueprints_addons.karpenter.node_iam_role_arn + username = "system:node:{{EC2PrivateDNSName}}" + groups = [ + "system:bootstrappers", + "system:nodes", + ] + } + ] + + #--------------------------------------- + # Note: This can further restricted to specific required for each Add-on and your application + #--------------------------------------- + # Extend cluster security group rules + cluster_security_group_additional_rules = { + ingress_nodes_ephemeral_ports_tcp = { + description = "Nodes on ephemeral ports" + protocol = "tcp" + from_port = 1025 + to_port = 65535 + type = "ingress" + source_node_security_group = true + } + } + + # Extend node-to-node security group rules + node_security_group_additional_rules = { + ingress_self_all = { + description = "Node to node all ports/protocols" + protocol = "-1" + from_port = 0 + to_port = 0 + type = "ingress" + self = true + } + # Allows Control Plane Nodes to talk to Worker nodes on all ports. Added this to simplify the example and further avoid issues with Add-ons communication with Control plane. + # This can be restricted further to specific port based on the requirement for each Add-on e.g., metrics-server 4443, spark-operator 8080, karpenter 8443 etc. + # Change this according to your security requirements if needed + ingress_cluster_to_node_all_traffic = { + description = "Cluster API to Nodegroup all traffic" + protocol = "-1" + from_port = 0 + to_port = 0 + type = "ingress" + source_cluster_security_group = true + } + } + + eks_managed_node_group_defaults = { + iam_role_additional_policies = { + # Not required, but used in the example to access the nodes to inspect mounted volumes + AmazonSSMManagedInstanceCore = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore" + } + + # NVMe instance store volumes are automatically enumerated and assigned a device + pre_bootstrap_user_data = <<-EOT + cat <<-EOF > /etc/profile.d/bootstrap.sh + #!/bin/sh + + # Configure the NVMe volumes in RAID0 configuration in the bootstrap.sh call. + # https://github.com/awslabs/amazon-eks-ami/blob/master/files/bootstrap.sh#L35 + # This will create a RAID volume and mount it at /mnt/k8s-disks/0 + # then mount that volume to /var/lib/kubelet, /var/lib/containerd, and /var/log/pods + # this allows the container daemons and pods to write to the RAID0 by default without needing PersistentVolumes + export LOCAL_DISKS='raid0' + EOF + + # Source extra environment variables in bootstrap script + sed -i '/^set -o errexit/a\\nsource /etc/profile.d/bootstrap.sh' /etc/eks/bootstrap.sh + EOT + + ebs_optimized = true + # This block device is used only for root volume. Adjust volume according to your size. + # NOTE: Don't use this volume for Spark workloads + block_device_mappings = { + xvda = { + device_name = "/dev/xvda" + ebs = { + volume_size = 100 + volume_type = "gp3" + } + } + } + } + + eks_managed_node_groups = { + # We recommend to have a MNG to place your critical workloads and add-ons + # Then rely on Karpenter to scale your workloads + # You can also make uses on nodeSelector and Taints/tolerations to spread workloads on MNG or Karpenter provisioners + core_node_group = { + name = "core-node-group" + description = "EKS managed node group example launch template" + # Filtering only Secondary CIDR private subnets starting with "100.". Subnet IDs where the nodes/node groups will be provisioned + subnet_ids = compact([for subnet_id, cidr_block in zipmap(module.vpc.private_subnets, module.vpc.private_subnets_cidr_blocks) : + substr(cidr_block, 0, 4) == "100." ? subnet_id : null] + ) + + min_size = 3 + max_size = 9 + desired_size = 3 + + instance_types = ["m5.xlarge"] + + labels = { + WorkerType = "ON_DEMAND" + NodeGroupType = "core" + } + + tags = { + Name = "core-node-grp", + "karpenter.sh/discovery" = local.name + } + } + + spark_ondemand_r5d = { + name = "spark-ondemand-r5d" + description = "Spark managed node group for Driver pods" + # Filtering only Secondary CIDR private subnets starting with "100.". Subnet IDs where the nodes/node groups will be provisioned + subnet_ids = [element(compact([for subnet_id, cidr_block in zipmap(module.vpc.private_subnets, module.vpc.private_subnets_cidr_blocks) : + substr(cidr_block, 0, 4) == "100." ? subnet_id : null]), 0) + ] + + min_size = 0 + max_size = 20 + desired_size = 0 + + instance_types = ["r5d.xlarge"] # r5d.xlarge 4vCPU - 32GB - 1 x 150 NVMe SSD - Up to 10Gbps - Up to 4,750 Mbps EBS Bandwidth + + labels = { + WorkerType = "ON_DEMAND" + NodeGroupType = "spark-on-demand-ca" + } + + taints = [{ + key = "spark-on-demand-ca", + value = true + effect = "NO_SCHEDULE" + }] + + tags = { + Name = "spark-ondemand-r5d" + WorkerType = "ON_DEMAND" + NodeGroupType = "spark-on-demand-ca" + } + } + + # ec2-instance-selector --vcpus=48 --gpus 0 -a arm64 --allow-list '.*d.*' + # This command will give you the list of the instances with similar vcpus for arm64 dense instances + spark_spot_x86_48cpu = { + name = "spark-spot-48cpu" + description = "Spark Spot node group for executor workloads" + # Filtering only Secondary CIDR private subnets starting with "100.". Subnet IDs where the nodes/node groups will be provisioned + subnet_ids = [element(compact([for subnet_id, cidr_block in zipmap(module.vpc.private_subnets, module.vpc.private_subnets_cidr_blocks) : + substr(cidr_block, 0, 4) == "100." ? subnet_id : null]), 0) + ] + + min_size = 0 + max_size = 12 + desired_size = 0 + + instance_types = ["r5d.12xlarge", "r6id.12xlarge", "c5ad.12xlarge", "c5d.12xlarge", "c6id.12xlarge", "m5ad.12xlarge", "m5d.12xlarge", "m6id.12xlarge"] # 48cpu - 2 x 1425 NVMe SSD + + labels = { + WorkerType = "SPOT" + NodeGroupType = "spark-spot-ca" + } + + taints = [{ + key = "spark-spot-ca" + value = true + effect = "NO_SCHEDULE" + }] + + tags = { + Name = "spark-node-grp" + WorkerType = "SPOT" + NodeGroupType = "spark" + } + } + } +} diff --git a/streaming/spark-streaming/terraform/helm-values/aws-cloudwatch-metrics-values.yaml b/streaming/spark-streaming/terraform/helm-values/aws-cloudwatch-metrics-values.yaml new file mode 100755 index 000000000..ae3c41d44 --- /dev/null +++ b/streaming/spark-streaming/terraform/helm-values/aws-cloudwatch-metrics-values.yaml @@ -0,0 +1,11 @@ +resources: + limits: + cpu: 500m + memory: 2Gi + requests: + cpu: 200m + memory: 1Gi + +# This toleration allows Daemonset pod to be scheduled on any node, regardless of their Taints. +tolerations: + - operator: Exists diff --git a/streaming/spark-streaming/terraform/helm-values/aws-for-fluentbit-values.yaml b/streaming/spark-streaming/terraform/helm-values/aws-for-fluentbit-values.yaml new file mode 100644 index 000000000..0bea5188d --- /dev/null +++ b/streaming/spark-streaming/terraform/helm-values/aws-for-fluentbit-values.yaml @@ -0,0 +1,102 @@ +global: + +#hostNetwork and dnsPolicy are critical for enabling large clusters to avoid making calls to API server +# see this link https://docs.fluentbit.io/manual/pipeline/filters/kubernetes#optional-feature-using-kubelet-to-get-metadata +hostNetwork: true +dnsPolicy: ClusterFirstWithHostNet + +service: + parsersFiles: + - /fluent-bit/parsers/parsers.conf + extraParsers: | + [PARSER] + Name kubernetes + Format regex + Regex ^(?[^_]+)\.(?.+)\.(?[a-z0-9](?:[-a-z0-9]*[a-z0-9])?(?:\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)\.(?[a-z0-9]{64})-$ + +input: + name: "tail" + enabled: true + tag: "systempods....-" + path: "/var/log/containers/*.log" + db: "/var/log/flb_kube.db" + memBufLimit: 5MB + skipLongLines: "On" + refreshInterval: 10 + extraInputs: | + multiline.parser docker, cri + Tag_Regex (?[a-z0-9](?:[-a-z0-9]*[a-z0-9])?(?:\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{64})\.log$ + + +# NOTE: extraFilters config for using Kubelet to get the Metadata instead of talking to API server for large clusters +filter: + name: "kubernetes" + match: "systempods.*" + kubeURL: "https://kubernetes.default.svc.cluster.local:443" + mergeLog: "On" + mergeLogKey: "log_processed" + keepLog: "On" + k8sLoggingParser: "On" + k8sLoggingExclude: "Off" + bufferSize: "0" + extraFilters: | + Kube_Tag_Prefix systempods. + Regex_Parser kubernetes + Labels On + Annotations Off + Use_Kubelet true + Kubelet_Port 10250 + Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token + +# CATION: Do not use `cloudwatch` plugin. This Golang Plugin is not recommended by AWS anymore instead use C plugin(`cloudWatchLogs`) for better performance. +# cloudWatch: +# enabled: false + +# This is a new high performance C Plugin for CloudWatchLogs. See docs here https://docs.fluentbit.io/manual/pipeline/outputs/cloudwatch +cloudWatchLogs: + enabled: true + match: "systempods.*" + region: ${region} + logGroupName: ${cloudwatch_log_group} + autoCreateGroup: false + extraOutputs: | + log_key log + +#----------------------------------------------------------# +# OUTPUT logs to S3 +#----------------------------------------------------------# + +# This is an example for writing logs to S3 bucket. +# This example writes system pod logs and spark logs into dedicated prefix. +# This second output is using the rewrite_tag filter commented above + +additionalOutputs: | + [OUTPUT] + Name s3 + Match systempods.* + region ${region} + bucket ${s3_bucket_name} + total_file_size 100M + s3_key_format /${cluster_name}/system-pod-logs/$TAG[1]/$TAG[2]/$TAG[3]/$TAG[3]_%H%M%S_$UUID.log + s3_key_format_tag_delimiters .. + store_dir /home/ec2-user/buffer + upload_timeout 10m + log_key log + + +# Resource config for large clusters +resources: + limits: + cpu: 1000m + memory: 1500Mi + requests: + cpu: 500m + memory: 500Mi + +## Assign a PriorityClassName to pods if set +priorityClassName: system-node-critical + +# This toleration allows Daemonset pod to be scheduled on any node, regardless of their Taints. +tolerations: + - operator: Exists diff --git a/streaming/spark-streaming/terraform/helm-values/cluster-autoscaler-values.yaml b/streaming/spark-streaming/terraform/helm-values/cluster-autoscaler-values.yaml new file mode 100644 index 000000000..5a42794f2 --- /dev/null +++ b/streaming/spark-streaming/terraform/helm-values/cluster-autoscaler-values.yaml @@ -0,0 +1,25 @@ +autoDiscovery: + clusterName: ${eks_cluster_id} + +awsRegion: ${aws_region} + +cloudProvider: aws + +extraArgs: + aws-use-static-instance-list: true + +# Best practice to update the resource requests and limits for each add-on +resources: + limits: + cpu: 1000m + memory: 1G + requests: + cpu: 200m + memory: 512Mi + +# Best practice to updateStrategy for each add-on +updateStrategy: + type: RollingUpdate + rollingUpdate: + maxSurge: 0 + maxUnavailable: 1 diff --git a/streaming/spark-streaming/terraform/helm-values/coredns-autoscaler-values.yaml b/streaming/spark-streaming/terraform/helm-values/coredns-autoscaler-values.yaml new file mode 100644 index 000000000..64cb540bf --- /dev/null +++ b/streaming/spark-streaming/terraform/helm-values/coredns-autoscaler-values.yaml @@ -0,0 +1,40 @@ +nameOverride: kube-dns-autoscaler + +# Formula for controlling the replicas. Adjust according to your needs +# replicas = max( ceil( cores * 1/coresPerReplica ) , ceil( nodes * 1/nodesPerReplica ) ) +# replicas = min(replicas, max) +# replicas = max(replicas, min) +config: + linear: + coresPerReplica: 256 + nodesPerReplica: 16 + min: 1 + max: 100 + preventSinglePointFailure: true + includeUnschedulableNodes: true + +# Target to scale. In format: deployment/*, replicationcontroller/* or replicaset/* (not case sensitive). +options: + target: ${target} + +serviceAccount: + create: true + name: kube-dns-autoscaler + +podSecurityContext: + seccompProfile: + type: RuntimeDefault + supplementalGroups: [ 65534 ] + fsGroup: 65534 + +resources: + limits: + cpu: 100m + memory: 128Mi + requests: + cpu: 100m + memory: 128Mi + +tolerations: + - key: "CriticalAddonsOnly" + operator: "Exists" diff --git a/streaming/spark-streaming/terraform/helm-values/kube-prometheus-amp-enable.yaml b/streaming/spark-streaming/terraform/helm-values/kube-prometheus-amp-enable.yaml new file mode 100644 index 000000000..522065c6c --- /dev/null +++ b/streaming/spark-streaming/terraform/helm-values/kube-prometheus-amp-enable.yaml @@ -0,0 +1,76 @@ +prometheus: + serviceAccount: + create: true + name: ${amp_sa} + annotations: + eks.amazonaws.com/role-arn: ${amp_irsa} + prometheusSpec: + remoteWrite: + - url: ${amp_remotewrite_url} + sigv4: + region: ${region} + queueConfig: + maxSamplesPerSend: 1000 + maxShards: 200 + capacity: 2500 + retention: 5h + scrapeInterval: 30s + evaluationInterval: 30s + scrapeTimeout: 10s + storageSpec: + volumeClaimTemplate: + metadata: + name: data + spec: + storageClassName: gp2 + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 50Gi + # Scrape Cost metrics for Karpenter and Yunikorn add-ons + additionalScrapeConfigs: + - job_name: yunikorn + honor_labels: true + scrape_interval: 15s + scrape_timeout: 10s + metrics_path: /ws/v1//metrics + scheme: http + dns_sd_configs: + - names: + - yunikorn-service.yunikorn.svc + type: 'A' + port: 9080 + - job_name: karpenter + kubernetes_sd_configs: + - role: endpoints + namespaces: + names: + - karpenter + relabel_configs: + - source_labels: [__meta_kubernetes_endpoint_port_name] + regex: http-metrics + action: keep + +alertmanager: + enabled: false + +grafana: + enabled: true + defaultDashboardsEnabled: true +# Adding AMP datasource to Grafana config + serviceAccount: + create: false + name: ${amp_sa} + grafana.ini: + auth: + sigv4_auth_enabled: true + additionalDataSources: + - name: AMP + editable: true + jsonData: + sigV4Auth: true + sigV4Region: ${region} + type: prometheus + isDefault: false + url: ${amp_url} diff --git a/streaming/spark-streaming/terraform/helm-values/kube-prometheus.yaml b/streaming/spark-streaming/terraform/helm-values/kube-prometheus.yaml new file mode 100644 index 000000000..ccb286bbb --- /dev/null +++ b/streaming/spark-streaming/terraform/helm-values/kube-prometheus.yaml @@ -0,0 +1,47 @@ +prometheus: + prometheusSpec: + retention: 5h + scrapeInterval: 30s + evaluationInterval: 30s + scrapeTimeout: 10s + storageSpec: + volumeClaimTemplate: + metadata: + name: data + spec: + storageClassName: gp2 + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 50Gi + # Scrape Cost metrics for Karpenter and Yunikorn add-ons + additionalScrapeConfigs: + - job_name: yunikorn + honor_labels: true + scrape_interval: 15s + scrape_timeout: 10s + metrics_path: /ws/v1//metrics + scheme: http + dns_sd_configs: + - names: + - yunikorn-service.yunikorn.svc + type: 'A' + port: 9080 + - job_name: karpenter + kubernetes_sd_configs: + - role: endpoints + namespaces: + names: + - karpenter + relabel_configs: + - source_labels: [__meta_kubernetes_endpoint_port_name] + regex: http-metrics + action: keep + +alertmanager: + enabled: false + +grafana: + enabled: true + defaultDashboardsEnabled: true diff --git a/streaming/spark-streaming/terraform/helm-values/kubecost-values.yaml b/streaming/spark-streaming/terraform/helm-values/kubecost-values.yaml new file mode 100644 index 000000000..f781ec5ce --- /dev/null +++ b/streaming/spark-streaming/terraform/helm-values/kubecost-values.yaml @@ -0,0 +1,62 @@ + +# KubeCost WebUI -> kubectl port-forward --namespace kubecost deployment/kubecost-cost-analyzer 9090 + +global: + # pricingCsv: + # enabled: false + # location: + # provider: "AWS" + # region: "us-east-1" + # URI: s3://kc-csv-test/pricing_schema.csv # a valid file URI + # csvAccessCredentials: pricing-schema-access-secret + + # This Prometheus setup is reusing the existing Prometheus deployment + # Check for more docs under https://guide.kubecost.com/hc/en-us/articles/4407595941015 + prometheus: + fqdn: http://kube-prometheus-stack-prometheus.kube-prometheus-stack.svc:9090 + enabled: false + +# If you have node-exporter and/or KSM running on your cluster, follow this step to disable the Kubecost included versions. +prometheus: + nodeExporter: + enabled: false + serviceAccounts: + nodeExporter: + create: false + kubeStateMetrics: + enabled: false + +#imageVersion: prod-1.96.0 # commented to use the latest + +kubecostFrontend: + image: public.ecr.aws/kubecost/frontend + resources: + requests: + cpu: "200m" + memory: "512Mi" + +kubecostMetrics: + emitPodAnnotations: true + emitNamespaceAnnotations: true + +kubecostModel: + image: public.ecr.aws/kubecost/cost-model + resources: + requests: + cpu: "500m" + memory: "512Mi" + +# Set this to false if you're bringing your own service account. +#serviceAccount: +# create: false +# name: kubecost-cost-analyzer +# annotations: +# eks.amazonaws.com/role-arn: + +# Define persistence volume for cost-analyzer +persistentVolume: + size: 32Gi + dbSize: 32.0Gi + enabled: true # Note that setting this to false means configurations will be wiped out on pod restart. + storageClass: gp2 + # existingClaim: kubecost-cost-analyzer # a claim in the same namespace as kubecost diff --git a/streaming/spark-streaming/terraform/helm-values/metrics-server-values.yaml b/streaming/spark-streaming/terraform/helm-values/metrics-server-values.yaml new file mode 100644 index 000000000..bc806ced6 --- /dev/null +++ b/streaming/spark-streaming/terraform/helm-values/metrics-server-values.yaml @@ -0,0 +1,52 @@ +# HA config for metrics-server +image: + repository: registry.k8s.io/metrics-server/metrics-server + pullPolicy: IfNotPresent + +serviceAccount: + create: true + name: metrics-server + +rbac: + create: true + pspEnabled: false + +apiService: + create: true + +podLabels: + k8s-app: metrics-server + +# HA enabled by enabling replicas to 2, updateStrategy and podDisruptionBudget to true +replicas: 2 + +updateStrategy: + type: RollingUpdate + rollingUpdate: + maxSurge: 0 + maxUnavailable: 1 + +podDisruptionBudget: + enabled: true + minAvailable: 1 + +defaultArgs: + - --cert-dir=/tmp + - --kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname + - --kubelet-use-node-status-port + - --metric-resolution=15s + +resources: + requests: + cpu: 200m + memory: 512Mi + +affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchLabels: + k8s-app: metrics-server + namespaces: + - kube-system + topologyKey: kubernetes.io/hostname diff --git a/streaming/spark-streaming/terraform/helm-values/nginx-values.yaml b/streaming/spark-streaming/terraform/helm-values/nginx-values.yaml new file mode 100644 index 000000000..c129611fa --- /dev/null +++ b/streaming/spark-streaming/terraform/helm-values/nginx-values.yaml @@ -0,0 +1,37 @@ +controller: + service: + # For more annotations https://kubernetes-sigs.github.io/aws-load-balancer-controller/v2.4/guide/service/annotations/ + annotations: + service.beta.kubernetes.io/aws-load-balancer-ip-address-type: ipv4 +# service.beta.kubernetes.io/aws-load-balancer-scheme: internal # PRIVATE NLB + service.beta.kubernetes.io/aws-load-balancer-scheme: internal # Private Load Balancer can only be accessed within the VPC # PUBLIC NLB + service.beta.kubernetes.io/aws-load-balancer-type: external + service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip +# service.beta.kubernetes.io/aws-load-balancer-proxy-protocol: "*" # DONT USE THIS EVER WHEN YOU USE IP based routing + service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp + service.beta.kubernetes.io/aws-load-balancer-cross-zone-load-balancing-enabled: 'true' + + +#------------------------------------ +# FUTURE WORK TO ENABLE ROUTE53, ACM +#------------------------------------ +# external-dns.alpha.kubernetes.io/hostname: kubernetes-example.com. +# AWS route53-mapper +#controller: +# service: +# labels: +# dns: "route53" +# annotations: +# domainName: "kubernetes-example.com" + +# AWS L7 ELB with SSL Termination +#controller: +# service: +# targetPorts: +# http: http +# https: http +# annotations: +# service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:XX-XXXX-X:XXXXXXXXX:certificate/XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXX +# service.beta.kubernetes.io/aws-load-balancer-backend-protocol: "http" +# service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "https" +# service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout: '3600' diff --git a/streaming/spark-streaming/terraform/helm-values/spark-operator-values.yaml b/streaming/spark-streaming/terraform/helm-values/spark-operator-values.yaml new file mode 100644 index 000000000..be0aa86c1 --- /dev/null +++ b/streaming/spark-streaming/terraform/helm-values/spark-operator-values.yaml @@ -0,0 +1,42 @@ +replicaCount: 1 + +webhook: + # -- Enable webhook server + enable: true + # -- Webhook service port + port: 8080 + +# -- Set this if running spark jobs in a different namespace than the operator +#sparkJobNamespace: "spark-team-a" + +# -- Operator concurrency, higher values might increase memory usage +controllerThreads: 10 + +# resources -- Pod resource requests and limits +# Note, that each job submission will spawn a JVM within the Spark Operator Pod using "/usr/local/openjdk-11/bin/java -Xmx128m". +# Kubernetes may kill these Java processes at will to enforce resource limits. When that happens, you will see the following error: +# 'failed to run spark-submit for SparkApplication [...]: signal: killed' - when this happens, you may want to increase memory limits. +resources: + limits: + cpu: 200m + memory: 1Gi + requests: + cpu: 100m + memory: 512Mi + +batchScheduler: + # -- Enable batch scheduler for spark jobs scheduling. If enabled, users can specify batch scheduler name in spark application + enable: true + +#------------------------------------ +# THIS WILL CREATE SERVICE AND INGRESS OBJECT FOR EACH SPARK APPLICATION +#------------------------------------ +#uiService: +## # -- Enable UI service creation for Spark application +# enable: true +### -- Ingress URL format. +### Requires the UI service to be enabled by setting `uiService.enable` to true. +### 1/ Enable ingressUrlFormat to create an Ingress object for each Spark Job submitted using Spark Operator +### 2/ This setup also requires ingres-nginx to be deployed with NLB as LB with IP based routing. +### 3. Enter the NLB DNS name or enter Custom Domain name from route53 below which points to the NLB +#ingressUrlFormat: '/{{$appName}}' diff --git a/streaming/spark-streaming/terraform/helm-values/yunikorn-values.yaml b/streaming/spark-streaming/terraform/helm-values/yunikorn-values.yaml new file mode 100644 index 000000000..079653a85 --- /dev/null +++ b/streaming/spark-streaming/terraform/helm-values/yunikorn-values.yaml @@ -0,0 +1,148 @@ + + +imagePullSecrets: +serviceAccount: yunikorn-admin + +image: + repository: apache/yunikorn + tag: scheduler-${image_version} + pullPolicy: Always + +pluginImage: + repository: apache/yunikorn + tag: scheduler-plugin-${image_version} + pullPolicy: Always + +nodeSelector: {} +tolerations: [] +affinity: {} + +configuration: null # deprecated; use queues.yaml in yunikornDefaults +operatorPlugins: null # deprecated; use service.operatorPlugins in yunikornDefaults +placeHolderImage: null # deprecated; use service.placeholderImage in yunikornDefaults + +admissionController: + image: + repository: apache/yunikorn + tag: admission-${image_version} + pullPolicy: Always + replicaCount: 1 + serviceAccount: yunikorn-admission-controller + hostNetwork: true + resources: + requests: + cpu: 200m + memory: 500Mi + limits: + cpu: 500m + memory: 500Mi + nodeSelector: {} + tolerations: [] + affinity: {} + service: + type: ClusterIP + processNamespaces: null # deprecated; use admissionController.processNamespaces in yunikornDefaults + bypassNamespaces: null # deprecated; use admissionController.bypassNamespaces in yunikornDefaults + labelNamespaces: null # deprecated; use admissionController.labelNamespaces in yunikornDefaults + noLabelNamespaces: null # deprecated; use admissionController.noLabelNamespaces in yunikornDefaults + +web: + image: + repository: apache/yunikorn + tag: web-${image_version} + pullPolicy: Always + resources: + requests: + memory: 500Mi + cpu: 500m + limits: + memory: 500Mi + cpu: 500m + +service: + type: ClusterIP + port: 9080 + portWeb: 9889 + +ingress: + enabled: false + ingressClassName: "" + annotations: {} + hosts: + - host: chart-example.local + paths: [] + pathType: Prefix + tls: [] + +resources: + requests: + cpu: 400m + memory: 2Gi + limits: + cpu: 4 + memory: 2Gi + + + +# When this flag is true, the admission controller will be installed along with the scheduler. +# When this flag is false, the admission controller will not be installed. +# Once the admission controller is installed, all traffic will be routing to yunikorn. +embedAdmissionController: true + +# When this flag is true, the scheduler will be deployed as Kubernetes scheduler plugin. +# When this flag is false, the scheduler will be deployed as a standalone scheduler. +enableSchedulerPlugin: false + + +# Bootstrap configuration for YuniKorn - will be rendered into yunikorn-defaults ConfigMap. +# Any valid options for YuniKorn may be specified here. +# Use this link for more values -> https://yunikorn.apache.org/docs/user_guide/service_config/#yunikorn-configuration +yunikornDefaults: + # The default volume bind timeout value of 10 seconds may be too short for EBS. + service.volumeBindTimeout: "60s" + service.placeholderImage: registry.k8s.io/pause:3.7 + service.operatorPlugins: "general,spark-k8s-operator" + admissionController.filtering.bypassNamespaces: "^kube-system$" + # Use this configuration to configure absolute capacities for yunikorn queues + # The Default partition uses BinPacking on Nodes by default https://yunikorn.apache.org/docs/next/user_guide/sorting_policies/#node-sorting + queues.yaml: | + partitions: + - name: default + nodesortpolicy: + type: binpacking + queues: + - name: root + submitacl: '*' + queues: + - name: default + resources: + guaranteed: + memory: 100G + vcore: 10 + max: + memory: 100G + vcore: 10 + - name: prod + resources: + guaranteed: + memory: 500G + vcore: 50 + max: + memory: 800G + vcore: 300 + - name: test + resources: + guaranteed: + memory: 100G + vcore: 10 + max: + memory: 800G + vcore: 50 + - name: dev + resources: + guaranteed: + memory: 100G + vcore: 10 + max: + memory: 100G + vcore: 10 diff --git a/streaming/spark-streaming/terraform/install.sh b/streaming/spark-streaming/terraform/install.sh new file mode 100755 index 000000000..18f2a94d3 --- /dev/null +++ b/streaming/spark-streaming/terraform/install.sh @@ -0,0 +1,36 @@ +#!/bin/bash + +read -p "Enter the region: " region +export AWS_DEFAULT_REGION=$region + +# List of Terraform modules to apply in sequence +targets=( + "module.vpc" + "module.eks" +) + +# Initialize Terraform +terraform init -upgrade + +# Apply modules in sequence +for target in "${targets[@]}" +do + echo "Applying module $target..." + apply_output=$(terraform apply -target="$target" -var="region=$region" -auto-approve 2>&1 | tee /dev/tty) + if [[ ${PIPESTATUS[0]} -eq 0 && $apply_output == *"Apply complete"* ]]; then + echo "SUCCESS: Terraform apply of $target completed successfully" + else + echo "FAILED: Terraform apply of $target failed" + exit 1 + fi +done + +# Final apply to catch any remaining resources +echo "Applying remaining resources..." +apply_output=$(terraform apply -var="region=$region" -auto-approve 2>&1 | tee /dev/tty) +if [[ ${PIPESTATUS[0]} -eq 0 && $apply_output == *"Apply complete"* ]]; then + echo "SUCCESS: Terraform apply of all modules completed successfully" +else + echo "FAILED: Terraform apply of all modules failed" + exit 1 +fi diff --git a/streaming/spark-streaming/terraform/main.tf b/streaming/spark-streaming/terraform/main.tf new file mode 100755 index 000000000..fe87743f6 --- /dev/null +++ b/streaming/spark-streaming/terraform/main.tf @@ -0,0 +1,22 @@ +locals { + name = var.name + region = var.region + azs = slice(data.aws_availability_zones.available.names, 0, 2) + + account_id = data.aws_caller_identity.current.account_id + partition = data.aws_partition.current.partition + + tags = { + Blueprint = local.name + GithubRepo = "github.com/awslabs/data-on-eks" + } +} + +data "aws_ecrpublic_authorization_token" "token" { + provider = aws.ecr +} + +data "aws_availability_zones" "available" {} +data "aws_region" "current" {} +data "aws_caller_identity" "current" {} +data "aws_partition" "current" {} diff --git a/streaming/spark-streaming/terraform/msk.tf b/streaming/spark-streaming/terraform/msk.tf new file mode 100644 index 000000000..fb9a46ab2 --- /dev/null +++ b/streaming/spark-streaming/terraform/msk.tf @@ -0,0 +1,98 @@ +resource "aws_security_group" "msk_security_group" { + name = "msk-security-group" + description = "Security group for MSK cluster" + vpc_id = module.vpc.vpc_id + + ingress { + from_port = 9092 + to_port = 9092 + protocol = "tcp" + cidr_blocks = [var.vpc_cidr, element(var.secondary_cidr_blocks, 0)] + } + ingress { + from_port = 9198 + to_port = 9198 + protocol = "tcp" + cidr_blocks = [var.vpc_cidr, element(var.secondary_cidr_blocks, 0)] + } + ingress { + from_port = 9094 + to_port = 9094 + protocol = "tcp" + cidr_blocks = [var.vpc_cidr, element(var.secondary_cidr_blocks, 0)] + } + ingress { + from_port = 9096 + to_port = 9096 + protocol = "tcp" + cidr_blocks = [var.vpc_cidr, element(var.secondary_cidr_blocks, 0)] + } + ingress { + from_port = 9098 + to_port = 9098 + protocol = "tcp" + cidr_blocks = [var.vpc_cidr, element(var.secondary_cidr_blocks, 0)] + } + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } +} + +# Security is not enable here only for demo purposes, you should enable auth mechanisms +resource "aws_msk_cluster" "kafka_test_demo" { + cluster_name = "kafka-demo-spark" + kafka_version = "2.7.1" + number_of_broker_nodes = 2 + + broker_node_group_info { + instance_type = "kafka.m5.large" + client_subnets = [module.vpc.private_subnets[0], module.vpc.private_subnets[1]] + storage_info { + ebs_storage_info { + volume_size = 1000 + } + } + connectivity_info { + public_access { + type = "DISABLED" + } + } + security_groups = [aws_security_group.msk_security_group.id] + } + + encryption_info { + encryption_in_transit { + client_broker = "PLAINTEXT" + } + } + + client_authentication { + sasl { + iam = false + } + + unauthenticated = true + } + + open_monitoring { + prometheus { + jmx_exporter { + enabled_in_broker = true + } + + node_exporter { + enabled_in_broker = true + } + } + } + + lifecycle { + ignore_changes = [ + client_authentication[0].tls + ] + } +} + diff --git a/streaming/spark-streaming/terraform/outputs.tf b/streaming/spark-streaming/terraform/outputs.tf new file mode 100644 index 000000000..49dbcc132 --- /dev/null +++ b/streaming/spark-streaming/terraform/outputs.tf @@ -0,0 +1,70 @@ +################################################################################ +# Cluster +################################################################################ + +output "cluster_arn" { + description = "The Amazon Resource Name (ARN) of the cluster" + value = module.eks.cluster_arn +} + +output "cluster_name" { + description = "The Amazon Resource Name (ARN) of the cluster" + value = var.name +} + +output "configure_kubectl" { + description = "Configure kubectl: make sure you're logged in with the correct AWS profile and run the following command to update your kubeconfig" + value = "aws eks --region ${local.region} update-kubeconfig --name ${module.eks.cluster_name}" +} + +################################################################################ +# Private Subnets +################################################################################ + +output "subnet_ids_starting_with_100" { + description = "Secondary CIDR Private Subnet IDs for EKS Data Plane" + value = compact([for subnet_id, cidr_block in zipmap(module.vpc.private_subnets, module.vpc.private_subnets_cidr_blocks) : substr(cidr_block, 0, 4) == "100." ? subnet_id : null]) +} + +output "s3_bucket_id_spark_history_server" { + description = "Spark History server logs S3 bucket ID" + value = module.s3_bucket.s3_bucket_id +} + +output "s3_bucket_region_spark_history_server" { + description = "Spark History server logs S3 bucket ID" + value = module.s3_bucket.s3_bucket_region +} + +output "grafana_secret_name" { + description = "Grafana password secret name" + value = aws_secretsmanager_secret.grafana.name +} + +################################################################################ +# MSK Outputs +################################################################################ + +output "bootstrap_brokers" { + description = "Bootstrap brokers for the MSK cluster" + value = aws_msk_cluster.kafka_test_demo.bootstrap_brokers +} + +################################################################################ +# Application Related Outputs +################################################################################ + +output "s3_bucket_id_iceberg_bucket" { + description = "Spark History server logs S3 bucket ID" + value = aws_s3_bucket.iceberg_data.id +} + +output "producer_iam_role_arn" { + description = "IAM role ARN for the producer" + value = module.producer_iam_role.iam_role_arn +} + +output "consumer_iam_role_arn" { + description = "IAM role ARN for the consumer" + value = module.spark_team_a_irsa.iam_role_arn +} diff --git a/streaming/spark-streaming/terraform/providers.tf b/streaming/spark-streaming/terraform/providers.tf new file mode 100644 index 000000000..c70906f02 --- /dev/null +++ b/streaming/spark-streaming/terraform/providers.tf @@ -0,0 +1,50 @@ +provider "aws" { + region = local.region +} + +# ECR always authenticates with `us-east-1` region +# Docs -> https://docs.aws.amazon.com/AmazonECR/latest/public/public-registries.html +provider "aws" { + alias = "ecr" + region = "us-east-1" +} + +provider "kubernetes" { + host = module.eks.cluster_endpoint + cluster_ca_certificate = base64decode(module.eks.cluster_certificate_authority_data) + + exec { + api_version = "client.authentication.k8s.io/v1beta1" + command = "aws" + # This requires the awscli to be installed locally where Terraform is executed + args = ["eks", "get-token", "--cluster-name", module.eks.cluster_name] + } +} + +provider "helm" { + kubernetes { + host = module.eks.cluster_endpoint + cluster_ca_certificate = base64decode(module.eks.cluster_certificate_authority_data) + + exec { + api_version = "client.authentication.k8s.io/v1beta1" + command = "aws" + # This requires the awscli to be installed locally where Terraform is executed + args = ["eks", "get-token", "--cluster-name", module.eks.cluster_name] + } + } +} + +provider "kubectl" { + apply_retry_count = 5 + host = module.eks.cluster_endpoint + cluster_ca_certificate = base64decode(module.eks.cluster_certificate_authority_data) + load_config_file = false + + exec { + api_version = "client.authentication.k8s.io/v1beta1" + command = "aws" + # This requires the awscli to be installed locally where Terraform is executed + args = ["eks", "get-token", "--cluster-name", module.eks.cluster_name] + } +} diff --git a/streaming/spark-streaming/terraform/spark-team.tf b/streaming/spark-streaming/terraform/spark-team.tf new file mode 100644 index 000000000..b77e35aab --- /dev/null +++ b/streaming/spark-streaming/terraform/spark-team.tf @@ -0,0 +1,142 @@ +locals { + spark_team = "spark-team-a" +} + +resource "kubernetes_namespace_v1" "spark_team_a" { + metadata { + name = local.spark_team + } + timeouts { + delete = "15m" + } +} + +resource "kubernetes_service_account_v1" "spark_team_a" { + metadata { + name = local.spark_team + namespace = kubernetes_namespace_v1.spark_team_a.metadata[0].name + annotations = { "eks.amazonaws.com/role-arn" : module.spark_team_a_irsa.iam_role_arn } + } + + automount_service_account_token = true +} + +resource "kubernetes_secret_v1" "spark_team_a" { + metadata { + name = "${local.spark_team}-secret" + namespace = kubernetes_namespace_v1.spark_team_a.metadata[0].name + annotations = { + "kubernetes.io/service-account.name" = kubernetes_service_account_v1.spark_team_a.metadata[0].name + "kubernetes.io/service-account.namespace" = kubernetes_namespace_v1.spark_team_a.metadata[0].name + } + } + + type = "kubernetes.io/service-account-token" +} + +#--------------------------------------------------------------- +# IRSA for Spark driver/executor pods for "spark-team-a" +#--------------------------------------------------------------- +module "spark_team_a_irsa" { + source = "aws-ia/eks-blueprints-addon/aws" + version = "~> 1.0" + + # Disable helm release + create_release = false + + # IAM role for service account (IRSA) + create_role = true + role_name = "${local.name}-${local.spark_team}" + create_policy = false + role_policies = { + consumer_s3_kafka_policy = aws_iam_policy.consumer_s3_kafka.arn + } + + oidc_providers = { + this = { + provider_arn = module.eks.oidc_provider_arn + namespace = local.spark_team + service_account = local.spark_team + } + } +} + +#--------------------------------------------------------------- +# Kubernetes Cluster role for service Account analytics-k8s-data-team-a +#--------------------------------------------------------------- +resource "kubernetes_cluster_role" "spark_role" { + metadata { + name = "spark-cluster-role" + } + + rule { + verbs = ["get", "list", "watch"] + api_groups = [""] + resources = ["namespaces", "nodes", "persistentvolumes"] + } + + rule { + verbs = ["list", "watch"] + api_groups = ["storage.k8s.io"] + resources = ["storageclasses"] + } + rule { + verbs = ["get", "list", "watch", "describe", "create", "edit", "delete", "deletecollection", "annotate", "patch", "label"] + api_groups = [""] + resources = ["serviceaccounts", "services", "configmaps", "events", "pods", "pods/log", "persistentvolumeclaims"] + } + + rule { + verbs = ["create", "patch", "delete", "watch"] + api_groups = [""] + resources = ["secrets"] + } + + rule { + verbs = ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"] + api_groups = ["apps"] + resources = ["statefulsets", "deployments"] + } + + rule { + verbs = ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"] + api_groups = ["batch", "extensions"] + resources = ["jobs"] + } + + rule { + verbs = ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"] + api_groups = ["extensions"] + resources = ["ingresses"] + } + + rule { + verbs = ["get", "list", "watch", "describe", "create", "edit", "delete", "deletecollection", "annotate", "patch", "label"] + api_groups = ["rbac.authorization.k8s.io"] + resources = ["roles", "rolebindings"] + } + + depends_on = [module.spark_team_a_irsa] +} +#--------------------------------------------------------------- +# Kubernetes Cluster Role binding role for service Account analytics-k8s-data-team-a +#--------------------------------------------------------------- +resource "kubernetes_cluster_role_binding" "spark_role_binding" { + metadata { + name = "spark-cluster-role-bind" + } + + subject { + kind = "ServiceAccount" + name = local.spark_team + namespace = local.spark_team + } + + role_ref { + api_group = "rbac.authorization.k8s.io" + kind = "ClusterRole" + name = kubernetes_cluster_role.spark_role.id + } + + depends_on = [module.spark_team_a_irsa] +} diff --git a/streaming/spark-streaming/terraform/variables.tf b/streaming/spark-streaming/terraform/variables.tf new file mode 100644 index 000000000..23d4c3b6f --- /dev/null +++ b/streaming/spark-streaming/terraform/variables.tf @@ -0,0 +1,73 @@ +variable "name" { + description = "Name of the VPC and EKS Cluster" + default = "spark-streaming-doeks" + type = string +} + +variable "region" { + description = "Region" + type = string + default = "us-west-2" +} + +variable "eks_cluster_version" { + description = "EKS Cluster version" + default = "1.29" + type = string +} + +# VPC +variable "vpc_cidr" { + description = "VPC CIDR. This should be a valid private (RFC 1918) CIDR range" + default = "10.1.0.0/16" + type = string +} + +# Routable Public subnets with NAT Gateway and Internet Gateway. Not required for fully private clusters +variable "public_subnets" { + description = "Public Subnets CIDRs. 62 IPs per Subnet/AZ" + default = ["10.1.0.0/26", "10.1.0.64/26"] + type = list(string) +} + +# Routable Private subnets only for Private NAT Gateway -> Transit Gateway -> Second VPC for overlapping overlapping CIDRs +variable "private_subnets" { + description = "Private Subnets CIDRs. 254 IPs per Subnet/AZ for Private NAT + NLB + Airflow + EC2 Jumphost etc." + default = ["10.1.1.0/24", "10.1.2.0/24"] + type = list(string) +} + +# RFC6598 range 100.64.0.0/10 +# Note you can only /16 range to VPC. You can add multiples of /16 if required +variable "secondary_cidr_blocks" { + description = "Secondary CIDR blocks to be attached to VPC" + default = ["100.64.0.0/16"] + type = list(string) +} + +# EKS Worker nodes and pods will be placed on these subnets. Each Private subnet can get 32766 IPs. +# RFC6598 range 100.64.0.0/10 +variable "eks_data_plane_subnet_secondary_cidr" { + description = "Secondary CIDR blocks. 32766 IPs per Subnet per Subnet/AZ for EKS Node and Pods" + default = ["100.64.0.0/17", "100.64.128.0/17"] + type = list(string) +} + +# Enable this for fully private clusters +variable "enable_vpc_endpoints" { + description = "Enable VPC Endpoints" + default = false + type = bool +} + +variable "enable_amazon_prometheus" { + description = "Enable AWS Managed Prometheus service" + type = bool + default = true +} + +variable "enable_yunikorn" { + default = false + description = "Enable Apache YuniKorn Scheduler" # There is no need for Yunikorn in this implementation + type = bool +} diff --git a/streaming/spark-streaming/terraform/versions.tf b/streaming/spark-streaming/terraform/versions.tf new file mode 100644 index 000000000..639ff3cbb --- /dev/null +++ b/streaming/spark-streaming/terraform/versions.tf @@ -0,0 +1,33 @@ +terraform { + required_version = ">= 1.0.0" + + required_providers { + aws = { + source = "hashicorp/aws" + version = ">= 3.72" + } + kubernetes = { + source = "hashicorp/kubernetes" + version = ">= 2.10" + } + helm = { + source = "hashicorp/helm" + version = ">= 2.4.1" + } + kubectl = { + source = "gavinbunney/kubectl" + version = ">= 1.14" + } + random = { + source = "hashicorp/random" + version = "3.3.2" + } + } + + # ## Used for end-to-end testing on project; update to suit your needs + # backend "s3" { + # bucket = "doeks-github-actions-e2e-test-state" + # region = "us-west-2" + # key = "e2e/spark-streaming-doeks/terraform.tfstate" + # } +} diff --git a/streaming/spark-streaming/terraform/vpc.tf b/streaming/spark-streaming/terraform/vpc.tf new file mode 100644 index 000000000..21762f37c --- /dev/null +++ b/streaming/spark-streaming/terraform/vpc.tf @@ -0,0 +1,102 @@ +#--------------------------------------------------------------- +# Supporting Network Resources +#--------------------------------------------------------------- +# WARNING: This VPC module includes the creation of an Internet Gateway and NAT Gateway, which simplifies cluster deployment and testing, primarily intended for sandbox accounts. +# IMPORTANT: For preprod and prod use cases, it is crucial to consult with your security team and AWS architects to design a private infrastructure solution that aligns with your security requirements + +module "vpc" { + source = "terraform-aws-modules/vpc/aws" + version = "~> 5.0" + + name = local.name + cidr = var.vpc_cidr + azs = local.azs + + # Secondary CIDR block attached to VPC for EKS Control Plane ENI + Nodes + Pods + secondary_cidr_blocks = var.secondary_cidr_blocks + + # 1/ EKS Data Plane secondary CIDR blocks for two subnets across two AZs for EKS Control Plane ENI + Nodes + Pods + # 2/ Two private Subnets with RFC1918 private IPv4 address range for Private NAT + NLB + Airflow + EC2 Jumphost etc. + private_subnets = concat(var.private_subnets, var.eks_data_plane_subnet_secondary_cidr) + + # ------------------------------ + # Optional Public Subnets for NAT and IGW for PoC/Dev/Test environments + # Public Subnets can be disabled while deploying to Production and use Private NAT + TGW + public_subnets = var.public_subnets + enable_nat_gateway = true + single_nat_gateway = true + #------------------------------- + + public_subnet_tags = { + "kubernetes.io/role/elb" = 1 + } + + private_subnet_tags = { + "kubernetes.io/role/internal-elb" = 1 + # Tags subnets for Karpenter auto-discovery + "karpenter.sh/discovery" = local.name + } + + tags = local.tags +} + +module "vpc_endpoints_sg" { + source = "terraform-aws-modules/security-group/aws" + version = "~> 5.0" + + create = var.enable_vpc_endpoints + + name = "${local.name}-vpc-endpoints" + description = "Security group for VPC endpoint access" + vpc_id = module.vpc.vpc_id + + ingress_with_cidr_blocks = [ + { + rule = "https-443-tcp" + description = "VPC CIDR HTTPS" + cidr_blocks = join(",", module.vpc.private_subnets_cidr_blocks) + }, + ] + + egress_with_cidr_blocks = [ + { + rule = "https-443-tcp" + description = "All egress HTTPS" + cidr_blocks = "0.0.0.0/0" + }, + ] + + tags = local.tags +} + +module "vpc_endpoints" { + source = "terraform-aws-modules/vpc/aws//modules/vpc-endpoints" + version = "~> 5.0" + + create = var.enable_vpc_endpoints + + vpc_id = module.vpc.vpc_id + security_group_ids = [module.vpc_endpoints_sg.security_group_id] + + endpoints = merge({ + s3 = { + service = "s3" + service_type = "Gateway" + route_table_ids = module.vpc.private_route_table_ids + tags = { + Name = "${local.name}-s3" + } + } + }, + { for service in toset(["autoscaling", "ecr.api", "ecr.dkr", "ec2", "ec2messages", "elasticloadbalancing", "sts", "kms", "logs", "ssm", "ssmmessages"]) : + replace(service, ".", "_") => + { + service = service + subnet_ids = module.vpc.private_subnets + private_dns_enabled = true + tags = { Name = "${local.name}-${service}" } + } + }) + + tags = local.tags +} diff --git a/website/docs/blueprints/streaming-platforms/spark-streaming.md b/website/docs/blueprints/streaming-platforms/spark-streaming.md new file mode 100644 index 000000000..19218ff4c --- /dev/null +++ b/website/docs/blueprints/streaming-platforms/spark-streaming.md @@ -0,0 +1,276 @@ +--- +title: Spark Streaming from Kafka in EKS +sidebar_position: 6 +--- + +This example showcases the usage of Spark Operator to create a producer and consumer stack using Kafka (Amazon MSK). The main idea is to show Spark Streaming working with Kafka, persisting data in Parquet format using Apache Iceberg. + +## Deploy the EKS Cluster with all the add-ons and infrastructure needed to test this example + +### Clone the repository + +```bash +git clone https://github.com/awslabs/data-on-eks.git +``` + +### Initialize Terraform + +Navigate into the example directory and run the initialization script `install.sh`. + +```bash +cd data-on-eks/streaming/spark-streaming/terraform/ +./install.sh +``` + +### Export Terraform Outputs + +After the Terraform script finishes, export the necessary variables to use them in the `sed` commands. + +```bash +export CLUSTER_NAME=$(terraform output -raw cluster_name) +export PRODUCER_ROLE_ARN=$(terraform output -raw producer_iam_role_arn) +export CONSUMER_ROLE_ARN=$(terraform output -raw consumer_iam_role_arn) +export MSK_BROKERS=$(terraform output -raw bootstrap_brokers) +export REGION=$(terraform output -raw s3_bucket_region_spark_history_server) +export ICEBERG_BUCKET=$(terraform output -raw s3_bucket_id_iceberg_bucket) +``` + +### Update kubeconfig + +Update the kubeconfig to verify the deployment. + +```bash +aws eks --region $REGION update-kubeconfig --name $CLUSTER_NAME +kubectl get nodes +``` + +### Configuring Producer + +In order to deploy the producer, update the `examples/producer/00_deployment.yaml` manifest with the variables exported from Terraform. + +```bash +# Apply `sed` commands to replace placeholders in the producer manifest +sed -i.bak -e "s|__MY_PRODUCER_ROLE_ARN__|$PRODUCER_ROLE_ARN|g" \ + -e "s|__MY_AWS_REGION__|$REGION|g" \ + -e "s|__MY_KAFKA_BROKERS__|$MSK_BROKERS|g" \ + ../examples/producer/00_deployment.yaml + +# Apply sed to delete topic manifest, this can be used to delete kafka topic and start the stack once again +sed -i.bak -e "s|__MY_KAFKA_BROKERS__|$MSK_BROKERS|g" \ + ../examples/producer/01_delete_topic.yaml +``` + +### Configuring Consumer + +In order to deploy the Spark consumer, update the `examples/consumer/manifests/01_spark_application.yaml` manifests with the variables exported from Terraform. + +```bash +# Apply `sed` commands to replace placeholders in the consumer Spark application manifest +sed -i.bak -e "s|__MY_BUCKET_NAME__|$ICEBERG_BUCKET|g" \ + -e "s|__MY_KAFKA_BROKERS_ADRESS__|$MSK_BROKERS|g" \ + ../examples/consumer/manifests/01_spark_application.yaml +``` + +### Deploy Producer and Consumer + +After configuring the producer and consumer manifests, deploy them using kubectl. + +```bash +# Deploy Producer +kubectl apply -f ../examples/producer/00_deployment.yaml + +# Deploy Consumer +kubectl apply -f ../examples/consumer/manifests/ +``` + +#### Checking Producer to MSK + +First, let's see the producer logs to verify data is being created and flowing into MSK: + +```bash +kubectl logs $(kubectl get pods -l app=producer -oname) -f +``` + +#### Checking Spark Streaming application with Spark Operator + +For the consumer, we first need to get the `SparkApplication` that generates the `spark-submit` command to Spark Operator to create driver and executor pods based on the YAML configuration: + +```bash +kubectl get SparkApplication -n spark-operator +``` + +You should see the `STATUS` equals `RUNNING`, now let's verify the driver and executors pods: + +```bash +kubectl get pods -n spark-operator +``` + +You should see an output like below: + +```bash +NAME READY STATUS RESTARTS AGE +kafkatoiceberg-1e9a438f4eeedfbb-exec-1 1/1 Running 0 7m15s +kafkatoiceberg-1e9a438f4eeedfbb-exec-2 1/1 Running 0 7m14s +kafkatoiceberg-1e9a438f4eeedfbb-exec-3 1/1 Running 0 7m14s +spark-consumer-driver 1/1 Running 0 9m +spark-operator-9448b5c6d-d2ksp 1/1 Running 0 117m +spark-operator-webhook-init-psm4x 0/1 Completed 0 117m +``` + +We have `1 driver` and `3 executors` pods. Now, let's check the driver logs: + +```bash +kubectl logs pod/spark-consumer-driver -n spark-operator +``` + +You should see only `INFO` logs indicating that the job is running. + +### Verify Data Flow + +After deploying both the producer and consumer, verify the data flow by checking the consumer application's output in the S3 bucket. You can run the `s3_automation` script to get a live view of the data size in your S3 bucket. + +Follow these steps: + +1. **Navigate to the `s3_automation` directory**: + + ```bash + cd ../examples/s3_automation/ + ``` + +2. **Run the `s3_automation` script**: + + ```bash + python app.py + ``` + + This script will continuously monitor and display the total size of your S3 bucket, giving you a real-time view of data being ingested. You can choose to view the bucket size or delete specific directories as needed. + + +#### Using the `s3_automation` Script + +The `s3_automation` script offers two primary functions: + +- **Check Bucket Size**: Continuously monitor and display the total size of your S3 bucket. +- **Delete Directory**: Delete specific directories within your S3 bucket. + +Here's how to use these functions: + +1. **Check Bucket Size**: + - When prompted, enter `size` to get the current size of your bucket in megabytes (MB). + +2. **Delete Directory**: + - When prompted, enter `delete` and then provide the directory prefix you wish to delete (e.g., `myfolder/`). + +## Tuning the Producer and Consumer for Better Performance + +After deploying the producer and consumer, you can further optimize the data ingestion and processing by adjusting the number of replicas for the producer and the executor configuration for the Spark application. Here are some suggestions to get you started: + +### Adjusting the Number of Producer Replicas + +You can increase the number of replicas of the producer deployment to handle a higher rate of message production. By default, the producer deployment is configured with a single replica. Increasing this number allows more instances of the producer to run concurrently, increasing the overall throughput. + +To change the number of replicas, update the `replicas` field in `examples/producer/00_deployment.yaml`: + +```yaml +spec: + replicas: 200 # Increase this number to scale up the producer +``` + +You can also adjust the environment variables to control the rate and volume of messages produced: + +```yaml +env: + - name: RATE_PER_SECOND + value: "200000" # Increase this value to produce more messages per second + - name: NUM_OF_MESSAGES + value: "20000000" # Increase this value to produce more messages in total +``` + +Apply the updated deployment: + +```bash +kubectl apply -f ../examples/producer/00_deployment.yaml +``` + +### Tuning Spark Executors for Better Ingestion Performance + +To handle the increased data volume efficiently, you can add more executors to the Spark application or increase the resources allocated to each executor. This will allow the consumer to process data faster and reduce ingestion time. + +To adjust the Spark executor configuration, update `examples/consumer/manifests/01_spark_application.yaml`: + +```yaml +spec: + dynamicAllocation: + enabled: true + initialExecutors: 5 + minExecutors: 5 + maxExecutors: 50 # Increase this number to allow more executors + executor: + cores: 4 # Increase CPU allocation + memory: "8g" # Increase memory allocation +``` + +Apply the updated Spark application: + +```bash +kubectl apply -f ../examples/consumer/manifests/01_spark_application.yaml +``` + +### Verify and Monitor + +After making these changes, monitor the logs and metrics to ensure the system is performing as expected. You can check the producer logs to verify data production and the consumer logs to verify data ingestion and processing. + +To check producer logs: + +```bash +kubectl logs $(kubectl get pods -l app=producer -oname) -f +``` + +To check consumer logs: + +```bash +kubectl logs pod/spark-consumer-driver -n spark-operator +``` + +> Can use verify dataflow script again + +### Summary + +By adjusting the number of producer replicas and tuning the Spark executor settings, you can optimize the performance of your data pipeline. This allows you to handle higher ingestion rates and process data more efficiently, ensuring that your Spark Streaming application can keep up with the increased data volume from Kafka. + +Feel free to experiment with these settings to find the optimal configuration for your workload. Happy streaming! + + +### Cleaning Up Producer and Consumer Resources + +To clean up only the producer and consumer resources, use the following commands: + +```bash +# Clean up Producer resources +kubectl delete -f ../examples/producer/00_deployment.yaml + +# Clean up Consumer resources +kubectl delete -f ../examples/consumer/manifests/ +``` + +### Restoring `.yaml` Files from `.bak` + +If you need to reset the `.yaml` files to their original state with placeholders, move the `.bak` files back to `.yaml`. + +```bash +# Restore Producer manifest +mv ../examples/producer/00_deployment.yaml.bak ../examples/producer/00_deployment.yaml + + +# Restore Consumer Spark application manifest +mv ../examples/consumer/manifests/01_spark_application.yaml.bak ../examples/consumer/manifests/01_spark_application.yaml +``` + +### Destroy the EKS Cluster and Resources + +To clean up the entire EKS cluster and associated resources: + +```bash +cd data-on-eks/streaming/spark-streaming/terraform/ +terraform destroy +```