In [1]:
print("hello")

hello


In [2]:
import time
from pyspark.sql import SparkSession
import psutil
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType, MapType
import pandas as pd
from functools import reduce
from pyspark import SparkContext
from pyspark.storagelevel import StorageLevel
from pyspark.sql.functions import col
from pyspark import StorageLevel

In [3]:
# Cluster configuration for PySpark - 2 nodes (16 cores, 64GB RAM total)
import psutil
import os

# Define storage paths (modify as needed for your cluster)
external_ssd_path = "/scratch/ans9868/spark_temp"
os.makedirs(external_ssd_path, exist_ok=True)

# Calculate cluster resources
total_cores = 16
total_memory_gb = 64
nodes = 2

# Reserve memory for OS and system processes (15% per node)
reserve_memory_per_node = total_memory_gb / nodes * 0.15
available_memory_gb = total_memory_gb - (reserve_memory_per_node * nodes)

# Allocate resources
cores_per_executor = 2  # Recommended size for good parallelism without excessive overhead
num_executors = nodes * (total_cores // cores_per_executor) // nodes - 1  # Reserve 1 core per node for overhead

# Memory settings (account for overhead)
executor_memory_gb = int((available_memory_gb * 0.8) / num_executors)  # 80% for executors
driver_memory_gb = int(available_memory_gb * 0.2)  # 20% for driver

# Calculate executor overhead (roughly 10% of executor memory)
executor_overhead_mb = int(executor_memory_gb * 1024 * 0.1)

In [6]:


print(f"Cluster configuration:")
print(f"  - Total nodes: {nodes}")
print(f"  - Total cores: {total_cores}")
print(f"  - Total memory: {total_memory_gb}GB")
print(f"  - Number of executors: {num_executors}")
print(f"  - Cores per executor: {cores_per_executor}")
print(f"  - Driver memory: {driver_memory_gb}GB")
print(f"  - Executor memory: {executor_memory_gb}GB")
print(f"  - Executor overhead: {executor_overhead_mb}MB")

# Create SparkSession with distributed cluster configuration
master = "spark://cm045:14089"
spark = (SparkSession.builder
    .appName("EEG-Analysis-Cluster")
    # .master(master)  # Set to your cluster's master URL
    
    # Resource allocation
    .config("spark.executor.instances", str(num_executors))
    .config("spark.executor.cores", str(cores_per_executor))
    .config("spark.executor.memory", f"{executor_memory_gb}g")
    .config("spark.driver.memory", f"{driver_memory_gb}g")
    .config("spark.executor.memoryOverhead", f"{executor_overhead_mb}m")
    
    # Performance tuning
    .config("spark.default.parallelism", str(total_cores * 2))  # 2x total cores
    .config("spark.sql.shuffle.partitions", str(total_cores * 4))  # 4x total cores
    .config("spark.memory.fraction", "0.75")  # Higher for data processing workloads
    .config("spark.memory.storageFraction", "0.4")  # Balanced for compute/storage
    .config("spark.driver.maxResultSize", f"{driver_memory_gb // 2}g")
    
    # Network and shuffle optimizations
    .config("spark.reducer.maxSizeInFlight", "96m")  # Larger for faster network transfers
    .config("spark.shuffle.file.buffer", "2m")  # Increased for better I/O
    .config("spark.shuffle.io.maxRetries", "10")  # More resilient in distributed environment
    .config("spark.shuffle.io.retryWait", "30s")  # Wait longer between retries
    .config("spark.executor.heartbeatInterval", "10s")  # Set a much shorter heartbeat interval
    .config("spark.network.timeout", "800s")  # Prevent timeouts on larger operations
    
    # Storage locations
    .config("spark.local.dir", external_ssd_path)
    .config("spark.worker.dir", external_ssd_path)
    .config("spark.sql.warehouse.dir", f"{external_ssd_path}/warehouse")
    
    # Memory management and GC
    .config("spark.driver.extraJavaOptions", 
            f"-Djava.io.tmpdir={external_ssd_path}/tmp -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -XX:+HeapDumpOnOutOfMemoryError")
    .config("spark.executor.extraJavaOptions", 
            f"-Djava.io.tmpdir={external_ssd_path}/tmp -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark")
    
    # Data compression - snappy is balanced for speed/compression ratio
    .config("spark.io.compression.codec", "snappy")
    .config("spark.shuffle.compress", "true")
    .config("spark.shuffle.spill.compress", "true")
    .config("spark.broadcast.compress", "true")
    
    # Dynamic allocation (optional, remove if you want fixed allocation)
    #.config("spark.dynamicAllocation.enabled", "true")
    #.config("spark.dynamicAllocation.minExecutors", str(num_executors // 2))
    #.config("spark.dynamicAllocation.maxExecutors", str(num_executors + 4))
    
    .getOrCreate()
)

# Optional: Set log level to reduce console output
spark.sparkContext.setLogLevel("WARN")

# Display actual configuration for verification
print("\nActive Spark Configuration:")
print(f"Default Parallelism: {spark.sparkContext.defaultParallelism}")
print(f"Shuffle Partitions: {spark.conf.get('spark.sql.shuffle.partitions')}")

Cluster configuration:
  - Total nodes: 2
  - Total cores: 16
  - Total memory: 64GB
  - Number of executors: 7
  - Cores per executor: 2
  - Driver memory: 10GB
  - Executor memory: 6GB
  - Executor overhead: 614MB


25/05/08 21:42:06 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor). This may indicate an error, since only one SparkContext should be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.Con


