In [1]:
from pyspark.sql import SparkSession

# SparkSession is an entry point to PySpark's functionality within a program. This entry point lets you access the features of Spark through Python. 

In [2]:
spark = (SparkSession.builder.appName("Converting articles into BoW Vectors").getOrCreate())

In [3]:
spark

In [4]:
data = spark.read.csv("all-the-news-2-1.csv")

# The older version of PySpark used to store the data in special kind of Data Structure called Resilient Distributed Dataset (RDD). The special feature of this data structure is that it is Row Major. 

# The new version of PySpark has also introduced a column major data structure called PySpark DataFrames. 

In [5]:
print(data)

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string]


In [6]:
data.show(5)

+--------------------+--------------------+-------------------+----------------+-----+----+-----------+--------------------+--------------------+--------------------+--------------------+------------+
|                 _c0|                 _c1|                _c2|             _c3|  _c4| _c5|        _c6|                 _c7|                 _c8|                 _c9|                _c10|        _c11|
+--------------------+--------------------+-------------------+----------------+-----+----+-----------+--------------------+--------------------+--------------------+--------------------+------------+
|                null|          Unnamed: 0|               date|            year|month| day|     author|               title|             article|                 url|             section| publication|
|                   0|                   0|2016-12-09 18:31:00|            2016| 12.0|   9|Lee Drutman|We should take co...|"This post is par...|             however| several critics ...| for exam

In [7]:
data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)



In [8]:
print(data.dtypes)

[('_c0', 'string'), ('_c1', 'string'), ('_c2', 'string'), ('_c3', 'string'), ('_c4', 'string'), ('_c5', 'string'), ('_c6', 'string'), ('_c7', 'string'), ('_c8', 'string'), ('_c9', 'string'), ('_c10', 'string'), ('_c11', 'string')]


In [9]:
data.show()

+--------------------+--------------------+-------------------+----------------+-----+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                 _c0|                 _c1|                _c2|             _c3|  _c4| _c5|                 _c6|                 _c7|                 _c8|                 _c9|                _c10|                _c11|
+--------------------+--------------------+-------------------+----------------+-----+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                null|          Unnamed: 0|               date|            year|month| day|              author|               title|             article|                 url|             section|         publication|
|                   0|                   0|2016-12-09 18:31:00|            2016| 12.0|   9|         Lee Drutman|We should take c

In [10]:
from pyspark.sql.functions import col

In [11]:
text_column = data.select("_c8")

In [12]:
text_column.show()

