### Start SparkSession

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


spark = (
    SparkSession
    .builder
    .appName("erohin_pyspark_local")
    .getOrCreate()
)

print("Активные Spark сессии:", spark.sparkContext.uiWebUrl)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/06 10:25:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Активные Spark сессии: http://4dab6efce3c1:4040


### Read

In [2]:
PATH = 'work/data/customs_data.csv'

In [3]:
spark.read.csv(PATH).show()

+--------------------+
|                 _c0|
+--------------------+
|month;country;cod...|
|01/2016;IT;620469...|
|01/2016;CN;900190...|
|01/2016;BY;841430...|
|01/2016;US;901850...|
|01/2016;EE;902110...|
|01/2016;FR;381600...|
|01/2016;MX;852351...|
|01/2016;JP;620452...|
|01/2016;KR;611020...|
|01/2016;KG;852713...|
|01/2016;ZA;842123...|
|01/2016;CN;851810...|
|01/2016;TR;841790...|
|01/2016;IT;390610...|
|01/2016;CZ;870840...|
|01/2016;ES;640419...|
|01/2016;IT;940490...|
|01/2016;UA;820780...|
|01/2016;CN;330410...|
+--------------------+
only showing top 20 rows


In [4]:
spark.read.csv(PATH, sep=';', header=True).show(5)

+-------+-------+----------+------+-----+--------+------+--------+-------------+-----------+--------------------+
|  month|country|      code| value|netto|quantity|region|district|direction_eng|measure_eng|           load_date|
+-------+-------+----------+------+-----+--------+------+--------+-------------+-----------+--------------------+
|01/2016|     IT|6204695000|   131|    1|       7| 46000|      01|           IM|        ShT|2024-07-01T00:00:...|
|01/2016|     CN|9001900009|112750|   18|       0| 46000|      01|           IM|          1|2024-01-01T00:00:...|
|01/2016|     BY|8414302004|   392|   57|       8| 50000|      06|           IM|        ShT|2024-06-01T00:00:...|
|01/2016|     US|9018509000| 54349|  179|       0| 40000|      02|           IM|          1|2024-04-01T00:00:...|
|01/2016|     EE|9021101000| 17304|  372|       0| 46000|      01|           IM|          1|2024-02-01T00:00:...|
+-------+-------+----------+------+-----+--------+------+--------+-------------+--------

In [6]:
df = spark.read.csv(PATH, sep=';', header=True)

In [7]:
df.show(truncate=False) # Выведет колонки с длинными значениями полностью, выше колонка load_date обрезана

+-------+-------+----------+------+------+--------+------+--------+-------------+-----------+-----------------------------+
|month  |country|code      |value |netto |quantity|region|district|direction_eng|measure_eng|load_date                    |
+-------+-------+----------+------+------+--------+------+--------+-------------+-----------+-----------------------------+
|01/2016|IT     |6204695000|131   |1     |7       |46000 |01      |IM           |ShT        |2024-07-01T00:00:00.000+03:00|
|01/2016|CN     |9001900009|112750|18    |0       |46000 |01      |IM           |1          |2024-01-01T00:00:00.000+03:00|
|01/2016|BY     |8414302004|392   |57    |8       |50000 |06      |IM           |ShT        |2024-06-01T00:00:00.000+03:00|
|01/2016|US     |9018509000|54349 |179   |0       |40000 |02      |IM           |1          |2024-04-01T00:00:00.000+03:00|
|01/2016|EE     |9021101000|17304 |372   |0       |46000 |01      |IM           |1          |2024-02-01T00:00:00.000+03:00|
|01/2016

In [8]:
df.show(2, False, True) #(количество строк, не обрезать длинные строки, выводить каждую строку вертикально)

-RECORD 0--------------------------------------
 month         | 01/2016                       
 country       | IT                            
 code          | 6204695000                    
 value         | 131                           
 netto         | 1                             
 quantity      | 7                             
 region        | 46000                         
 district      | 01                            
 direction_eng | IM                            
 measure_eng   | ShT                           
 load_date     | 2024-07-01T00:00:00.000+03:00 
-RECORD 1--------------------------------------
 month         | 01/2016                       
 country       | CN                            
 code          | 9001900009                    
 value         | 112750                        
 netto         | 18                            
 quantity      | 0                             
 region        | 46000                         
 district      | 01                     

### PrintSchema

