In [1]:
import findspark

In [2]:
findspark.init('/home/coffey/spark-2.2.0-bin-hadoop2.7')

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('nlp').getOrCreate()

In [5]:
data = spark.read.csv('smsspamcollection/SMSSpamCollection', inferSchema=True, sep='\t')

In [6]:
data.describe().show()

+-------+----+--------------------+
|summary| _c0|                 _c1|
+-------+----+--------------------+
|  count|5574|                5574|
|   mean|null|               645.0|
| stddev|null|                 NaN|
|    min| ham| &lt;#&gt;  in mc...|
|    max|spam|… we r stayin her...|
+-------+----+--------------------+



In [7]:
data.show(5)

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
+----+--------------------+
only showing top 5 rows



# 1. data processing

1.give a name to the header

In [8]:
data = data.withColumnRenamed('_c0', 'label').withColumnRenamed('_c1', 'text')

In [9]:
data.show(4)

+-----+--------------------+
|label|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
+-----+--------------------+
only showing top 4 rows



2.calcuate length of text column

In [10]:
from pyspark.sql.functions import length

In [11]:
data = data.withColumn('length', length(data['text']))

In [12]:
data.show(5)

+-----+--------------------+------+
|label|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
+-----+--------------------+------+
only showing top 5 rows



In [13]:
data.groupBy('label').mean().show()     #==> it shows that ham and spam do have different length. We could consider length of text as a features!

+-----+-----------------+
|label|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



3.create preprocessing 

In [14]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer, VectorAssembler

In [15]:
tokenizer = Tokenizer(inputCol='text', outputCol='token_text')
stop_word_remover = StopWordsRemover(inputCol='token_text', outputCol='no_stop_word')
c_vec = CountVectorizer(inputCol='no_stop_word', outputCol='c_vec')
idf = IDF(inputCol='c_vec', outputCol='idf')

In [16]:
assembler = VectorAssembler(inputCols=['idf', 'length'], outputCol='features')

4.dummy label

In [17]:
dummy = StringIndexer(inputCol='label', outputCol='d_label')

5. pipeline and processing data

In [18]:
from pyspark.ml import Pipeline

In [19]:
pipe = Pipeline(stages=[tokenizer, stop_word_remover, c_vec, idf, assembler, dummy])

In [20]:
my_data = pipe.fit(data).transform(data)

In [21]:
my_data.show(3)

