In [173]:
from pyspark.sql.functions import length
from pyspark.sql import udf
from pyspark.sql.types import ArrayType, StringType
import re
sql = SQLContext(sc)

In [244]:
sms = sql.read.format('com.databricks.spark.csv').option('delimiter', '\t').load('spam')

In [245]:
sms = sms.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')
sms = sms.withColumn('length', length(sms['text']))
sms.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [246]:
sms.createOrReplaceTempView('sms')
sql.sql('select class, count(*) from sms group by class').show()

+-----+--------+
|class|count(1)|
+-----+--------+
|  ham|    4827|
| spam|     747|
+-----+--------+



In [247]:
def remove_punctuation(text):
    
    text=text.lower().strip()
    text=re.sub("[^a-zA-Z]", " ", text).split()
    
    return text

spark.udf.register("remove_punctuation", remove_punctuation, ArrayType(StringType()))

<function __main__.remove_punctuation>

In [248]:
sms = sql.sql('select *, remove_punctuation(text) as token_text from sms')
sms.createOrReplaceTempView('sms')
sms.show()

+-----+--------------------+------+--------------------+
|class|                text|length|          token_text|
+-----+--------------------+------+--------------------+
|  ham|Go until jurong p...|   111|[go, until, juron...|
|  ham|Ok lar... Joking ...|    29|[ok, lar, joking,...|
| spam|Free entry in 2 a...|   155|[free, entry, in,...|
|  ham|U dun say so earl...|    49|[u, dun, say, so,...|
|  ham|Nah I don't think...|    61|[nah, i, don, t, ...|
| spam|FreeMsg Hey there...|   147|[freemsg, hey, th...|
|  ham|Even my brother i...|    77|[even, my, brothe...|
|  ham|As per your reque...|   160|[as, per, your, r...|
| spam|WINNER!! As a val...|   157|[winner, as, a, v...|
| spam|Had your mobile 1...|   154|[had, your, mobil...|
|  ham|I'm gonna be home...|   109|[i, m, gonna, be,...|
| spam|SIX chances to wi...|   136|[six, chances, to...|
| spam|URGENT! You have ...|   155|[urgent, you, hav...|
|  ham|I've been searchi...|   196|[i, ve, been, sea...|
|  ham|I HAVE A DATE ON ...|   

In [249]:
training_ham, testing_ham = sql.sql('select * from sms where class="ham"').randomSplit([0.8, 0.2], seed = 10)
training_spam, testing_spam = sql.sql('select * from sms where class="spam"').randomSplit([0.8, 0.2], seed = 10)
training = training_ham.unionAll(training_spam)
testing = training_spam.unionAll(testing_spam)

In [250]:
from pyspark.ml.feature import StopWordsRemover, Tokenizer, StringIndexer, VectorAssembler, IDF, CountVectorizer, Word2Vec
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, LinearSVC, NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [258]:
stopremove = StopWordsRemover(inputCol='token_text', outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens', outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
ham_spam_to_num = StringIndexer(inputCol='class', outputCol='label')
clean_up = VectorAssembler(inputCols=['tf_idf','length'], outputCol='features')


lr = LogisticRegression()
svm = LinearSVC()
dtc = DecisionTreeClassifier()
nb = NaiveBayes()

In [234]:
def get_pipeline(preprocessing_pipeline, model):
    
    return Pipeline(stages = preprocessing_pipeline + [model])


def evaluate(training, testing, preprocessing_pipeline, model):
    
    model = get_pipeline(preprocessing_pipeline, model).fit(training)
    prediction = model.transform(testing)
    acc = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy').evaluate(prediction)
    f1 = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1').evaluate(prediction)
    print(f'Accuracy : {acc} F1-score : {f1}')

In [202]:
preprocessing_pipeline = [stopremove, count_vec, idf, ham_spam_to_num, clean_up]

In [203]:
evaluate(training, testing, preprocessing_pipeline, lr)

Accuracy : 0.9585006693440429 F1-score : 0.9788106630211896


In [204]:
evaluate(training, testing, preprocessing_pipeline, dtc)

Accuracy : 0.7188755020080321 F1-score : 0.836448598130841


In [205]:
evaluate(training, testing, preprocessing_pipeline, svm)

Accuracy : 0.9571619812583668 F1-score : 0.9781121751025993


In [259]:
evaluate(training, testing, preprocessing_pipeline, nb)

Accuracy : 0.9852744310575636 F1-score : 0.9925826028320972


# Advanced

In [252]:
word2vec = Word2Vec(vectorSize = 300, seed = 10, inputCol = 'token_text', outputCol = 'model')
model = word2vec.fit(sms)
model.getVectors().show()

+---------+--------------------+
|     word|              vector|
+---------+--------------------+
|     buns|[0.00452330429106...|
|  serious|[5.42592722922563...|
|    lover|[0.01670911349356...|
|     rate|[0.01607220619916...|
|     snow|[0.00892466586083...|
|    looks|[-0.0069542448036...|
|locations|[0.01299875974655...|
|    sweet|[0.00539758428931...|
|     used|[0.00302504608407...|
|reference|[0.00619605137035...|
|        e|[0.05535818263888...|
| custcare|[0.01292549166828...|
|     hiya|[0.00621117092669...|
|beautiful|[0.01255329698324...|
|     poly|[0.02419165149331...|
|      ntt|[-0.0131739843636...|
|   sunday|[-0.0098504303023...|
|    funny|[0.01139319501817...|
| birthday|[0.02177138067781...|
|      lik|[0.00580250844359...|
+---------+--------------------+
only showing top 20 rows



In [253]:
vector_assembler = VectorAssembler(inputCols=['model'], outputCol='features')
ham_spam_to_num = StringIndexer(inputCol='class', outputCol='label')
new_pipeline = [model, vector_assembler, ham_spam_to_num]

In [254]:
evaluate(training, testing, new_pipeline, lr)

Accuracy : 0.8835341365461847 F1-score : 0.9381663113006398


In [260]:
evaluate(training, testing, new_pipeline, dtc)

Accuracy : 0.8714859437751004 F1-score : 0.9313304721030042


In [261]:
evaluate(training, testing, new_pipeline, svm)

Accuracy : 0.8406961178045516 F1-score : 0.9134545454545454


In [262]:
### Не вистачає екзамплів для word2vec

# Combined

In [266]:
stopremove = StopWordsRemover(inputCol='token_text', outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens', outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
ham_spam_to_num = StringIndexer(inputCol='class', outputCol='label')
clean_up = VectorAssembler(inputCols=['tf_idf','length', 'model'], outputCol='features')

In [267]:
preprocessing_pipeline = [stopremove, count_vec, idf, ham_spam_to_num, model, clean_up]

In [268]:
evaluate(training, testing, preprocessing_pipeline, lr)

Accuracy : 0.9718875502008032 F1-score : 0.985743380855397


In [269]:
evaluate(training, testing, preprocessing_pipeline, dtc)

Accuracy : 0.9183400267737617 F1-score : 0.9574319609211445


In [270]:
evaluate(training, testing, preprocessing_pipeline, svm)

Accuracy : 0.9772423025435074 F1-score : 0.988490182802979


In [272]:
### Linear SVC - the best.