# <span style = 'color:#960574;font-family:helvetica'> NLP basiscs with pySpark

In [1]:
import findspark

In [2]:
findspark.init('/home/chandan/spark-3.2.4-bin-hadoop3.2')

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('nlp').getOrCreate()

23/05/21 15:57:54 WARN Utils: Your hostname, chandan-VivoBook-ASUSLaptop-X515MA-X515MA resolves to a loopback address: 127.0.1.1; using 192.168.0.169 instead (on interface wlo1)
23/05/21 15:57:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/21 15:57:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## pySpark tools for Text data

### Spark comes with Tokenziser class and also Regular Expression class

In [5]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer

In [6]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [7]:
sen_df = spark.createDataFrame([
    (0,'Hi I heard about Spark'),
    (1,"I wish Java could use case classes"),
    (2,"Logistic,regression,models,are,neat")
    
],['id','sentence'])

In [8]:
sen_df.show()

                                                                                

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|Hi I heard about ...|
|  1|I wish Java could...|
|  2|Logistic,regressi...|
+---+--------------------+



### We will used tokernizer

In [9]:
tokenizer = Tokenizer(inputCol='sentence', outputCol='words')

In [12]:
regex_tokenziser = RegexTokenizer(inputCol='sentence', outputCol='words', pattern='\\W')

In [13]:
count_tokens = udf(lambda words: len(words), IntegerType())

In [14]:
tokenized = tokenizer.transform(sen_df)

In [15]:
tokenized.show()

