In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DecisionTree").getOrCreate()

# Load data from CSV file
df = spark.read.format("csv").option("header", "true").load("clean_2.csv")

# Convert categorical variables to numerical using StringIndexer
from pyspark.ml.feature import StringIndexer

source_indexer = StringIndexer(inputCol="SourceName", outputCol="SourceNameIndex")
state_indexer = StringIndexer(inputCol="StateCode", outputCol="StateCodeIndex")
tobacco_indexer = StringIndexer(inputCol="Tobacco", outputCol="TobaccoIndex")
plan_indexer = StringIndexer(inputCol="PlanId", outputCol="PlanIdIndex")
rating_indexer = StringIndexer(inputCol="RatingAreaId", outputCol="RatingAreaIdIndex")

# Apply StringIndexer to data
df = source_indexer.fit(df).transform(df)
df = state_indexer.fit(df).transform(df)
df = tobacco_indexer.fit(df).transform(df)
df = plan_indexer.fit(df).transform(df)
df = rating_indexer.fit(df).transform(df)

# Select features and target variable
features = ['BusinessYear', 'StateCodeIndex', 'SourceNameIndex', 'RatingAreaIdIndex', 'TobaccoIndex', 'Age', 'PrimarySubscriberAndThreeOrMoreDependents', 'tobacco_rate']
target = 'PlanIdIndex'

# Split data into training and testing sets
(training_data, testing_data) = df.select(features + [target]).randomSplit([0.8, 0.2], seed=42)


                                                                                

IllegalArgumentException: features does not exist. Available: BusinessYear, StateCodeIndex, SourceNameIndex, RatingAreaIdIndex, TobaccoIndex, Age, PrimarySubscriberAndThreeOrMoreDependents, tobacco_rate, PlanIdIndex

In [8]:
from pyspark.sql.functions import col

# Convert string columns to numeric data types
training_data = training_data.withColumn("BusinessYear", col("BusinessYear").cast("double"))
testing_data = testing_data.withColumn("BusinessYear", col("BusinessYear").cast("double"))
training_data = training_data.withColumn("Age", col("Age").cast("double"))
testing_data = testing_data.withColumn("Age", col("Age").cast("double"))
training_data = training_data.withColumn("PrimarySubscriberAndThreeOrMoreDependents", col("PrimarySubscriberAndThreeOrMoreDependents").cast("double"))
testing_data = testing_data.withColumn("PrimarySubscriberAndThreeOrMoreDependents", col("PrimarySubscriberAndThreeOrMoreDependents").cast("double"))
training_data = training_data.withColumn("tobacco_rate", col("tobacco_rate").cast("double"))
testing_data = testing_data.withColumn("tobacco_rate", col("tobacco_rate").cast("double"))


from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['BusinessYear', 'StateCodeIndex', 'SourceNameIndex', 'RatingAreaIdIndex', 'TobaccoIndex', 'Age', 'PrimarySubscriberAndThreeOrMoreDependents', 'tobacco_rate'], outputCol='features')

training_data = assembler.transform(training_data)
testing_data = assembler.transform(testing_data)


In [4]:
df

DataFrame[_c0: string, BusinessYear: string, StateCode: string, SourceName: string, PlanId: string, RatingAreaId: string, Tobacco: string, Age: string, IndividualRate: string, PrimarySubscriberAndThreeOrMoreDependents: string, tobacco_rate: string, SourceNameIndex: double, StateCodeIndex: double, TobaccoIndex: double, PlanIdIndex: double, RatingAreaIdIndex: double]

In [10]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a decision tree classifier
dt = DecisionTreeClassifier(labelCol=target, featuresCol="features", maxDepth=24,maxBins=70)

# Fit the model to the training data
model = dt.fit(training_data)

# Make predictions on the testing data
predictions = model.transform(testing_data)

# Evaluate the performance of the model using F1 score
evaluator = MulticlassClassificationEvaluator(labelCol=target, predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions, {evaluator.weightCol: "weightCol"})
print("F1 Score:", f1_score)

[Stage 22:>                                                       (0 + 12) / 12]

23/03/23 17:09:23 WARN MemoryStore: Not enough space to cache rdd_83_7 in memory! (computed 1044.8 KiB so far)
23/03/23 17:09:23 WARN MemoryStore: Not enough space to cache rdd_83_5 in memory! (computed 12.3 MiB so far)
23/03/23 17:09:23 WARN MemoryStore: Not enough space to cache rdd_83_9 in memory! (computed 27.7 MiB so far)
23/03/23 17:09:23 WARN BlockManager: Persisting block rdd_83_7 to disk instead.
23/03/23 17:09:23 WARN BlockManager: Persisting block rdd_83_5 to disk instead.
23/03/23 17:09:23 WARN BlockManager: Persisting block rdd_83_9 to disk instead.
23/03/23 17:09:23 WARN MemoryStore: Not enough space to cache rdd_83_4 in memory! (computed 18.4 MiB so far)
23/03/23 17:09:23 WARN BlockManager: Persisting block rdd_83_4 to disk instead.
23/03/23 17:09:23 WARN MemoryStore: Not enough space to cache rdd_83_8 in memory! (computed 27.7 MiB so far)
23/03/23 17:09:23 WARN MemoryStore: Not enough space to cache rdd_83_6 in memory! (computed 18.4 MiB so far)
23/03/23 17:09:23 WARN B