Active Spark Configuration:
Default Parallelism: 32
Shuffle Partitions: 64


In [7]:
print("New Spark session created successfully")
print("=== Spark Configuration & Runtime info ===")
print(f"App ID: {spark.sparkContext.applicationId}")
print(f"Master: {spark.sparkContext.master}")
print(f"Default Parallelism: {spark.sparkContext.defaultParallelism}")
print(f"Total Executors: {spark.sparkContext._jsc.sc().getExecutorMemoryStatus().size()}")

for item in spark.sparkContext.getConf().getAll():
    print(f"{item[0]} = {item[1]}")
    

New Spark session created successfully
=== Spark Configuration & Runtime info ===
App ID: app-20250508214207-0000
Master: spark://cm045:50174
Default Parallelism: 32
Total Executors: 9
spark.shuffle.io.connectionTimeout = 3000s
spark.ui.killEnabled = false
spark.ui.enabled = true
spark.driver.userClassPathFirst = true
spark.app.submitTime = 1746754810195
spark.ndb.parallel_import = true
spark.ndb.access_key_id = 
spark.executor.cores = 2
spark.shuffle.io.maxRetries = 10
spark.executor.userClassPathFirst = true
spark.driver.memory = 10g
spark.shuffle.compress = true
spark.driver.extraJavaOptions = -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-o

In [None]:
# spark.stop()

In [None]:
external_ssd_path = "/scratch/ans9868/spark_temp"


spark = (
    SparkSession.builder
    .appName("EEG-Analysis")

    # Master should be set if you're not using spark-submit with --master
    # Memory and cores per executor
    .config("spark.executor.instances", "8")             # Total executors (across both nodes)
    .config("spark.executor.cores", "4")                 # Cores per executor
    .config("spark.executor.memory", "8g")               # Memory per executor
    .config("spark.driver.memory", "8g")                 # Driver memory

    # Memory tuning
    .config("spark.memory.fraction", "0.7")
    .config("spark.memory.storageFraction", "0.5")
    .config("spark.driver.maxResultSize", "4g")          # Result limit to prevent OOM

    # Shuffle & parallelism tuning
    .config("spark.sql.shuffle.partitions", "64")        # = total cores (2 nodes × 16)
    .config("spark.default.parallelism", "64")
    .config("spark.shuffle.file.buffer", "1m")
    .config("spark.shuffle.spill.compress", "true")
    .config("spark.shuffle.compress", "true")
    .config("spark.shuffle.spill.numElementsForceSpillThreshold", "5000")

    # External SSD paths
    # .config("spark.local.dir", "/mnt/data/spark-local")  # Temp local storage
    # .config("spark.worker.dir", "/mnt/data/spark-worker")
    # .config("spark.sql.warehouse.dir", "/mnt/data/warehouse")

    # Temp and GC tuning
    # .config("spark.driver.extraJavaOptions", "-Djava.io.tmpdir=/mnt/data/tmp -XX:+UseG1GC")
    # .config("spark.executor.extraJavaOptions", "-Djava.io.tmpdir=/mnt/data/tmp -XX:+UseG1GC")

    .getOrCreate()
)
