In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

# Configure Spark session here. Follow config below:
# memory per executor = 6 gb
# cores per executor = 1
# max cores = 2
# memory for driver = 1 gb

conf = SparkConf().setAll([('spark.executor.memory', '6g'),
                           ('spark.executor.cores', '1'),
                           ('spark.cores.max', '2'),
                           ('spark.driver.memory', '1g')])

# Start Spark Session:
spark = SparkSession.builder.config(conf=conf).appName("TestSparkSessionConfiguration").getOrCreate()




In [2]:
print('Current spark config:')
configurations = spark.sparkContext.getConf().getAll()
for item in configurations:
    print('\033[32m' + str(item))

Current spark config:
[32m('spark.driver.extraJavaOptions', '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED')
[32m('spark.app.initial.jar.urls', 'spark://crazylab:44973/jars/org.codehaus.jackson_jackson-core-asl-1.9.13.jar,spark://crazylab:44973/jars/io.delta_delta-storage-2.1.0.jar,spark://crazylab:44973/jars/io.delt

In [3]:
# Testing Spark session configuration

print('Checking memory per executor...')
result_value = list(filter(lambda tup: tup[0] == 'spark.executor.memory', configurations))[0][1]
assert result_value == '6g', 'Wrong memory per executor. Use spark.executor.memory!'

print('Checking cores per executor...')
result_value = list(filter(lambda tup: tup[0] == 'spark.executor.cores', configurations))[0][1]
assert result_value == '1', 'Wrong cores per executor. Use spark.executor.cores!'

print('Checking max cores...')
result_value = list(filter(lambda tup: tup[0] == 'spark.cores.max', configurations))[0][1]
assert result_value == '2', 'Wrong max cores. Use spark.cores.max!'

print('Checking memory for driver...')
result_value = list(filter(lambda tup: tup[0] == 'spark.driver.memory', configurations))[0][1]
assert result_value == '1g', 'Wrong memory for driver. Use spark.driver.memory!'

print('All tests passed.')


Checking memory per executor...
Checking cores per executor...
Checking max cores...
Checking memory for driver...
All tests passed.


In [4]:
# Stop Spark session:
spark.stop()