In [30]:
import re
import pyspark
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql import Row
from pyspark.sql.functions import col, split
from pyspark.ml.feature import RegexTokenizer, CountVectorizer, PCA ,StopWordsRemover,StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier,LogisticRegression,NaiveBayes,LinearSVC
from pyspark.sql import functions as F
from pyspark.ml.clustering import KMeans
from pyspark.sql.types import DoubleType

In [65]:
def loading(path):
    # creating spark session
    spark = SparkSession.builder.appName("spooky").master('local[*]').config("spark.executor.memory", '6G').config("spark.driver.memory", '6G').config("spark.driver.maxResultSize", '7G').getOrCreate()
    # loading csv into dataframe
    df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(path)
    return df

In [66]:
def preprocessing(df):
    # droping the used column
    df=df.drop('_c2').drop('_c3').drop('_c4')
    # changing the name of the colmun
    df = df.selectExpr("v1 as class", "v2 as text")
    # removing regrex from label column 
    df=df.withColumn('String_Label', F.regexp_replace('class', '\\W', ''))
    # removing the null value
    df=df.filter(df.text != '')
    return df

In [67]:
def pyspark_lib(df):
    # regex tokenizer
    regexTokenizer = RegexTokenizer(inputCol="text", outputCol="tokenized", pattern="\\W")
    resultantdf=regexTokenizer.transform(df)
    # removal of stop word
    stopwordsRemover = StopWordsRemover(inputCol="tokenized", outputCol="filtered")
    resultantdf=stopwordsRemover.transform(resultantdf)
    #count vectorizer implemetation
    cv = CountVectorizer(inputCol="filtered", outputCol="features")
    model=cv.fit(resultantdf)
    result=model.transform(resultantdf)
    result=result.drop('tokenized').drop('filtered')
    result=result.drop('class')
    #converting the String label to integer label
    indexer = StringIndexer(inputCol="String_Label", outputCol="label")
    indexed = indexer.fit(result).transform(result)
    indexed=indexed.drop('String_Label')
    # Splitting of data set
    (trainingData, testData) = indexed.randomSplit([0.7, 0.3], seed = 100)
    return trainingData,testData

In [69]:
def model_implementation(model,trainingData,testData):
    if (model==1):
        # Using RandomForestClassifier to train the model
        rf = RandomForestClassifier(labelCol="label",featuresCol="features",numTrees = 30,maxDepth = 20)
        # Train model with Training Data
        rfModel = rf.fit(trainingData)
        # Prediction
        predictions = rfModel.transform(testData)
        evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
        evaluator.evaluate(predictions)
    if(model==2):
        # Using Logistic Regression to train the model
        lr=LogisticRegression(labelCol="label",featuresCol="features")
        # train the model 
        lrmodel=lr.fit(trainingData)
        # Prediction
        predictions = rfModel.transform(testData)
        evaluatorlr = MulticlassClassificationEvaluator(predictionCol="prediction")
        evaluatorlr.evaluate(predictions)
    if(model==3):
        # using Naive Bayes to train the model
        nb = NaiveBayes(smoothing=1)
        # train the model
        model = nb.fit(trainingData)
        # prediction
        predictions= model.transform(testData)
        evaluatorlr = MulticlassClassificationEvaluator(predictionCol="prediction")
        evaluatorlr.evaluate(predictions)
    if(model==4):
        # using svm
        lsvc = LinearSVC(maxIter=10, regParam=0.1)
        # Fit the model
        lsvcModel = lsvc.fit(trainingData)
        predictions = lsvcModel.transform(testData)
        evaluatorsvm = MulticlassClassificationEvaluator(predictionCol="prediction")
        evaluatorsvm.evaluate(predictions)
    if(model==5):
        # Trains a k-means model.
        kmeans = KMeans().setK(2).setSeed(1)
        kmodel = kmeans.fit(trainingData)
        predictions=kmodel.transform(testData)
        # converting column datatype from integer to double
        predictions = predictions.withColumn("prediction", predictions["prediction"].cast(DoubleType()))
        evaluatorsvm = MulticlassClassificationEvaluator(predictionCol="prediction")
        evaluatorsvm.evaluate(predictions)

In [None]:
if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=('Trains the model and outputs predictions.'),
        add_help='How to use', prog='pyspark_implementation.py <args>')

    # Required arguments
    parser.add_argument("--data_path", required=True,
                        help=("Provide the path to the data folder"))
    parser.add_argument('--model', type=str, choices=['random_forest', 'logistic_regression', 'navies bayes', 'svm','kmean'], 
    default='unet', help = 'model to use for spam classification')

    args = vars(parser.parse_args())

    # Getting the names of the training / testing files.
    path=args['data_path']
    df=loading(path)
    df=preprocessing(df
    trainingData,testData=pyspark_lib(df)
    model_implementation(model,trainingData,testData)
    
    