In [None]:
!pip install pyspark

In [None]:
import numpy as np
import pandas as pd
df = pd.read_csv('/content/emails.csv')

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('emails').getOrCreate()
df = spark.read.csv('emails.csv', header=True, inferSchema=True)
df.printSchema()

In [None]:
# pd.DataFrame(df.take(5), columns=df.columns).transpose()

In [None]:
df.columns

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
stages=[]
numericCols = ['the','to','for','a','you','hou','is','this','i','your','we','are','com','please','price','attached','th','forward','u','click','unsubscribe','pro','therefore','cc','prize','hi','deadline','ur']
assembler = VectorAssembler(inputCols=numericCols, outputCol="features")
stages += [assembler]

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
df.show()

In [None]:
train, test = df.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 3663
Test Dataset Count: 1509


https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.classification.MultilayerPerceptronClassifier.html

In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
mlp = MultilayerPerceptronClassifier(featuresCol = 'features', labelCol = 'Spam', layers = [ 28, 14, 4, 2 ])
mlpModel = mlp.fit(train)
predictions = mlpModel.transform(test)
predictions

https://towardsdatascience.com/spark-multilayer-perceptron-classifier-for-poi-classification-99e5c68b4a77

In [None]:
result = mlpModel.transform(test)
result.show(10)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol = 'Spam', predictionCol = 'prediction', metricName = 'accuracy')
mlpacc = evaluator.evaluate(result)
mlpacc

0.8502319416832339

In [None]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]