# Spark NLP

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.ml.feature import (Tokenizer, RegexTokenizer, StopWordsRemover, StringIndexer, VectorAssembler,
                                NGram, HashingTF, IDF, CountVectorizer)
from pyspark.sql.functions import col, udf, length
from pyspark.sql.types import IntegerType
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

spark = SparkSession.builder.appName('nlp').getOrCreate()

In [2]:
PATH = os.path.join(os.getcwd(),"Python-and-Spark-for-Big-Data-master/Spark_for_Machine_Learning/Natural_Language_Processing/")

spam = spark.read.csv(os.path.join(PATH,'smsspamcollection/SMSSpamCollection'),inferSchema=True,sep='\t')

In [3]:
sen_df = spark.createDataFrame([(0,'Hi I heard about Spark'),
                               (1,'I wish Java could use case classes'),
                               (2,'Logistic, regression,models,are,neat')],
                               ['id','sentence'])

In [4]:
sen_df.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|Hi I heard about ...|
|  1|I wish Java could...|
|  2|Logistic, regress...|
+---+--------------------+



In [5]:
tokenizer = Tokenizer(inputCol='sentence',outputCol='words')

In [6]:
regex_tokenizer = RegexTokenizer(inputCol='sentence',outputCol='words',pattern='\\W')

In [7]:
count_tokens = udf(lambda words:len(words),IntegerType())

In [8]:
tokenized = tokenizer.transform(sen_df)

