diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..1230149 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,6 @@ +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "daily" diff --git a/.github/workflows/check-dependencies-updates.yml b/.github/workflows/check-dependencies-updates.yml new file mode 100644 index 0000000..f43f8a4 --- /dev/null +++ b/.github/workflows/check-dependencies-updates.yml @@ -0,0 +1,16 @@ +on: + schedule: + - cron: '0 6 * * 1-5' + +name: 🍄 Check dependencies updates + +permissions: + contents: write + pull-requests: write + +jobs: + scala-steward: + runs-on: ubuntu-22.04 + name: Check Scala project dependencies updates with Scala Steward + steps: + - uses: scala-steward-org/scala-steward-action@v2 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..f68e0e6 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,25 @@ +name: CI + +on: + push: + branches: + - main + pull_request: + +permissions: + contents: read + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-java@v4 + with: + distribution: 'zulu' + java-version: '21' + cache: 'sbt' + - name: 👌 Run "pre-push" tasks (compile and style-check) + run: sbt prep + - name: ✅ Run test + run: sbt test diff --git a/.github/workflows/update-github-dependency-graph.yml b/.github/workflows/update-github-dependency-graph.yml new file mode 100644 index 0000000..bb88b08 --- /dev/null +++ b/.github/workflows/update-github-dependency-graph.yml @@ -0,0 +1,16 @@ +name: Update GitHub Dependency Graph + +on: + push: + branches: + - main + +permissions: + contents: write + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: scalacenter/sbt-dependency-submission@v3 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8081027 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +.bsp/ +target/ +boot/ +lib_managed/ +src_managed/ +project/plugins/project/ + +/docker/spark/data/ +/docker/volume/ +/docker/spark/apps/ diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..58971b2 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,11 @@ +version = 3.8.2 +runner.dialect = scala213 +style = default +maxColumn = 120 +continuationIndent.callSite = 2 +align.preset = more +runner.optimizer.forceConfigStyleMinArgCount = 1 +rewrite.rules = [SortImports] +importSelectors = singleLine +project.excludeFilters = ["target/"] +project.git = true # Only format files tracked by git \ No newline at end of file diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..5a3eaef --- /dev/null +++ b/build.sbt @@ -0,0 +1,17 @@ +Settings.settings + +libraryDependencies := Dependencies.all + +SbtAliases.aliases.flatMap { case (alias, command) => + addCommandAlias(alias, command) +} + +assembly / mainClass := Some( + "com.codely.lesson_04_how_to_deploy_spark.video_01__deploy_application.DeploySparkApp" +) + +import sbtassembly.MergeStrategy +assembly / assemblyMergeStrategy := { + case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first + case x => (assembly / assemblyMergeStrategy).value(x) +} diff --git a/conf/.gitkeep b/conf/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/doc/hooks/install-hooks.sh b/doc/hooks/install-hooks.sh new file mode 100755 index 0000000..c50abc3 --- /dev/null +++ b/doc/hooks/install-hooks.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +cd "$(dirname "$0")/../.." + +rm -rf .git/hooks + +ln -s ../doc/hooks .git/hooks +sudo chmod -R 777 doc/hooks/* diff --git a/doc/hooks/pre-push b/doc/hooks/pre-push new file mode 100755 index 0000000..3d6953e --- /dev/null +++ b/doc/hooks/pre-push @@ -0,0 +1,50 @@ +#!/bin/bash + +# Checks if locally staged changes are formatted properly ignoring non-staged changes. +# Install it with the `install-hooks.sh` script +# Based on: https://gist.github.com/cvogt/2676ed6c6d1abafa3d6a + +PATH=$PATH:/usr/local/bin:/usr/local/sbin + +echo "" +echo "Running pre-push hook… (you can omit this with --no-verify, but don't)" + +echo "* Moving to the project directory…" +_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +DIR=$( echo $_DIR | sed 's/\/.git\/hooks$//' ) + +echo "* Stashing non-staged changes so we avoid checking them…" +git diff --quiet +hadNoNonStagedChanges=$? + +if ! [ $hadNoNonStagedChanges -eq 0 ] +then + git stash --keep-index -u > /dev/null +fi + +echo "* Checking pre push conditions ('prep' SBT task)…" +sbt prep > /dev/null +canPush=$? + +if [ $canPush -ne 0 ] +then + echo " [KO] Error :(" +fi + +echo "* Applying the stash with the non-staged changes…" +if ! [ $hadNoNonStagedChanges -eq 0 ] +then + sleep 1 && git stash pop --index > /dev/null & # sleep because otherwise commit fails when this leads to a merge conflict +fi + +# Final result +echo "" + +if [ $canPush -eq 0 ] +then + echo "[OK] Your code will be pushed young Padawan" + exit 0 +else + echo "[KO] Cancelling push due to test code style error (run 'sbt prep' for more information)" + exit 1 +fi diff --git a/docker/.env b/docker/.env new file mode 100644 index 0000000..4753132 --- /dev/null +++ b/docker/.env @@ -0,0 +1,8 @@ +AWS_ACCESS_KEY_ID=test +AWS_SECRET_ACCESS_KEY=test +LOCALSTACK_PORT=4566 +S3_BUCKET=my-bucket +S3_PREFIX=data +POSTGRES_USER=admin +POSTGRES_PASSWORD=secret +POSTGRES_DB=metastore \ No newline at end of file diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 0000000..8e4de2f --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,61 @@ +# Use a lightweight JDK base image +FROM openjdk:11.0.11-jre-slim-buster as base-stage + +# Install only the necessary dependencies +RUN apt-get update && \ + apt-get install -y curl wget ca-certificates software-properties-common ssh net-tools && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Set Spark and Hadoop versions as environment variables +ENV SPARK_VERSION=3.5.0 \ + HADOOP_VERSION=3 \ + SPARK_HOME=/opt/spark + +# Download and install Apache Spark +RUN wget --no-verbose -O apache-spark.tgz "https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop$HADOOP_VERSION.tgz" \ + && mkdir -p /opt/spark \ + && tar -xf apache-spark.tgz -C /opt/spark --strip-components=1 \ + && rm apache-spark.tgz + +# Download additional JARs needed +RUN wget -P /opt/spark/jars https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/3.3.1/hadoop-common-3.3.1.jar \ + && wget -P /opt/spark/jars https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.375/aws-java-sdk-bundle-1.11.375.jar \ + && wget -P /opt/spark/jars https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.2/hadoop-aws-3.2.2.jar \ + && wget -P /opt/spark/jars https://repo1.maven.org/maven2/io/delta/delta-spark_2.12/3.1.0/delta-spark_2.12-3.1.0.jar \ + && wget -P /opt/spark/jars https://repo1.maven.org/maven2/org/apache/spark/spark-hive_2.12/3.5.0/spark-hive_2.12-3.5.0.jar \ + && wget -P /opt/spark/jars https://repo1.maven.org/maven2/io/delta/delta-storage/3.1.0/delta-storage-3.1.0.jar + + +# Setup the next stage for the actual Spark master and worker setup +FROM base-stage as spark-cluster-setup + +# Define the working directory +WORKDIR /opt/spark + +# Set environment variables for Spark master and worker configuration +ENV SPARK_MASTER_PORT=7077 \ + SPARK_MASTER_WEBUI_PORT=8080 \ + SPARK_LOG_DIR=/opt/spark/logs \ + SPARK_MASTER_LOG=/opt/spark/logs/spark-master.out \ + SPARK_WORKER_LOG=/opt/spark/logs/spark-worker.out \ + SPARK_WORKER_WEBUI_PORT=8080 \ + SPARK_WORKER_PORT=7000 \ + SPARK_MASTER="spark://spark-master:7077" \ + SPARK_WORKLOAD="master" + +# Expose the ports used by Spark master and worker +EXPOSE 8080 7077 7000 + +# Setup log directories and link logs to stdout for easier container log management +RUN mkdir -p $SPARK_LOG_DIR && \ + touch $SPARK_MASTER_LOG && \ + touch $SPARK_WORKER_LOG && \ + ln -sf /dev/stdout $SPARK_MASTER_LOG && \ + ln -sf /dev/stdout $SPARK_WORKER_LOG + +# Copy the start script to the container +COPY spark/run.sh / + +# Set the command to start the Spark cluster +CMD ["/bin/bash", "/run.sh"] diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000..c16b8f8 --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,245 @@ +name: "spark-ecosystem-cluster" + +networks: + spark-network: + name: spark-network + driver: bridge + ipam: + driver: default + config: + - subnet: "172.19.0.0/16" + +services: + spark-master: + image: my-spark-cluster:3.5.0 + ports: + - "9090:8080" + - "7077:7077" + - "4040:4040" + volumes: + - ./spark/apps:/opt/spark-apps + - ./spark/data:/opt/spark-data + - ./spark/metrics.properties:/opt/spark/conf/metrics.properties + - ./spark/config/spark-defaults.conf:/opt/spark/conf/spark-defaults.conf + environment: + - SPARK_LOCAL_IP=172.19.0.10 + - SPARK_WORKLOAD=master + networks: + spark-network: + ipv4_address: 172.19.0.10 + + spark-worker-a: + image: my-spark-cluster:3.5.0 + ports: + - "9091:8080" + - "7000:7000" + depends_on: + - spark-master + environment: + - SPARK_MASTER=spark://spark-master:7077 + - SPARK_WORKER_CORES=2 + - SPARK_WORKER_MEMORY=2G + - SPARK_DRIVER_MEMORY=2G + - SPARK_EXECUTOR_MEMORY=1G + - SPARK_WORKLOAD=worker + - SPARK_LOCAL_IP=172.19.0.2 + volumes: + - ./spark/apps:/opt/spark-apps + - ./spark/data:/opt/spark-data + - ./spark/metrics.properties:/opt/spark/conf/metrics.properties + - ./spark/config/spark-defaults.conf:/opt/spark/conf/spark-defaults.conf + + networks: + spark-network: + ipv4_address: 172.19.0.2 + + spark-worker-b: + image: my-spark-cluster:3.5.0 + ports: + - "9093:8080" + - "7001:7000" + depends_on: + - spark-master + environment: + - SPARK_MASTER=spark://spark-master:7077 + - SPARK_WORKER_CORES=2 + - SPARK_WORKER_MEMORY=2G + - SPARK_DRIVER_MEMORY=2G + - SPARK_EXECUTOR_MEMORY=1G + - SPARK_WORKLOAD=worker + - SPARK_LOCAL_IP=172.19.0.3 + volumes: + - ./spark/apps:/opt/spark-apps + - ./spark/data:/opt/spark-data + - ./spark/metrics.properties:/opt/spark/conf/metrics.properties + - ./spark/config/spark-defaults.conf:/opt/spark/conf/spark-defaults.conf + + networks: + spark-network: + ipv4_address: 172.19.0.3 + + kafka: + image: bitnami/kafka:3.7.0 + container_name: kafka + ports: + - "9092:9092" + environment: + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_PROCESS_ROLES: controller,broker + KAFKA_CFG_LISTENERS: "PLAINTEXT://172.19.0.4:9092,CONTROLLER://0.0.0.0:9093" + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@172.19.0.4:9093 + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + networks: + spark-network: + ipv4_address: 172.19.0.4 + + init-kafka: + image: bitnami/kafka:3.7.0 + container_name: init-kafka + depends_on: + kafka: + condition: service_started + entrypoint: [ '/usr/bin/bash', '-c' ] + command: | + " + set -ex + + # blocks until kafka is reachable + /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server 172.19.0.4:9092 --list + + echo -e 'Creating kafka topics' + /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server 172.19.0.4:9092 --create --if-not-exists --topic topic-events --replication-factor 1 --partitions 1 + + echo -e 'Successfully created the following topics:' + /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server 172.19.0.4:9092 --list + " + networks: + spark-network: + ipv4_address: 172.19.0.21 + + localstack: + container_name: "${LOCALSTACK_DOCKER_NAME:-localstack-main}" + image: localstack/localstack:3.4.0 + ports: + - "127.0.0.1:4566:4566" + - "127.0.0.1:4510-4559:4510-4559" + environment: + - SERVICES=s3 + - DEBUG=true + - DEFAULT_REGION=eu-west-1 + - AWS_ACCESS_KEY_ID=test + - AWS_SECRET_ACCESS_KEY=test + volumes: + - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack" + - "/var/run/docker.sock:/var/run/docker.sock" + networks: + spark-network: + ipv4_address: 172.19.0.6 + + init-s3-storage: + container_name: init-s3-storage + image: localstack/localstack:3.4.0 + entrypoint: [ "bash", "-c", "awslocal --endpoint-url http://172.19.0.6:4566 s3 mb s3://my-bucket" ] + depends_on: + localstack: + condition: service_healthy + networks: + spark-network: + ipv4_address: 172.19.0.7 + + postgres: + image: postgres:14-alpine + container_name: postgres + environment: + - POSTGRES_USER + - POSTGRES_PASSWORD + - POSTGRES_DB + healthcheck: + test: [ "CMD", "psql", "-U", "${POSTGRES_USER}", "${POSTGRES_DB}" ] + ports: + - '5432:5432' + networks: + spark-network: + ipv4_address: 172.19.0.8 + + hive-metastore: + image: apache/hive:4.0.0-alpha-2 + container_name: hive-metastore + environment: + - SERVICE_NAME=metastore + - DB_DRIVER=postgres + - HIVE_CUSTOM_CONF_DIR=/hive_custom_conf + ports: + - "9083:9083" + volumes: + - ./data/delta/osdp/spark-warehouse:/opt/spark/work-dir/data/delta/osdp/spark-warehouse + - ./hive/conf/hive-site.xml:/opt/hive/conf/hive-site.xml + - ./hive/conf/jars/hadoop-aws-3.2.2.jar:/opt/hive/lib/hadoop-aws-3.2.2.jar + - ./hive/conf/jars/aws-java-sdk-bundle-1.11.375.jar:/opt/hive/lib/aws-java-sdk-bundle-1.11.375.jar + - ./hive/conf/.hiverc:/opt/hive/conf/.hiverc + + depends_on: + postgres: + condition: service_healthy + networks: + spark-network: + ipv4_address: 172.19.0.9 + + spark-thrift-server: + image: my-spark-cluster:3.5.0 + container_name: spark-thrift-server + depends_on: + spark-master: + condition: service_started + hive-metastore: + condition: service_started + environment: + - SPARK_WORKLOAD=thrift-server + - HIVE_METASTORE_URI=thrift://hive-metastore:9083 + - SPARK_MASTER=spark://spark-master:7077 + - SPARK_SQL_HIVE_METASTORE_URIS=thrift://hive-metastore:9083 + ports: + - "10000:10000" + entrypoint: > + /opt/spark/bin/spark-submit + --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 + --master spark://spark-master:7077 + --deploy-mode client + --executor-memory 1G + --driver-memory 1G + --total-executor-cores 1 + --conf spark.sql.hive.metastore.version=2.3.9 + --conf spark.sql.uris=thrift://hive-metastore:9083 + --conf spark.hadoop.hive.metastore.uris=thrift://hive-metastore:9083 + --conf spark.hadoop.fs.s3a.endpoint=http://localstack:4566 + --conf spark.hadoop.fs.s3a.access.key=test + --conf spark.hadoop.fs.s3a.secret.key=test + --conf spark.hadoop.fs.s3a.path.style.access=true + --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension + --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog + local://opt/spark/jars/spark-hive-thriftserver_2.12-3.5.0.jar + networks: + spark-network: + ipv4_address: 172.19.0.11 + + grafana: + image: grafana/grafana:latest + container_name: grafana + ports: + - "3000:3000" + networks: + spark-network: + ipv4_address: 172.19.0.13 + + prometheus: + image: prom/prometheus:latest + container_name: prometheus + ports: + - "19090:9090" + networks: + spark-network: + ipv4_address: 172.19.0.12 + volumes: + - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml diff --git a/docker/hive/conf/.hiverc b/docker/hive/conf/.hiverc new file mode 100644 index 0000000..842345a --- /dev/null +++ b/docker/hive/conf/.hiverc @@ -0,0 +1,3 @@ +ADD JAR /opt/hive/lib/hadoop-aws-3.3.1.jar; +ADD JAR /opt/hive/lib/aws-java-sdk-bundle-1.11.901.jar; +ADD JAR /opt/hive/lib/hadoop-common-3.3.1.jar; diff --git a/docker/hive/conf/hive-site.xml b/docker/hive/conf/hive-site.xml new file mode 100644 index 0000000..3a78449 --- /dev/null +++ b/docker/hive/conf/hive-site.xml @@ -0,0 +1,42 @@ + + + javax.jdo.option.ConnectionDriverName + org.postgresql.Driver + + + javax.jdo.option.ConnectionURL + jdbc:postgresql://172.18.0.8:5432/metastore + + + javax.jdo.option.ConnectionUserName + admin + + + javax.jdo.option.ConnectionPassword + secret + + + fs.s3a.endpoint + http://172.18.0.6:4566 + + + fs.s3a.access.key + test + + + fs.s3a.secret.key + test + + + fs.s3a.path.style.access + true + + + hive.metastore.warehouse.dir + s3a://my-bucket/warehouse + + + fs.s3a.aws.credentials.provider + org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + + diff --git a/docker/hive/conf/jars/aws-java-sdk-bundle-1.11.375.jar b/docker/hive/conf/jars/aws-java-sdk-bundle-1.11.375.jar new file mode 100644 index 0000000..8061bc2 Binary files /dev/null and b/docker/hive/conf/jars/aws-java-sdk-bundle-1.11.375.jar differ diff --git a/docker/hive/conf/jars/hadoop-aws-3.2.2.jar b/docker/hive/conf/jars/hadoop-aws-3.2.2.jar new file mode 100644 index 0000000..221557b Binary files /dev/null and b/docker/hive/conf/jars/hadoop-aws-3.2.2.jar differ diff --git a/docker/prometheus/prometheus.yml b/docker/prometheus/prometheus.yml new file mode 100644 index 0000000..8083e5c --- /dev/null +++ b/docker/prometheus/prometheus.yml @@ -0,0 +1,24 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + - job_name: 'spark_master' + metrics_path: '/metrics/master/prometheus' + static_configs: + - targets: ['172.19.0.10:8080'] + + - job_name: 'spark_driver' + metrics_path: '/metrics/prometheus' + static_configs: + - targets: ['172.19.0.10:4040', 'localhost:4041'] + + - job_name: 'spark_workers' + metrics_path: '/metrics/prometheus' + static_configs: + - targets: ['172.19.0.2:8080', '172.19.0.3:8080'] + + - job_name: 'spark_executors' + metrics_path: '/metrics/executors/prometheus' + static_configs: + - targets: ['172.19.0.10:4040', 'localhost:4041'] diff --git a/docker/spark/config/spark-defaults.conf b/docker/spark/config/spark-defaults.conf new file mode 100644 index 0000000..429420b --- /dev/null +++ b/docker/spark/config/spark-defaults.conf @@ -0,0 +1,9 @@ +spark.sql.uris thrift://hive-metastore:9083 +spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension +spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog +spark.hadoop.fs.s3a.access.key test +spark.hadoop.fs.s3a.secret.key test +spark.hadoop.fs.s3a.endpoint http://localstack:4566 +spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider +spark.hadoop.fs.s3a.path.style.access true +spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem \ No newline at end of file diff --git a/docker/spark/metrics.properties b/docker/spark/metrics.properties new file mode 100644 index 0000000..ad8bc1d --- /dev/null +++ b/docker/spark/metrics.properties @@ -0,0 +1,4 @@ +*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet +*.sink.prometheusServlet.path=/metrics/prometheus +master.sink.prometheusServlet.path=/metrics/master/prometheus +applications.sink.prometheusServlet.path=/metrics/applications/prometheus \ No newline at end of file diff --git a/docker/spark/run.sh b/docker/spark/run.sh new file mode 100644 index 0000000..cd3c0fb --- /dev/null +++ b/docker/spark/run.sh @@ -0,0 +1,43 @@ +#!/bin/bash + +# Load Spark environment configurations +source "/opt/spark/bin/load-spark-env.sh" + +# Function to start the Spark Master +start_master() { + export SPARK_MASTER_HOST=$(hostname) + cd /opt/spark/bin + ./spark-class org.apache.spark.deploy.master.Master \ + --ip $SPARK_MASTER_HOST \ + --port $SPARK_MASTER_PORT \ + --webui-port $SPARK_MASTER_WEBUI_PORT \ + >> $SPARK_MASTER_LOG +} + +# Function to start the Spark Worker +start_worker() { + cd /opt/spark/bin + ./spark-class org.apache.spark.deploy.worker.Worker \ + --webui-port $SPARK_WORKER_WEBUI_PORT \ + $SPARK_MASTER \ + >> $SPARK_WORKER_LOG +} + +# Evaluate workload type and act accordingly +case "$SPARK_WORKLOAD" in + master) + echo "Starting Spark Master..." + start_master + ;; + worker) + echo "Starting Spark Worker..." + start_worker + ;; + submit) + echo "Executing Spark Submit" + # Add specific commands for submit task if necessary + ;; + *) + echo "Undefined workload type: $SPARK_WORKLOAD. Must specify: master, worker, submit" + ;; +esac diff --git a/project/Dependencies.scala b/project/Dependencies.scala new file mode 100644 index 0000000..7589ffa --- /dev/null +++ b/project/Dependencies.scala @@ -0,0 +1,26 @@ +import sbt._ + +object Dependencies { + private val prod = Seq( + "com.github.nscala-time" %% "nscala-time" % "2.32.0", + "com.lihaoyi" %% "pprint" % "0.9.0", + "org.apache.spark" %% "spark-core" % "3.5.0" % Provided, + "org.apache.spark" %% "spark-sql" % "3.5.0" % Provided, + "org.apache.spark" %% "spark-streaming" % "3.5.0" % Provided, + "org.apache.spark" %% "spark-hive" % "3.5.0" % Provided, + "org.apache.hadoop" % "hadoop-aws" % "3.2.2" % Provided, + "io.delta" %% "delta-spark" % "3.1.0" % Provided, + "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.0", + "com.typesafe" % "config" % "1.4.3", + "com.github.scopt" %% "scopt" % "4.1.0" + ) + private val test = Seq( + "org.scalatest" %% "scalatest" % "3.2.19", + "org.mockito" %% "mockito-scala" % "1.17.37", + "com.dimafeng" %% "testcontainers-scala" % "0.41.4", + "com.dimafeng" %% "testcontainers-scala-postgresql" % "0.41.4", + "org.postgresql" % "postgresql" % "42.7.3" + ).map(_ % Test) + + val all: Seq[ModuleID] = prod ++ test +} diff --git a/project/SbtAliases.scala b/project/SbtAliases.scala new file mode 100644 index 0000000..8e0a9f7 --- /dev/null +++ b/project/SbtAliases.scala @@ -0,0 +1,15 @@ +object SbtAliases { + val aliases: Seq[(String, String)] = Seq( + "t" -> "test", + "to" -> "testOnly", + "tq" -> "testQuick", + "tsf" -> "testShowFailed", + "c" -> "compile", + "tc" -> "Test / compile", + "f" -> "scalafmt", // Format production files according to ScalaFmt + "fc" -> "scalafmtCheck", // Check if production files are formatted according to ScalaFmt + "tf" -> "Test / scalafmt", // Format test files according to ScalaFmt + "tfc" -> "Test / scalafmtCheck", // Check if test files are formatted according to ScalaFmt + "prep" -> ";c;tc;fc;tfc" // All the needed tasks before pushing to the repository (compile, compile test, format check in prod and test) + ) +} diff --git a/project/Settings.scala b/project/Settings.scala new file mode 100644 index 0000000..b91edf1 --- /dev/null +++ b/project/Settings.scala @@ -0,0 +1,39 @@ +import sbt.{Compile, Configuration => _, Test, TestFrameworks, Tests} +import sbt.Keys._ +import sbt.io.syntax._ + +object Settings { + val settings = Seq( + name := "spark-best_practises_and_deploy-course", + version := "0.1.0-SNAPSHOT", + scalaVersion := "2.12.12", + organization := "com.codely", + organizationName := "com.codely, Inc.", + organizationHomepage := Some(url("https://com.codely")), + + // Custom folders path (remove the `/scala` default subdirectory) + Compile / scalaSource := file((baseDirectory.value / "src" / "main").toString), + Test / scalaSource := file((baseDirectory.value / "src" / "test").toString), + + // Compiler options + scalacOptions ++= Seq( + "-deprecation", // Warnings deprecation + "-feature", // Advise features + "-unchecked", // More warnings. Strict + "-Xlint", // More warnings when compiling + "-Ywarn-dead-code", + "-Ywarn-unused" + ), + Test / scalacOptions += "-Xcheckinit", // Check against early initialization only in tests because it's expensive + javaOptions += "-Duser.timezone=UTC", + + // Test options + Test / parallelExecution := false, + Test / testForkedParallel := false, + Test / fork := true, + Test / testOptions ++= Seq( + Tests.Argument(TestFrameworks.ScalaTest, "-u", "target/test-reports"), // Save test reports + Tests.Argument("-oDF") // Show full stack traces and time spent in each test + ) + ) +} diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..ee4c672 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.10.1 diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..c2e2bf4 --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1,2 @@ +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0") diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/app/AvgSpendingApp.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/app/AvgSpendingApp.scala new file mode 100644 index 0000000..926d3ec --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/app/AvgSpendingApp.scala @@ -0,0 +1,27 @@ +package com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.app + +import com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.config.AppConfig +import com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.job.AvgSpendingJob +import com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.service.{Reader, Writer} +import org.apache.spark.sql.SparkSession + +object AvgSpendingApp extends App { + + private val appName = "avg-spending-app" + + private val context = AppConfig.load(args, appName) + + implicit val spark: SparkSession = SparkSession + .builder() + .appName(context.spark.appName) + .enableHiveSupport() + .getOrCreate() + + private val reader = Reader() + private val deltaWriter = Writer() + + val job = AvgSpendingJob(context, reader, deltaWriter) + + job.run() + +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/config/AppConfig.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/config/AppConfig.scala new file mode 100644 index 0000000..c3d5cb0 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/config/AppConfig.scala @@ -0,0 +1,47 @@ +package com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.config + +import com.typesafe.config.ConfigFactory + +import java.io.File + +case class AppConfig( + spark: SparkConfig, + source: SourceConfig, + sink: SinkConfig +) + +case class SparkConfig(appName: String) +case class SourceConfig(format: String, options: Map[String, String]) +case class SinkConfig(format: String, mode: String, path: String) + +object AppConfig { + def load(args: Array[String], appName: String): AppConfig = { + + val cmdArgs = ArgumentsParser.parse(args).getOrElse(CmdArgs()) + val configFile = new File(cmdArgs.configFile.get) + val config = ConfigFactory.parseFile(configFile) + + val sourceOptions = Map( + "url" -> config.getString("source.options.url"), + "user" -> config.getString("source.options.user"), + "password" -> config.getString("source.options.password"), + "dbtable" -> config.getString("source.options.dbtable"), + "driver" -> config.getString("source.options.driver") + ) + + AppConfig( + spark = SparkConfig( + appName = config.getString("spark.appName") + ), + source = SourceConfig( + format = config.getString("source.format"), + options = sourceOptions + ), + sink = SinkConfig( + format = config.getString("sink.format"), + mode = config.getString("sink.mode"), + path = config.getString("sink.path") + ) + ) + } +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/config/ArgumentsParser.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/config/ArgumentsParser.scala new file mode 100644 index 0000000..7bad1d0 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/config/ArgumentsParser.scala @@ -0,0 +1,24 @@ +package com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.config + +import scopt.OParser + +object ArgumentsParser { + val builder = OParser.builder[CmdArgs] + val argsParser = { + import builder._ + OParser.sequence( + programName("Scala Application"), + head("Scala Application", "1.0"), + opt[String]('c', "configFile") + .required() + .valueName("") + .action((x, c) => c.copy(configFile = Some(x))) + .text("Path to the configuration file."), + help("help").text("Prints this usage text") + ) + } + + def parse(args: Array[String]): Option[CmdArgs] = { + OParser.parse(argsParser, args, CmdArgs()) + } +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/config/CmdArgs.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/config/CmdArgs.scala new file mode 100644 index 0000000..12bfb35 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/config/CmdArgs.scala @@ -0,0 +1,3 @@ +package com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.config + +case class CmdArgs(configFile: Option[String] = None) \ No newline at end of file diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/extensions/DataFrameExtensions.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/extensions/DataFrameExtensions.scala new file mode 100644 index 0000000..5a5ea3c --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/extensions/DataFrameExtensions.scala @@ -0,0 +1,13 @@ +package com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.extensions + +import org.apache.spark.sql.DataFrame + +object DataFrameExtensions { + implicit class DataFrameOps(df: DataFrame) { + def calculateSumByName: DataFrame = { + df.groupBy("name") + .sum("value") + .withColumnRenamed("sum(value)", "total_spending") + } + } +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/job/AvgSpendingJob.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/job/AvgSpendingJob.scala new file mode 100644 index 0000000..fb24273 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/job/AvgSpendingJob.scala @@ -0,0 +1,23 @@ +package com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.job + +import com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.config.AppConfig +import org.apache.spark.sql.SparkSession +import com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.extensions.DataFrameExtensions._ +import com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.service.{Reader, Writer} + +case class AvgSpendingJob( + config: AppConfig, + reader: Reader, + writer: Writer +)(implicit spark: SparkSession) { + + def run(): Unit = { + + val data = reader.read(config.source.format, config.source.options) + + val sumByNameDataFrame = data.calculateSumByName + + writer.write(sumByNameDataFrame, config.sink.mode, config.sink.format, config.sink.path) + } + +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/service/Reader.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/service/Reader.scala new file mode 100644 index 0000000..2f49226 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/service/Reader.scala @@ -0,0 +1,12 @@ +package com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.service + +import org.apache.spark.sql.{DataFrame, SparkSession} + +case class Reader()(implicit spark: SparkSession) { + def read(format: String, options: Map[String, String]): DataFrame = { + spark.read + .format(format) + .options(options) + .load() + } +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/service/Writer.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/service/Writer.scala new file mode 100644 index 0000000..f5d03cd --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_01__end_to_end_testing/service/Writer.scala @@ -0,0 +1,14 @@ +package com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.service + +import org.apache.spark.sql.DataFrame + +case class Writer() { + def write( + df: DataFrame, + mode: String, + format: String, + path: String + ): Unit = { + df.write.mode(mode).format(format).save(path) + } +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/app/AvgSpendingApp.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/app/AvgSpendingApp.scala new file mode 100644 index 0000000..4239b33 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/app/AvgSpendingApp.scala @@ -0,0 +1,25 @@ +package com.codely.lesson_02_tests_in_spark.video_02__unit_testing.app + +import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.config.AppConfig +import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.job.AvgSpendingJob +import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.service.{Reader, StreamWriter} +import org.apache.spark.sql.SparkSession + +object AvgSpendingApp extends App { + + private val context = AppConfig.load(args) + + implicit val spark: SparkSession = SparkSession + .builder() + .appName(context.spark.appName) + .enableHiveSupport() + .getOrCreate() + + private val reader = Reader() + private val deltaWriter = StreamWriter() + + val job = AvgSpendingJob(context, reader, deltaWriter) + + job.run() + +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/commons/Schemas.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/commons/Schemas.scala new file mode 100644 index 0000000..033a3a3 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/commons/Schemas.scala @@ -0,0 +1,22 @@ +package com.codely.lesson_02_tests_in_spark.video_02__unit_testing.commons + +object Schemas { + + import org.apache.spark.sql.types._ + + private val productType = new StructType() + .add("productId", StringType) + .add("quantity", IntegerType) + .add("description", StringType) + .add("category", StringType) + .add("price", DoubleType) + + val purchasedSchema: StructType = new StructType() + .add("eventType", StringType) + .add("timestamp", StringType) + .add("userId", StringType) + .add("transactionId", StringType) + .add("products", ArrayType(productType)) + .add("eventId", StringType) + +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/config/AppConfig.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/config/AppConfig.scala new file mode 100644 index 0000000..f3b45c0 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/config/AppConfig.scala @@ -0,0 +1,50 @@ +package com.codely.lesson_02_tests_in_spark.video_02__unit_testing.config + +import com.typesafe.config.ConfigFactory + +import java.io.File + +case class AppConfig( + spark: SparkConfig, + source: SourceConfig, + sink: SinkConfig +) + +case class SparkConfig(appName: String) +case class SourceConfig(format: String, options: Map[String, String]) +case class SinkConfig(format: String, mode: String, options: Map[String, String]) + +object AppConfig { + def load(args: Array[String]): AppConfig = { + + val cmdArgs = ArgumentsParser.parse(args).getOrElse(CmdArgs()) + val configFile = new File(cmdArgs.configFile.get) + val config = ConfigFactory.parseFile(configFile) + + val sourceOptions = Map( + "kafka.bootstrap.servers" -> config.getString("source.options.server"), + "startingOffsets" -> config.getString("source.options.startingOffsets"), + "subscribe" -> config.getString("source.options.subscribe") + ) + + val sinkOptions = Map( + "path" -> config.getString("sink.options.path"), + "checkpoint" -> config.getString("sink.options.checkpoint") + ) + + AppConfig( + spark = SparkConfig( + appName = config.getString("spark.appName") + ), + source = SourceConfig( + format = config.getString("source.format"), + options = sourceOptions + ), + sink = SinkConfig( + format = config.getString("sink.format"), + mode = config.getString("sink.mode"), + options = sinkOptions + ) + ) + } +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/config/ArgumentsParser.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/config/ArgumentsParser.scala new file mode 100644 index 0000000..94b892b --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/config/ArgumentsParser.scala @@ -0,0 +1,24 @@ +package com.codely.lesson_02_tests_in_spark.video_02__unit_testing.config + +import scopt.OParser + +object ArgumentsParser { + val builder = OParser.builder[CmdArgs] + val argsParser = { + import builder._ + OParser.sequence( + programName("Scala Application"), + head("Scala Application", "1.0"), + opt[String]('c', "configFile") + .required() + .valueName("") + .action((x, c) => c.copy(configFile = Some(x))) + .text("Path to the configuration file."), + help("help").text("Prints this usage text") + ) + } + + def parse(args: Array[String]): Option[CmdArgs] = { + OParser.parse(argsParser, args, CmdArgs()) + } +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/config/CmdArgs.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/config/CmdArgs.scala new file mode 100644 index 0000000..cecbf7e --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/config/CmdArgs.scala @@ -0,0 +1,3 @@ +package com.codely.lesson_02_tests_in_spark.video_02__unit_testing.config + +case class CmdArgs(configFile: Option[String] = None) \ No newline at end of file diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/extensions/DataFrameExtensions.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/extensions/DataFrameExtensions.scala new file mode 100644 index 0000000..67932e0 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/extensions/DataFrameExtensions.scala @@ -0,0 +1,45 @@ +package com.codely.lesson_02_tests_in_spark.video_02__unit_testing.extensions + +import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.commons.Schemas +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.{avg, col, explode, from_json, month, to_date} + +object DataFrameExtensions { + implicit class DataFrameOps(df: DataFrame) { + + def parseJson: DataFrame = { + df.select(from_json(col("value").cast("string"), Schemas.purchasedSchema).as("value")) + .select("value.*") + } + + def addDateColum: DataFrame = { + df.withColumn("date", to_date(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss'Z'")) + } + + def explodeProducts: DataFrame = { + df.select( + col("userId"), + explode(col("products")).as("product"), + col("date") + ) + } + + def transformForAggregation: DataFrame = { + df.select( + col("userId"), + col("product.category"), + month(col("date")).alias("month"), + (col("product.price") * col("product.quantity")).alias("totalSpent") + ) + } + + def calculateAvgSpending: DataFrame = { + df.groupBy(col("userId"), col("category"), col("month")) + .agg(avg("totalSpent").alias("AvgSpending")) + } + + def calculateCompleteAvgSpending: DataFrame = { + df.parseJson.addDateColum.explodeProducts.transformForAggregation.calculateAvgSpending + } + } +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/job/AvgSpendingJob.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/job/AvgSpendingJob.scala new file mode 100644 index 0000000..bb1f11f --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/job/AvgSpendingJob.scala @@ -0,0 +1,26 @@ +package com.codely.lesson_02_tests_in_spark.video_02__unit_testing.job + +import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.config.AppConfig +import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.extensions.DataFrameExtensions._ +import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.service.{Reader, StreamWriter} +import org.apache.spark.sql.SparkSession + +case class AvgSpendingJob( + context: AppConfig, + reader: Reader, + writer: StreamWriter +)(implicit spark: SparkSession) { + + def run(): Unit = { + + val data = reader.read(context.source.format, context.source.options) + + val avgSpendingPerUserDF = + data.parseJson.addDateColum.explodeProducts.transformForAggregation.calculateAvgSpending + + val query = writer.write(avgSpendingPerUserDF, context.sink.mode, context.sink.format, context.sink.options) + + query.awaitTermination() + } + +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/service/AvgSpending.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/service/AvgSpending.scala new file mode 100644 index 0000000..6f70cd9 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/service/AvgSpending.scala @@ -0,0 +1,32 @@ +package com.codely.lesson_02_tests_in_spark.video_02__unit_testing.service + +import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.commons.Schemas +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.{avg, col, explode, from_json, month, to_date} + +object AvgSpending { + + def calculate(dataFrame: DataFrame): DataFrame = { + + dataFrame + .select( + from_json(col("value").cast("string"), Schemas.purchasedSchema) + .as("value") + ) + .select("value.*") + .withColumn("date", to_date(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss'Z'")) + .select( + col("userId"), + explode(col("products")).as("product"), + col("date") + ) + .select( + col("userId"), + col("product.category"), + month(col("date")).alias("month"), + (col("product.price") * col("product.quantity")).alias("totalSpent") + ) + .groupBy(col("userId"), col("category"), col("month")) + .agg(avg("totalSpent").alias("AvgSpending")) + } +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/service/Reader.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/service/Reader.scala new file mode 100644 index 0000000..1eedc9e --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/service/Reader.scala @@ -0,0 +1,12 @@ +package com.codely.lesson_02_tests_in_spark.video_02__unit_testing.service + +import org.apache.spark.sql.{DataFrame, SparkSession} + +case class Reader()(implicit spark: SparkSession) { + def read(format: String, options: Map[String, String]): DataFrame = { + spark.read + .format(format) + .options(options) + .load() + } +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/service/StreamWriter.scala b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/service/StreamWriter.scala new file mode 100644 index 0000000..5399563 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/video_02__unit_testing/service/StreamWriter.scala @@ -0,0 +1,15 @@ +package com.codely.lesson_02_tests_in_spark.video_02__unit_testing.service + +import org.apache.spark.sql.streaming.StreamingQuery +import org.apache.spark.sql.DataFrame + +case class StreamWriter() { + def write( + df: DataFrame, + mode: String, + format: String, + options: Map[String, String] + ): StreamingQuery = { + df.writeStream.outputMode(mode).format(format).options(options).start() + } +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/app/AvgSpendingApp.scala b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/app/AvgSpendingApp.scala new file mode 100644 index 0000000..2c2e1f6 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/app/AvgSpendingApp.scala @@ -0,0 +1,26 @@ +package com.codely.lesson_02_tests_in_spark.z_practical_exercise.app + +import com.codely.lesson_02_tests_in_spark.z_practical_exercise.config.AppConfig +import com.codely.lesson_02_tests_in_spark.z_practical_exercise.job.AvgSpendingJob +import com.codely.lesson_02_tests_in_spark.z_practical_exercise.service.{Reader, StreamWriter} +import org.apache.spark.sql.SparkSession + + +object AvgSpendingApp extends App { + + private val context = AppConfig.load(args) + + implicit val spark: SparkSession = SparkSession + .builder() + .appName(context.spark.appName) + .enableHiveSupport() + .getOrCreate() + + private val reader = Reader() + private val deltaWriter = StreamWriter() + + val job = AvgSpendingJob(context, reader, deltaWriter) + + job.run() + +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/commons/Schemas.scala b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/commons/Schemas.scala new file mode 100644 index 0000000..d935c94 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/commons/Schemas.scala @@ -0,0 +1,22 @@ +package com.codely.lesson_02_tests_in_spark.z_practical_exercise.commons + +object Schemas { + + import org.apache.spark.sql.types._ + + private val productType = new StructType() + .add("productId", StringType) + .add("quantity", IntegerType) + .add("description", StringType) + .add("category", StringType) + .add("price", DoubleType) + + val purchasedSchema: StructType = new StructType() + .add("eventType", StringType) + .add("timestamp", StringType) + .add("userId", StringType) + .add("transactionId", StringType) + .add("products", ArrayType(productType)) + .add("eventId", StringType) + +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/config/AppConfig.scala b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/config/AppConfig.scala new file mode 100644 index 0000000..435cd71 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/config/AppConfig.scala @@ -0,0 +1,50 @@ +package com.codely.lesson_02_tests_in_spark.z_practical_exercise.config + +import com.typesafe.config.ConfigFactory + +import java.io.File + +case class AppConfig( + spark: SparkConfig, + source: SourceConfig, + sink: SinkConfig +) + +case class SparkConfig(appName: String) +case class SourceConfig(format: String, options: Map[String, String]) +case class SinkConfig(format: String, mode: String, options: Map[String, String]) + +object AppConfig { + def load(args: Array[String]): AppConfig = { + + val cmdArgs = ArgumentsParser.parse(args).getOrElse(CmdArgs()) + val configFile = new File(cmdArgs.configFile.get) + val config = ConfigFactory.parseFile(configFile) + + val sourceOptions = Map( + "kafka.bootstrap.servers" -> config.getString("source.options.server"), + "startingOffsets" -> config.getString("source.options.startingOffsets"), + "subscribe" -> config.getString("source.options.subscribe") + ) + + val sinkOptions = Map( + "path" -> config.getString("sink.options.path"), + "checkpoint" -> config.getString("sink.options.checkpoint") + ) + + AppConfig( + spark = SparkConfig( + appName = config.getString("spark.appName") + ), + source = SourceConfig( + format = config.getString("source.format"), + options = sourceOptions + ), + sink = SinkConfig( + format = config.getString("sink.format"), + mode = config.getString("sink.mode"), + options = sinkOptions + ) + ) + } +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/config/ArgumentsParser.scala b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/config/ArgumentsParser.scala new file mode 100644 index 0000000..1ee6f6c --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/config/ArgumentsParser.scala @@ -0,0 +1,24 @@ +package com.codely.lesson_02_tests_in_spark.z_practical_exercise.config + +import scopt.OParser + +object ArgumentsParser { + val builder = OParser.builder[CmdArgs] + val argsParser = { + import builder._ + OParser.sequence( + programName("Scala Application"), + head("Scala Application", "1.0"), + opt[String]('c', "configFile") + .required() + .valueName("") + .action((x, c) => c.copy(configFile = Some(x))) + .text("Path to the configuration file."), + help("help").text("Prints this usage text") + ) + } + + def parse(args: Array[String]): Option[CmdArgs] = { + OParser.parse(argsParser, args, CmdArgs()) + } +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/config/CmdArgs.scala b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/config/CmdArgs.scala new file mode 100644 index 0000000..6bfbbac --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/config/CmdArgs.scala @@ -0,0 +1,3 @@ +package com.codely.lesson_02_tests_in_spark.z_practical_exercise.config + +case class CmdArgs(configFile: Option[String] = None) \ No newline at end of file diff --git a/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/extensions/DataFrameExtensions.scala b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/extensions/DataFrameExtensions.scala new file mode 100644 index 0000000..cdb09ba --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/extensions/DataFrameExtensions.scala @@ -0,0 +1,41 @@ +package com.codely.lesson_02_tests_in_spark.z_practical_exercise.extensions + +import com.codely.lesson_02_tests_in_spark.z_practical_exercise.commons.Schemas +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.{avg, col, explode, from_json, month, to_date} + +object DataFrameExtensions { + implicit class DataFrameOps(df: DataFrame) { + + def parseJson: DataFrame = { + df.select(from_json(col("value").cast("string"), Schemas.purchasedSchema).as("value")) + .select("value.*") + } + + def addDateColum: DataFrame = { + df.withColumn("date", to_date(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss'Z'")) + } + + def explodeProducts: DataFrame = { + df.select( + col("userId"), + explode(col("products")).as("product"), + col("date") + ) + } + + def transformForAggregation: DataFrame = { + df.select( + col("userId"), + col("product.category"), + month(col("date")).alias("month"), + (col("product.price") * col("product.quantity")).alias("totalSpent") + ) + } + + def calculateAvgSpending: DataFrame = { + df.groupBy(col("userId"), col("category"), col("month")) + .agg(avg("totalSpent").alias("AvgSpending")) + } + } +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/job/AvgSpendingJob.scala b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/job/AvgSpendingJob.scala new file mode 100644 index 0000000..14d3ae2 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/job/AvgSpendingJob.scala @@ -0,0 +1,26 @@ +package com.codely.lesson_02_tests_in_spark.z_practical_exercise.job + +import com.codely.lesson_02_tests_in_spark.z_practical_exercise.extensions.DataFrameExtensions._ +import com.codely.lesson_02_tests_in_spark.z_practical_exercise.config.AppConfig +import com.codely.lesson_02_tests_in_spark.z_practical_exercise.service.{Reader, StreamWriter} +import org.apache.spark.sql.SparkSession + +case class AvgSpendingJob( + context: AppConfig, + reader: Reader, + writer: StreamWriter +)(implicit spark: SparkSession) { + + def run(): Unit = { + + val data = reader.read(context.source.format, context.source.options) + + val avgSpendingPerUserDF = + data.parseJson.addDateColum.explodeProducts.transformForAggregation.calculateAvgSpending + + val query = writer.write(avgSpendingPerUserDF, context.sink.mode, context.sink.format, context.sink.options) + + query.awaitTermination() + } + +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/service/Reader.scala b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/service/Reader.scala new file mode 100644 index 0000000..4b6e1c6 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/service/Reader.scala @@ -0,0 +1,12 @@ +package com.codely.lesson_02_tests_in_spark.z_practical_exercise.service + +import org.apache.spark.sql.{DataFrame, SparkSession} + +case class Reader()(implicit spark: SparkSession) { + def read(format: String, options: Map[String, String]): DataFrame = { + spark.read + .format(format) + .options(options) + .load() + } +} diff --git a/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/service/StreamWriter.scala b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/service/StreamWriter.scala new file mode 100644 index 0000000..f877f16 --- /dev/null +++ b/src/main/com/codely/lesson_02_tests_in_spark/z_practical_exercise/service/StreamWriter.scala @@ -0,0 +1,15 @@ +package com.codely.lesson_02_tests_in_spark.z_practical_exercise.service + +import org.apache.spark.sql.streaming.StreamingQuery +import org.apache.spark.sql.DataFrame + +case class StreamWriter() { + def write( + df: DataFrame, + mode: String, + format: String, + options: Map[String, String] + ): StreamingQuery = { + df.writeStream.outputMode(mode).format(format).options(options).start() + } +} diff --git a/src/main/com/codely/lesson_03_basics_spark_execution_model/video_01__how_spark_works/HowSparkWorks.scala b/src/main/com/codely/lesson_03_basics_spark_execution_model/video_01__how_spark_works/HowSparkWorks.scala new file mode 100644 index 0000000..e7639fd --- /dev/null +++ b/src/main/com/codely/lesson_03_basics_spark_execution_model/video_01__how_spark_works/HowSparkWorks.scala @@ -0,0 +1,25 @@ +package com.codely.lesson_03_basics_spark_execution_model.video_01__how_spark_works + +object HowSparkWorks extends App { + + // 1. docker exec -it spark-ecosystem-cluster-spark-master-1 bash + // 2. ./bin/spark-shell --master spark://spark-master:7077 --total-executor-cores 2 --executor-memory 1024m + + val spark = org.apache.spark.sql.SparkSession.builder + .master("local") + .appName("Spark Example") + .getOrCreate() + + val sc = spark.sparkContext + val numbers = sc.parallelize(1 to 1000) + numbers.count() + + // localhost:4040 + + val doubledNumbers = numbers.map(_ * 2) + doubledNumbers.count() + + val groupedNumbers = doubledNumbers.groupBy(_ % 2) + groupedNumbers.count() + +} diff --git a/src/main/com/codely/lesson_03_basics_spark_execution_model/video_02__reading_query_plans/QueryPlans.scala b/src/main/com/codely/lesson_03_basics_spark_execution_model/video_02__reading_query_plans/QueryPlans.scala new file mode 100644 index 0000000..83fc2ba --- /dev/null +++ b/src/main/com/codely/lesson_03_basics_spark_execution_model/video_02__reading_query_plans/QueryPlans.scala @@ -0,0 +1,46 @@ +package com.codely.lesson_03_basics_spark_execution_model.video_02__reading_query_plans + +object QueryPlans extends App { + + val spark = org.apache.spark.sql.SparkSession.builder + .master("local[*]") + .appName("Spark Query Plans") + .getOrCreate() + + spark.sparkContext.setLogLevel("WARN") + + val rangeDs = spark.range(1000) + + rangeDs.explain() + + val mappedDs = rangeDs.selectExpr("id * 2 as id") + + mappedDs.explain(extended = true) + + val mappedAnfFilteredDs = rangeDs.selectExpr("id * 2 as id").filter("id = 2") + + mappedAnfFilteredDs.explain(extended = true) + + import spark.implicits._ + + val anotherDs = Seq( + (0, "zero"), + (2, "two"), + (4, "four"), + (6, "six"), + (8, "eight") + ).toDF("id", "name") + + val joinedDs = mappedDs.join(anotherDs, "id") + + joinedDs.explain(extended = true) + + val bigRangeDs = spark.range(2000000000) + + val anotherBigDs = spark.range(2000000000) + + val joinedBigDs = bigRangeDs.join(anotherBigDs, "id") + + joinedBigDs.explain(extended = true) + +} diff --git a/src/main/com/codely/lesson_03_basics_spark_execution_model/z_practical_exercise/QueryPlanExercise.scala b/src/main/com/codely/lesson_03_basics_spark_execution_model/z_practical_exercise/QueryPlanExercise.scala new file mode 100644 index 0000000..e6093cc --- /dev/null +++ b/src/main/com/codely/lesson_03_basics_spark_execution_model/z_practical_exercise/QueryPlanExercise.scala @@ -0,0 +1,26 @@ +package com.codely.lesson_03_basics_spark_execution_model.z_practical_exercise + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.col + +object QueryPlanExercise extends App { + + val spark = SparkSession + .builder() + .appName("AvgSpendingCalculation") + .master("local[*]") + .getOrCreate() + + val filePath = + "src/main/com/codely/lesson_03_basics_spark_execution_model/z_practical_exercise/data/some_csv.csv" + + val rawData = spark.read.option("header", "true").csv(filePath) + + val filteredData = + rawData.filter(col("colA") === 1).selectExpr("upper(colB) as colB") + + filteredData.explain() + + filteredData.show() + +} diff --git a/src/main/com/codely/lesson_03_basics_spark_execution_model/z_practical_exercise/data/some_csv.csv b/src/main/com/codely/lesson_03_basics_spark_execution_model/z_practical_exercise/data/some_csv.csv new file mode 100644 index 0000000..1f87289 --- /dev/null +++ b/src/main/com/codely/lesson_03_basics_spark_execution_model/z_practical_exercise/data/some_csv.csv @@ -0,0 +1,4 @@ +colA,colB +1,"good" +1,"job" +2,"bad" \ No newline at end of file diff --git a/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/DeploySparkApp.scala b/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/DeploySparkApp.scala new file mode 100644 index 0000000..e8b9f12 --- /dev/null +++ b/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/DeploySparkApp.scala @@ -0,0 +1,50 @@ +package com.codely.lesson_04_how_to_deploy_spark.video_01__deploy_application + +import com.codely.lesson_04_how_to_deploy_spark.video_01__deploy_application.commons.Schemas +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.streaming.OutputMode + +object DeploySparkApp { + def main(args: Array[String]): Unit = { + val spark = SparkSession + .builder() + .appName("DeploySparkApp") + .enableHiveSupport() + .getOrCreate() + + val kafkaDF = spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", "kafka:9092") + .option("startingOffsets", "earliest") + .option("subscribe", "topic-events") + .load() + .select( + from_json(col("value").cast("string"), Schemas.purchasedSchema) + .as("value") + ) + .select("value.*") + + import spark.implicits._ + + val avgSpendingPerUserDF = kafkaDF + .withColumn("date", to_date($"timestamp", "yyyy-MM-dd'T'HH:mm:ss'Z'")) + .select($"userId", explode($"products").as("product"), $"date") + .select( + $"userId", + $"product.category", + month($"date").alias("month"), + ($"product.price" * $"product.quantity").alias("totalSpent") + ) + .groupBy($"userId", $"category", $"month") + .agg(avg("totalSpent").alias("AvgSpending")) + + avgSpendingPerUserDF.writeStream + .format("delta") + .option("checkpointLocation", "s3a://my-bucket/checkpoint") + .option("path", "s3a://my-bucket/avg_spending") + .outputMode(OutputMode.Complete()) + .start() + .awaitTermination() + } +} diff --git a/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/commons/Schemas.scala b/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/commons/Schemas.scala new file mode 100644 index 0000000..4ecbe27 --- /dev/null +++ b/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/commons/Schemas.scala @@ -0,0 +1,29 @@ +package com.codely.lesson_04_how_to_deploy_spark.video_01__deploy_application.commons + +object Schemas { + + import org.apache.spark.sql.types._ + + private val productType = new StructType() + .add("productId", StringType) + .add("quantity", IntegerType) + .add("description", StringType) + .add("category", StringType) + .add("price", DoubleType) + + val purchasedSchema: StructType = new StructType() + .add("eventType", StringType) + .add("timestamp", StringType) + .add("userId", StringType) + .add("transactionId", StringType) + .add("products", ArrayType(productType)) + .add("eventId", StringType) + + val viewedSchema: StructType = new StructType() + .add("eventType", StringType) + .add("timestamp", StringType) + .add("userId", StringType) + .add("productId", StringType) + .add("eventId", StringType) + +} diff --git a/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/scripts/1-prepare.sh b/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/scripts/1-prepare.sh new file mode 100644 index 0000000..5f9f698 --- /dev/null +++ b/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/scripts/1-prepare.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +sbt assembly + +mv target/scala-2.12/spark-best_practises_and_deploy-course-assembly-0.1.0-SNAPSHOT.jar docker/spark/apps \ No newline at end of file diff --git a/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/scripts/2-config.sh b/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/scripts/2-config.sh new file mode 100644 index 0000000..4107cd3 --- /dev/null +++ b/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/scripts/2-config.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +docker compose up -d + +docker exec -it spark-ecosystem-cluster-spark-master-1 bash \ No newline at end of file diff --git a/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/scripts/3a-spark-submit.sh b/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/scripts/3a-spark-submit.sh new file mode 100644 index 0000000..e10e59a --- /dev/null +++ b/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/scripts/3a-spark-submit.sh @@ -0,0 +1,20 @@ + spark/bin/spark-submit \ + --class com.codely.lesson_04_how_to_deploy_spark.video_01__deploy_application.DeploySparkApp \ + --deploy-mode client \ + --master spark://spark-master:7077 \ + --conf spark.sql.uris=thrift://hive-metastore:9083 \ + --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \ + --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \ + --conf spark.hadoop.fs.s3a.access.key=test \ + --conf spark.hadoop.fs.s3a.secret.key=test \ + --conf spark.hadoop.fs.s3a.endpoint=http://localstack:4566 \ + --conf spark.hadoop.fs.s3a.path.style.access=true \ + --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ + --conf spark.driver.memory=1g \ + --conf spark.executor.memory=1g \ + --conf spark.executor.cores=1 \ + -- verbose \ + --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,io.delta:delta-spark_2.12:3.1.0,org.apache.hadoop:hadoop-aws:3.3.1,com.amazonaws:aws-java-sdk-bundle:1.11.375 \ + spark-apps/spark-best_practises_and_deploy-course-assembly-0.1.0-SNAPSHOT.jar + + diff --git a/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/scripts/3b-spark-submit-min.sh b/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/scripts/3b-spark-submit-min.sh new file mode 100644 index 0000000..2aecffb --- /dev/null +++ b/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/scripts/3b-spark-submit-min.sh @@ -0,0 +1,10 @@ + spark/bin/spark-submit \ + --class com.codely.lesson_04_how_to_deploy_spark.video_01__deploy_application.DeploySparkApp \ + --deploy-mode client \ + --master spark://spark-master:7077 \ + --executor-memory 1G \ + --driver-memory 1G \ + --total-executor-cores 2 \ + --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \ + --verbose \ + spark-apps/spark-best_practises_and_deploy-course-assembly-0.1.0-SNAPSHOT.jar \ No newline at end of file diff --git a/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/scripts/4-publish.sh b/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/scripts/4-publish.sh new file mode 100644 index 0000000..8a8a7a5 --- /dev/null +++ b/src/main/com/codely/lesson_04_how_to_deploy_spark/video_01__deploy_application/scripts/4-publish.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +sbt "run --eventsConfigPath src/main/resources/samples/events.json" \ No newline at end of file diff --git a/src/main/com/codely/lesson_04_how_to_deploy_spark/z_practical_exercise/KafkaSparkStreamingApp.scala b/src/main/com/codely/lesson_04_how_to_deploy_spark/z_practical_exercise/KafkaSparkStreamingApp.scala new file mode 100644 index 0000000..7a83b6e --- /dev/null +++ b/src/main/com/codely/lesson_04_how_to_deploy_spark/z_practical_exercise/KafkaSparkStreamingApp.scala @@ -0,0 +1,36 @@ +package com.codely.lesson_04_how_to_deploy_spark.z_practical_exercise + +import org.apache.spark.sql.SparkSession + +object KafkaSparkStreamingApp extends App { + + val spark = SparkSession + .builder() + .appName("KafkaSparkStreamingApp") + .master("local[*]") + .getOrCreate() + + spark.sparkContext.setLogLevel("WARN") + + val kafkaDF = spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", "172.18.0.4:9092") + .option("subscribe", "topic-events") + .option("startingOffsets", "earliest") + .load() + + import spark.implicits._ + val messagesDF = kafkaDF.selectExpr("CAST(value AS STRING)").as[String] + + val wordsDF = messagesDF + .flatMap(_.split(" ")) + .groupBy("value") + .count() + + val query = wordsDF.writeStream + .outputMode("update") + .format("console") + .start() + + query.awaitTermination() +} diff --git a/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/scripts/1-export-datasource.sh b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/scripts/1-export-datasource.sh new file mode 100644 index 0000000..0d39153 --- /dev/null +++ b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/scripts/1-export-datasource.sh @@ -0,0 +1,5 @@ +curl -X POST http://localhost:3000/api/datasources \ +-H "Content-Type: application/json" \ +-H "Authorization: Bearer glsa_OvsNen9bJllcjVPQ1VSfCouJcukdcVcL_4a16a12d" \ +--data-binary "@my_datasource.json" + diff --git a/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/scripts/1.import.sh b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/scripts/1.import.sh new file mode 100644 index 0000000..9416e65 --- /dev/null +++ b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/scripts/1.import.sh @@ -0,0 +1,16 @@ +curl -X GET \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer glsa_OvsNen9bJllcjVPQ1VSfCouJcukdcVcL_4a16a12d" \ + http://localhost:3000/api/datasources + + + curl -X GET \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer glsa_OvsNen9bJllcjVPQ1VSfCouJcukdcVcL_4a16a12d" \ + http://localhost:3000/api/datasources/1 \ + -o my_datasource.json + + curl -X GET "http://localhost:3000/api/dashboards/uid/bdqiuxl7bh98gc" \ +-H "Authorization: Bearer glsa_OvsNen9bJllcjVPQ1VSfCouJcukdcVcL_4a16a12d" \ + -H "Content-Type: application/json" \ + -o dashboard.json diff --git a/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/scripts/2-export-dashboard.sh b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/scripts/2-export-dashboard.sh new file mode 100644 index 0000000..b76b5f1 --- /dev/null +++ b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/scripts/2-export-dashboard.sh @@ -0,0 +1,5 @@ +curl -X POST http://localhost:3000/api/dashboards/db \ +-H "Content-Type: application/json" \ +-H "Authorization: Bearer glsa_OvsNen9bJllcjVPQ1VSfCouJcukdcVcL_4a16a12d" \ +--data-binary "@dashboard.json" + diff --git a/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/scripts/dashboard.json b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/scripts/dashboard.json new file mode 100644 index 0000000..dd50c81 --- /dev/null +++ b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/scripts/dashboard.json @@ -0,0 +1 @@ +{"meta":{"type":"db","canSave":true,"canEdit":true,"canAdmin":true,"canStar":true,"canDelete":true,"slug":"spark-cluster-metrics-demo","url":"/d/bdqiuxl7bh98gc/spark-cluster-metrics-demo","expires":"0001-01-01T00:00:00Z","created":"2024-07-28T17:16:17Z","updated":"2024-07-28T17:16:17Z","updatedBy":"admin","createdBy":"admin","version":1,"hasAcl":false,"isFolder":false,"folderId":0,"folderUid":"","folderTitle":"General","folderUrl":"","provisioned":false,"provisionedExternalId":"","annotationsPermissions":{"dashboard":{"canAdd":true,"canEdit":true,"canDelete":true},"organization":{"canAdd":true,"canEdit":true,"canDelete":true}}},"dashboard":{"annotations":{"list":[{"builtIn":1,"datasource":{"type":"grafana","uid":"-- Grafana --"},"enable":true,"hide":true,"iconColor":"rgba(0, 211, 255, 1)","name":"Annotations \u0026 Alerts","type":"dashboard"}]},"editable":true,"fiscalYearStartMonth":0,"graphTooltip":0,"id":1,"links":[],"panels":[{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"fieldConfig":{"defaults":{"color":{"mode":"thresholds"},"mappings":[],"thresholds":{"mode":"absolute","steps":[{"color":"green","value":null},{"color":"red","value":80}]}},"overrides":[]},"gridPos":{"h":7,"w":4,"x":0,"y":0},"id":4,"options":{"colorMode":"value","graphMode":"area","justifyMode":"auto","orientation":"auto","percentChangeColorMode":"standard","reduceOptions":{"calcs":["lastNotNull"],"fields":"","values":false},"showPercentChange":false,"textMode":"auto","wideLayout":true},"pluginVersion":"11.1.0","targets":[{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"disableTextWrap":false,"editorMode":"builder","expr":"metrics_master_workers_Value","fullMetaSearch":false,"includeNullMetadata":true,"instant":false,"legendFormat":"__auto","range":true,"refId":"A","useBackend":false}],"title":"Total Workers","type":"stat"},{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"fieldConfig":{"defaults":{"color":{"mode":"thresholds"},"mappings":[],"thresholds":{"mode":"absolute","steps":[{"color":"green","value":null},{"color":"red","value":80}]}},"overrides":[]},"gridPos":{"h":7,"w":4,"x":4,"y":0},"id":6,"options":{"colorMode":"value","graphMode":"area","justifyMode":"auto","orientation":"auto","percentChangeColorMode":"standard","reduceOptions":{"calcs":["lastNotNull"],"fields":"","values":false},"showPercentChange":false,"textMode":"auto","wideLayout":true},"pluginVersion":"11.1.0","targets":[{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"disableTextWrap":false,"editorMode":"builder","expr":"metrics_master_aliveWorkers_Value","fullMetaSearch":false,"includeNullMetadata":true,"instant":false,"legendFormat":"__auto","range":true,"refId":"A","useBackend":false}],"title":"Workers alive","type":"stat"},{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"fieldConfig":{"defaults":{"color":{"mode":"thresholds"},"mappings":[],"thresholds":{"mode":"absolute","steps":[{"color":"green","value":null},{"color":"red","value":80}]}},"overrides":[]},"gridPos":{"h":7,"w":7,"x":8,"y":0},"id":1,"options":{"minVizHeight":75,"minVizWidth":75,"orientation":"auto","reduceOptions":{"calcs":["lastNotNull"],"fields":"","values":false},"showThresholdLabels":false,"showThresholdMarkers":true,"sizing":"auto"},"pluginVersion":"11.1.0","targets":[{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"disableTextWrap":false,"editorMode":"builder","expr":"sum by(instance) (metrics_worker_memUsed_MB_Number)","fullMetaSearch":false,"includeNullMetadata":true,"instant":false,"legendFormat":"__auto","range":true,"refId":"A","useBackend":false}],"title":"Workers memory used","type":"gauge"},{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"fieldConfig":{"defaults":{"color":{"mode":"thresholds"},"mappings":[],"thresholds":{"mode":"absolute","steps":[{"color":"green","value":null},{"color":"red","value":80}]}},"overrides":[]},"gridPos":{"h":7,"w":8,"x":15,"y":0},"id":2,"options":{"minVizHeight":75,"minVizWidth":75,"orientation":"auto","reduceOptions":{"calcs":["lastNotNull"],"fields":"","values":false},"showThresholdLabels":false,"showThresholdMarkers":true,"sizing":"auto"},"pluginVersion":"11.1.0","targets":[{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"disableTextWrap":false,"editorMode":"builder","expr":"sum by(instance) (metrics_worker_coresUsed_Value)","fullMetaSearch":false,"includeNullMetadata":true,"instant":false,"legendFormat":"__auto","range":true,"refId":"A","useBackend":false}],"title":"Workers cores used","type":"gauge"},{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"fieldConfig":{"defaults":{"color":{"mode":"thresholds"},"mappings":[],"thresholds":{"mode":"absolute","steps":[{"color":"green","value":null},{"color":"red","value":80}]}},"overrides":[]},"gridPos":{"h":7,"w":4,"x":0,"y":7},"id":5,"options":{"colorMode":"value","graphMode":"area","justifyMode":"auto","orientation":"auto","percentChangeColorMode":"standard","reduceOptions":{"calcs":["lastNotNull"],"fields":"","values":false},"showPercentChange":false,"textMode":"auto","wideLayout":true},"pluginVersion":"11.1.0","targets":[{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"disableTextWrap":false,"editorMode":"builder","expr":"metrics_master_waitingApps_Value","fullMetaSearch":false,"includeNullMetadata":true,"instant":false,"legendFormat":"__auto","range":true,"refId":"A","useBackend":false}],"title":"Waiting apps","type":"stat"},{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"fieldConfig":{"defaults":{"color":{"mode":"thresholds"},"mappings":[],"thresholds":{"mode":"absolute","steps":[{"color":"green","value":null},{"color":"red","value":80}]}},"overrides":[]},"gridPos":{"h":7,"w":4,"x":4,"y":7},"id":8,"options":{"colorMode":"value","graphMode":"area","justifyMode":"auto","orientation":"auto","percentChangeColorMode":"standard","reduceOptions":{"calcs":["lastNotNull"],"fields":"","values":false},"showPercentChange":false,"textMode":"auto","wideLayout":true},"pluginVersion":"11.1.0","targets":[{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"disableTextWrap":false,"editorMode":"builder","expr":"metrics_master_apps_Number","fullMetaSearch":false,"includeNullMetadata":true,"instant":false,"legendFormat":"__auto","range":true,"refId":"A","useBackend":false}],"title":"Total apps","type":"stat"},{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"fieldConfig":{"defaults":{"color":{"mode":"palette-classic"},"custom":{"axisBorderShow":false,"axisCenteredZero":false,"axisColorMode":"text","axisLabel":"","axisPlacement":"auto","barAlignment":0,"drawStyle":"line","fillOpacity":0,"gradientMode":"none","hideFrom":{"legend":false,"tooltip":false,"viz":false},"insertNulls":false,"lineInterpolation":"linear","lineWidth":1,"pointSize":5,"scaleDistribution":{"type":"linear"},"showPoints":"auto","spanNulls":false,"stacking":{"group":"A","mode":"none"},"thresholdsStyle":{"mode":"off"}},"mappings":[],"thresholds":{"mode":"absolute","steps":[{"color":"green","value":null},{"color":"red","value":80}]}},"overrides":[{"matcher":{"id":"byName","options":"app-20240702165007-0020"},"properties":[]}]},"gridPos":{"h":7,"w":15,"x":8,"y":7},"id":7,"options":{"legend":{"calcs":[],"displayMode":"list","placement":"bottom","showLegend":true},"tooltip":{"mode":"single","sort":"none"}},"targets":[{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"disableTextWrap":false,"editorMode":"builder","expr":"sum by(application_id) (metrics_executor_failedTasks_total)","format":"time_series","fullMetaSearch":false,"includeNullMetadata":true,"instant":false,"legendFormat":"failed","range":true,"refId":"A","useBackend":false},{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"disableTextWrap":false,"editorMode":"builder","expr":"sum by(application_id) (metrics_executor_activeTasks)","fullMetaSearch":false,"hide":false,"includeNullMetadata":true,"instant":false,"legendFormat":"active","range":true,"refId":"B","useBackend":false},{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"disableTextWrap":false,"editorMode":"builder","expr":"sum by(application_id) (metrics_executor_completedTasks_total)","fullMetaSearch":false,"hide":false,"includeNullMetadata":true,"instant":false,"legendFormat":"completed","range":true,"refId":"C","useBackend":false}],"title":"Tasks completed vs failed","type":"timeseries"},{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"fieldConfig":{"defaults":{"color":{"mode":"palette-classic"},"custom":{"axisBorderShow":false,"axisCenteredZero":false,"axisColorMode":"text","axisLabel":"","axisPlacement":"auto","barAlignment":0,"drawStyle":"line","fillOpacity":0,"gradientMode":"none","hideFrom":{"legend":false,"tooltip":false,"viz":false},"insertNulls":false,"lineInterpolation":"linear","lineWidth":1,"pointSize":5,"scaleDistribution":{"type":"linear"},"showPoints":"auto","spanNulls":false,"stacking":{"group":"A","mode":"none"},"thresholdsStyle":{"mode":"off"}},"mappings":[],"thresholds":{"mode":"absolute","steps":[{"color":"green","value":null},{"color":"red","value":80}]}},"overrides":[]},"gridPos":{"h":10,"w":23,"x":0,"y":14},"id":9,"options":{"legend":{"calcs":[],"displayMode":"list","placement":"bottom","showLegend":true},"tooltip":{"mode":"single","sort":"none"}},"targets":[{"datasource":{"type":"prometheus","uid":"edt4uxwwo5kowa"},"disableTextWrap":false,"editorMode":"builder","expr":"sum(metrics_worker_memFree_MB_Value)","fullMetaSearch":false,"includeNullMetadata":true,"instant":false,"legendFormat":"__auto","range":true,"refId":"A","useBackend":false}],"title":"Panel Title","type":"timeseries"}],"schemaVersion":39,"tags":[],"templating":{"list":[]},"time":{"from":"now-15m","to":"now"},"timepicker":{},"timezone":"browser","title":"Spark Cluster metrics Demo","uid":"bdqiuxl7bh98gc","version":1,"weekStart":""}} \ No newline at end of file diff --git a/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/scripts/my_dashboard.json b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/scripts/my_dashboard.json new file mode 100644 index 0000000..844deb7 --- /dev/null +++ b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/scripts/my_dashboard.json @@ -0,0 +1,1182 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 2, + "links": [], + "panels": [ + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 14, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 4, + "x": 0, + "y": 9 + }, + "id": 10, + "options": { + "minVizHeight": 75, + "minVizWidth": 75, + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "sizing": "auto" + }, + "pluginVersion": "11.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "sum(metrics_worker_coresFree_Value)", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Free cores", + "type": "gauge" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 4, + "x": 4, + "y": 9 + }, + "id": 11, + "options": { + "minVizHeight": 75, + "minVizWidth": 75, + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "sizing": "auto" + }, + "pluginVersion": "11.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "sum(metrics_worker_memFree_MB_Value)", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Free memory", + "type": "gauge" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 16, + "x": 8, + "y": 9 + }, + "id": 9, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "sum(metrics_worker_memFree_MB_Value)", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Free memory in cluster", + "type": "timeseries" + } + ], + "title": "Cluster general", + "type": "row" + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 1 + }, + "id": 12, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 4, + "x": 0, + "y": 10 + }, + "id": 4, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "metrics_master_workers_Value", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Total Workers", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 4, + "x": 4, + "y": 10 + }, + "id": 6, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "metrics_master_aliveWorkers_Value", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Workers alive", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 16, + "x": 8, + "y": 10 + }, + "id": 18, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "sum(metrics_executor_activeTasks)", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "sum(metrics_executor_failedTasks_total)", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "B", + "useBackend": false + } + ], + "title": "Tasks", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 8, + "x": 0, + "y": 17 + }, + "id": 2, + "options": { + "minVizHeight": 75, + "minVizWidth": 75, + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "sizing": "auto" + }, + "pluginVersion": "11.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "sum by(instance) (metrics_worker_coresUsed_Value)", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Workers cores used", + "type": "gauge" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 7, + "x": 8, + "y": 17 + }, + "id": 1, + "options": { + "minVizHeight": 75, + "minVizWidth": 75, + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "sizing": "auto" + }, + "pluginVersion": "11.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "sum by(instance) (metrics_worker_memUsed_MB_Number)", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Workers memory used", + "type": "gauge" + } + ], + "title": "Workers", + "type": "row" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 2 + }, + "id": 13, + "panels": [], + "title": "Applications", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 4, + "x": 0, + "y": 3 + }, + "id": 8, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "metrics_master_apps_Number", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Total apps", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 4, + "x": 4, + "y": 3 + }, + "id": 5, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "metrics_master_waitingApps_Value", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Waiting apps", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 16, + "x": 8, + "y": 3 + }, + "id": 19, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "sum by(application_name) (metrics_executor_totalShuffleWrite_bytes_total)", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Shuffle write", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 10 + }, + "id": 16, + "panels": [], + "title": "Hive Metastore", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 11 + }, + "id": 15, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "metrics_app_20240728171930_0004_driver_HiveExternalCatalog_partitionsFetched_Count", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Partitions fetched Hive Metastore", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 11 + }, + "id": 17, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "edt4uxwwo5kowa" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "metrics_HiveExternalCatalog_hiveClientCalls_Count", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Hive Metastore calls", + "type": "timeseries" + } + ], + "schemaVersion": 39, + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "Spark Cluster metrics Demo", + "uid": "bdqiuxl7bh98gc", + "version": 3, + "weekStart": "" +} \ No newline at end of file diff --git a/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/spark-submit.sh b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/spark-submit.sh new file mode 100644 index 0000000..acdf7d0 --- /dev/null +++ b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03__monitor_your_app_with_grafana/spark-submit.sh @@ -0,0 +1,13 @@ + spark/bin/spark-submit \ + --class com.codely.lesson_04_how_to_deploy_spark.video_01__deploy_application.DeploySparkApp \ + --deploy-mode client \ + --master spark://spark-master:7077 \ + --executor-memory 1G \ + --driver-memory 1G \ + --total-executor-cores 2 \ + --conf spark.ui.prometheus.enabled=true \ + --conf spark.executor.processTreeMetrics.enabled=true \ + --conf spark.eventLog.logStageExecutorMetrics=true \ + --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \ + --verbose \ + spark-apps/spark-best_practises_and_deploy-course-assembly-0.1.0-SNAPSHOT.jar \ No newline at end of file diff --git a/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_04_join_optimization/JoinOptimizationApp.scala b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_04_join_optimization/JoinOptimizationApp.scala new file mode 100644 index 0000000..64c727f --- /dev/null +++ b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_04_join_optimization/JoinOptimizationApp.scala @@ -0,0 +1,30 @@ +package com.codely.lesson_05_monitoring_and_optimizations.video_04_join_optimization + +import org.apache.spark.sql.functions.broadcast + +object JoinOptimizationApp extends SparkApp { + + spark.sparkContext.setLogLevel("WARN") + + import spark.implicits._ + + val dataFrame1 = + Seq((1, "Alice", 50), (2, "Bob", 80), (3, "Javi", 99)) + .toDF("id", "name", "score") + + val largeDataFrame = spark + .range(1, 100000000L) + .map(i => (i, s"Name$i")) + .toDF("id", "other") + + // val result = largeDataFrame.join(dataFrame1, "id") + // result.explain() + // result.show() + + val result = largeDataFrame.join(broadcast(dataFrame1), "id") + result.explain() + result.show() + + Thread.sleep(1000000) + +} diff --git a/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_04_join_optimization/SparkApp.scala b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_04_join_optimization/SparkApp.scala new file mode 100644 index 0000000..a7a0796 --- /dev/null +++ b/src/main/com/codely/lesson_05_monitoring_and_optimizations/video_04_join_optimization/SparkApp.scala @@ -0,0 +1,10 @@ +package com.codely.lesson_05_monitoring_and_optimizations.video_04_join_optimization + +trait SparkApp extends App { + val spark = org.apache.spark.sql.SparkSession.builder + .master("local[*]") + .appName("Spark Broadcast Join") + //.config("spark.sql.autoBroadcastJoinThreshold", -1) descomentar primera vez + //.config("spark.sql.adaptive.enabled", "false") descomentar primera vez + .getOrCreate() +} diff --git a/src/main/com/codely/lesson_05_monitoring_and_optimizations/z_practical_exercise/JoinOptimizationApp.scala b/src/main/com/codely/lesson_05_monitoring_and_optimizations/z_practical_exercise/JoinOptimizationApp.scala new file mode 100644 index 0000000..c7c1a03 --- /dev/null +++ b/src/main/com/codely/lesson_05_monitoring_and_optimizations/z_practical_exercise/JoinOptimizationApp.scala @@ -0,0 +1,30 @@ +package com.codely.lesson_05_monitoring_and_optimizations.z_practical_exercise + +import org.apache.spark.sql.functions.broadcast + +object JoinOptimizationApp extends SparkApp { + + spark.sparkContext.setLogLevel("WARN") + + import spark.implicits._ + + val dataFrame1 = + Seq((1, "Alice", 50), (2, "Bob", 80), (3, "Javi", 99)) + .toDF("id", "name", "score") + + val largeDataFrame = spark + .range(1, 100000000L) + .map(i => (i, s"Name$i")) + .toDF("id", "other") + + // val result = largeDataFrame.join(dataFrame1, "id") + // result.explain() + // result.show() + + val result = largeDataFrame.join(broadcast(dataFrame1), "id") + result.explain() + result.show() + + Thread.sleep(1000000) + +} diff --git a/src/main/com/codely/lesson_05_monitoring_and_optimizations/z_practical_exercise/SparkApp.scala b/src/main/com/codely/lesson_05_monitoring_and_optimizations/z_practical_exercise/SparkApp.scala new file mode 100644 index 0000000..df75ab3 --- /dev/null +++ b/src/main/com/codely/lesson_05_monitoring_and_optimizations/z_practical_exercise/SparkApp.scala @@ -0,0 +1,10 @@ +package com.codely.lesson_05_monitoring_and_optimizations.z_practical_exercise + +trait SparkApp extends App { + val spark = org.apache.spark.sql.SparkSession.builder + .master("local[*]") + .appName("Spark Broadcast Join") + //.config("spark.sql.autoBroadcastJoinThreshold", -1) descomentar primera vez + //.config("spark.sql.adaptive.enabled", "false") descomentar primera vez + .getOrCreate() +} diff --git a/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/0-delete-all.sh b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/0-delete-all.sh new file mode 100644 index 0000000..2fc4dfe --- /dev/null +++ b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/0-delete-all.sh @@ -0,0 +1,82 @@ +#!/bin/bash + +# Detach policies from roles if they exist +detach_role_policy() { + local role_name=$1 + local policy_arn=$2 + if aws iam get-role --role-name "$role_name" --profile emr-user > /dev/null 2>&1; then + echo "Detaching policy $policy_arn from role $role_name" + aws iam detach-role-policy --role-name "$role_name" --policy-arn "$policy_arn" --profile emr-user + fi +} + +detach_user_policy() { + local user_name=$1 + local policy_arn=$2 + if aws iam get-user --user-name "$user_name" --profile emr-user > /dev/null 2>&1; then + echo "Detaching policy $policy_arn from user $user_name" + aws iam detach-user-policy --user-name "$user_name" --policy-arn "$policy_arn" --profile emr-user + fi +} + +# Detach policies from roles +detach_role_policy EMR_DefaultRole arn:aws:iam::010928190667:policy/EMR_S3_FullAccessPolicy +detach_role_policy EMR_EC2_DefaultRole arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role +detach_role_policy EMR_EC2_DefaultRole arn:aws:iam::aws:policy/AmazonS3FullAccess + +# Detach policies from user +detach_user_policy emr-user arn:aws:iam::010928190667:policy/EMR_RunJobFlow_Policy +detach_user_policy emr-user arn:aws:iam::010928190667:policy/EMR_FullAccess_Policy + +# Detach policies from Step Functions role +detach_role_policy EMR_StepFunctions_Role arn:aws:iam::010928190667:policy/EMR_StepFunctions_Policy + +# Delete policies if they exist +delete_policy() { + local policy_arn=$1 + if aws iam get-policy --policy-arn "$policy_arn" --profile emr-user > /dev/null 2>&1; then + echo "Deleting policy $policy_arn" + aws iam delete-policy --policy-arn "$policy_arn" --profile emr-user + fi +} + +delete_policy arn:aws:iam::010928190667:policy/EMR_RunJobFlow_Policy +delete_policy arn:aws:iam::010928190667:policy/EMR_FullAccess_Policy +delete_policy arn:aws:iam::010928190667:policy/EMR_StepFunctions_Policy + +# Delete roles if they exist +delete_role() { + local role_name=$1 + if aws iam get-role --role-name "$role_name" --profile emr-user > /dev/null 2>&1; then + echo "Deleting role $role_name" + aws iam delete-role --role-name "$role_name" --profile emr-user + fi +} + +delete_role EMR_DefaultRole +delete_role EMR_EC2_DefaultRole +delete_role EMR_StepFunctions_Role + +# Delete instance profile if it exists +delete_instance_profile() { + local profile_name=$1 + if aws iam get-instance-profile --instance-profile-name "$profile_name" --profile emr-user > /dev/null 2>&1; then + echo "Deleting instance profile $profile_name" + aws iam remove-role-from-instance-profile --instance-profile-name "$profile_name" --role-name EMR_EC2_DefaultRole --profile emr-user + aws iam delete-instance-profile --instance-profile-name "$profile_name" --profile emr-user + fi +} + +delete_instance_profile EMR_EC2_DefaultRole + +# Delete service-linked role for EMR if it exists +if aws iam get-role --role-name AWSServiceRoleForEMRCleanup --profile emr-user > /dev/null 2>&1; then + echo "Deleting service-linked role AWSServiceRoleForEMRCleanup" + aws iam delete-service-linked-role --role-name AWSServiceRoleForEMRCleanup --profile emr-user +fi + +# Delete S3 bucket if it exists +if aws s3 ls s3://my-bucket-codely --profile emr-user > /dev/null 2>&1; then + echo "Deleting S3 bucket my-bucket-codely" + aws s3 rb s3://my-bucket-codely --force --profile emr-user +fi diff --git a/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/1-setup-resources.sh b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/1-setup-resources.sh new file mode 100644 index 0000000..c2a920d --- /dev/null +++ b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/1-setup-resources.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +# Create S3 bucket +aws s3 mb s3://my-bucket-codely --profile emr-user + +# Create EMR-S3-FullAccessPolicy +aws iam create-policy --policy-name EMR_FullAccessPolicy --policy-document file://EMR_FullAccessPolicy.json --profile emr-user + +# Create EMR_DefaultRole +aws iam create-role --role-name EMR_DefaultRole --assume-role-policy-document file://EMR_DefaultRoleTrustPolicy.json --profile emr-user + +# Attach EMR_FullAccessPolicy to EMR_DefaultRole +aws iam attach-role-policy --role-name EMR_DefaultRole --policy-arn arn:aws:iam::010928190667:policy/EMR_FullAccessPolicy --profile emr-user + +# Create EMR_EC2_DefaultRole +aws iam create-role --role-name EMR_EC2_DefaultRole --assume-role-policy-document file://EMR_EC2_DefaultRoleTrustPolicy.json --profile emr-user + +# Create instance profile and add role +aws iam create-instance-profile --instance-profile-name EMR_EC2_DefaultRole --profile emr-user +aws iam add-role-to-instance-profile --instance-profile-name EMR_EC2_DefaultRole --role-name EMR_EC2_DefaultRole --profile emr-user + +# Attach policies to EMR_EC2_DefaultRole +aws iam attach-role-policy --role-name EMR_EC2_DefaultRole --policy-arn arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role --profile emr-user +aws iam attach-role-policy --role-name EMR_EC2_DefaultRole --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess --profile emr-user + +# Create Step Functions policy +aws iam create-policy --policy-name EMR_StepFunctions_Policy --policy-document file://EMR_StepFunctions_Policy.json --profile emr-user + +# Create Step Functions role and attach policy +aws iam create-role --role-name EMR_StepFunctions_Role --assume-role-policy-document file://StepFunctionsTrustPolicy.json --profile emr-user +aws iam attach-role-policy --role-name EMR_StepFunctions_Role --policy-arn arn:aws:iam::010928190667:policy/EMR_StepFunctions_Policy --profile emr-user + +# Create service-linked role for EMR +aws iam create-service-linked-role --aws-service-name elasticmapreduce.amazonaws.com --description "Role for EMR cleanup tasks" --profile emr-user + +# Create EMR cluster using Step Functions +aws stepfunctions create-state-machine --name "EMR_StepFunctions_Machine" --definition file://state_machine_definition.json --role-arn arn:aws:iam::010928190667:role/EMR_StepFunctions_Role --profile emr-user + +# Start execution of Step Functions +# aws stepfunctions start-execution --state-machine-arn arn:aws:states:eu-west-1:010928190667:stateMachine:EMR_StepFunctions_Machine --profile emr-user diff --git a/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/EMR_DefaultRoleTrustPolicy.json b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/EMR_DefaultRoleTrustPolicy.json new file mode 100644 index 0000000..34e5d82 --- /dev/null +++ b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/EMR_DefaultRoleTrustPolicy.json @@ -0,0 +1,12 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "elasticmapreduce.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] +} diff --git a/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/EMR_EC2_DefaultRoleTrustPolicy.json b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/EMR_EC2_DefaultRoleTrustPolicy.json new file mode 100644 index 0000000..87c7d7c --- /dev/null +++ b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/EMR_EC2_DefaultRoleTrustPolicy.json @@ -0,0 +1,12 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "ec2.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] +} diff --git a/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/EMR_FullAccessPolicy.json b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/EMR_FullAccessPolicy.json new file mode 100644 index 0000000..6e5d32a --- /dev/null +++ b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/EMR_FullAccessPolicy.json @@ -0,0 +1,18 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "elasticmapreduce:*", + "ec2:Describe*", + "ec2:*", + "s3:*", + "logs:*", + "iam:GetRole", + "iam:PassRole" + ], + "Resource": "*" + } + ] +} diff --git a/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/EMR_StepFunctions_Policy.json b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/EMR_StepFunctions_Policy.json new file mode 100644 index 0000000..b9be94d --- /dev/null +++ b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/EMR_StepFunctions_Policy.json @@ -0,0 +1,19 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "elasticmapreduce:RunJobFlow", + "elasticmapreduce:DescribeCluster", + "elasticmapreduce:TerminateJobFlows", + "elasticmapreduce:AddJobFlowSteps", + "elasticmapreduce:DescribeStep", + "elasticmapreduce:ListSteps", + "s3:*", + "iam:PassRole" + ], + "Resource": "*" + } + ] +} diff --git a/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/StepFunctionsTrustPolicy.json b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/StepFunctionsTrustPolicy.json new file mode 100644 index 0000000..9217421 --- /dev/null +++ b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/StepFunctionsTrustPolicy.json @@ -0,0 +1,12 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "states.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] +} diff --git a/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/state_machine_definition.json b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/state_machine_definition.json new file mode 100644 index 0000000..de2fcd0 --- /dev/null +++ b/src/main/com/codely/lesson_06__deploy_to_cloud/video_01__infra_aws/state_machine_definition.json @@ -0,0 +1,67 @@ +{ + "Comment": "A description of my state machine", + "StartAt": "CreateCluster", + "States": { + "CreateCluster": { + "Type": "Task", + "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", + "Parameters": { + "Name": "WordCountStreamingCluster", + "ReleaseLabel": "emr-7.1.0", + "Applications": [ + { + "Name": "Spark" + } + ], + "Instances": { + "InstanceGroups": [ + { + "Name": "Master instance group", + "Market": "ON_DEMAND", + "InstanceRole": "MASTER", + "InstanceType": "m5.xlarge", + "InstanceCount": 1 + }, + { + "Name": "Core instance group", + "Market": "ON_DEMAND", + "InstanceRole": "CORE", + "InstanceType": "m5.xlarge", + "InstanceCount": 1 + } + ], + "KeepJobFlowAliveWhenNoSteps": true + }, + "JobFlowRole": "EMR_EC2_DefaultRole", + "ServiceRole": "EMR_DefaultRole", + "VisibleToAllUsers": true, + "LogUri": "s3://my-bucket-codely/logs/" + }, + "Next": "AddStep" + }, + "AddStep": { + "Type": "Task", + "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", + "Parameters": { + "ClusterId.$": "$.ClusterId", + "Step": { + "Name": "RunSparkJob", + "ActionOnFailure": "CONTINUE", + "HadoopJarStep": { + "Jar": "command-runner.jar", + "Args": [ + "spark-submit", + "--deploy-mode", + "cluster", + "--class", + "org.apache.spark.examples.SparkPi", + "/usr/lib/spark/examples/jars/spark-examples.jar", + "10" + ] + } + } + }, + "End": true + } + } +} diff --git a/src/main/com/codely/lesson_06__deploy_to_cloud/video_02__deploy/1-start-step-emr.sh b/src/main/com/codely/lesson_06__deploy_to_cloud/video_02__deploy/1-start-step-emr.sh new file mode 100644 index 0000000..5c78c75 --- /dev/null +++ b/src/main/com/codely/lesson_06__deploy_to_cloud/video_02__deploy/1-start-step-emr.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +# Start execution of Step Functions +aws stepfunctions start-execution --state-machine-arn arn:aws:states:eu-west-1:010928190667:stateMachine:EMR_StepFunctions_Machine --profile emr-user diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf new file mode 100644 index 0000000..e23856c --- /dev/null +++ b/src/main/resources/application.conf @@ -0,0 +1,23 @@ +avg-spending-app { + spark { + appName = "AvgSpendingApp" + } + + source { + format = "jdbc" + options { + dbtable = "transactions" + driver = "org.postgresql.Driver" + url = "jdbc:postgresql://localhost:5432/test_database" + user = "admin" + password = "secret" + } + } + + sink { + format = "delta" + mode = "overwrite" + path = "tmp/delta" + } +} + diff --git a/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/video_01__end_to_end_testing/app/AvgSpendingAppTest.scala b/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/video_01__end_to_end_testing/app/AvgSpendingAppTest.scala new file mode 100644 index 0000000..680e2b7 --- /dev/null +++ b/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/video_01__end_to_end_testing/app/AvgSpendingAppTest.scala @@ -0,0 +1,44 @@ +package com.codely.spark_best_practices_and_deploy.lesson_02_tests_in_spark.video_01__end_to_end_testing.app + +import com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.app.AvgSpendingApp +import com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.service.{Reader, Writer} +import com.codely.spark_best_practices_and_deploy.lesson_02_tests_in_spark.video_01__end_to_end_testing.app.utils.TestUtils +import com.dimafeng.testcontainers.{ForAllTestContainer, PostgreSQLContainer} + +class AvgSpendingAppTest extends SparkTestHelper with ForAllTestContainer { + + val reader = new Reader + val writer = new Writer + + override val container: PostgreSQLContainer = { + PostgreSQLContainer().configure { c => + c.withInitScript("init_scripts.sql") + c.withDatabaseName("test-database") + c.withUsername("admin") + c.withPassword("secret") + } + } + + "AvgSpendingApp" should "process messages from Kafka and write results to Delta Lake" in { + + val configFile = + TestUtils.createTempConfFile(replacements = Map(":URL:" -> container.jdbcUrl, ":PATH:" -> tempDir)) + + AvgSpendingApp.main(Array("--configFile", configFile)) + + val result = + spark.read + .format("delta") + .load(s"$tempDir/delta") + + import testSQLImplicits._ + + val expected = Seq(("Charlie", 50), ("Bob", 20), ("Alice", 30)).toDF( + "name", + "total_spending" + ) + + assert(result.collect() sameElements expected.collect()) + } + +} diff --git a/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/video_01__end_to_end_testing/app/SparkTestHelper.scala b/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/video_01__end_to_end_testing/app/SparkTestHelper.scala new file mode 100644 index 0000000..5daae61 --- /dev/null +++ b/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/video_01__end_to_end_testing/app/SparkTestHelper.scala @@ -0,0 +1,69 @@ +package com.codely.spark_best_practices_and_deploy.lesson_02_tests_in_spark.video_01__end_to_end_testing.app + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.{SQLContext, SQLImplicits, SparkSession} +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import java.io.File +import java.nio.file.Files +import scala.reflect.io.Directory + +trait SparkTestHelper extends AnyFlatSpec with BeforeAndAfterEach with BeforeAndAfterAll { + + private val sparkSession = SparkSession + .builder() + .master("local[*]") + .appName("test-spark-session") + .config(sparkConfiguration) + .enableHiveSupport() + .getOrCreate() + + protected var tempDir: String = _ + + protected implicit def spark: SparkSession = sparkSession + + protected def sc: SparkContext = sparkSession.sparkContext + + protected def sparkConfiguration: SparkConf = + new SparkConf() + .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .set( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog" + ) + + override protected def beforeAll(): Unit = { + super.beforeAll() + clearTemporaryDirectories() + } + + override protected def beforeEach(): Unit = { + super.beforeEach() + tempDir = Files.createTempDirectory(this.getClass.toString).toString.replaceAll("\\s", "") + } + + override protected def afterAll(): Unit = { + super.afterAll() + sparkSession.stop() + clearTemporaryDirectories() + } + + override protected def afterEach(): Unit = { + super.afterEach() + new Directory(new File(tempDir)).deleteRecursively() + } + + protected object testSQLImplicits extends SQLImplicits { + protected override def _sqlContext: SQLContext = sparkSession.sqlContext + } + + private def clearTemporaryDirectories(): Unit = { + val warehousePath = new File("spark-warehouse").getAbsolutePath + FileUtils.deleteDirectory(new File(warehousePath)) + + val metastoreDbPath = new File("metastore_db").getAbsolutePath + FileUtils.deleteDirectory(new File(metastoreDbPath)) + } +} diff --git a/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/video_01__end_to_end_testing/app/utils/TestUtils.scala b/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/video_01__end_to_end_testing/app/utils/TestUtils.scala new file mode 100644 index 0000000..54250a7 --- /dev/null +++ b/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/video_01__end_to_end_testing/app/utils/TestUtils.scala @@ -0,0 +1,50 @@ +package com.codely.spark_best_practices_and_deploy.lesson_02_tests_in_spark.video_01__end_to_end_testing.app.utils + +import java.io.{File, FileWriter} +import scala.collection.mutable +import scala.io.Source + +object TestUtils { + + def createTempConfFile( + configFile: String = "application.conf", + replacements: Map[String, String] + ): String = { + val content = readApplicationConfFile(configFile) + val updatedContent = replacePlaceholders(content, replacements) + + val tempFile = File.createTempFile("temp-application", ".conf") + val writer = new FileWriter(tempFile) + try { + writer.write(updatedContent) + } finally { + writer.close() + } + + tempFile.getAbsolutePath + } + + private def readApplicationConfFile(fileName: String): String = { + readFileAsString(s"/$fileName") + } + + def readFileAsString(file: String): String = { + val fileURI = getClass.getResource(file).toURI + + val bufferedSource = Source.fromFile(fileURI) + val lines = new mutable.StringBuilder + try { + bufferedSource.getLines().foreach(line => lines.append(line).append("\n")) + } finally { + bufferedSource.close() + } + + lines.toString + } + + private def replacePlaceholders(content: String, replacements: Map[String, String]): String = { + replacements.foldLeft(content) { case (acc, (placeholder, replacement)) => + acc.replace(placeholder, replacement) + } + } +} diff --git a/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/video_02__unit_testing/scala/job/DataFrameExtensionsTest.scala b/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/video_02__unit_testing/scala/job/DataFrameExtensionsTest.scala new file mode 100644 index 0000000..113c11c --- /dev/null +++ b/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/video_02__unit_testing/scala/job/DataFrameExtensionsTest.scala @@ -0,0 +1,61 @@ +package com.codely.spark_best_practices_and_deploy.lesson_02_tests_in_spark.video_02__unit_testing.scala.job + +import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.extensions.DataFrameExtensions._ +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.execution.streaming.MemoryStream + +class DataFrameExtensionsTest extends SparkTestHelper { + + "DataFrameExtensionsTest" should "calculate average spending correctly" in { + + import testSQLImplicits._ + implicit val sqlCtx: SQLContext = spark.sqlContext + + val events = MemoryStream[String] + val sessions = events.toDS + assert(sessions.isStreaming, "sessions must be a streaming Dataset") + + val transformedSessions = + sessions.toDF().calculateCompleteAvgSpending + + val streamingQuery = transformedSessions.writeStream + .format("memory") + .queryName("queryName") + .outputMode("complete") + .start + + val offset = events.addData(DataFrameExtensionsTest.testPurchase) + + streamingQuery.processAllAvailable() + events.commit(offset) + + val result = spark.sql("select * from queryName") + result.show() + assert( + result.collect().head === Row("user456", "Electronics", 6, 599.98) + ) + } +} + +object DataFrameExtensionsTest { + + val testPurchase: String = + """ + |{ + | "eventType": "purchase", + | "timestamp": "2024-06-28T14:35:00Z", + | "userId": "user456", + | "transactionId": "trans789", + | "products": [ + | { + | "productId": "prod123", + | "quantity": 2, + | "description": "Sample product description", + | "category": "Electronics", + | "price": 299.99 + | } + | ], + | "eventId": "event012" + |} + |""".stripMargin +} diff --git a/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/video_02__unit_testing/scala/job/SparkTestHelper.scala b/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/video_02__unit_testing/scala/job/SparkTestHelper.scala new file mode 100644 index 0000000..ae356f5 --- /dev/null +++ b/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/video_02__unit_testing/scala/job/SparkTestHelper.scala @@ -0,0 +1,77 @@ +package com.codely.spark_best_practices_and_deploy.lesson_02_tests_in_spark.video_02__unit_testing.scala.job + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.{SQLContext, SQLImplicits, SparkSession} +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import java.io.File +import java.nio.file.Files +import scala.reflect.io.Directory + +trait SparkTestHelper + extends AnyFlatSpec + with BeforeAndAfterEach + with BeforeAndAfterAll { + + private val sparkSession = SparkSession + .builder() + .master("local[*]") + .appName("test-spark-session") + .config(sparkConfiguration) + .enableHiveSupport() + .getOrCreate() + + protected var tempDir: String = _ + + protected implicit def spark: SparkSession = sparkSession + + protected def sc: SparkContext = sparkSession.sparkContext + + protected def sparkConfiguration: SparkConf = + new SparkConf() + .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .set( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog" + ) + + override protected def beforeAll(): Unit = { + super.beforeAll() + clearTemporaryDirectories() + } + + override protected def beforeEach(): Unit = { + super.beforeEach() + tempDir = Files.createTempDirectory(this.getClass.toString).toString + } + + override protected def afterAll(): Unit = { + super.afterAll() + sparkSession.stop() + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + clearTemporaryDirectories() + } + + override protected def afterEach(): Unit = { + super.afterEach() + new Directory(new File(tempDir)).deleteRecursively() + spark.sharedState.cacheManager.clearCache() + spark.sessionState.catalog.reset() + } + + protected object testSQLImplicits extends SQLImplicits { + protected override def _sqlContext: SQLContext = sparkSession.sqlContext + } + + private def clearTemporaryDirectories(): Unit = { + val warehousePath = new File("spark-warehouse").getAbsolutePath + FileUtils.deleteDirectory(new File(warehousePath)) + + val metastoreDbPath = new File("metastore_db").getAbsolutePath + FileUtils.deleteDirectory(new File(metastoreDbPath)) + } + +} diff --git a/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/z_practical_exercise/job/DataFrameExtensionsTest.scala b/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/z_practical_exercise/job/DataFrameExtensionsTest.scala new file mode 100644 index 0000000..1c70b8e --- /dev/null +++ b/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/z_practical_exercise/job/DataFrameExtensionsTest.scala @@ -0,0 +1,161 @@ +package com.codely.spark_best_practices_and_deploy.lesson_02_tests_in_spark.z_practical_exercise.job + +import com.codely.lesson_02_tests_in_spark.z_practical_exercise.extensions.DataFrameExtensions._ +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.{Row, SQLContext} +import org.scalatest.matchers.should.Matchers + +class DataFrameExtensionsTest extends SparkTestHelper with Matchers { + + implicit val sqlCtx: SQLContext = spark.sqlContext + + "DataFrameExtensions" should "parse JSON correctly" in { + import testSQLImplicits._ + + val events = MemoryStream[String] + val sessions = events.toDS + + val transformedSessions = sessions.toDF().parseJson + + val streamingQuery = transformedSessions.writeStream + .format("memory") + .queryName("queryName") + .outputMode("append") + .start() + + val offset = events.addData(DataFrameExtensionsTest.testPurchase) + streamingQuery.processAllAvailable() + events.commit(offset) + + val result = spark.sql("select * from queryName") + result.show() + result.collect().head shouldEqual Row( + "purchase", + "2024-06-28T14:35:00Z", + "user456", + "trans789", + Array(Row("prod123", 2, "Sample product description", "Electronics", 299.99)), + "event012" + ) + } + + it should "add date column correctly" in { + import testSQLImplicits._ + + val events = MemoryStream[String] + val sessions = events.toDS + + val transformedSessions = sessions.toDF().parseJson.addDateColum + + val streamingQuery = transformedSessions.writeStream + .format("memory") + .queryName("queryName") + .outputMode("append") + .start() + + val offset = events.addData(DataFrameExtensionsTest.testPurchase) + streamingQuery.processAllAvailable() + events.commit(offset) + + val result = spark.sql("select * from queryName") + result.show() + result.collect().head.getAs[String]("date") shouldEqual "2024-06-28" + } + + it should "explode products correctly" in { + import testSQLImplicits._ + + val events = MemoryStream[String] + val sessions = events.toDS + + val transformedSessions = sessions.toDF().parseJson.addDateColum.explodeProducts + + val streamingQuery = transformedSessions.writeStream + .format("memory") + .queryName("queryName") + .outputMode("append") + .start() + + val offset = events.addData(DataFrameExtensionsTest.testPurchase) + streamingQuery.processAllAvailable() + events.commit(offset) + + val result = spark.sql("select * from queryName") + result.show() + result.collect().head shouldEqual Row( + "user456", + Row("prod123", 2, "Sample product description", "Electronics", 299.99), + "2024-06-28" + ) + } + + it should "transform for aggregation correctly" in { + import testSQLImplicits._ + + val events = MemoryStream[String] + val sessions = events.toDS + + val transformedSessions = sessions.toDF().parseJson.addDateColum.explodeProducts.transformForAggregation + + val streamingQuery = transformedSessions.writeStream + .format("memory") + .queryName("queryName") + .outputMode("append") + .start() + + val offset = events.addData(DataFrameExtensionsTest.testPurchase) + streamingQuery.processAllAvailable() + events.commit(offset) + + val result = spark.sql("select * from queryName") + result.show() + result.collect().head shouldEqual Row("user456", "Electronics", 6, 599.98) + } + + it should "calculate average spending correctly" in { + import testSQLImplicits._ + + val events = MemoryStream[String] + val sessions = events.toDS + + val transformedSessions = + sessions.toDF().parseJson.addDateColum.explodeProducts.transformForAggregation.calculateAvgSpending + + val streamingQuery = transformedSessions.writeStream + .format("memory") + .queryName("queryName") + .outputMode("complete") + .start() + + val offset = events.addData(DataFrameExtensionsTest.testPurchase) + streamingQuery.processAllAvailable() + events.commit(offset) + + val result = spark.sql("select * from queryName") + result.show() + result.collect().head shouldEqual Row("user456", "Electronics", 6, 599.98) + } +} + +object DataFrameExtensionsTest { + + val testPurchase: String = + """ + |{ + | "eventType": "purchase", + | "timestamp": "2024-06-28T14:35:00Z", + | "userId": "user456", + | "transactionId": "trans789", + | "products": [ + | { + | "productId": "prod123", + | "quantity": 2, + | "description": "Sample product description", + | "category": "Electronics", + | "price": 299.99 + | } + | ], + | "eventId": "event012" + |} + |""".stripMargin +} diff --git a/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/z_practical_exercise/job/SparkTestHelper.scala b/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/z_practical_exercise/job/SparkTestHelper.scala new file mode 100644 index 0000000..c0a44ca --- /dev/null +++ b/src/test/com/codely/spark_best_practices_and_deploy/lesson_02_tests_in_spark/z_practical_exercise/job/SparkTestHelper.scala @@ -0,0 +1,77 @@ +package com.codely.spark_best_practices_and_deploy.lesson_02_tests_in_spark.z_practical_exercise.job + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.{SQLContext, SQLImplicits, SparkSession} +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import java.io.File +import java.nio.file.Files +import scala.reflect.io.Directory + +trait SparkTestHelper + extends AnyFlatSpec + with BeforeAndAfterEach + with BeforeAndAfterAll { + + private val sparkSession = SparkSession + .builder() + .master("local[*]") + .appName("test-spark-session") + .config(sparkConfiguration) + .enableHiveSupport() + .getOrCreate() + + protected var tempDir: String = _ + + protected implicit def spark: SparkSession = sparkSession + + protected def sc: SparkContext = sparkSession.sparkContext + + protected def sparkConfiguration: SparkConf = + new SparkConf() + .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .set( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog" + ) + + override protected def beforeAll(): Unit = { + super.beforeAll() + clearTemporaryDirectories() + } + + override protected def beforeEach(): Unit = { + super.beforeEach() + tempDir = Files.createTempDirectory(this.getClass.toString).toString + } + + override protected def afterAll(): Unit = { + super.afterAll() + sparkSession.stop() + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + clearTemporaryDirectories() + } + + override protected def afterEach(): Unit = { + super.afterEach() + new Directory(new File(tempDir)).deleteRecursively() + spark.sharedState.cacheManager.clearCache() + spark.sessionState.catalog.reset() + } + + protected object testSQLImplicits extends SQLImplicits { + protected override def _sqlContext: SQLContext = sparkSession.sqlContext + } + + private def clearTemporaryDirectories(): Unit = { + val warehousePath = new File("spark-warehouse").getAbsolutePath + FileUtils.deleteDirectory(new File(warehousePath)) + + val metastoreDbPath = new File("metastore_db").getAbsolutePath + FileUtils.deleteDirectory(new File(metastoreDbPath)) + } + +} diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf new file mode 100644 index 0000000..7237568 --- /dev/null +++ b/src/test/resources/application.conf @@ -0,0 +1,22 @@ + + spark { + appName = "AvgSpendingApp" + } + + source { + format = "jdbc" + options { + dbtable = "example_table" + driver = "org.postgresql.Driver" + url = ":URL:" + user = "admin" + password = "secret" + } + } + + sink { + format = "delta" + mode = "overwrite" + path = ":PATH:/delta" + } + diff --git a/src/test/resources/application_kafka.conf b/src/test/resources/application_kafka.conf new file mode 100644 index 0000000..da41294 --- /dev/null +++ b/src/test/resources/application_kafka.conf @@ -0,0 +1,23 @@ + + spark { + appName = "AvgSpendingApp" + } + + source { + format = "kafka" + options { + server = "localhost:9092" + startingOffsets = "earliest" + subscribe = "topic-events" + } + } + + sink { + format = "delta" + mode = "complete" + options { + path = "/tmp" + checkpoint = "/tmp" + } + } + diff --git a/src/test/resources/init_scripts.sql b/src/test/resources/init_scripts.sql new file mode 100644 index 0000000..7b49ab0 --- /dev/null +++ b/src/test/resources/init_scripts.sql @@ -0,0 +1,13 @@ + +CREATE TABLE example_table ( + name VARCHAR(10), + value INT +); + +INSERT INTO example_table (name, value) +VALUES + ('Alice', 10), + ('Bob', 20), + ('Alice', 20), + ('Charlie', 20), + ('Charlie', 30); \ No newline at end of file