In [92]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover

spark = SparkSession.builder.appName('Spam detection').getOrCreate()
print('ok')

ok


In [93]:
# baca file
theFile = spark.read.csv('spam_email.csv', inferSchema=True, header=True)
theFile.show(10)

+--------------------+-----+
|               email|label|
+--------------------+-----+
| date wed NUMBER ...|    0|
|martin a posted t...|    0|
|man threatens exp...|    0|
|klez the virus th...|    0|
| in adding cream ...|    0|
| i just had to ju...|    0|
|the scotsman NUMB...|    0|
|martin adamson wr...|    0|
|the scotsman thu ...|    0|
|i have been tryin...|    0|
+--------------------+-----+
only showing top 10 rows



In [94]:
filterData = theFile.filter((theFile['label'] != 0))
filterData.show(10)

+--------------------+-----+
|               email|label|
+--------------------+-----+
| save up to NUMBE...|    1|
|NUMBER fight the ...|    1|
|NUMBER fight the ...|    1|
| adult club offer...|    1|
|i thought you mig...|    1|
|a powerhouse gift...|    1|
|help wanted we ar...|    1|
| hyperlink life c...|    1|
|tired of the bull...|    1|
|dear ricardoNUMBE...|    1|
+--------------------+-----+
only showing top 10 rows



In [96]:
# latih data
bagiData = theFile.randomSplit([0.7, 0.3])
Trdata = bagiData[0] # data latih
TeData = bagiData[1] # data testing

print(f'Jumlah Data Training : {Trdata.count()}\nJumlah Data Testing : {TeData.count()}')

# pecah kata
tokenizer = Tokenizer(inputCol="email", outputCol="kataPecah")
tokenizerLatih = tokenizer.transform(Trdata)
tokenizerLatih.show(n=10)

Jumlah Data Training : 2106
Jumlah Data Testing : 894
+--------------------+-----+--------------------+
|               email|label|           kataPecah|
+--------------------+-----+--------------------+
| NUMBER NUMBER NU...|    1|[, number, number...|
| NUMBER NUMBER an...|    1|[, number, number...|
| NUMBER NUMBER ho...|    1|[, number, number...|
| NUMBER hits here...|    0|[, number, hits, ...|
| NUMBER in the ma...|    0|[, number, in, th...|
| NUMBER minutes i...|    0|[, number, minute...|
|                URL |    0|             [, url]|
| URL act dtl open...|    0|[, url, act, dtl,...|
| URL additional c...|    0|[, url, additiona...|
| URL additional c...|    0|[, url, additiona...|
+--------------------+-----+--------------------+
only showing top 10 rows



In [97]:
stopWord = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="Kata Berarti")
stopWordLatih = stopWord.transform(tokenizerLatih)
stopWordLatih.show(truncate=True, n=10)

+--------------------+-----+--------------------+--------------------+
|               email|label|           kataPecah|        Kata Berarti|
+--------------------+-----+--------------------+--------------------+
| NUMBER NUMBER NU...|    1|[, number, number...|[, number, number...|
| NUMBER NUMBER an...|    1|[, number, number...|[, number, number...|
| NUMBER NUMBER ho...|    1|[, number, number...|[, number, number...|
| NUMBER hits here...|    0|[, number, hits, ...|[, number, hits, ...|
| NUMBER in the ma...|    0|[, number, in, th...|[, number, main, ...|
| NUMBER minutes i...|    0|[, number, minute...|[, number, minute...|
|                URL |    0|             [, url]|             [, url]|
| URL act dtl open...|    0|[, url, act, dtl,...|[, url, act, dtl,...|
| URL additional c...|    0|[, url, additiona...|[, url, additiona...|
| URL additional c...|    0|[, url, additiona...|[, url, additiona...|
+--------------------+-----+--------------------+--------------------+
only s

In [103]:
# menghapus kata menjadi numerik
hashingTF = HashingTF(inputCol=stopWord.getOutputCol(), outputCol='Hasil')
dataAngka = hashingTF.transform(stopWordLatih).select(
    'label', 'KataPecah', 'Kata Berarti', 'Hasil'
)
dataAngka.show(truncate=True, n=5)

+-----+--------------------+--------------------+--------------------+
|label|           KataPecah|        Kata Berarti|               Hasil|
+-----+--------------------+--------------------+--------------------+
|    1|[, number, number...|[, number, number...|(262144,[21622,13...|
|    1|[, number, number...|[, number, number...|(262144,[4475,140...|
|    1|[, number, number...|[, number, number...|(262144,[535,4214...|
|    0|[, number, hits, ...|[, number, hits, ...|(262144,[14376,21...|
|    0|[, number, in, th...|[, number, main, ...|(262144,[8618,978...|
+-----+--------------------+--------------------+--------------------+
only showing top 5 rows



In [99]:
lr = LogisticRegression(labelCol="label", featuresCol="Hasil", maxIter=10, regParam=0.01)
model = lr.fit(dataAngka)
print("Training is Done")

Training is Done


In [104]:
# stemming
tokenizedTes = tokenizer.transform(TeData)
swRemovedTest = stopWord.transform(tokenizedTes)
tesAngka = hashingTF.transform(swRemovedTest).select('label', 'Kata Berarti', 'Hasil')
# tesAngka.show(5)
tesAngka.tail(num=1)

[Row(label=1, Kata Berarti=['stumbling', 'greatest', 'way', 'marketing', 'century', 'undoubtedly', 'direct', 'e', 'mail', 'similar', 'postman', 'delivering', 'letter', 'mailbox', 'ability', 'promote', 'product', 'service', 'website', 'mlm', 'network', 'marketing', 'opportunity', 'millions', 'instantly', 'advertisers', 'dreaming', 'number', 'years', 'e', 'mail', 'one', 'page', 'promotion', 'list', 'general', 'addresses', 'greatest', 'part', 'completely', 'affordable', 'e', 'mail', 'marketing', 'answer', 'know', 'know', 'exactly', 'proven', 'fact', 'attract', 'new', 'business', 'direct', 'e', 'mail', 'marketing', 'profits', 'e', 'mail', 'advertising', 'generate', 'amazing', 'living', 'proof', 'direct', 'e', 'mail', 'internet', 'advertising', 'company', 'clients', 'pay', 'us', 'thousands', 'dollars', 'week', 'e', 'mail', 'products', 'services', 'want', 'one', 'spending', 'thousands', 'direct', 'email', 'marketing', 'campane', 'testing', 'market', 'see', 'works', 'standard', 'pricing', 'pr

In [129]:
prediction = model.transform(tesAngka)
predictionFinal = prediction.select("Kata Berarti", "prediction", "label")
predictionFinal.tail(num=2)
correctPrediction = predictionFinal.filter(predictionFinal['prediction'] == predictionFinal['label']).count()
totalData = predictionFinal.count()
print("correct prediction:", correctPrediction, ", total data:", totalData, ", accuracy:", correctPrediction/totalData)

Py4JJavaError: An error occurred while calling o3906.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 282.0 failed 1 times, most recent failure: Lost task 0.0 in stage 282.0 (TID 355) (LAPTOP-CKIH6PDK executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (Tokenizer$$Lambda$3630/0x0000000801fab280: (string) => array<string>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:217)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: java.lang.NullPointerException

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (Tokenizer$$Lambda$3630/0x0000000801fab280: (string) => array<string>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:217)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: java.lang.NullPointerException