In [9]:
df.printSchema()

root
 |-- month: string (nullable = true)
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)
 |-- value: string (nullable = true)
 |-- netto: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- region: string (nullable = true)
 |-- district: string (nullable = true)
 |-- direction_eng: string (nullable = true)
 |-- measure_eng: string (nullable = true)
 |-- load_date: string (nullable = true)



In [10]:
# withColumnRenamed("direction_eng", "direction") возьми колонку и поменяй название
result = (
    df
    .withColumnRenamed("direction_eng", "direction")
    .withColumnRenamed("measure_eng", "measure")
)

result.columns

['month',
 'country',
 'code',
 'value',
 'netto',
 'quantity',
 'region',
 'district',
 'direction',
 'measure',
 'load_date']

In [11]:
result.printSchema()

root
 |-- month: string (nullable = true)
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)
 |-- value: string (nullable = true)
 |-- netto: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- region: string (nullable = true)
 |-- district: string (nullable = true)
 |-- direction: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- load_date: string (nullable = true)



### Select

In [12]:
result \
      .select('country')\
      .distinct()\
      .show(10, truncate=False)



+-------+
|country|
+-------+
|LT     |
|MM     |
|DZ     |
|CI     |
|TC     |
|FI     |
|SC     |
|AZ     |
|UA     |
|RO     |
+-------+
only showing top 10 rows


                                                                                

### GroupBy

In [13]:
(
    result
    .groupBy('country')
    .agg(F.count('*').alias('total_rows'))
    .orderBy(F.col('total_rows').desc())
    .show()
)



+-------+----------+
|country|total_rows|
+-------+----------+
|     BY|   3509568|
|     KZ|   2519896|
|     CN|   2454792|
|     DE|   1542311|
|     UA|   1158498|
|     IT|   1102837|
|     US|    835936|
|     PL|    666690|
|     FR|    593040|
|     JP|    571756|
|     TR|    463432|
|     KR|    446907|
|     GB|    443091|
|     AM|    438705|
|     CZ|    407360|
|     KG|    403565|
|     ES|    401644|
|     IN|    374151|
|     NL|    365193|
|     UZ|    329707|
+-------+----------+
only showing top 20 rows


                                                                                

### Filter

In [14]:
# Два варианта написании фильтрации
df_de = (
    result
    .where(F.col('country') == 'DE')
    .where(F.col('value').isNotNull())
)

In [15]:
df_de.show(2)

+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+--------------------+
|  month|country|      code|value|netto|quantity|region|district|direction|measure|           load_date|
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+--------------------+
|01/2016|     DE|4016995709| 5901|  172|       0| 46000|      01|       IM|      1|2024-01-01T00:00:...|
|01/2016|     DE|8708809109| 1213|   94|       0| 45000|      01|       IM|      1|2024-01-01T00:00:...|
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+--------------------+
only showing top 2 rows


In [16]:
df_de2 = (
    result
    .where(''' country == "DE" ''')
    .where(''' value IS NOT NULL ''')
)

In [17]:
df_de2.show(2)

+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+--------------------+
|  month|country|      code|value|netto|quantity|region|district|direction|measure|           load_date|
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+--------------------+
|01/2016|     DE|4016995709| 5901|  172|       0| 46000|      01|       IM|      1|2024-01-01T00:00:...|
|01/2016|     DE|8708809109| 1213|   94|       0| 45000|      01|       IM|      1|2024-01-01T00:00:...|
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+--------------------+
only showing top 2 rows


In [18]:
#Проверка что результат фильтрации при первом и втором варианте один и тот же
print(df_de.count() == df_de2.count())



True


                                                                                

### Save to CSV

In [18]:
df_de.show(2, truncate=False)

+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+-----------------------------+
|month  |country|code      |value|netto|quantity|region|district|direction|measure|load_date                    |
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+-----------------------------+
|01/2016|DE     |4016995709|5901 |172  |0       |46000 |01      |IM       |1      |2024-01-01T00:00:00.000+03:00|
|01/2016|DE     |8708809109|1213 |94   |0       |45000 |01      |IM       |1      |2024-01-01T00:00:00.000+03:00|
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+-----------------------------+
only showing top 2 rows


In [19]:
df_de.columns

['month',
 'country',
 'code',
 'value',
 'netto',
 'quantity',
 'region',
 'district',
 'direction',
 'measure',
 'load_date']