[Stage 2:>                                                          (0 + 1) / 1]                                                                                

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|
|  1|I wish Java could...|[i, wish, java, c...|
|  2|Logistic,regressi...|[logistic,regress...|
+---+--------------------+--------------------+



In [16]:
tokenized.withColumn('tokens', count_tokens(col('words'))).show()

                                                                                

+---+--------------------+--------------------+------+
| id|            sentence|               words|tokens|
+---+--------------------+--------------------+------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|     5|
|  1|I wish Java could...|[i, wish, java, c...|     7|
|  2|Logistic,regressi...|[logistic,regress...|     1|
+---+--------------------+--------------------+------+



### We can see tokenizer splits on white spaces

In [17]:
rg_tokenized = regex_tokenziser.transform(sen_df)

In [18]:
rg_tokenized.withColumn('tokens', count_tokens(col('words'))).show()

+---+--------------------+--------------------+------+
| id|            sentence|               words|tokens|
+---+--------------------+--------------------+------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|     5|
|  1|I wish Java could...|[i, wish, java, c...|     7|
|  2|Logistic,regressi...|[logistic, regres...|     5|
+---+--------------------+--------------------+------+



#### With regex_tokenizer we were able to seperate even on comma

### Stop words removal

In [20]:
from pyspark.ml.feature import StopWordsRemover

In [23]:
sentenceDataFrame = spark.createDataFrame([
    (0,['I',"saw","the",'green','horse']),
    (1,["Mary", 'had', 'a', 'little', 'lamb'])
],['id','tokens'])

In [24]:
sentenceDataFrame.show()

+---+--------------------+
| id|              tokens|
+---+--------------------+
|  0|[I, saw, the, gre...|
|  1|[Mary, had, a, li...|
+---+--------------------+



In [25]:
remover = StopWordsRemover(inputCol='tokens',outputCol= 'filtered')

In [26]:
remover.transform(sentenceDataFrame).show()

+---+--------------------+--------------------+
| id|              tokens|            filtered|
+---+--------------------+--------------------+
|  0|[I, saw, the, gre...| [saw, green, horse]|
|  1|[Mary, had, a, li...|[Mary, little, lamb]|
+---+--------------------+--------------------+



## n-gram  is sequence of tokens typically words of an integer

In [27]:
from pyspark.ml.feature import NGram

In [29]:
wordDataFrame = spark.createDataFrame([
    (0,['Hi',"I","heard","about","Spark"]),
    (1,["I","wish","Java",'could', 'use','case','classes']),
    (2,['Logistic',"regression","models", "are",'neat'])
], ["id",'words'])

In [30]:
wordDataFrame.show()

+---+--------------------+
| id|               words|
+---+--------------------+
|  0|[Hi, I, heard, ab...|
|  1|[I, wish, Java, c...|
|  2|[Logistic, regres...|
+---+--------------------+



In [31]:
ngram =NGram(n=2,inputCol='words', outputCol='grams')

In [34]:
ngram.transform(wordDataFrame).select('grams').show(truncate = False)

+------------------------------------------------------------------+
|grams                                                             |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



### n-grams are creating pairs of consicutive words

It helps to build on word relationship

### TF-IDF and Count Vecotrizer

In [35]:
from pyspark.ml.feature import HashingTF,IDF, Tokenizer

In [36]:
sentenceData = spark.createDataFrame([
    (0,'Hi I heard about Spark'),
    (1,"I wish Java could use case classes"),
    (2,"Logistic,regression,models,are,neat")
    
],['id','sentence'])

In [38]:
sentenceData.show(truncate = False)

+---+-----------------------------------+
|id |sentence                           |
+---+-----------------------------------+
|0  |Hi I heard about Spark             |
|1  |I wish Java could use case classes |
|2  |Logistic,regression,models,are,neat|
+---+-----------------------------------+



In [39]:
tokenizer = Tokenizer(inputCol='sentence', outputCol='words')

In [40]:
words_data = tokenizer.transform(sentenceData)

In [41]:
words_data.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|
|  1|I wish Java could...|[i, wish, java, c...|
|  2|Logistic,regressi...|[logistic,regress...|
+---+--------------------+--------------------+



## Term frequency

In [43]:
hashing_tf = HashingTF(inputCol='words', outputCol='rawFeatures')

In [44]:
featurized_data = hashing_tf.transform(words_data)

### Now can appy Inverse document factorizer on hashed data

In [45]:
idf = IDF(inputCol='rawFeatures', outputCol='features')

In [49]:
idf_model = idf.fit(featurized_data)

                                                                                

In [50]:
rescaled_data = idf_model.transform(featurized_data)

In [55]:
rescaled_data.show()

23/05/21 16:39:38 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/05/21 16:39:38 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB


+---+--------------------+--------------------+--------------------+--------------------+
| id|            sentence|               words|         rawFeatures|            features|
+---+--------------------+--------------------+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|(262144,[18700,19...|(262144,[18700,19...|
|  1|I wish Java could...|[i, wish, java, c...|(262144,[19036,20...|(262144,[19036,20...|
|  2|Logistic,regressi...|[logistic,regress...|(262144,[11534],[...|(262144,[11534],[...|
+---+--------------------+--------------------+--------------------+--------------------+



## Countvectorizer

In [56]:
from pyspark.ml.feature import CountVectorizer

In [57]:
df = spark.createDataFrame([
    (0,'a b c'.split(" ")),
    (1,'a b b c a'.split(" "))
], ['id','words'])

In [58]:
df.show()

+---+---------------+
| id|          words|
+---+---------------+
|  0|      [a, b, c]|
|  1|[a, b, b, c, a]|
+---+---------------+



In [59]:
cv = CountVectorizer(inputCol='words', outputCol='features', vocabSize=3, minDF=2.0)

In [60]:
model = cv.fit(df)

                                                                                

In [61]:
results = model.transform(df)

In [62]:
results.show(truncate = False)

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+



# Practical Excerise Spam Detection

In [63]:
data = spark.read.csv("SMSSpamCollection", inferSchema=True, sep='\t')

                                                                                

In [65]:
data = data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')

In [66]:
data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [67]:
from pyspark.sql.functions import length

In [68]:
data = data.withColumn('length', length(data['text']))

In [69]:
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [70]:
data.groupBy('class').mean().show()

[Stage 44:>                                                         (0 + 1) / 1]

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



                                                                                

In [71]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover,CountVectorizer,IDF, StringIndexer

In [74]:
tokenizer = Tokenizer(inputCol='text', outputCol='token_text')
stop_remove = StopWordsRemover(inputCol='token_text', outputCol='stop_token')
count_vec = CountVectorizer(inputCol='stop_token', outputCol='c_vec')
idf = IDF(inputCol='c_vec', outputCol='tf_idf')
ham_spam_to_nummeric = StringIndexer(inputCol='class', outputCol='label')

In [75]:
from pyspark.ml.feature import VectorAssembler

In [76]:
clean_up = VectorAssembler(inputCols=['tf_idf','length'], outputCol='features')

In [77]:
from pyspark.ml.classification import NaiveBayes

In [78]:
nb = NaiveBayes()

In [79]:
from pyspark.ml import Pipeline

In [80]:
data_prep_pipe = Pipeline(stages=[
    ham_spam_to_nummeric, tokenizer, stop_remove, count_vec, idf, clean_up
])

In [82]:
cleaner = data_prep_pipe.fit(data)

                                                                                

In [83]:
cleaned_data = cleaner.transform(data)

In [85]:
cleaned_data = cleaned_data.select('label','features')

In [86]:
cleaned_data.show()

[Stage 55:>                                                         (0 + 1) / 1]                                                                                

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
|  1.0|(13424,[10,60,139...|
|  0.0|(13424,[10,53,103...|
|  0.0|(13424,[125,184,4...|
|  1.0|(13424,[1,47,118,...|
|  1.0|(13424,[0,1,13,27...|
|  0.0|(13424,[18,43,120...|
|  1.0|(13424,[8,17,37,8...|
|  1.0|(13424,[13,30,47,...|
|  0.0|(13424,[39,96,217...|
|  0.0|(13424,[552,1697,...|
|  1.0|(13424,[30,109,11...|
|  0.0|(13424,[82,214,47...|
|  0.0|(13424,[0,2,49,13...|
|  0.0|(13424,[0,74,105,...|
|  1.0|(13424,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



In [87]:
training, test = cleaned_data.randomSplit([0.7,0.3])

In [88]:
spam_detector = nb.fit(training)

23/05/21 17:02:41 WARN DAGScheduler: Broadcasting large task binary with size 1142.7 KiB
23/05/21 17:02:47 WARN DAGScheduler: Broadcasting large task binary with size 1126.3 KiB
                                                                                

In [89]:
test_results = spam_detector.transform(test)

In [90]:
test_results.show()

23/05/21 17:03:28 WARN DAGScheduler: Broadcasting large task binary with size 1360.1 KiB
[Stage 59:>                                                         (0 + 1) / 1]

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,2,41,...|[-1062.2072173855...|[1.0,3.8985651638...|       0.0|
|  0.0|(13424,[0,1,5,15,...|[-998.36436082150...|[1.0,3.9496523954...|       0.0|
|  0.0|(13424,[0,1,7,8,1...|[-870.06851304074...|[1.0,4.8982570410...|       0.0|
|  0.0|(13424,[0,1,9,14,...|[-549.11720978981...|[1.0,8.0444295900...|       0.0|
|  0.0|(13424,[0,1,14,78...|[-692.52358394833...|[1.0,8.1051838478...|       0.0|
|  0.0|(13424,[0,1,15,20...|[-693.10138322613...|[1.0,2.0246190266...|       0.0|
|  0.0|(13424,[0,1,17,19...|[-804.92516203141...|[1.0,6.6609501986...|       0.0|
|  0.0|(13424,[0,1,20,27...|[-966.66976540239...|[1.0,3.4240278092...|       0.0|
|  0.0|(13424,[0,1,27,35...|[-1471.3876569655...|[1.0,1.3544380420...|       0.0|
|  0.0|(13424,[0

                                                                                

In [91]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [92]:
acc_eval =MulticlassClassificationEvaluator()

In [93]:
acc = acc_eval.evaluate(test_results)

23/05/21 17:04:31 WARN DAGScheduler: Broadcasting large task binary with size 1365.2 KiB
                                                                                

In [94]:
print(acc)

0.9300343993352429


Thank you !!!