
# Tutoriel Pyspark - Intro I
<p style = "font-size:100%"> Ce tutoriel d'introduction à Pyspark, introduit et présente les concepts suivants: spark context, spark sessions, spark catalog et global/local temporal view
</p>

### 1. Importation des librairies nécessaires

In [1]:
import os, time, psutil

In [2]:
# To run pypsark in notebooks: 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [3]:
import json

In [4]:
import socket

In [5]:
spark = SparkSession.builder \
    .appName("JupyterSparkCluster") \
    .master("spark://hadoop-master:7077") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "512m") \
    .config("spark.cores.max", "4") \
    .config("spark.dynamicAllocation.enabled", "false") \
    .getOrCreate()

#### 2. Création de fonctions utiles

In [6]:
def get_advanced_spark_info(spark: SparkSession):
    sc = spark.sparkContext
    conf = sc.getConf()
    
    print("Spark Configuration:")
    print(f"  Spark Version: {spark.version}")
    print(f"  Master: {sc.master}")
    print(f"  App Name: {sc.appName}")
    print(f"  App ID: {sc.applicationId}")
    print(f"  Executor Memory: {conf.get('spark.executor.memory')}")
    print(f"  Number of Executors set: {conf.get('spark.executor.instances', 'Not explicitly set')}")
    print(f"  Executor Cores: {conf.get('spark.executor.cores', 'Not explicitly set')}")
    print(f"  Driver Memory: {conf.get('spark.driver.memory')}")
    print(f"  Driver Cores: {conf.get('spark.driver.cores', 'Not explicitly set')}")
    print(f"  Dynamic Allocation: {conf.get('spark.dynamicAllocation.enabled', 'Not set')}")
    
    print("\nExecutor Information:")
    executors = spark._jsparkSession.sparkContext().getExecutorMemoryStatus().size()
    print(f"  Number of executors (including driver): {executors}")
    print(f"  Number of executors (excluding driver): {executors - 1}")
    
    print("\nCluster Resources:")
    print(f"  Total cores: {sc.defaultParallelism}")
    print(f"  Default min partitions: {sc.defaultMinPartitions}")
    
    print("\nUI Info:")
    print(f"  Spark UI URL: {sc.uiWebUrl}")
    
    print("\nActive Executors:")
    active_executors = sc._jsc.sc().statusTracker().getExecutorInfos()
    print(f"  Number of active executors: {len(active_executors)}")
    for i, executor in enumerate(active_executors):
        print(f"  Executor {i}: {executor}")

In [7]:
def get_detailed_executor_info(spark: SparkSession):
    sc = spark.sparkContext
    
    print("Executor Information:")
    executor_mem_status = sc._jsc.sc().getExecutorMemoryStatus()
    print(f"  Number of executors (including driver): {executor_mem_status.size()}")
    
    active_executors = sc._jsc.sc().statusTracker().getExecutorInfos()
    print(f"  Number of active executors: {len(active_executors)}")
    
    print("\nDetailed Executor List:")
    for i, executor in enumerate(active_executors):
        print(f"  Executor {i}:")
        print(f"    ID: {executor.id()}")
        print(f"    Host: {executor.host()}")
        print(f"    Is Active: {executor.isActive()}")
        print(f"    Total Cores: {executor.totalCores()}")
        print(f"    Max Tasks: {executor.maxTasks()}")

    print("\nSpark Configuration:")
    print(f"  spark.executor.instances: {sc.getConf().get('spark.executor.instances', 'Not set')}")
    print(f"  spark.dynamicAllocation.enabled: {sc.getConf().get('spark.dynamicAllocation.enabled', 'Not set')}")
    print(f"  spark.dynamicAllocation.maxExecutors: {sc.getConf().get('spark.dynamicAllocation.maxExecutors', 'Not set')}")