+--------------------+
|                 _c8|
+--------------------+
|             article|
|"This post is par...|
| The Indianapolis...|
|                null|
|DAVOS, Switzerlan...|
|PARIS (Reuters) -...|
|"Paris Hilton arr...|
|BERLIN, June 17 (...|
|                null|
|CARACAS (Reuters)...|
|"If only every da...|
|Google I/O, the c...|
|China is dismissi...|
|Elizabeth Warren ...|
|(Reuters) - The s...|
|Joakim Noah's ﻿mo...|
|"Jermaine Jackson...|
|LONDON (Reuters) ...|
|"Nancy Pelosi is ...|
|The nonpartisan d...|
+--------------------+
only showing top 20 rows



In [13]:
text_column = data.select(data["_c8"])

In [14]:
text_column.show()

+--------------------+
|                 _c8|
+--------------------+
|             article|
|"This post is par...|
| The Indianapolis...|
|                null|
|DAVOS, Switzerlan...|
|PARIS (Reuters) -...|
|"Paris Hilton arr...|
|BERLIN, June 17 (...|
|                null|
|CARACAS (Reuters)...|
|"If only every da...|
|Google I/O, the c...|
|China is dismissi...|
|Elizabeth Warren ...|
|(Reuters) - The s...|
|Joakim Noah's ﻿mo...|
|"Jermaine Jackson...|
|LONDON (Reuters) ...|
|"Nancy Pelosi is ...|
|The nonpartisan d...|
+--------------------+
only showing top 20 rows



In [15]:
text_column = data.select(data._c8)

In [16]:
text_column.show()

+--------------------+
|                 _c8|
+--------------------+
|             article|
|"This post is par...|
| The Indianapolis...|
|                null|
|DAVOS, Switzerlan...|
|PARIS (Reuters) -...|
|"Paris Hilton arr...|
|BERLIN, June 17 (...|
|                null|
|CARACAS (Reuters)...|
|"If only every da...|
|Google I/O, the c...|
|China is dismissi...|
|Elizabeth Warren ...|
|(Reuters) - The s...|
|Joakim Noah's ﻿mo...|
|"Jermaine Jackson...|
|LONDON (Reuters) ...|
|"Nancy Pelosi is ...|
|The nonpartisan d...|
+--------------------+
only showing top 20 rows



In [17]:
text_column = data.select(col("_c8"))

In [18]:
text_column.show()

+--------------------+
|                 _c8|
+--------------------+
|             article|
|"This post is par...|
| The Indianapolis...|
|                null|
|DAVOS, Switzerlan...|
|PARIS (Reuters) -...|
|"Paris Hilton arr...|
|BERLIN, June 17 (...|
|                null|
|CARACAS (Reuters)...|
|"If only every da...|
|Google I/O, the c...|
|China is dismissi...|
|Elizabeth Warren ...|
|(Reuters) - The s...|
|Joakim Noah's ﻿mo...|
|"Jermaine Jackson...|
|LONDON (Reuters) ...|
|"Nancy Pelosi is ...|
|The nonpartisan d...|
+--------------------+
only showing top 20 rows



In [19]:
from pyspark.sql.functions import split

In [20]:
text_column = data.select(split(col("_c8")," ").alias("article"))

In [21]:
text_column.show(truncate=100)

+----------------------------------------------------------------------------------------------------+
|                                                                                             article|
+----------------------------------------------------------------------------------------------------+
|                                                                                           [article]|
|["This, post, is, part, of, Polyarchy,, an, independent, blog, produced, by, the, political, refo...|
|[, The, Indianapolis, Colts, made, Andrew, Luck, the, highest-paid, player, in, NFL, history, thi...|
|                                                                                                null|
|[DAVOS,, Switzerland, (Reuters), -, U.S., President, Donald, Trump, denied, a, report, on, Friday...|
|[PARIS, (Reuters), -, Former, French, president, Nicolas, Sarkozy, published, a, new, memoir, on,...|
|["Paris, Hilton, arrived, at, LAX, Wednesday, dressed, to, pay, her, las

In [57]:
text_column.printSchema()

root
 |-- article: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [22]:
from pyspark.sql.functions import explode

In [23]:
tokens = text_column.select(explode(col("article")).alias("tokens"))

In [24]:
tokens.show()

+-----------+
|     tokens|
+-----------+
|    article|
|      "This|
|       post|
|         is|
|       part|
|         of|
| Polyarchy,|
|         an|
|independent|
|       blog|
|   produced|
|         by|
|        the|
|  political|
|     reform|
|    program|
|         at|
|        New|
|   America,|
|          a|
+-----------+
only showing top 20 rows



In [25]:
from pyspark.sql.functions import lower

In [26]:
normalized_tokens = tokens.select(lower(col("tokens")).alias("normalized tokens"))

In [27]:
normalized_tokens.show()

+-----------------+
|normalized tokens|
+-----------------+
|          article|
|            "this|
|             post|
|               is|
|             part|
|               of|
|       polyarchy,|
|               an|
|      independent|
|             blog|
|         produced|
|               by|
|              the|
|        political|
|           reform|
|          program|
|               at|
|              new|
|         america,|
|                a|
+-----------------+
only showing top 20 rows



In [28]:
from pyspark.sql.functions import regexp_extract

In [29]:
clean_tokens = normalized_tokens.select(regexp_extract(col("normalized tokens"),"[a-z]*",0).
                                        alias("Cleaned Tokens"))

In [30]:
clean_tokens.show()

+--------------+
|Cleaned Tokens|
+--------------+
|       article|
|              |
|          post|
|            is|
|          part|
|            of|
|     polyarchy|
|            an|
|   independent|
|          blog|
|      produced|
|            by|
|           the|
|     political|
|        reform|
|       program|
|            at|
|           new|
|       america|
|             a|
+--------------+
only showing top 20 rows



In [31]:
non_null_cleaned_tokens = clean_tokens.filter(col("Cleaned Tokens") != "")

In [32]:
non_null_cleaned_tokens.show()

+--------------+
|Cleaned Tokens|
+--------------+
|       article|
|          post|
|            is|
|          part|
|            of|
|     polyarchy|
|            an|
|   independent|
|          blog|
|      produced|
|            by|
|           the|
|     political|
|        reform|
|       program|
|            at|
|           new|
|       america|
|             a|
|    washington|
+--------------+
only showing top 20 rows



In [33]:
token_groups = non_null_cleaned_tokens.groupby(col("Cleaned Tokens"))

In [34]:
token_groups

<pyspark.sql.group.GroupedData at 0x1ef6582d850>

In [35]:
token_counts = token_groups.count()

In [36]:
token_counts.show()

+--------------+-------+
|Cleaned Tokens|  count|
+--------------+-------+
|          some|1348511|
|    likelihood|  12071|
|         still| 682599|
|         those| 741766|
|      tortured|   5220|
|        online| 200524|
|           few| 380259|
|   transaction|  31312|
|     indicator|   8984|
|     involving|  42441|
|      incoming|  12798|
|     connected|  40889|
|       jewelry|  15130|
|        bazaar|   2039|
|  safeguarding|   1675|
|        filing|  58015|
|        brands|  55095|
|       flashed|   2282|
|   interaction|   9840|
|           art| 183475|
+--------------+-------+
only showing top 20 rows



In [37]:
token_counts.orderBy("count",ascending=False).show(100)

+--------------+--------+
|Cleaned Tokens|   count|
+--------------+--------+
|           the|49614097|
|            to|25278753|
|            of|22664966|
|             a|22242784|
|           and|21699171|
|            in|18907734|
|          that|10654902|
|            on| 9087425|
|           for| 9066347|
|            is| 7811476|
|            it| 7043139|
|          with| 6416271|
|            as| 5385847|
|            by| 5363492|
|           was| 5249367|
|          said| 4944904|
|            at| 4684935|
|            he| 4379023|
|          from| 4267604|
|           are| 3800689|
|            be| 3782725|
|           has| 3751435|
|          have| 3703778|
|           but| 3560702|
|            an| 3538650|
|             i| 3406538|
|          this| 3401500|
|           his| 3371237|
|           not| 3152063|
|           you| 2872337|
|          they| 2779378|
|          more| 2692541|
|           its| 2616173|
|           who| 2578406|
|          will| 2493764|
|         th

In [38]:
data.select(*data.columns[:3]).show()

+--------------------+--------------------+-------------------+
|                 _c0|                 _c1|                _c2|
+--------------------+--------------------+-------------------+
|                null|          Unnamed: 0|               date|
|                   0|                   0|2016-12-09 18:31:00|
|                   1|                   1|2016-10-07 21:26:46|
|The highest-paid ...|https://www.busin...|               null|
|                   2|                   2|2018-01-26 00:00:00|
|                   3|                   3|2019-06-27 00:00:00|
|                   4|                   4|2016-01-27 00:00:00|
|                   5|                   5|2019-06-17 00:00:00|
|Editing by Tassil...|https://www.reute...|         Financials|
|                   6|                   6|2019-06-23 00:00:00|
|                   7|                   7|2018-05-02 17:09:00|
|                   8|                   8|2016-05-18 13:00:06|
|                   9|                  

In [39]:
data.select(*["_c0","_c1","_c2"]).show()

+--------------------+--------------------+-------------------+
|                 _c0|                 _c1|                _c2|
+--------------------+--------------------+-------------------+
|                null|          Unnamed: 0|               date|
|                   0|                   0|2016-12-09 18:31:00|
|                   1|                   1|2016-10-07 21:26:46|
|The highest-paid ...|https://www.busin...|               null|
|                   2|                   2|2018-01-26 00:00:00|
|                   3|                   3|2019-06-27 00:00:00|
|                   4|                   4|2016-01-27 00:00:00|
|                   5|                   5|2019-06-17 00:00:00|
|Editing by Tassil...|https://www.reute...|         Financials|
|                   6|                   6|2019-06-23 00:00:00|
|                   7|                   7|2018-05-02 17:09:00|
|                   8|                   8|2016-05-18 13:00:06|
|                   9|                  

In [40]:
data.select("_c0","_c1","_c2").show()

+--------------------+--------------------+-------------------+
|                 _c0|                 _c1|                _c2|
+--------------------+--------------------+-------------------+
|                null|          Unnamed: 0|               date|
|                   0|                   0|2016-12-09 18:31:00|
|                   1|                   1|2016-10-07 21:26:46|
|The highest-paid ...|https://www.busin...|               null|
|                   2|                   2|2018-01-26 00:00:00|
|                   3|                   3|2019-06-27 00:00:00|
|                   4|                   4|2016-01-27 00:00:00|
|                   5|                   5|2019-06-17 00:00:00|
|Editing by Tassil...|https://www.reute...|         Financials|
|                   6|                   6|2019-06-23 00:00:00|
|                   7|                   7|2018-05-02 17:09:00|
|                   8|                   8|2016-05-18 13:00:06|
|                   9|                  

In [41]:
data.select(col("_c0"),col("_c1"),col("_c2")).show()

+--------------------+--------------------+-------------------+
|                 _c0|                 _c1|                _c2|
+--------------------+--------------------+-------------------+
|                null|          Unnamed: 0|               date|
|                   0|                   0|2016-12-09 18:31:00|
|                   1|                   1|2016-10-07 21:26:46|
|The highest-paid ...|https://www.busin...|               null|
|                   2|                   2|2018-01-26 00:00:00|
|                   3|                   3|2019-06-27 00:00:00|
|                   4|                   4|2016-01-27 00:00:00|
|                   5|                   5|2019-06-17 00:00:00|
|Editing by Tassil...|https://www.reute...|         Financials|
|                   6|                   6|2019-06-23 00:00:00|
|                   7|                   7|2018-05-02 17:09:00|
|                   8|                   8|2016-05-18 13:00:06|
|                   9|                  

In [42]:
data.select(*[col("_c0"),col("_c1"),col("_c2")]).show()

+--------------------+--------------------+-------------------+
|                 _c0|                 _c1|                _c2|
+--------------------+--------------------+-------------------+
|                null|          Unnamed: 0|               date|
|                   0|                   0|2016-12-09 18:31:00|
|                   1|                   1|2016-10-07 21:26:46|
|The highest-paid ...|https://www.busin...|               null|
|                   2|                   2|2018-01-26 00:00:00|
|                   3|                   3|2019-06-27 00:00:00|
|                   4|                   4|2016-01-27 00:00:00|
|                   5|                   5|2019-06-17 00:00:00|
|Editing by Tassil...|https://www.reute...|         Financials|
|                   6|                   6|2019-06-23 00:00:00|
|                   7|                   7|2018-05-02 17:09:00|
|                   8|                   8|2016-05-18 13:00:06|
|                   9|                  

In [43]:
data.columns

['_c0',
 '_c1',
 '_c2',
 '_c3',
 '_c4',
 '_c5',
 '_c6',
 '_c7',
 '_c8',
 '_c9',
 '_c10',
 '_c11']

In [44]:
import numpy as np

In [45]:
column_chunks = np.array_split(np.array(data.columns),len(data.columns)//3)

In [46]:
print(column_chunks)

[array(['_c0', '_c1', '_c2'], dtype='<U4'), array(['_c3', '_c4', '_c5'], dtype='<U4'), array(['_c6', '_c7', '_c8'], dtype='<U4'), array(['_c9', '_c10', '_c11'], dtype='<U4')]


In [47]:
for x in column_chunks:
    data.select(*x).show()

+--------------------+--------------------+-------------------+
|                 _c0|                 _c1|                _c2|
+--------------------+--------------------+-------------------+
|                null|          Unnamed: 0|               date|
|                   0|                   0|2016-12-09 18:31:00|
|                   1|                   1|2016-10-07 21:26:46|
|The highest-paid ...|https://www.busin...|               null|
|                   2|                   2|2018-01-26 00:00:00|
|                   3|                   3|2019-06-27 00:00:00|
|                   4|                   4|2016-01-27 00:00:00|
|                   5|                   5|2019-06-17 00:00:00|
|Editing by Tassil...|https://www.reute...|         Financials|
|                   6|                   6|2019-06-23 00:00:00|
|                   7|                   7|2018-05-02 17:09:00|
|                   8|                   8|2016-05-18 13:00:06|
|                   9|                  

In [48]:
dropped_data = data.drop(*data.columns[:6])

In [49]:
dropped_data.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                 _c6|                 _c7|                 _c8|                 _c9|                _c10|                _c11|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|              author|               title|             article|                 url|             section|         publication|
|         Lee Drutman|We should take co...|"This post is par...|             however| several critics ...|         for example|
|         Scott Davis|Colts GM Ryan Gri...| The Indianapolis...|                null|                null|                null|
|                null|                null|                null|                null|                null|                null|
|                null|Trump denies repo...|DAVOS, Switzerlan...|https://www.reute...|               Davo

In [50]:
dropped_data = dropped_data.drop(col("_c9"))

In [51]:
dropped_data.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                 _c6|                 _c7|                 _c8|                _c10|                _c11|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|              author|               title|             article|             section|         publication|
|         Lee Drutman|We should take co...|"This post is par...| several critics ...|         for example|
|         Scott Davis|Colts GM Ryan Gri...| The Indianapolis...|                null|                null|
|                null|                null|                null|                null|                null|
|                null|Trump denies repo...|DAVOS, Switzerlan...|               Davos|             Reuters|
|                null|France's Sarkozy ...|PARIS (Reuters) -...|          World News|             Reuters|
|                null|Paris Hilton: W

In [52]:
another_dropped_data = data.select(
                        *[x for x in data.columns if x in ["_c6","_c7","_c8","_c10","_c11"]])

In [53]:
another_dropped_data.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                 _c6|                 _c7|                 _c8|                _c10|                _c11|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|              author|               title|             article|             section|         publication|
|         Lee Drutman|We should take co...|"This post is par...| several critics ...|         for example|
|         Scott Davis|Colts GM Ryan Gri...| The Indianapolis...|                null|                null|
|                null|                null|                null|                null|                null|
|                null|Trump denies repo...|DAVOS, Switzerlan...|               Davos|             Reuters|
|                null|France's Sarkozy ...|PARIS (Reuters) -...|          World News|             Reuters|
|                null|Paris Hilton: W

In [54]:
another_dropped_data = another_dropped_data.withColumnRenamed("_c6","author")

In [55]:
another_dropped_data.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|              author|                 _c7|                 _c8|                _c10|                _c11|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|              author|               title|             article|             section|         publication|
|         Lee Drutman|We should take co...|"This post is par...| several critics ...|         for example|
|         Scott Davis|Colts GM Ryan Gri...| The Indianapolis...|                null|                null|
|                null|                null|                null|                null|                null|
|                null|Trump denies repo...|DAVOS, Switzerlan...|               Davos|             Reuters|
|                null|France's Sarkozy ...|PARIS (Reuters) -...|          World News|             Reuters|
|                null|Paris Hilton: W

In [56]:
import pyspark.sql.types as T

In [64]:
import pyspark.sql.functions as F

In [10]:
from pyspark import SparkContext

In [11]:
import pyspark

In [3]:
some_random_collection = [1,2,3,4,5]

In [12]:
conftfos = pyspark.SparkConf().setAll([('spark.executor.cores','1')])

In [13]:
sc = SparkContext("local[*]",conf=conftfos)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Converting articles into BoW Vectors, master=local[*]) created by getOrCreate at <ipython-input-2-e2daf0041fd5>:1 

In [5]:
random_collection_rdd = sc.parallelize(some_random_collection)

In [6]:
print(random_collection_rdd)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274


In [7]:
def add_one(element):
    return element + 1

In [8]:
func_result = random_collection_rdd.map(add_one)

In [9]:
func_result.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 0.0 failed 1 times, most recent failure: Lost task 4.0 in stage 0.0 (TID 4) (DESKTOP-MLVSQCI executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\aimlrl-amd ryzen\spark\spark-3.1.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 586, in main
  File "C:\Users\aimlrl-amd ryzen\spark\spark-3.1.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 69, in read_command
  File "C:\Users\aimlrl-amd ryzen\spark\spark-3.1.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "C:\Users\aimlrl-amd ryzen\spark\spark-3.1.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
AttributeError: Can't get attribute '_fill_function' on <module 'pyspark.cloudpickle' from 'C:\\Users\\aimlrl-amd ryzen\\spark\\spark-3.1.1-bin-hadoop2.7\\python\\lib\\pyspark.zip\\pyspark\\cloudpickle\\__init__.py'>

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\aimlrl-amd ryzen\spark\spark-3.1.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 586, in main
  File "C:\Users\aimlrl-amd ryzen\spark\spark-3.1.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 69, in read_command
  File "C:\Users\aimlrl-amd ryzen\spark\spark-3.1.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "C:\Users\aimlrl-amd ryzen\spark\spark-3.1.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
AttributeError: Can't get attribute '_fill_function' on <module 'pyspark.cloudpickle' from 'C:\\Users\\aimlrl-amd ryzen\\spark\\spark-3.1.1-bin-hadoop2.7\\python\\lib\\pyspark.zip\\pyspark\\cloudpickle\\__init__.py'>

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