+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-------+
|label|                text|length|          token_text|        no_stop_word|               c_vec|                 idf|            features|d_label|
+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-------+
|  ham|Go until jurong p...|   111|[go, until, juron...|[go, jurong, poin...|(13423,[7,11,31,6...|(13423,[7,11,31,6...|(13424,[7,11,31,6...|    0.0|
|  ham|Ok lar... Joking ...|    29|[ok, lar..., joki...|[ok, lar..., joki...|(13423,[0,24,297,...|(13423,[0,24,297,...|(13424,[0,24,297,...|    0.0|
| spam|Free entry in 2 a...|   155|[free, entry, in,...|[free, entry, 2, ...|(13423,[2,13,19,3...|(13423,[2,13,19,3...|(13424,[2,13,19,3...|    1.0|
+-----+--------------------+------+--------------------+--------------------+--------------------+--------

In [22]:
final_data = my_data.select('features', 'd_label')

In [23]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- d_label: double (nullable = true)



# 2.Implemented models

1.try classification ML

In [24]:
from pyspark.ml.classification import NaiveBayes, LogisticRegression, DecisionTreeClassifier, RandomForestClassifier

In [25]:
train, test = final_data.randomSplit([0.7,0.3])

In [26]:
nb = NaiveBayes(labelCol='d_label')
lgr = LogisticRegression(labelCol='d_label')
dt = DecisionTreeClassifier(labelCol='d_label')
rfc = RandomForestClassifier(labelCol='d_label')

In [59]:
nb_result = nb.fit(train).transform(test)

In [60]:
lgr_result = lgr.fit(train).transform(test)

In [None]:
# dt_result = dt.fit(final_data).transform(final_data)   #==>not working

In [None]:
# rfc_result = rfc.fit(final_data).transform(final_data)  #==>not working

In [61]:
nb_result.show(5)

+--------------------+-------+--------------------+--------------------+----------+
|            features|d_label|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(13424,[0,1,2,7,8...|    0.0|[-807.48506073419...|[1.0,1.9668160501...|       0.0|
|(13424,[0,1,2,13,...|    0.0|[-610.11235173519...|[1.0,1.1791107993...|       0.0|
|(13424,[0,1,2,15,...|    1.0|[-1154.4341765950...|[2.77112562593244...|       1.0|
|(13424,[0,1,2,15,...|    1.0|[-1165.9539841518...|[2.24334612273477...|       1.0|
|(13424,[0,1,2,20,...|    1.0|[-1292.7272002255...|[1.83595289374146...|       1.0|
+--------------------+-------+--------------------+--------------------+----------+
only showing top 5 rows



2. try clustering ML

In [57]:
from pyspark.ml.clustering import KMeans

In [62]:
kmeans = KMeans(k=2)

In [63]:
kmeans_result = kmeans.fit(final_data).transform(final_data)

In [69]:
kmeans_result.show(3)

+--------------------+-------+----------+
|            features|d_label|prediction|
+--------------------+-------+----------+
|(13424,[7,11,31,6...|    0.0|         1|
|(13424,[0,24,297,...|    0.0|         0|
|(13424,[2,13,19,3...|    1.0|         1|
+--------------------+-------+----------+
only showing top 3 rows



In [71]:
kmeans_result.printSchema()

root
 |-- features: vector (nullable = true)
 |-- d_label: double (nullable = true)
 |-- prediction: integer (nullable = true)



In [72]:
nb_result.printSchema()

root
 |-- features: vector (nullable = true)
 |-- d_label: double (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [77]:
#change kmeans_result 'prediction' col from Integer to Double
from pyspark.sql.types import DoubleType

kmeans_r = kmeans_result.withColumn("d_prediction", kmeans_result["prediction"].cast(DoubleType()))


In [81]:
kmeans_r.printSchema()

root
 |-- features: vector (nullable = true)
 |-- d_label: double (nullable = true)
 |-- prediction: integer (nullable = true)
 |-- d_prediction: double (nullable = true)



# 3. evaluated the models

In [64]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [65]:
eval_b = BinaryClassificationEvaluator(labelCol='d_label')
eval_m = MulticlassClassificationEvaluator(labelCol='d_label', metricName='accuracy')

In [66]:
nb_b = eval_b.evaluate(nb_result)
nb_m = eval_m.evaluate(nb_result)

In [67]:
lgr_b = eval_b.evaluate(lgr_result)
lgr_m= eval_m.evaluate(lgr_result)

In [82]:
kmean_eval = MulticlassClassificationEvaluator(predictionCol='d_prediction', labelCol='d_label', metricName='accuracy')
kmean_m= kmean_eval.evaluate(kmeans_r)

In [83]:
print('nb_b: ', nb_b)
print('nb_m: ', nb_m)

print('lgr_b: ', lgr_b)
print('lgr_m: ', lgr_m)

print('kmean_m: ', kmean_m)

nb_b:  0.20049288061336235
nb_m:  0.9142357059509918
lgr_b:  0.975593282219784
lgr_m:  0.9649941656942824
kmean_m:  0.7890204520990313


# Appendix: a little explore different between 3 counts methods.

In [None]:
df = spark.createDataFrame([(1, 'hello world hello people')],['id', 'text'])

In [None]:
df.show()

In [None]:
df.withColumn('length', length(df['text'])).show()   #==>purily len(text)

In [None]:
from pyspark.ml.feature import CountVectorizer, Tokenizer

In [None]:
token = Tokenizer(inputCol='text', outputCol='token_word')

In [None]:
c_vec = CountVectorizer(inputCol='token_word', outputCol='c_vec')

In [None]:
df_token = token.transform(df)

In [None]:
df_token.show()

In [None]:
vec_m = c_vec.fit(df_token)

In [None]:
final = vec_m.transform(df_token)

In [None]:
final.show()

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [None]:
count_token = udf(lambda word: len(word), IntegerType())

In [None]:
df_token.withColumn('token', count_token(col('token_word'))).show()