In [20]:
# В селекте можно CASTовать, приводить к определенным типам
final = (
    df_de
    .select(
        'month',
        'country',
        'code',
        'value',
        'netto',
        'quantity',
        'region',
        'district',
        'direction',
        'measure',
        F.col('load_date').cast('date'),
    )
)

final.show(2, truncate=False)

+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+----------+
|month  |country|code      |value|netto|quantity|region|district|direction|measure|load_date |
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+----------+
|01/2016|DE     |4016995709|5901 |172  |0       |46000 |01      |IM       |1      |2024-01-01|
|01/2016|DE     |8708809109|1213 |94   |0       |45000 |01      |IM       |1      |2024-01-01|
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+----------+
only showing top 2 rows


In [21]:
# Сохранение неконтроллируемое по кол-ву файлов
(
    final
    .write
    .format('csv')
    .options(header='True', sep=';')
    .csv('data/final_no_control')
)




                                                                                

In [22]:
# Узнать количество разделов
partition_num = final.rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')

Кол-во партиций 16


In [23]:
# Сохранение контроллируемое по кол-ву файлов - ОДИН ФАЙЛ
(
    final
    .coalesce(1)
    .write
    .format('csv')
    .options(header='True', sep=';')
    .csv('data/final_one_file')
)

                                                                                

In [24]:
partition_num = final.coalesce(1).rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')

Кол-во партиций 1


In [26]:
# Сохранения с партицированием
(
    final
    .write
    .partitionBy('load_date')
    .format('csv')
    .options(header='True', sep=';')
    .csv('data/final_partitioned')
)

                                                                                

In [28]:
final.select('load_date').distinct().show()



+----------+
| load_date|
+----------+
|2024-08-01|
|2024-09-01|
|2024-10-01|
|2024-04-01|
|2024-05-01|
|2024-07-01|
|2024-02-01|
|2024-03-01|
|2024-01-01|
|2024-06-01|
+----------+



                                                                                

In [27]:
print_df = final.select('load_date').distinct()
print(f'Load_date distinct: {print_df.count()}')



Load_date distinct: 10


                                                                                

In [29]:
# Сохранения с партицированием и repartition внутри самой партиции
(
    final
    .repartition(1, 'load_date')
    .write
    .partitionBy('load_date')
    .format('csv')
    .options(header='True', sep=';')
    .csv('data/final_partitioned_repart')
)


                                                                                

In [30]:
partition_num = final.repartition(1, 'load_date').rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')



Кол-во партиций 1


### Read Transformed