In [25]:
def create_custom_session(app_name, executor_instances, executor_cores, executor_memory):
    conf = SparkConf().setAppName(app_name).setMaster("spark://hadoop-master:7077")
    conf.set("spark.executor.instances", str(executor_instances))
    conf.set("spark.executor.cores", str(executor_cores))
    conf.set("spark.executor.memory", executor_memory)
    conf.set("spark.dynamicAllocation.enabled", "false")
    
    # Create a new SparkContext with the custom configuration
    sc = SparkSession.builder.config(conf=conf).getOrCreate().sparkContext
    
    # Create a new session using the custom SparkContext
    return SparkSession(sc)

In [15]:
def display_session_info(session):
    print(f"Session ID: {session.sparkContext.applicationId}")
    print(f"Master: {session.sparkContext.master}")
    print(f"Executor Cores: {session.sparkContext.getConf().get('spark.executor.cores')}")
    print(f"Executor Memory: {session.sparkContext.getConf().get('spark.executor.memory')}")
    print(f"Shuffle Partitions: {session.sparkContext.getConf().get('spark.sql.shuffle.partitions')}")
    print(f"Default Parallelism: {session.sparkContext.getConf().get('spark.default.parallelism')}")
    print(f"Dynamic Allocation: {session.sparkContext.getConf().get('spark.dynamicAllocation.enabled')}")
    print("------------------")

### 3. Inspection des ressources locales disponibles

In [8]:
df = spark.range(1000).repartition(5)
print(df.rdd.getNumPartitions())
df.count()

5


1000

In [10]:
get_advanced_spark_info(spark)

Spark Configuration:
  Spark Version: 3.5.0
  Master: spark://hadoop-master:7077
  App Name: JupyterSparkCluster
  App ID: app-20241011055637-0004
  Executor Memory: 512m
  Number of Executors set: 2
  Executor Cores: 2
  Driver Memory: None
  Driver Cores: Not explicitly set
  Dynamic Allocation: false

Executor Information:
  Number of executors (including driver): 3
  Number of executors (excluding driver): 2

Cluster Resources:
  Total cores: 4
  Default min partitions: 2

UI Info:
  Spark UI URL: http://9892a3a47b4a:4040

Active Executors:
  Number of active executors: 3
  Executor 0: org.apache.spark.SparkExecutorInfoImpl@7d1dd03b
  Executor 1: org.apache.spark.SparkExecutorInfoImpl@4ee5a6f9
  Executor 2: org.apache.spark.SparkExecutorInfoImpl@69f6238b


### 4. Création des sessions

Nb en 'local mode' il n'y a qu'un executor, puisque le master et le worker node sont la même machine, le driver et executor tourne sur la même machine 'local'.

In [39]:
spark1.stop()

In [41]:
# Create the first Spark session
session1 = create_custom_session("SparkSession1", 1, 2, "512m")

# Create the second Spark session
session2 = create_custom_session("SparkSession2", 1, 1, "1g")

In [42]:
# Test the first session
df1 = session1.range(0, 1000000)
print("Session 1 count:", df1.count())

# Test the second session
df2 = session2.range(0, 1000000)
print("Session 2 count:", df2.count())

Session 1 count: 1000000
Session 2 count: 1000000


Pourquoi il n'a pas pris en compte ma configuration ? 
<p> >> Car les configurations liées aux ressources (comme le nombre de cœurs, la mémoire de l'exécuteur) sont définies au niveau du SparkContext! </p>

### 5. Opérations

Le but est de voir que chaque session spark effectue les calculs de manière isolé de l'autre, et qu'en fonction des ressources mises à disposition, elles n'auront pas les mêmes durées d'exécution.

In [48]:
spark = SparkSession.builder \
    .appName("JupyterSparkCluster") \
    .master("local[*]") \
    .getOrCreate()

In [49]:
def perform_computation(session, num_partitions):
    """Prend une session en entrée, s'appuie sur le sparkContext (qui est unique pour l'ensemble des sessions) pour créer un RDD et effectue des opérations dessus"""
    large_range = session.sparkContext.parallelize(range(10000000), num_partitions)
    start_time = time.time()
    result = large_range.map(lambda x: x * x).sum()
    end_time = time.time()
    print(f"Sum of squares: {result}")
    print(f"Computation time: {end_time - start_time:.2f} seconds")


In [52]:
print("\nPerforming computation in Session 1:")
perform_computation(spark, 2)

print("\nPerforming computation in Session 2:")
perform_computation(spark, 4)


Performing computation in Session 1:


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 9.0 failed 4 times, most recent failure: Lost task 1.3 in stage 9.0 (TID 34) (172.18.0.3 executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1100, in main
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 10) than that in driver 3.11, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1046)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1046)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1045)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1100, in main
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 10) than that in driver 3.11, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1046)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)