[Stage 22:====>                                                   (1 + 11) / 12]

23/03/23 17:09:36 WARN MemoryStore: Not enough space to cache rdd_83_4 in memory! (computed 43.8 MiB so far)




23/03/23 17:09:36 WARN MemoryStore: Not enough space to cache rdd_83_5 in memory! (computed 2.4 MiB so far)
23/03/23 17:09:37 WARN MemoryStore: Not enough space to cache rdd_83_0 in memory! (computed 1654.1 KiB so far)
23/03/23 17:09:37 WARN MemoryStore: Not enough space to cache rdd_83_7 in memory! (computed 5.5 MiB so far)
23/03/23 17:09:37 WARN MemoryStore: Not enough space to cache rdd_83_2 in memory! (computed 2.4 MiB so far)
23/03/23 17:09:37 WARN MemoryStore: Not enough space to cache rdd_83_6 in memory! (computed 65.7 MiB so far)
23/03/23 17:09:37 WARN MemoryStore: Not enough space to cache rdd_83_1 in memory! (computed 65.7 MiB so far)
23/03/23 17:09:38 WARN MemoryStore: Not enough space to cache rdd_83_10 in memory! (computed 99.2 MiB so far)


                                                                                

23/03/23 17:09:45 WARN DAGScheduler: Broadcasting large task binary with size 1117.0 KiB


[Stage 24:>                                                       (0 + 12) / 12]

23/03/23 17:09:46 WARN MemoryStore: Not enough space to cache rdd_83_9 in memory! (computed 19.5 MiB so far)
23/03/23 17:09:46 WARN MemoryStore: Not enough space to cache rdd_83_11 in memory! (computed 19.5 MiB so far)
23/03/23 17:09:46 WARN MemoryStore: Not enough space to cache rdd_83_1 in memory! (computed 19.5 MiB so far)
23/03/23 17:09:46 WARN MemoryStore: Not enough space to cache rdd_83_7 in memory! (computed 12.3 MiB so far)
23/03/23 17:09:46 WARN MemoryStore: Not enough space to cache rdd_83_10 in memory! (computed 19.5 MiB so far)
23/03/23 17:09:46 WARN MemoryStore: Not enough space to cache rdd_83_4 in memory! (computed 19.5 MiB so far)
23/03/23 17:09:47 WARN MemoryStore: Not enough space to cache rdd_83_5 in memory! (computed 19.5 MiB so far)
23/03/23 17:09:48 WARN MemoryStore: Not enough space to cache rdd_83_8 in memory! (computed 19.5 MiB so far)
23/03/23 17:09:51 ERROR Executor: Exception in task 8.0 in stage 24.0 (TID 140)
java.lang.OutOfMemoryError: Java heap space
	a

[Stage 24:>                                                        (0 + 3) / 12]

Py4JJavaError: An error occurred while calling o524.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 24.0 failed 1 times, most recent failure: Lost task 8.0 in stage 24.0 (TID 140) (10.0.0.105 executor driver): java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ml.tree.impl.DTStatsAggregator.<init>(DTStatsAggregator.scala:77)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22(RandomForest.scala:651)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22$adapted(RandomForest.scala:647)
	at org.apache.spark.ml.tree.impl.RandomForest$$$Lambda$4477/1649625408.apply(Unknown Source)
	at scala.Array$.tabulate(Array.scala:418)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$21(RandomForest.scala:647)
	at org.apache.spark.ml.tree.impl.RandomForest$$$Lambda$4418/832564158.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.RDD$$Lambda$4135/1420784972.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2537/1082294972.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:738)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:737)
	at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:663)
	at org.apache.spark.ml.tree.impl.RandomForest$.runBagged(RandomForest.scala:208)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:302)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.$anonfun$train$1(DecisionTreeClassifier.scala:135)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:114)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:46)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ml.tree.impl.DTStatsAggregator.<init>(DTStatsAggregator.scala:77)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22(RandomForest.scala:651)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22$adapted(RandomForest.scala:647)
	at org.apache.spark.ml.tree.impl.RandomForest$$$Lambda$4477/1649625408.apply(Unknown Source)
	at scala.Array$.tabulate(Array.scala:418)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$21(RandomForest.scala:647)
	at org.apache.spark.ml.tree.impl.RandomForest$$$Lambda$4418/832564158.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.RDD$$Lambda$4135/1420784972.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2537/1082294972.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


[Stage 24:>                                                        (0 + 2) / 12]

23/03/23 17:09:55 WARN MemoryStore: Not enough space to cache rdd_83_2 in memory! (computed 65.7 MiB so far)
23/03/23 17:09:55 WARN TaskSetManager: Lost task 2.0 in stage 24.0 (TID 134) (10.0.0.105 executor driver): TaskKilled (Stage cancelled)