In [19]:
reader_no_control = (
    spark
    .read
    .csv('data/final_no_control/', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)

reader_no_control.count() # number of files read: 16 | size of files read: 88.4 MiB | 4 s (132 ms, 289 ms, 362 ms)

350998

In [20]:
reader_final_one_file = (
    spark
    .read
    .csv('data/final_one_file/', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)

reader_final_one_file.count() # number of files read: 1 | size of files read: 88.4 MiB | 4.2 s (202 ms, 361 ms, 380 ms )

350998

In [21]:
reader_partitioned = (
    spark
    .read
    .csv('data/final_partitioned', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)

reader_partitioned.count() # number of files read: 16 | size of files read: 16.4 MiB | 1.5 s (65 ms, 175 ms, 200 ms )

350998

In [23]:
reader_partitioned_repart = (
    spark
    .read
    .csv('data/final_partitioned_repart', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)

reader_partitioned_repart.count() # number of files read: 1 | size of files read: 16.4 MiB | 199 ms (8 ms, 38 ms, 77 ms )

350998

### JOIN

In [24]:
data = [
    (14000, "Северный"),
    (11000, "Южный"),
    (10000, "Восточный"),
    (26000, "Западный"),
    (56000, "Центральный"),
]

region_df = spark.createDataFrame(data, schema='region_id long, name string')

region_df.show()

                                                                                

+---------+-----------+
|region_id|       name|
+---------+-----------+
|    14000|   Северный|
|    11000|      Южный|
|    10000|  Восточный|
|    26000|   Западный|
|    56000|Центральный|
+---------+-----------+



In [26]:
customs_data = (
    spark
    .read
    .csv(PATH, header=True, sep=';')
)

customs_data.show(2)

+-------+-------+----------+------+-----+--------+------+--------+-------------+-----------+--------------------+
|  month|country|      code| value|netto|quantity|region|district|direction_eng|measure_eng|           load_date|
+-------+-------+----------+------+-----+--------+------+--------+-------------+-----------+--------------------+
|01/2016|     IT|6204695000|   131|    1|       7| 46000|      01|           IM|        ShT|2024-07-01T00:00:...|
|01/2016|     CN|9001900009|112750|   18|       0| 46000|      01|           IM|          1|2024-01-01T00:00:...|
+-------+-------+----------+------+-----+--------+------+--------+-------------+-----------+--------------------+
only showing top 2 rows


In [27]:
# Отключим автоматический Broadcast JOIN
import time
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [28]:
# Замерим выполнение запроса без broadcast join

start_time = time.time()

customs_data.join(region_df, customs_data.region==region_df.region_id, "left").count()

end_time = time.time()

print(f"Elapsed time for join operation: {end_time - start_time:.2f} seconds")



Elapsed time for join operation: 20.02 seconds


                                                                                

In [29]:
# Замерим выполнение запроса c broadcast join

start_time = time.time()

customs_data.join(F.broadcast(region_df), customs_data.region == region_df.region_id, "left").count()

end_time = time.time()

print(f"Elapsed time for broadcast join operation: {end_time - start_time:.2f} seconds")



Elapsed time for broadcast join operation: 12.00 seconds


                                                                                

### Cache | Persist

In [30]:
customs_data.cache()

DataFrame[month: string, country: string, code: string, value: string, netto: string, quantity: string, region: string, district: string, direction_eng: string, measure_eng: string, load_date: string]

In [31]:
customs_data.cache().count()

25/12/06 11:42:07 WARN CacheManager: Asked to cache already cached data.
25/12/06 11:42:25 WARN MemoryStore: Not enough space to cache rdd_228_10 in memory! (computed 34.3 MiB so far)
25/12/06 11:42:25 WARN MemoryStore: Not enough space to cache rdd_228_6 in memory! (computed 35.2 MiB so far)
25/12/06 11:42:25 WARN MemoryStore: Not enough space to cache rdd_228_9 in memory! (computed 34.2 MiB so far)
25/12/06 11:42:25 WARN BlockManager: Persisting block rdd_228_9 to disk instead.
25/12/06 11:42:25 WARN BlockManager: Persisting block rdd_228_10 to disk instead.
25/12/06 11:42:25 WARN BlockManager: Persisting block rdd_228_6 to disk instead.
25/12/06 11:42:25 WARN MemoryStore: Not enough space to cache rdd_228_5 in memory! (computed 33.6 MiB so far)
25/12/06 11:42:25 WARN BlockManager: Persisting block rdd_228_5 to disk instead.
25/12/06 11:42:32 WARN MemoryStore: Not enough space to cache rdd_228_3 in memory! (computed 45.3 MiB so far)
25/12/06 11:42:32 WARN BlockManager: Persisting blo

26392290

In [32]:
customs_data.unpersist()

DataFrame[month: string, country: string, code: string, value: string, netto: string, quantity: string, region: string, district: string, direction_eng: string, measure_eng: string, load_date: string]

In [33]:
from pyspark.storagelevel import StorageLevel

customs_data.persist(StorageLevel.DISK_ONLY).count()

                                                                                

26392290

### Repartition & Coalesce

In [46]:
data = [
    (1,'one'),
    (2,'two'),
    (3,'three'),
    (4,'four'),
    (5,'five'),
    (6,'six'),
    (7, 'seven'),
    (8, 'eight'),
    (9, 'nine'),
]

df = spark.createDataFrame(data, ['id', 'number'])

df.show()

+---+------+
| id|number|
+---+------+
|  1|   one|
|  2|   two|
|  3| three|
|  4|  four|
|  5|  five|
|  6|   six|
|  7| seven|
|  8| eight|
|  9|  nine|
+---+------+



In [47]:
# Когда только создали датафрейм у нас 12 разделов
df.rdd.getNumPartitions()

12

In [48]:
# Покажи эти разделы, в каждом разделе список с элементами, но так же есть и пустые
df.rdd.glom().collect()

[[],
 [Row(id=1, number='one')],
 [Row(id=2, number='two')],
 [Row(id=3, number='three')],
 [],
 [Row(id=4, number='four')],
 [Row(id=5, number='five')],
 [Row(id=6, number='six')],
 [],
 [Row(id=7, number='seven')],
 [Row(id=8, number='eight')],
 [Row(id=9, number='nine')]]

In [49]:
# Намеренно перемешаем и поделим на 8 разделов
mix = df.repartition(8)
mix.rdd.glom().collect()

[[Row(id=2, number='two'), Row(id=4, number='four')],
 [],
 [Row(id=1, number='one'), Row(id=8, number='eight')],
 [],
 [Row(id=6, number='six')],
 [],
 [Row(id=3, number='three'),
  Row(id=7, number='seven'),
  Row(id=9, number='nine')],
 [Row(id=5, number='five')]]

In [50]:
# Намеренно перемешаем и поделим на 4 раздела, видим пустых списков нет
mix_2 = df.repartition(4)
mix_2.rdd.glom().collect()

[[Row(id=2, number='two'), Row(id=4, number='four'), Row(id=5, number='five')],
 [Row(id=1, number='one'), Row(id=8, number='eight')],
 [Row(id=6, number='six')],
 [Row(id=3, number='three'),
  Row(id=7, number='seven'),
  Row(id=9, number='nine')]]

In [51]:
mix.repartition(3).rdd.glom().collect()

[[Row(id=3, number='three'),
  Row(id=5, number='five'),
  Row(id=7, number='seven'),
  Row(id=8, number='eight'),
  Row(id=9, number='nine')],
 [Row(id=1, number='one')],
 [Row(id=2, number='two'), Row(id=4, number='four'), Row(id=6, number='six')]]

In [52]:
mix.coalesce(3).rdd.glom().collect()

[[Row(id=2, number='two'), Row(id=4, number='four')],
 [Row(id=1, number='one'),
  Row(id=8, number='eight'),
  Row(id=3, number='three'),
  Row(id=7, number='seven'),
  Row(id=9, number='nine')],
 [Row(id=6, number='six'), Row(id=5, number='five')]]

In [None]:
# Еще вариант как можно вывести датафрейм, можем сделать из него пандас датафрейм
# С большими данными не работает, делает соединение со всех экзекуторов в один, перекидывает все на драйвер и может вызвать OUT OF MEMORY
mix.toPandas().head()

In [57]:
# OUT OF MEMORY - Делаю падение по памяти

d = spark.read.csv(PATH, header=True, sep='\t')
d.collect()

25/12/06 12:46:27 ERROR Utils: uncaught error in thread Spark Context Cleaner, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
25/12/06 12:46:27 ERROR Utils: uncaught error in thread spark-listener-group-appStatus, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.status.LiveEntityHelpers$.createMetrics(LiveEntity.scala:789)
	at org.apache.spark.status.LiveEntityHelpers$.makeNegative(LiveEntity.scala:895)
	at org.apache.spark.status.LiveTask.doUpdate(LiveEntity.scala:207)
	at org.apache.spark.status.LiveEntity.write(LiveEntity.scala:50)
	at org.apache.spark.status.AppStatusListener.update(AppStatusListener.scala:1233)
	at org.apache.spark.status.AppStatusListener.maybeUpdate(AppStatusListener.scala:1239)
	at org.apache.spark.status.AppStatusListener.$anonfun$onExecutorMetricsUpdate$6(AppStatusListener.scala:976)
	at org.apache.spark.status.AppStatusListener.$anonfun$onExecutorMetricsUpdate$6$adapted(AppStatusListener.scala:976)
	at

Py4JJavaError: An error occurred while calling o431.collectToPython.
: org.apache.spark.SparkException: Job 87 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1301)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1299)
	at scala.collection.mutable.HashSet$Node.foreach(HashSet.scala:450)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:376)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1299)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:3234)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:85)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$stop$3(DAGScheduler.scala:3120)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1300)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:3120)
	at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2346)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1300)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2346)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2297)
	at org.apache.spark.SparkContext$$anon$3.run(SparkContext.scala:2284)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2505)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2524)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2549)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1057)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:417)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1056)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:462)
	at org.apache.spark.sql.classic.Dataset.$anonfun$collectToPython$1(Dataset.scala:2057)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$2(Dataset.scala:2234)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
	at org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2232)
	at org.apache.spark.sql.classic.Dataset.collectToPython(Dataset.scala:2054)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1583)


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 38976)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/py4j/clientserver.py", line 535, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/py4j/clientserver.py", line 566, in send_command
    raise Py

In [None]:
spark.stop()