In [1]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Optimizing Shuffles")
    .master("spark://sales-spark-master:7077")
    .config("spark.cores.max", 12)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

25/05/02 13:56:34 WARN  SparkSession:72 Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
# Check Spark defaultParallelism

spark.sparkContext.defaultParallelism

12

In [3]:
# Disable AQE and Broadcast join

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [4]:
# Read EMP CSV file with 10M records

_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load("/opt/spark/datasets/input/employee_records.csv")

In [5]:
# Find out avg salary as per dept
from pyspark.sql.functions import avg

emp_avg = emp.groupBy("department_id").agg(avg("salary").alias("avg_sal"))

In [6]:
# Write data for performance Benchmarking

emp_avg.write.format("noop").mode("overwrite").save()

25/05/02 13:56:52 WARN  DataSourceV2RelationDatasetExtractor:162 Couldn't find identifier for dataset in plan RelationV2[]  noop-table

25/05/02 13:56:52 WARN  DataSourceV2RelationDatasetExtractor:189 Catalog Cannot extract dataset from relation=RelationV2[]  noop-table relationClass=org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation is unsupported
25/05/02 13:56:52 ERROR EventEmitter:77 Could not emit lineage w/ exception
io.openlineage.client.OpenLineageClientException: io.openlineage.spark.shaded.org.apache.hc.client5.http.HttpHostConnectException: Connect to http://localhost:5000 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused
	at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:208)
	at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:181)
	at io.openlineage.client.OpenLineageClient.lambda$emit$0(OpenLineageClient.java:86)
	at io.micrometer.core.instrument.composite.CompositeTimer.record(Compos

In [7]:
# Check Spark Shuffle Partition setting

spark.conf.get("spark.sql.shuffle.partitions")

'200'

In [8]:
spark.conf.set("spark.sql.shuffle.partitions", 16)

In [9]:
from pyspark.sql.functions import spark_partition_id

emp.withColumn("partition_id", spark_partition_id()).where("partition_id = 0").show()

25/05/02 13:57:11 ERROR EventEmitter:77 Could not emit lineage w/ exception
io.openlineage.client.OpenLineageClientException: io.openlineage.spark.shaded.org.apache.hc.client5.http.HttpHostConnectException: Connect to http://localhost:5000 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused
	at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:208)
	at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:181)
	at io.openlineage.client.OpenLineageClient.lambda$emit$0(OpenLineageClient.java:86)
	at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141)
	at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:86)
	at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:66)
	at io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContext.start(SparkSQLExecutionContext.java:112)
	at io.openlineage.spark.agent.OpenLineageSparkListener.lambda$sparkSQLExecStart$0(OpenLineage

+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+------------+
|first_name| last_name|           job_title|       dob|               email|               phone|  salary|department_id|partition_id|
+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+------------+
|   Richard|  Morrison|Public relations ...|1973-05-05|melissagarcia@exa...|       (699)525-4827|512653.0|            8|           0|
|     Bobby|  Mccarthy|   Barrister's clerk|1974-04-25|   llara@example.net|  (750)846-1602x7458|999836.0|            7|           0|
|    Dennis|    Norman|Land/geomatics su...|1990-06-24| jturner@example.net|    873.820.0518x825|131900.0|           10|           0|
|      John|    Monroe|        Retail buyer|1968-06-16|  erik33@example.net|    820-813-0557x624|485506.0|            1|           0|
|  Michelle|   Elliott|      Air cabin crew|1975-03-31|tiffany

25/05/02 13:57:12 ERROR EventEmitter:77 Could not emit lineage w/ exception
io.openlineage.client.OpenLineageClientException: io.openlineage.spark.shaded.org.apache.hc.client5.http.HttpHostConnectException: Connect to http://localhost:5000 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused
	at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:208)
	at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:181)
	at io.openlineage.client.OpenLineageClient.lambda$emit$0(OpenLineageClient.java:86)
	at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141)
	at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:86)
	at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:66)
	at io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContext.end(SparkSQLExecutionContext.java:161)
	at io.openlineage.spark.agent.OpenLineageSparkListener.lambda$sparkSQLExecEnd$2(OpenLineageSpar

In [12]:
# Read the partitioned data

emp_part = spark.read.format("csv").schema(_schema).option("header", True).load("/opt/spark/datasets/input/emp_partitioned.csv/")

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/opt/spark/datasets/input/emp_partitioned.csv.

In [11]:
emp_avg = emp_part.groupBy("department_id").agg(avg("salary").alias("avg_sal"))

NameError: name 'emp_part' is not defined

In [18]:
emp_avg.write.format("noop").mode("overwrite").save()

In [13]:
spark.stop()

25/05/02 13:58:17 ERROR EventEmitter:77 Could not emit lineage w/ exception
io.openlineage.client.OpenLineageClientException: io.openlineage.spark.shaded.org.apache.hc.client5.http.HttpHostConnectException: Connect to http://localhost:5000 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused
	at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:208)
	at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:181)
	at io.openlineage.client.OpenLineageClient.lambda$emit$0(OpenLineageClient.java:86)
	at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141)
	at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:86)
	at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:66)
	at io.openlineage.spark.agent.lifecycle.SparkApplicationExecutionContext.end(SparkApplicationExecutionContext.java:129)
	at io.openlineage.spark.agent.OpenLineageSparkListener.lambda$onApplicationEnd$