### 6. Views and catalog

In [44]:
# Create DataFrames in each session
df1 = session1.createDataFrame([(1, "Alice"), (2, "Bob"), (3, "Charlie")], ["id", "name"])
df2 = session2.createDataFrame([(1, "David"), (2, "Eve"), (3, "Frank")], ["id", "name"])

##### 6.1) Local Temp View

In [45]:
# Register temporary views
df1.createOrReplaceTempView("people_session1")
df2.createOrReplaceTempView("people_session2")

Par défaut, une vue temporaire n'existe que dans la session dans laquelle elle a été crée

In [46]:
print("\nData in Session 1:")
session1.sql("SELECT * FROM people_session1").show()


Data in Session 1:


Py4JJavaError: An error occurred while calling o2404.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 19) (172.18.0.4 executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1100, in main
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 10) than that in driver 3.11, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3549)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1100, in main
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 10) than that in driver 3.11, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)


In [None]:
print("\nData in Session 2:")
session2.sql("SELECT * FROM people_session2").show()

Impossible donc d'accéder à une vue d'une autre session

In [None]:
# De la session 1 on essaye d'accéder à la vue (crée dans la session 2)
session1.sql("SELECT * FROM people_session2").show()

In [None]:
# De la session 2 on essaye d'accéder à la vue (crée dans la session 1)
session2.sql("SELECT * FROM people_session1").show()

##### 6.2) Global Temp View & catalog

Pour rendre une vue accessible à l'ensemble des sessions, on peut utiliser le Global Catalog

In [None]:
# Global Catalog Usage

# 1. Global Temporary View
print("\n--- Global Temporary View ---")
df1.createGlobalTempView("global_people") # We register df1 as a global temp view in the global catalog

Les 2 sessions peuvent accéder à global_people

In [None]:
print("Accessing global temporary view from Session 1:")
session1.sql("SELECT * FROM global_temp.global_people").show()

print("Accessing global temporary view from Session 2:")
session2.sql("SELECT * FROM global_temp.global_people").show()

##### 6.3) Spark SQL Warehouse directory

On peut aussi inscrire la donnée en "base" pour la partager entre plusieurs spark sessions

In [None]:
# 2. Permanent Table in a Database
print("\n--- Permanent Table in Database ---")
session1.sql("CREATE DATABASE IF NOT EXISTS shared_db")
df1.write.mode("overwrite").saveAsTable("shared_db.permanent_people")

In [None]:
print("Accessing permanent table from Session 1:")
session1.sql("SELECT * FROM shared_db.permanent_people").show()

In [None]:
print("Accessing permanent table from Session 2:")
session2.sql("SELECT * FROM shared_db.permanent_people").show()

### 7. Opérations disponibles sur le catalog

In [None]:
# Catalog Operations
print("\n--- Catalog Operations ---")

print("Databases in Session 1:")
session1.catalog.listDatabases()

print("\nTables in 'shared_db' from Session 1:")
session1.catalog.listTables("shared_db")

In [None]:

print("\nDatabases in Session 2:")
session2.catalog.listDatabases()

print("\nTables in 'shared_db' from Session 2:")
session2.catalog.listTables("shared_db")

# Clean up

In [None]:
session1.catalog.dropGlobalTempView("global_people")
session1.sql("DROP TABLE IF EXISTS shared_db.permanent_people")
session1.sql("DROP DATABASE IF EXISTS shared_db")

### 8. Fin de session

In [None]:
session1.stop()

In [None]:
session2.stop()