From https://www.kaggle.com/code/danielokeeffe/simple-text-classification-with-apache-spark/notebook

In [15]:
!pip install pyspark

[0m

In [16]:
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression

In [17]:
#Spark context
hc = (SparkSession.builder
                  .appName("Toxic Comment Classification")
                  .enableHiveSupport()
                  .config("spark.executor.memory", "4G")
                  .config("spark.driver.memory", "18G")
                  .config("spark.executor.cores", "7")
                  .config("spark.python.worker.memory","4G")
                  .config("spark.driver.maxResultSize", "0")
                  .config("spark.sql.crossJoin.enables", "true")
                  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                  .config("spark.default.parallelism", "2")
                  .getOrCreate())

In [18]:
hc.sparkContext.setLogLevel('Info')

In [19]:
#Read data using Pandas DataFrame and converting to Spark DataFrame
def to_spark_df(fin):
    df = pd.read_csv(fin)
    df.fillna("", inplace = True)
    df = hc.createDataFrame(df)
    return (df)


In [20]:
#Load train and test data sets
train = to_spark_df("/kaggle/input/jigsaw-toxic-comment-classification-challenge/train.csv.zip")
test = to_spark_df("/kaggle/input/jigsaw-toxic-comment-classification-challenge/test.csv.zip")

In [21]:
out_cols = [i for i in train.columns if i not in ["id", "comment_text"]]
train.show(5)


22/11/28 20:50:25 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
22/11/28 20:50:25 INFO DAGScheduler: Got job 84 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
22/11/28 20:50:25 INFO DAGScheduler: Final stage: ResultStage 156 (showString at NativeMethodAccessorImpl.java:0)
22/11/28 20:50:25 INFO DAGScheduler: Parents of final stage: List()
22/11/28 20:50:25 INFO DAGScheduler: Missing parents: List()
22/11/28 20:50:25 INFO DAGScheduler: Submitting ResultStage 156 (MapPartitionsRDD[150] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
22/11/28 20:50:25 INFO MemoryStore: Block broadcast_157 stored as values in memory (estimated size 14.8 KiB, free 10.6 GiB)
22/11/28 20:50:25 INFO MemoryStore: Block broadcast_157_piece0 stored as bytes in memory (estimated size 7.1 KiB, free 10.6 GiB)
22/11/28 20:50:25 INFO BlockManagerInfo: Added broadcast_157_piece0 in memory on c0dd063d47a7:38291 (size: 7.1 KiB, fre

[Stage 156:>                                                        (0 + 1) / 1]

22/11/28 20:50:29 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 156 (TID 89): Attempting to kill Python Worker
22/11/28 20:50:29 INFO Executor: Finished task 0.0 in stage 156.0 (TID 89). 3084 bytes result sent to driver
22/11/28 20:50:29 INFO TaskSetManager: Finished task 0.0 in stage 156.0 (TID 89) in 4091 ms on c0dd063d47a7 (executor driver) (1/1)
22/11/28 20:50:29 INFO TaskSchedulerImpl: Removed TaskSet 156.0, whose tasks have all completed, from pool 
22/11/28 20:50:29 INFO DAGScheduler: ResultStage 156 (showString at NativeMethodAccessorImpl.java:0) finished in 4.123 s
22/11/28 20:50:29 INFO DAGScheduler: Job 84 is finished. Cancelling potential speculative or zombie tasks for this job
22/11/28 20:50:29 INFO TaskSchedulerImpl: Killing all running tasks in stage 156: Stage finished
22/11/28 20:50:29 INFO DAGScheduler: Job 84 finished: showString at NativeMethodAccessorImpl.java:0, took 4.125847 s
+----------------+--------------------+-----+------------+--

                                                                                

In [22]:
# View some toxic comments
train.filter(F.col('toxic') == 1).show(2)


22/11/28 20:50:29 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
22/11/28 20:50:29 INFO DAGScheduler: Got job 85 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
22/11/28 20:50:29 INFO DAGScheduler: Final stage: ResultStage 157 (showString at NativeMethodAccessorImpl.java:0)
22/11/28 20:50:29 INFO DAGScheduler: Parents of final stage: List()
22/11/28 20:50:29 INFO DAGScheduler: Missing parents: List()
22/11/28 20:50:29 INFO DAGScheduler: Submitting ResultStage 157 (MapPartitionsRDD[152] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
22/11/28 20:50:29 INFO MemoryStore: Block broadcast_158 stored as values in memory (estimated size 15.2 KiB, free 10.6 GiB)
22/11/28 20:50:29 INFO MemoryStore: Block broadcast_158_piece0 stored as bytes in memory (estimated size 7.2 KiB, free 10.6 GiB)
22/11/28 20:50:29 INFO BlockManagerInfo: Added broadcast_158_piece0 in memory on c0dd063d47a7:38291 (size: 7.2 KiB, fre

[Stage 157:>                                                        (0 + 1) / 1]

22/11/28 20:50:33 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 157 (TID 90): Attempting to kill Python Worker
22/11/28 20:50:33 INFO Executor: Finished task 0.0 in stage 157.0 (TID 90). 2220 bytes result sent to driver
22/11/28 20:50:33 INFO TaskSetManager: Finished task 0.0 in stage 157.0 (TID 90) in 4087 ms on c0dd063d47a7 (executor driver) (1/1)
22/11/28 20:50:33 INFO TaskSchedulerImpl: Removed TaskSet 157.0, whose tasks have all completed, from pool 
22/11/28 20:50:33 INFO DAGScheduler: ResultStage 157 (showString at NativeMethodAccessorImpl.java:0) finished in 4.094 s
22/11/28 20:50:33 INFO DAGScheduler: Job 85 is finished. Cancelling potential speculative or zombie tasks for this job
22/11/28 20:50:33 INFO TaskSchedulerImpl: Killing all running tasks in stage 157: Stage finished
22/11/28 20:50:33 INFO DAGScheduler: Job 85 finished: showString at NativeMethodAccessorImpl.java:0, took 4.096800 s
+----------------+--------------------+-----+------------+--

                                                                                

In [23]:
# Tokenize comment into individual words
tokenizer = Tokenizer(inputCol = "comment_text", outputCol= "words")
wordsData = tokenizer.transform (train)
# Count words
hashingTF = HashingTF (inputCol = "words", outputCol = "rawFeatures")
tf = hashingTF.transform(wordsData)
tf.select('rawFeatures').take(2)

22/11/28 20:50:33 INFO SparkContext: Starting job: take at /tmp/ipykernel_28/2576465728.py:7
22/11/28 20:50:33 INFO DAGScheduler: Got job 86 (take at /tmp/ipykernel_28/2576465728.py:7) with 1 output partitions
22/11/28 20:50:33 INFO DAGScheduler: Final stage: ResultStage 158 (take at /tmp/ipykernel_28/2576465728.py:7)
22/11/28 20:50:33 INFO DAGScheduler: Parents of final stage: List()
22/11/28 20:50:33 INFO DAGScheduler: Missing parents: List()
22/11/28 20:50:33 INFO DAGScheduler: Submitting ResultStage 158 (MapPartitionsRDD[154] at take at /tmp/ipykernel_28/2576465728.py:7), which has no missing parents
22/11/28 20:50:33 INFO MemoryStore: Block broadcast_159 stored as values in memory (estimated size 36.4 KiB, free 10.6 GiB)
22/11/28 20:50:33 INFO MemoryStore: Block broadcast_159_piece0 stored as bytes in memory (estimated size 17.0 KiB, free 10.6 GiB)
22/11/28 20:50:33 INFO BlockManagerInfo: Added broadcast_159_piece0 in memory on c0dd063d47a7:38291 (size: 17.0 KiB, free: 10.6 GiB)
2

[Stage 158:>                                                        (0 + 1) / 1]

22/11/28 20:50:38 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 158 (TID 91): Attempting to kill Python Worker
22/11/28 20:50:38 INFO Executor: Finished task 0.0 in stage 158.0 (TID 91). 1899 bytes result sent to driver
22/11/28 20:50:38 INFO TaskSetManager: Finished task 0.0 in stage 158.0 (TID 91) in 4088 ms on c0dd063d47a7 (executor driver) (1/1)
22/11/28 20:50:38 INFO TaskSchedulerImpl: Removed TaskSet 158.0, whose tasks have all completed, from pool 
22/11/28 20:50:38 INFO DAGScheduler: ResultStage 158 (take at /tmp/ipykernel_28/2576465728.py:7) finished in 4.096 s
22/11/28 20:50:38 INFO DAGScheduler: Job 86 is finished. Cancelling potential speculative or zombie tasks for this job
22/11/28 20:50:38 INFO TaskSchedulerImpl: Killing all running tasks in stage 158: Stage finished
22/11/28 20:50:38 INFO DAGScheduler: Job 86 finished: take at /tmp/ipykernel_28/2576465728.py:7, took 4.098898 s


                                                                                

[Row(rawFeatures=SparseVector(262144, {6240: 1.0, 7221: 1.0, 9420: 1.0, 10214: 1.0, 11680: 1.0, 15494: 1.0, 19036: 1.0, 19208: 1.0, 23032: 1.0, 25000: 1.0, 26144: 1.0, 66299: 1.0, 67416: 1.0, 72125: 1.0, 74944: 1.0, 77971: 1.0, 79300: 1.0, 79968: 1.0, 89833: 1.0, 94488: 1.0, 95889: 3.0, 97171: 1.0, 101169: 1.0, 103863: 1.0, 110427: 1.0, 110510: 1.0, 116767: 1.0, 140784: 1.0, 141086: 1.0, 145284: 1.0, 151536: 1.0, 151751: 1.0, 166368: 1.0, 187114: 1.0, 219915: 1.0, 223402: 1.0, 229137: 1.0, 231630: 1.0, 233967: 1.0, 240944: 1.0, 253170: 1.0})),
 Row(rawFeatures=SparseVector(262144, {2195: 1.0, 4714: 1.0, 13283: 1.0, 48234: 1.0, 85939: 1.0, 108541: 1.0, 119702: 1.0, 121320: 1.0, 137179: 1.0, 141086: 1.0, 159767: 1.0, 165258: 1.0, 169800: 1.0, 212492: 1.0, 218233: 1.0, 224255: 1.0, 224850: 1.0, 249180: 1.0}))]

In [24]:
#Transform token frequencies into tf-idf
idf = IDF (inputCol="rawFeatures", outputCol = "features")
idfModel = idf.fit(tf)
tfidf = idfModel.transform(tf)
tfidf.select("features").take(3)

22/11/28 20:50:38 INFO SparkContext: Starting job: treeAggregate at IDF.scala:55
22/11/28 20:50:38 INFO DAGScheduler: Got job 87 (treeAggregate at IDF.scala:55) with 2 output partitions
22/11/28 20:50:38 INFO DAGScheduler: Final stage: ResultStage 159 (treeAggregate at IDF.scala:55)
22/11/28 20:50:38 INFO DAGScheduler: Parents of final stage: List()
22/11/28 20:50:38 INFO DAGScheduler: Missing parents: List()
22/11/28 20:50:38 INFO DAGScheduler: Submitting ResultStage 159 (MapPartitionsRDD[160] at treeAggregate at IDF.scala:55), which has no missing parents
22/11/28 20:50:38 INFO MemoryStore: Block broadcast_160 stored as values in memory (estimated size 45.1 KiB, free 10.6 GiB)
22/11/28 20:50:38 INFO MemoryStore: Block broadcast_160_piece0 stored as bytes in memory (estimated size 20.5 KiB, free 10.6 GiB)
22/11/28 20:50:38 INFO BlockManagerInfo: Added broadcast_160_piece0 in memory on c0dd063d47a7:38291 (size: 20.5 KiB, free: 10.6 GiB)
22/11/28 20:50:38 INFO SparkContext: Created broa

[Stage 159:>                                                        (0 + 2) / 2]

22/11/28 20:50:47 INFO PythonRunner: Times: total = 7002, boot = 7, init = 7, finish = 6988
22/11/28 20:50:47 INFO PythonRunner: Times: total = 6843, boot = 11, init = 13, finish = 6819
22/11/28 20:50:47 INFO Executor: Finished task 1.0 in stage 159.0 (TID 93). 270353 bytes result sent to driver
22/11/28 20:50:47 INFO Executor: Finished task 0.0 in stage 159.0 (TID 92). 270422 bytes result sent to driver
22/11/28 20:50:47 INFO TaskSetManager: Finished task 1.0 in stage 159.0 (TID 93) in 9306 ms on c0dd063d47a7 (executor driver) (1/2)
22/11/28 20:50:47 INFO TaskSetManager: Finished task 0.0 in stage 159.0 (TID 92) in 9341 ms on c0dd063d47a7 (executor driver) (2/2)
22/11/28 20:50:47 INFO TaskSchedulerImpl: Removed TaskSet 159.0, whose tasks have all completed, from pool 
22/11/28 20:50:47 INFO DAGScheduler: ResultStage 159 (treeAggregate at IDF.scala:55) finished in 9.354 s
22/11/28 20:50:47 INFO DAGScheduler: Job 87 is finished. Cancelling potential speculative or zombie tasks for this 

[Stage 160:>                                                        (0 + 1) / 1]

22/11/28 20:50:51 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 160 (TID 94): Attempting to kill Python Worker
22/11/28 20:50:51 INFO Executor: Finished task 0.0 in stage 160.0 (TID 94). 2784 bytes result sent to driver
22/11/28 20:50:51 INFO TaskSetManager: Finished task 0.0 in stage 160.0 (TID 94) in 4065 ms on c0dd063d47a7 (executor driver) (1/1)
22/11/28 20:50:51 INFO TaskSchedulerImpl: Removed TaskSet 160.0, whose tasks have all completed, from pool 
22/11/28 20:50:51 INFO DAGScheduler: ResultStage 160 (take at /tmp/ipykernel_28/493068121.py:5) finished in 4.094 s
22/11/28 20:50:51 INFO DAGScheduler: Job 88 is finished. Cancelling potential speculative or zombie tasks for this job
22/11/28 20:50:51 INFO TaskSchedulerImpl: Killing all running tasks in stage 160: Stage finished
22/11/28 20:50:51 INFO DAGScheduler: Job 88 finished: take at /tmp/ipykernel_28/493068121.py:5, took 4.096542 s


                                                                                

[Row(features=SparseVector(262144, {6240: 8.7614, 7221: 2.2023, 9420: 3.1522, 10214: 6.4668, 11680: 5.0275, 15494: 3.4215, 19036: 0.7385, 19208: 2.2441, 23032: 5.0114, 25000: 5.6868, 26144: 3.5877, 66299: 7.7906, 67416: 1.1947, 72125: 2.2731, 74944: 2.5138, 77971: 7.6235, 79300: 6.672, 79968: 9.9008, 89833: 3.0516, 94488: 8.4249, 95889: 1.2127, 97171: 2.0161, 101169: 1.734, 103863: 6.8445, 110427: 2.1174, 110510: 5.6685, 116767: 6.0244, 140784: 3.0482, 141086: 2.4778, 145284: 8.0682, 151536: 2.2414, 151751: 9.0358, 166368: 2.0431, 187114: 1.7657, 219915: 0.6965, 223402: 3.3517, 229137: 4.5705, 231630: 9.4953, 233967: 3.102, 240944: 1.7538, 253170: 2.6999})),
 Row(features=SparseVector(262144, {2195: 7.2353, 4714: 5.8755, 13283: 6.1872, 48234: 6.4831, 85939: 6.9304, 108541: 1.133, 119702: 5.179, 121320: 5.8085, 137179: 8.9845, 141086: 2.4778, 159767: 6.7818, 165258: 4.1344, 169800: 8.3167, 212492: 10.8816, 218233: 3.3932, 224255: 2.5245, 224850: 3.7739, 249180: 0.4627})),
 Row(features=

In [25]:
#Get started with the regression model 
Reg_para = 0.1
LogReg= LogisticRegression (featuresCol = "features", labelCol= "toxic", regParam =Reg_para)
LogReg_Model = LogReg.fit(tfidf.limit(5000))
res_train = LogReg_Model.transform(tfidf)
res_train.show(5)

22/11/28 20:50:51 INFO Instrumentation: [a606aa0e] Stage class: LogisticRegression
22/11/28 20:50:51 INFO Instrumentation: [a606aa0e] Stage uid: LogisticRegression_4660efcfb5b5
22/11/28 20:50:51 INFO CodeGenerator: Code generated in 31.558833 ms
22/11/28 20:50:51 INFO DAGScheduler: Registering RDD 164 (rdd at Instrumentation.scala:62) as input to shuffle 4
22/11/28 20:50:51 INFO DAGScheduler: Got map stage job 89 (rdd at Instrumentation.scala:62) with 2 output partitions
22/11/28 20:50:51 INFO DAGScheduler: Final stage: ShuffleMapStage 161 (rdd at Instrumentation.scala:62)
22/11/28 20:50:51 INFO DAGScheduler: Parents of final stage: List()
22/11/28 20:50:51 INFO DAGScheduler: Missing parents: List()
22/11/28 20:50:51 INFO DAGScheduler: Submitting ShuffleMapStage 161 (MapPartitionsRDD[164] at rdd at Instrumentation.scala:62), which has no missing parents
22/11/28 20:50:51 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/11/28 20:50:51 INFO MemoryStore: Block broadc

[Stage 161:>                                                        (0 + 2) / 2]

22/11/28 20:50:56 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 161 (TID 95): Attempting to kill Python Worker
22/11/28 20:50:56 INFO Executor: Finished task 0.0 in stage 161.0 (TID 95). 1970 bytes result sent to driver
22/11/28 20:50:56 INFO TaskSetManager: Finished task 0.0 in stage 161.0 (TID 95) in 4132 ms on c0dd063d47a7 (executor driver) (1/2)
22/11/28 20:50:56 WARN PythonRunner: Detected deadlock while completing task 1.0 in stage 161 (TID 96): Attempting to kill Python Worker
22/11/28 20:50:56 INFO Executor: Finished task 1.0 in stage 161.0 (TID 96). 1970 bytes result sent to driver
22/11/28 20:50:56 INFO TaskSetManager: Finished task 1.0 in stage 161.0 (TID 96) in 4153 ms on c0dd063d47a7 (executor driver) (2/2)
22/11/28 20:50:56 INFO TaskSchedulerImpl: Removed TaskSet 161.0, whose tasks have all completed, from pool 
22/11/28 20:50:56 INFO DAGScheduler: ShuffleMapStage 161 (rdd at Instrumentation.scala:62) finished in 4.229 s
22/11/28 20:50:56 INFO DA

[Stage 162:>                                                        (0 + 2) / 2]

22/11/28 20:51:00 WARN PythonRunner: Detected deadlock while completing task 1.0 in stage 162 (TID 98): Attempting to kill Python Worker
22/11/28 20:51:00 INFO Executor: Finished task 1.0 in stage 162.0 (TID 98). 1970 bytes result sent to driver
22/11/28 20:51:00 INFO TaskSetManager: Finished task 1.0 in stage 162.0 (TID 98) in 4155 ms on c0dd063d47a7 (executor driver) (1/2)
22/11/28 20:51:00 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 162 (TID 97): Attempting to kill Python Worker
22/11/28 20:51:00 INFO Executor: Finished task 0.0 in stage 162.0 (TID 97). 1970 bytes result sent to driver
22/11/28 20:51:00 INFO TaskSetManager: Finished task 0.0 in stage 162.0 (TID 97) in 4207 ms on c0dd063d47a7 (executor driver) (2/2)
22/11/28 20:51:00 INFO TaskSchedulerImpl: Removed TaskSet 162.0, whose tasks have all completed, from pool 
22/11/28 20:51:00 INFO DAGScheduler: ShuffleMapStage 162 (rdd at Predictor.scala:81) finished in 4.241 s
22/11/28 20:51:00 INFO DAGSched

                                                                                

22/11/28 20:51:00 INFO MemoryStore: Block rdd_180_0 stored as values in memory (estimated size 2.8 MiB, free 10.6 GiB)
22/11/28 20:51:00 INFO BlockManagerInfo: Added rdd_180_0 in memory on c0dd063d47a7:38291 (size: 2.8 MiB, free: 10.6 GiB)
22/11/28 20:51:01 INFO MemoryStore: Block taskresult_100 stored as bytes in memory (estimated size 2.0 MiB, free 10.6 GiB)
22/11/28 20:51:01 INFO BlockManagerInfo: Added taskresult_100 in memory on c0dd063d47a7:38291 (size: 2.0 MiB, free: 10.6 GiB)
22/11/28 20:51:01 INFO Executor: Finished task 0.0 in stage 166.0 (TID 100). 2110259 bytes result sent via BlockManager)
22/11/28 20:51:01 INFO TaskSetManager: Finished task 0.0 in stage 166.0 (TID 100) in 200 ms on c0dd063d47a7 (executor driver) (1/1)
22/11/28 20:51:01 INFO TaskSchedulerImpl: Removed TaskSet 166.0, whose tasks have all completed, from pool 
22/11/28 20:51:01 INFO BlockManagerInfo: Removed broadcast_163_piece0 on c0dd063d47a7:38291 in memory (size: 1440.8 KiB, free: 10.6 GiB)
22/11/28 20:5

[Stage 307:>                                                        (0 + 2) / 2]

22/11/28 20:51:20 INFO BlockManagerInfo: Removed broadcast_308_piece0 on c0dd063d47a7:38291 in memory (size: 1445.2 KiB, free: 10.6 GiB)
22/11/28 20:51:23 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 307 (TID 171): Attempting to kill Python Worker
22/11/28 20:51:23 INFO Executor: Finished task 0.0 in stage 307.0 (TID 171). 2013 bytes result sent to driver
22/11/28 20:51:23 INFO TaskSetManager: Finished task 0.0 in stage 307.0 (TID 171) in 4094 ms on c0dd063d47a7 (executor driver) (1/2)
22/11/28 20:51:23 WARN PythonRunner: Detected deadlock while completing task 1.0 in stage 307 (TID 172): Attempting to kill Python Worker
22/11/28 20:51:23 INFO Executor: Finished task 1.0 in stage 307.0 (TID 172). 2013 bytes result sent to driver
22/11/28 20:51:23 INFO TaskSetManager: Finished task 1.0 in stage 307.0 (TID 172) in 4071 ms on c0dd063d47a7 (executor driver) (2/2)
22/11/28 20:51:23 INFO TaskSchedulerImpl: Removed TaskSet 307.0, whose tasks have all completed, from

[Stage 308:>                                                        (0 + 2) / 2]

22/11/28 20:51:24 INFO BlockManagerInfo: Removed broadcast_309_piece0 on c0dd063d47a7:38291 in memory (size: 1440.8 KiB, free: 10.6 GiB)
22/11/28 20:51:28 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 308 (TID 173): Attempting to kill Python Worker
22/11/28 20:51:28 INFO Executor: Finished task 0.0 in stage 308.0 (TID 173). 2013 bytes result sent to driver
22/11/28 20:51:28 INFO TaskSetManager: Finished task 0.0 in stage 308.0 (TID 173) in 4138 ms on c0dd063d47a7 (executor driver) (1/2)
22/11/28 20:51:28 WARN PythonRunner: Detected deadlock while completing task 1.0 in stage 308 (TID 174): Attempting to kill Python Worker
22/11/28 20:51:28 INFO Executor: Finished task 1.0 in stage 308.0 (TID 174). 2013 bytes result sent to driver
22/11/28 20:51:28 INFO TaskSetManager: Finished task 1.0 in stage 308.0 (TID 174) in 4134 ms on c0dd063d47a7 (executor driver) (2/2)
22/11/28 20:51:28 INFO TaskSchedulerImpl: Removed TaskSet 308.0, whose tasks have all completed, from



22/11/28 20:51:28 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
22/11/28 20:51:28 INFO DAGScheduler: Got job 165 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
22/11/28 20:51:28 INFO DAGScheduler: Final stage: ResultStage 309 (showString at NativeMethodAccessorImpl.java:0)
22/11/28 20:51:28 INFO DAGScheduler: Parents of final stage: List()
22/11/28 20:51:28 INFO DAGScheduler: Missing parents: List()
22/11/28 20:51:28 INFO DAGScheduler: Submitting ResultStage 309 (MapPartitionsRDD[270] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
22/11/28 20:51:28 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB
22/11/28 20:51:28 INFO MemoryStore: Block broadcast_311 stored as values in memory (estimated size 4.5 MiB, free 10.6 GiB)
22/11/28 20:51:28 INFO MemoryStore: Block broadcast_311_piece0 stored as bytes in memory (estimated size 1907.4 KiB, free 10.6 GiB)
22/11/28 20:51:28 INFO BlockMa

[Stage 309:>                                                        (0 + 1) / 1]

22/11/28 20:51:32 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 309 (TID 175): Attempting to kill Python Worker
22/11/28 20:51:32 INFO Executor: Finished task 0.0 in stage 309.0 (TID 175). 9155 bytes result sent to driver
22/11/28 20:51:32 INFO TaskSetManager: Finished task 0.0 in stage 309.0 (TID 175) in 4077 ms on c0dd063d47a7 (executor driver) (1/1)
22/11/28 20:51:32 INFO TaskSchedulerImpl: Removed TaskSet 309.0, whose tasks have all completed, from pool 
22/11/28 20:51:32 INFO DAGScheduler: ResultStage 309 (showString at NativeMethodAccessorImpl.java:0) finished in 4.115 s
22/11/28 20:51:32 INFO DAGScheduler: Job 165 is finished. Cancelling potential speculative or zombie tasks for this job
22/11/28 20:51:32 INFO TaskSchedulerImpl: Killing all running tasks in stage 309: Stage finished
22/11/28 20:51:32 INFO DAGScheduler: Job 165 finished: showString at NativeMethodAccessorImpl.java:0, took 4.119075 s
+----------------+--------------------+-----+----------

                                                                                

In [26]:
res_train.select("id", "toxic", "probability", "prediction").show(20)


22/11/28 20:51:32 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
22/11/28 20:51:32 INFO DAGScheduler: Got job 166 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
22/11/28 20:51:32 INFO DAGScheduler: Final stage: ResultStage 310 (showString at NativeMethodAccessorImpl.java:0)
22/11/28 20:51:32 INFO DAGScheduler: Parents of final stage: List()
22/11/28 20:51:32 INFO DAGScheduler: Missing parents: List()
22/11/28 20:51:32 INFO DAGScheduler: Submitting ResultStage 310 (MapPartitionsRDD[272] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
22/11/28 20:51:32 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB
22/11/28 20:51:32 INFO MemoryStore: Block broadcast_312 stored as values in memory (estimated size 4.5 MiB, free 10.6 GiB)
22/11/28 20:51:32 INFO MemoryStore: Block broadcast_312_piece0 stored as bytes in memory (estimated size 1902.3 KiB, free 10.6 GiB)
22/11/28 20:51:32 INFO BlockMa

[Stage 310:>                                                        (0 + 1) / 1]

22/11/28 20:51:36 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 310 (TID 176): Attempting to kill Python Worker
22/11/28 20:51:36 INFO Executor: Finished task 0.0 in stage 310.0 (TID 176). 2823 bytes result sent to driver
22/11/28 20:51:36 INFO TaskSetManager: Finished task 0.0 in stage 310.0 (TID 176) in 4092 ms on c0dd063d47a7 (executor driver) (1/1)
22/11/28 20:51:36 INFO TaskSchedulerImpl: Removed TaskSet 310.0, whose tasks have all completed, from pool 
22/11/28 20:51:36 INFO DAGScheduler: ResultStage 310 (showString at NativeMethodAccessorImpl.java:0) finished in 4.118 s
22/11/28 20:51:36 INFO DAGScheduler: Job 166 is finished. Cancelling potential speculative or zombie tasks for this job
22/11/28 20:51:36 INFO TaskSchedulerImpl: Killing all running tasks in stage 310: Stage finished
22/11/28 20:51:36 INFO DAGScheduler: Job 166 finished: showString at NativeMethodAccessorImpl.java:0, took 4.121188 s
+----------------+-----+--------------------+----------

                                                                                

In [27]:
#define a user-defined function udf 
extrac_proba = F.udf (lambda x : float(x[1]), T.FloatType())
(res_train.withColumn("proba", extrac_proba("probability"))
 .select("id", "proba", "prediction")
 .show())

22/11/28 20:51:36 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
22/11/28 20:51:36 INFO DAGScheduler: Got job 167 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
22/11/28 20:51:36 INFO DAGScheduler: Final stage: ResultStage 311 (showString at NativeMethodAccessorImpl.java:0)
22/11/28 20:51:36 INFO DAGScheduler: Parents of final stage: List()
22/11/28 20:51:36 INFO DAGScheduler: Missing parents: List()
22/11/28 20:51:36 INFO DAGScheduler: Submitting ResultStage 311 (MapPartitionsRDD[277] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
22/11/28 20:51:37 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB
22/11/28 20:51:37 INFO MemoryStore: Block broadcast_313 stored as values in memory (estimated size 4.5 MiB, free 10.6 GiB)
22/11/28 20:51:37 INFO MemoryStore: Block broadcast_313_piece0 stored as bytes in memory (estimated size 1907.6 KiB, free 10.6 GiB)
22/11/28 20:51:37 INFO BlockMa

[Stage 311:>                                                        (0 + 1) / 1]

22/11/28 20:51:41 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 311 (TID 177): Attempting to kill Python Worker
22/11/28 20:51:41 INFO Executor: Finished task 0.0 in stage 311.0 (TID 177). 2332 bytes result sent to driver
22/11/28 20:51:41 INFO TaskSetManager: Finished task 0.0 in stage 311.0 (TID 177) in 4104 ms on c0dd063d47a7 (executor driver) (1/1)
22/11/28 20:51:41 INFO TaskSchedulerImpl: Removed TaskSet 311.0, whose tasks have all completed, from pool 
22/11/28 20:51:41 INFO DAGScheduler: ResultStage 311 (showString at NativeMethodAccessorImpl.java:0) finished in 4.130 s
22/11/28 20:51:41 INFO DAGScheduler: Job 167 is finished. Cancelling potential speculative or zombie tasks for this job
22/11/28 20:51:41 INFO TaskSchedulerImpl: Killing all running tasks in stage 311: Stage finished
22/11/28 20:51:41 INFO DAGScheduler: Job 167 finished: showString at NativeMethodAccessorImpl.java:0, took 4.132423 s
+----------------+------------+----------+
|           

                                                                                

In [29]:
# Apply model on test 
test_tokens = tokenizer.transform (test)
test_tf = hashingTF.transform(test_tokens)
test_tfidf = idfModel.transform(test_tf)
test_results = test.select('id')

test_probs = []
for col in out_cols:
    LogReg= LogisticRegression (featuresCol = "features", labelCol= col, regParam =Reg_para)
    LogReg_Model = LogReg.fit(tfidf)
    res_test = LogReg_Model.transform(test_tfidf)
    test_results = test_results.join (res_test.select('id', 'probability'), on ="id")
    test_results = test_results.withColumn(col, extrac_proba("probability"))
    test_results.show(5)

22/11/28 20:54:05 INFO Instrumentation: [0db012a0] Stage class: LogisticRegression
22/11/28 20:54:05 INFO Instrumentation: [0db012a0] Stage uid: LogisticRegression_d8c38930217d
22/11/28 20:54:05 INFO Instrumentation: [0db012a0] training: numPartitions=2 storageLevel=StorageLevel(1 replicas)
22/11/28 20:54:05 INFO Instrumentation: [0db012a0] {"labelCol":"toxic","featuresCol":"features","regParam":0.1}
22/11/28 20:54:05 INFO SparkContext: Starting job: treeAggregate at Summarizer.scala:233
22/11/28 20:54:05 INFO DAGScheduler: Got job 239 (treeAggregate at Summarizer.scala:233) with 2 output partitions
22/11/28 20:54:05 INFO DAGScheduler: Final stage: ResultStage 383 (treeAggregate at Summarizer.scala:233)
22/11/28 20:54:05 INFO DAGScheduler: Parents of final stage: List()
22/11/28 20:54:05 INFO DAGScheduler: Missing parents: List()
22/11/28 20:54:05 INFO DAGScheduler: Submitting ResultStage 383 (MapPartitionsRDD[380] at treeAggregate at Summarizer.scala:233), which has no missing parents

[Stage 383:>                                                        (0 + 2) / 2]

22/11/28 20:54:16 INFO PythonRunner: Times: total = 7926, boot = 6, init = 6, finish = 7914
22/11/28 20:54:16 INFO MemoryStore: Block taskresult_321 stored as bytes in memory (estimated size 6.0 MiB, free 10.6 GiB)
22/11/28 20:54:16 INFO BlockManagerInfo: Added taskresult_321 in memory on c0dd063d47a7:38291 (size: 6.0 MiB, free: 10.6 GiB)
22/11/28 20:54:16 INFO Executor: Finished task 1.0 in stage 383.0 (TID 321). 6324364 bytes result sent via BlockManager)
22/11/28 20:54:16 INFO TaskSetManager: Finished task 1.0 in stage 383.0 (TID 321) in 10549 ms on c0dd063d47a7 (executor driver) (1/2)
22/11/28 20:54:16 INFO BlockManagerInfo: Removed taskresult_321 on c0dd063d47a7:38291 in memory (size: 6.0 MiB, free: 10.6 GiB)


                                                                                

22/11/28 20:54:16 INFO PythonRunner: Times: total = 8262, boot = 8, init = 7, finish = 8247
22/11/28 20:54:16 INFO MemoryStore: Block taskresult_320 stored as bytes in memory (estimated size 6.0 MiB, free 10.6 GiB)
22/11/28 20:54:16 INFO BlockManagerInfo: Added taskresult_320 in memory on c0dd063d47a7:38291 (size: 6.0 MiB, free: 10.6 GiB)
22/11/28 20:54:16 INFO Executor: Finished task 0.0 in stage 383.0 (TID 320). 6324364 bytes result sent via BlockManager)
22/11/28 20:54:16 INFO TaskSetManager: Finished task 0.0 in stage 383.0 (TID 320) in 10950 ms on c0dd063d47a7 (executor driver) (2/2)
22/11/28 20:54:16 INFO DAGScheduler: ResultStage 383 (treeAggregate at Summarizer.scala:233) finished in 10.978 s
22/11/28 20:54:16 INFO BlockManagerInfo: Removed taskresult_320 on c0dd063d47a7:38291 in memory (size: 6.0 MiB, free: 10.6 GiB)
22/11/28 20:54:16 INFO DAGScheduler: Job 239 is finished. Cancelling potential speculative or zombie tasks for this job
22/11/28 20:54:16 INFO TaskSchedulerImpl: 

[Stage 384:>                                                        (0 + 2) / 2]

22/11/28 20:54:18 INFO BlockManagerInfo: Removed broadcast_457_piece0 on c0dd063d47a7:38291 in memory (size: 1442.7 KiB, free: 10.6 GiB)
22/11/28 20:54:26 INFO PythonRunner: Times: total = 7007, boot = -3050, init = 3052, finish = 7005
22/11/28 20:54:26 INFO MemoryStore: Block rdd_382_1 stored as values in memory (estimated size 44.1 MiB, free 10.6 GiB)
22/11/28 20:54:26 INFO BlockManagerInfo: Added rdd_382_1 in memory on c0dd063d47a7:38291 (size: 44.1 MiB, free: 10.6 GiB)
22/11/28 20:54:26 INFO MemoryStore: Block taskresult_323 stored as bytes in memory (estimated size 2.0 MiB, free 10.6 GiB)
22/11/28 20:54:26 INFO BlockManagerInfo: Added taskresult_323 in memory on c0dd063d47a7:38291 (size: 2.0 MiB, free: 10.6 GiB)
22/11/28 20:54:26 INFO Executor: Finished task 1.0 in stage 384.0 (TID 323). 2109350 bytes result sent via BlockManager)
22/11/28 20:54:26 INFO PythonRunner: Times: total = 7232, boot = -2705, init = 2707, finish = 7230
22/11/28 20:54:26 INFO TaskSetManager: Finished task 

                                                                                

22/11/28 20:54:26 INFO MemoryStore: Block taskresult_322 stored as bytes in memory (estimated size 2.0 MiB, free 10.5 GiB)
22/11/28 20:54:26 INFO BlockManagerInfo: Added taskresult_322 in memory on c0dd063d47a7:38291 (size: 2.0 MiB, free: 10.5 GiB)
22/11/28 20:54:26 INFO Executor: Finished task 0.0 in stage 384.0 (TID 322). 2109350 bytes result sent via BlockManager)
22/11/28 20:54:26 INFO TaskSetManager: Finished task 0.0 in stage 384.0 (TID 322) in 9919 ms on c0dd063d47a7 (executor driver) (2/2)
22/11/28 20:54:26 INFO TaskSchedulerImpl: Removed TaskSet 384.0, whose tasks have all completed, from pool 
22/11/28 20:54:26 INFO BlockManagerInfo: Removed taskresult_322 on c0dd063d47a7:38291 in memory (size: 2.0 MiB, free: 10.5 GiB)
22/11/28 20:54:26 INFO DAGScheduler: ResultStage 384 (treeAggregate at RDDLossFunction.scala:61) finished in 9.949 s
22/11/28 20:54:26 INFO DAGScheduler: Job 240 is finished. Cancelling potential speculative or zombie tasks for this job
22/11/28 20:54:26 INFO T

[Stage 454:>                (0 + 2) / 2][Stage 455:>                (0 + 2) / 2]

22/11/28 20:54:55 INFO BlockManager: Removing RDD 382
22/11/28 20:54:55 INFO PythonRunner: Times: total = 704, boot = -30782, init = 30785, finish = 701
22/11/28 20:54:55 INFO Executor: Finished task 1.0 in stage 454.0 (TID 463). 2355 bytes result sent to driver
22/11/28 20:54:55 INFO TaskSetManager: Finished task 1.0 in stage 454.0 (TID 463) in 938 ms on c0dd063d47a7 (executor driver) (1/2)
22/11/28 20:54:55 INFO PythonRunner: Times: total = 786, boot = -30578, init = 30589, finish = 775
22/11/28 20:54:55 INFO Executor: Finished task 0.0 in stage 454.0 (TID 462). 2355 bytes result sent to driver
22/11/28 20:54:55 INFO TaskSetManager: Finished task 0.0 in stage 454.0 (TID 462) in 1039 ms on c0dd063d47a7 (executor driver) (2/2)
22/11/28 20:54:55 INFO TaskSchedulerImpl: Removed TaskSet 454.0, whose tasks have all completed, from pool 
22/11/28 20:54:55 INFO DAGScheduler: ShuffleMapStage 454 (showString at NativeMethodAccessorImpl.java:0) finished in 1.049 s
22/11/28 20:54:55 INFO DAGSche

[Stage 455:>                                                        (0 + 2) / 2]

22/11/28 20:54:55 INFO SparkContext: Starting job: $anonfun$withThreadLocalCaptured$1 at FutureTask.java:264
22/11/28 20:54:55 INFO DAGScheduler: Got job 312 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:264) with 2 output partitions
22/11/28 20:54:55 INFO DAGScheduler: Final stage: ResultStage 457 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:264)
22/11/28 20:54:55 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 456)
22/11/28 20:54:55 INFO DAGScheduler: Missing parents: List()
22/11/28 20:54:55 INFO DAGScheduler: Submitting ResultStage 457 (MapPartitionsRDD[469] at $anonfun$withThreadLocalCaptured$1 at FutureTask.java:264), which has no missing parents
22/11/28 20:54:55 INFO MemoryStore: Block broadcast_602 stored as values in memory (estimated size 7.2 KiB, free 10.6 GiB)
22/11/28 20:54:55 INFO MemoryStore: Block broadcast_602_piece0 stored as bytes in memory (estimated size 3.8 KiB, free 10.6 GiB)
22/11/28 20:54:55 INFO BlockManagerInfo: Added broad

                                                                                

22/11/28 20:54:56 INFO CodeGenerator: Code generated in 18.124356 ms


[Stage 455:>                                                        (0 + 2) / 2]

22/11/28 20:54:56 INFO MemoryStore: Block broadcast_603 stored as values in memory (estimated size 72.0 MiB, free 10.5 GiB)
22/11/28 20:54:56 INFO MemoryStore: Block broadcast_603_piece0 stored as bytes in memory (estimated size 3.6 MiB, free 10.5 GiB)
22/11/28 20:54:56 INFO BlockManagerInfo: Added broadcast_603_piece0 in memory on c0dd063d47a7:38291 (size: 3.6 MiB, free: 10.6 GiB)
22/11/28 20:54:56 INFO SparkContext: Created broadcast 603 from $anonfun$withThreadLocalCaptured$1 at FutureTask.java:264
22/11/28 20:54:56 INFO BlockManagerInfo: Removed broadcast_602_piece0 on c0dd063d47a7:38291 in memory (size: 3.8 KiB, free: 10.6 GiB)
22/11/28 20:55:04 INFO PythonRunner: Times: total = 7526, boot = 61, init = 18, finish = 7447
22/11/28 20:55:04 INFO PythonRunner: Times: total = 7310, boot = 7, init = 9, finish = 7294
22/11/28 20:55:04 INFO Executor: Finished task 0.0 in stage 455.0 (TID 464). 2355 bytes result sent to driver
22/11/28 20:55:04 INFO TaskSetManager: Finished task 0.0 in sta



22/11/28 20:55:04 INFO MemoryStore: Block broadcast_604_piece0 stored as bytes in memory (estimated size 3.2 MiB, free 10.5 GiB)
22/11/28 20:55:04 INFO BlockManagerInfo: Added broadcast_604_piece0 in memory on c0dd063d47a7:38291 (size: 3.2 MiB, free: 10.6 GiB)
22/11/28 20:55:04 INFO SparkContext: Created broadcast 604 from broadcast at DAGScheduler.scala:1513
22/11/28 20:55:04 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 459 (MapPartitionsRDD[475] at showString at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
22/11/28 20:55:04 INFO TaskSchedulerImpl: Adding task set 459.0 with 1 tasks resource profile 0
22/11/28 20:55:04 INFO TaskSetManager: Starting task 0.0 in stage 459.0 (TID 468) (c0dd063d47a7, executor driver, partition 0, NODE_LOCAL, 4472 bytes) taskResourceAssignments Map()
22/11/28 20:55:04 INFO Executor: Running task 0.0 in stage 459.0 (TID 468)
22/11/28 20:55:04 INFO ShuffleBlockFetcherIterator: Getting 1 (2.9 MiB) non-empty 

[Stage 459:>                                                        (0 + 1) / 1]

22/11/28 20:55:05 INFO Executor: Finished task 0.0 in stage 459.0 (TID 468). 4398 bytes result sent to driver
22/11/28 20:55:05 INFO TaskSetManager: Finished task 0.0 in stage 459.0 (TID 468) in 671 ms on c0dd063d47a7 (executor driver) (1/1)
22/11/28 20:55:05 INFO TaskSchedulerImpl: Removed TaskSet 459.0, whose tasks have all completed, from pool 
22/11/28 20:55:05 INFO DAGScheduler: ResultStage 459 (showString at NativeMethodAccessorImpl.java:0) finished in 0.707 s
22/11/28 20:55:05 INFO DAGScheduler: Job 313 is finished. Cancelling potential speculative or zombie tasks for this job
22/11/28 20:55:05 INFO TaskSchedulerImpl: Killing all running tasks in stage 459: Stage finished
22/11/28 20:55:05 INFO DAGScheduler: Job 313 finished: showString at NativeMethodAccessorImpl.java:0, took 0.708852 s
22/11/28 20:55:05 INFO CodeGenerator: Code generated in 8.231205 ms
+----------------+--------------------+------------+
|              id|         probability|       toxic|
+----------------+--

                                                                                

22/11/28 20:55:05 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/11/28 20:55:05 INFO MemoryStore: Block broadcast_605 stored as values in memory (estimated size 4.1 MiB, free 10.5 GiB)
22/11/28 20:55:05 INFO MemoryStore: Block broadcast_605_piece0 stored as bytes in memory (estimated size 1442.7 KiB, free 10.5 GiB)
22/11/28 20:55:05 INFO BlockManagerInfo: Added broadcast_605_piece0 in memory on c0dd063d47a7:38291 (size: 1442.7 KiB, free: 10.6 GiB)
22/11/28 20:55:05 INFO SparkContext: Created broadcast 605 from broadcast at DAGScheduler.scala:1513
22/11/28 20:55:05 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 460 (MapPartitionsRDD[485] at treeAggregate at Summarizer.scala:233) (first 15 tasks are for partitions Vector(0, 1))
22/11/28 20:55:05 INFO TaskSchedulerImpl: Adding task set 460.0 with 2 tasks resource profile 0
22/11/28 20:55:05 WARN TaskSetManager: Stage 460 contains a task of very large size (34160 KiB). The maximum recommended task si

[Stage 460:>                                                        (0 + 2) / 2]

22/11/28 20:55:06 INFO BlockManagerInfo: Removed broadcast_604_piece0 on c0dd063d47a7:38291 in memory (size: 3.2 MiB, free: 10.6 GiB)
22/11/28 20:55:16 INFO PythonRunner: Times: total = 7836, boot = -10453, init = 10461, finish = 7828
22/11/28 20:55:16 INFO PythonRunner: Times: total = 7457, boot = -10366, init = 10394, finish = 7429
22/11/28 20:55:16 INFO MemoryStore: Block taskresult_469 stored as bytes in memory (estimated size 6.0 MiB, free 10.5 GiB)
22/11/28 20:55:16 INFO BlockManagerInfo: Added taskresult_469 in memory on c0dd063d47a7:38291 (size: 6.0 MiB, free: 10.6 GiB)
22/11/28 20:55:16 INFO Executor: Finished task 0.0 in stage 460.0 (TID 469). 6324364 bytes result sent via BlockManager)
22/11/28 20:55:16 INFO MemoryStore: Block taskresult_470 stored as bytes in memory (estimated size 6.0 MiB, free 10.5 GiB)
22/11/28 20:55:16 INFO BlockManagerInfo: Added taskresult_470 in memory on c0dd063d47a7:38291 (size: 6.0 MiB, free: 10.6 GiB)
22/11/28 20:55:16 INFO Executor: Finished tas

                                                                                

22/11/28 20:55:16 INFO BlockManagerInfo: Removed broadcast_605_piece0 on c0dd063d47a7:38291 in memory (size: 1442.7 KiB, free: 10.6 GiB)
22/11/28 20:55:16 WARN TaskSetManager: Stage 461 contains a task of very large size (34160 KiB). The maximum recommended task size is 1000 KiB.
22/11/28 20:55:16 INFO TaskSetManager: Starting task 0.0 in stage 461.0 (TID 471) (c0dd063d47a7, executor driver, partition 0, PROCESS_LOCAL, 34980828 bytes) taskResourceAssignments Map()
22/11/28 20:55:16 INFO TaskSetManager: Starting task 1.0 in stage 461.0 (TID 472) (c0dd063d47a7, executor driver, partition 1, PROCESS_LOCAL, 34469651 bytes) taskResourceAssignments Map()
22/11/28 20:55:16 INFO Executor: Running task 1.0 in stage 461.0 (TID 472)
22/11/28 20:55:16 INFO Executor: Running task 0.0 in stage 461.0 (TID 471)
22/11/28 20:55:16 INFO BlockManagerInfo: Removed broadcast_601_piece0 on c0dd063d47a7:38291 in memory (size: 3.2 MiB, free: 10.6 GiB)
22/11/28 20:55:16 INFO BlockManagerInfo: Removed broadcast_

[Stage 461:>                                                        (0 + 2) / 2]

22/11/28 20:55:26 INFO PythonRunner: Times: total = 7690, boot = -13937, init = 13940, finish = 7687
22/11/28 20:55:26 INFO MemoryStore: Block rdd_487_1 stored as values in memory (estimated size 44.1 MiB, free 10.5 GiB)
22/11/28 20:55:26 INFO BlockManagerInfo: Added rdd_487_1 in memory on c0dd063d47a7:38291 (size: 44.1 MiB, free: 10.6 GiB)
22/11/28 20:55:26 INFO MemoryStore: Block taskresult_472 stored as bytes in memory (estimated size 2.0 MiB, free 10.5 GiB)
22/11/28 20:55:26 INFO BlockManagerInfo: Added taskresult_472 in memory on c0dd063d47a7:38291 (size: 2.0 MiB, free: 10.6 GiB)
22/11/28 20:55:26 INFO Executor: Finished task 1.0 in stage 461.0 (TID 472). 2109350 bytes result sent via BlockManager)
22/11/28 20:55:26 INFO PythonRunner: Times: total = 7154, boot = -14046, init = 14053, finish = 7147
22/11/28 20:55:26 INFO MemoryStore: Block rdd_487_0 stored as values in memory (estimated size 44.5 MiB, free 10.4 GiB)
22/11/28 20:55:26 INFO TaskSetManager: Finished task 1.0 in stage 

                                                                                

22/11/28 20:55:26 INFO MemoryStore: Block taskresult_471 stored as bytes in memory (estimated size 2.0 MiB, free 10.4 GiB)
22/11/28 20:55:26 INFO BlockManagerInfo: Added taskresult_471 in memory on c0dd063d47a7:38291 (size: 2.0 MiB, free: 10.5 GiB)
22/11/28 20:55:26 INFO Executor: Finished task 0.0 in stage 461.0 (TID 471). 2109350 bytes result sent via BlockManager)
22/11/28 20:55:26 INFO TaskSetManager: Finished task 0.0 in stage 461.0 (TID 471) in 10082 ms on c0dd063d47a7 (executor driver) (2/2)
22/11/28 20:55:26 INFO TaskSchedulerImpl: Removed TaskSet 461.0, whose tasks have all completed, from pool 
22/11/28 20:55:26 INFO BlockManagerInfo: Removed taskresult_471 on c0dd063d47a7:38291 in memory (size: 2.0 MiB, free: 10.5 GiB)
22/11/28 20:55:26 INFO DAGScheduler: ResultStage 461 (treeAggregate at RDDLossFunction.scala:61) finished in 10.107 s
22/11/28 20:55:26 INFO DAGScheduler: Job 315 is finished. Cancelling potential speculative or zombie tasks for this job
22/11/28 20:55:26 INFO

AnalysisException: Reference 'probability' is ambiguous, could be: probability, probability.