In [9]:
tokenized.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|
|  1|I wish Java could...|[i, wish, java, c...|
|  2|Logistic, regress...|[logistic,, regre...|
+---+--------------------+--------------------+



In [10]:
tokenized.withColumn('tokens',count_tokens(col('words'))).show()

+---+--------------------+--------------------+------+
| id|            sentence|               words|tokens|
+---+--------------------+--------------------+------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|     5|
|  1|I wish Java could...|[i, wish, java, c...|     7|
|  2|Logistic, regress...|[logistic,, regre...|     2|
+---+--------------------+--------------------+------+



In [11]:
rg_tokenized = regex_tokenizer.transform(sen_df)

In [12]:
rg_tokenized.withColumn('tokens',count_tokens(col('words'))).show()

+---+--------------------+--------------------+------+
| id|            sentence|               words|tokens|
+---+--------------------+--------------------+------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|     5|
|  1|I wish Java could...|[i, wish, java, c...|     7|
|  2|Logistic, regress...|[logistic, regres...|     5|
+---+--------------------+--------------------+------+



In [13]:
sentenceDataFrame = spark.createDataFrame([(0,['I','saw','the','green','horse']),
                                          (1,['Mary','had','a','little','lamb'])],['id','tokens'])

In [14]:
remover = StopWordsRemover(inputCol='tokens',outputCol='filtered')

In [15]:
remover.transform(sentenceDataFrame).show()

+---+--------------------+--------------------+
| id|              tokens|            filtered|
+---+--------------------+--------------------+
|  0|[I, saw, the, gre...| [saw, green, horse]|
|  1|[Mary, had, a, li...|[Mary, little, lamb]|
+---+--------------------+--------------------+



In [16]:
wordDataFrame = spark.createDataFrame([(0,['Hi', 'I', 'heard', 'about', 'Spark']),
                               (1,['I', 'wish', 'Java', 'could', 'use', 'case', 'classes']),
                               (2,['Logistic', 'regression','models','are','neat'])],
                               ['id','words'])

In [17]:
ngram = NGram(n=2,inputCol='words',outputCol='grams')

In [18]:
ngram.transform(wordDataFrame).select('grams').show(truncate=False)

+------------------------------------------------------------------+
|grams                                                             |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



In [19]:
sentenceData = spark.createDataFrame([(0,'Hi I heard about Spark'),
                               (1,'I wish Java could use case classes'),
                               (2,'Logistic regression models are neat')],
                               ['label','sentence'])

In [20]:
sentenceData.show()

+-----+--------------------+
|label|            sentence|
+-----+--------------------+
|    0|Hi I heard about ...|
|    1|I wish Java could...|
|    2|Logistic regressi...|
+-----+--------------------+



In [21]:
tokenizer = Tokenizer(inputCol='sentence',outputCol='words')

In [22]:
words_data = tokenizer.transform(sentenceData)

In [23]:
words_data.show()

+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|    0|Hi I heard about ...|[hi, i, heard, ab...|
|    1|I wish Java could...|[i, wish, java, c...|
|    2|Logistic regressi...|[logistic, regres...|
+-----+--------------------+--------------------+



In [24]:
hashing_tf = HashingTF(inputCol='words',outputCol='rawFeatures')

In [25]:
featurized_data = hashing_tf.transform(words_data)

In [26]:
idf = IDF(inputCol='rawFeatures',outputCol='features')

In [27]:
idf_model = idf.fit(featurized_data)

In [28]:
rescaled_data = idf_model.transform(featurized_data)

In [29]:
rescaled_data.select('label','features').show(truncate=False)

+-----+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                                                                                        |
+-----+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0    |(262144,[24417,49304,73197,91137,234657],[0.28768207245178085,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453])                                                     |
|1    |(262144,[20719,24417,55551,116873,147765,162369,192310],[0.6931471805599453,0.28768207245178085,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.693147180559945

In [30]:
df = spark.createDataFrame([(0,'a b c'.split(' ')),
                           (1, 'a b b c a'.split(' '))],['id','words'])

In [31]:
df.show()

+---+---------------+
| id|          words|
+---+---------------+
|  0|      [a, b, c]|
|  1|[a, b, b, c, a]|
+---+---------------+



In [32]:
cv = CountVectorizer(inputCol='words',outputCol='features',vocabSize=3,minDF=2.0)

In [33]:
model = cv.fit(df)

In [34]:
result = model.transform(df)

In [35]:
result.show(truncate=False)

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+



### Spam Detection Filter

In [36]:
spam.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [37]:
spam.head(1)

[Row(_c0='ham', _c1='Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...')]

In [38]:
spam = spam.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')

In [39]:
spam.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [40]:
spam = spam.withColumn('length',length(spam['text']))

In [41]:
spam.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [42]:
spam.groupBy('class').mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [43]:
tokenizer = Tokenizer(inputCol='text',outputCol='token_text')
stop_remove = StopWordsRemover(inputCol='token_text',outputCol='stop_token')
count_vec = CountVectorizer(inputCol='stop_token',outputCol='c_vec')
idf = IDF(inputCol='c_vec',outputCol='tf_idf')
ham_spam_to_numeric = StringIndexer(inputCol='class',outputCol='label')

In [44]:
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

In [45]:
nb = NaiveBayes()

In [46]:
data_prep_pipe = Pipeline(stages=[ham_spam_to_numeric,tokenizer,
                                 stop_remove, count_vec, idf, clean_up])

In [47]:
cleaner = data_prep_pipe.fit(spam)

In [48]:
clean_data = cleaner.transform(spam)

In [49]:
clean_data = clean_data.select('label','features')

In [50]:
clean_data.columns

['label', 'features']

In [51]:
clean_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
|  1.0|(13424,[10,60,139...|
|  0.0|(13424,[10,53,103...|
|  0.0|(13424,[125,184,4...|
|  1.0|(13424,[1,47,118,...|
|  1.0|(13424,[0,1,13,27...|
|  0.0|(13424,[18,43,120...|
|  1.0|(13424,[8,17,37,8...|
|  1.0|(13424,[13,30,47,...|
|  0.0|(13424,[39,96,217...|
|  0.0|(13424,[552,1697,...|
|  1.0|(13424,[30,109,11...|
|  0.0|(13424,[82,214,47...|
|  0.0|(13424,[0,2,49,13...|
|  0.0|(13424,[0,74,105,...|
|  1.0|(13424,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



In [52]:
train, test = clean_data.randomSplit([0.7,0.3])

In [53]:
spam_detector = nb.fit(train)

In [54]:
spam.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)



In [55]:
test_results = spam_detector.transform(test)

In [56]:
test_results.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,2,7,8...|[-811.10558356651...|[1.0,4.2767449559...|       0.0|
|  0.0|(13424,[0,1,2,13,...|[-620.17928478651...|[1.0,6.6970250979...|       0.0|
|  0.0|(13424,[0,1,3,9,1...|[-570.87052342135...|[1.0,4.6355763272...|       0.0|
|  0.0|(13424,[0,1,5,15,...|[-1003.7691946558...|[1.0,3.9084969417...|       0.0|
|  0.0|(13424,[0,1,5,20,...|[-816.07389773219...|[1.0,1.3043065624...|       0.0|
|  0.0|(13424,[0,1,7,8,1...|[-1169.1002896603...|[1.0,1.1191998369...|       0.0|
|  0.0|(13424,[0,1,15,20...|[-669.18081856553...|[1.0,1.9325067057...|       0.0|
|  0.0|(13424,[0,1,21,27...|[-749.50545525486...|[1.0,1.4288687930...|       0.0|
|  0.0|(13424,[0,1,21,27...|[-1025.4950937652...|[1.0,8.6710465536...|       0.0|
|  0.0|(13424,[0

In [57]:
acc_eval = MulticlassClassificationEvaluator()

In [58]:
acc = acc_eval.evaluate(test_results)

In [59]:
print(f"ACC of NB Model: {acc}")

ACC of NB Model: 0.9261285106601144
