In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('nlp').getOrCreate()

In [3]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer

In [4]:
from pyspark.sql.functions import col, udf

In [5]:
from pyspark.sql.types import IntegerType

In [6]:
sen_df = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

In [7]:
sen_df.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|Hi I heard about ...|
|  1|I wish Java could...|
|  2|Logistic,regressi...|
+---+--------------------+



In [9]:
tokenizer = Tokenizer(inputCol='sentence', outputCol='words')

In [28]:
regex_tokenizer = RegexTokenizer(inputCol='sentence', outputCol='words', pattern='\\W')

In [14]:
count_tokens = udf(lambda words:len(words), IntegerType())

In [15]:
tokenized = tokenizer.transform(sen_df)

In [16]:
type(tokenized)

pyspark.sql.dataframe.DataFrame

In [21]:
tokenized.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|
|  1|I wish Java could...|[i, wish, java, c...|
|  2|Logistic,regressi...|[logistic,regress...|
+---+--------------------+--------------------+



In [22]:
tokenized.withColumn('tokens', count_tokens(col('words'))).show()

+---+--------------------+--------------------+------+
| id|            sentence|               words|tokens|
+---+--------------------+--------------------+------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|     5|
|  1|I wish Java could...|[i, wish, java, c...|     7|
|  2|Logistic,regressi...|[logistic,regress...|     1|
+---+--------------------+--------------------+------+



In [29]:
rg_tokenized = regex_tokenizer.transform(sen_df)

In [31]:
rg_tokenized.withColumn('tokens', count_tokens(col('words'))).show()

+---+--------------------+--------------------+------+
| id|            sentence|               words|tokens|
+---+--------------------+--------------------+------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|     5|
|  1|I wish Java could...|[i, wish, java, c...|     7|
|  2|Logistic,regressi...|[logistic, regres...|     5|
+---+--------------------+--------------------+------+



In [32]:
from pyspark.ml.feature import StopWordsRemover

In [40]:
sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "tokens"])

In [41]:
sentenceData.show()

+---+--------------------+
| id|              tokens|
+---+--------------------+
|  0|[I, saw, the, red...|
|  1|[Mary, had, a, li...|
+---+--------------------+



In [42]:
remover = StopWordsRemover(inputCol='tokens', outputCol='filtered')

In [44]:
remover.transform(sentenceData).show()

+---+--------------------+--------------------+
| id|              tokens|            filtered|
+---+--------------------+--------------------+
|  0|[I, saw, the, red...| [saw, red, balloon]|
|  1|[Mary, had, a, li...|[Mary, little, lamb]|
+---+--------------------+--------------------+



In [45]:
from pyspark.ml.feature import NGram

In [47]:
wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

In [49]:
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

In [50]:
ngramDataFrame = ngram.transform(wordDataFrame)

In [51]:
ngramDataFrame.show()

+---+--------------------+--------------------+
| id|               words|              ngrams|
+---+--------------------+--------------------+
|  0|[Hi, I, heard, ab...|[Hi I, I heard, h...|
|  1|[I, wish, Java, c...|[I wish, wish Jav...|
|  2|[Logistic, regres...|[Logistic regress...|
+---+--------------------+--------------------+



In [52]:
ngramDataFrame.select("ngrams").show(truncate=False)

+------------------------------------------------------------------+
|ngrams                                                            |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



In [53]:
ngram = NGram(n=3, inputCol="words", outputCol="ngrams")
ngramDataFrame3 = ngram.transform(wordDataFrame)
ngramDataFrame3.select("ngrams").show(truncate=False)

+--------------------------------------------------------------------------------+
|ngrams                                                                          |
+--------------------------------------------------------------------------------+
|[Hi I heard, I heard about, heard about Spark]                                  |
|[I wish Java, wish Java could, Java could use, could use case, use case classes]|
|[Logistic regression models, regression models are, models are neat]            |
+--------------------------------------------------------------------------------+



In [54]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [56]:
sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

sentenceData.show(truncate=False)

+-----+-----------------------------------+
|label|sentence                           |
+-----+-----------------------------------+
|0.0  |Hi I heard about Spark             |
|0.0  |I wish Java could use case classes |
|1.0  |Logistic regression models are neat|
+-----+-----------------------------------+



In [57]:
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
wordsData.show()

+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|  0.0|Hi I heard about ...|[hi, i, heard, ab...|
|  0.0|I wish Java could...|[i, wish, java, c...|
|  1.0|Logistic regressi...|[logistic, regres...|
+-----+--------------------+--------------------+



In [58]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")

In [59]:
featurizedData = hashingTF.transform(wordsData)

In [62]:
featurizedData.select('rawFeatures').show(truncate=False)

+------------------------------------------------------------------------------------+
|rawFeatures                                                                         |
+------------------------------------------------------------------------------------+
|(262144,[18700,19036,33808,66273,173558],[1.0,1.0,1.0,1.0,1.0])                     |
|(262144,[19036,20719,55551,58672,98717,109547,192310],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(262144,[46243,58267,91006,160975,190884],[1.0,1.0,1.0,1.0,1.0])                    |
+------------------------------------------------------------------------------------+



In [64]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [68]:
rescaledData.select("label", "features").show(truncate =False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                                                                                      |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0.0  |(262144,[18700,19036,33808,66273,173558],[0.6931471805599453,0.28768207245178085,0.6931471805599453,0.6931471805599453,0.6931471805599453])                                                   |
|0.0  |(262144,[19036,20719,55551,58672,98717,109547,192310],[0.28768207245178085,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453])|
|1.0 

In [75]:
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a c ".split(" ")),
    (1, "a b b c a d".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=4)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)

+---+------------------+-------------------------------+
|id |words             |features                       |
+---+------------------+-------------------------------+
|0  |[a, c, ]          |(4,[0,2],[1.0,1.0])            |
|1  |[a, b, b, c, a, d]|(4,[0,1,2,3],[2.0,2.0,1.0,1.0])|
+---+------------------+-------------------------------+



In [70]:
df.show()

+---+---------------+
| id|          words|
+---+---------------+
|  0|      [a, b, c]|
|  1|[a, b, b, c, a]|
+---+---------------+



## project - spam detection filter

In [76]:
data = spark.read.csv('smsspamcollection/SMSSpamCollection', inferSchema=True, sep='\t')

In [79]:
data = data.withColumnRenamed('_c0', 'class').withColumnRenamed('_c1', 'text')
data.show(truncate = False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|class|text                                                                                                                                                                                                |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                                                                     |
|ham  |Ok lar... Joking wif u oni...                                                                                                                                                

In [80]:
from pyspark.sql.functions import length

In [82]:
data = data.withColumn('length', length(data['text']))

In [83]:
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [84]:
data.groupBy('class').mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [85]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover,CountVectorizer,IDF, StringIndexer

In [86]:
tokenizer = Tokenizer(inputCol='text', outputCol='token_text')

In [88]:
stop_remover = StopWordsRemover(inputCol='token_text', outputCol='stop_token')

In [89]:
count_vec = CountVectorizer(inputCol='stop_token', outputCol='c_vec')

In [90]:
idf = IDF(inputCol='c_vec', outputCol='tf_idf')

In [91]:
ham_spam_to_numeric = StringIndexer(inputCol='class', outputCol='label')

In [92]:
from pyspark.ml.feature import VectorAssembler

In [93]:
clean_up = VectorAssembler(inputCols=['tf_idf', 'length'], outputCol='features')

In [94]:
from pyspark.ml.classification import NaiveBayes

In [95]:
nb = NaiveBayes()

In [97]:
from pyspark.ml import Pipeline

In [99]:
data_prep_pipe = Pipeline(stages=[
    ham_spam_to_numeric, tokenizer, stop_remover, count_vec, idf, clean_up 
])

In [100]:
cleaner = data_prep_pipe.fit(data)

In [101]:
clean_data = cleaner.transform(data)

In [106]:
clean_data = clean_data.select('label','features')

In [107]:
clean_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
|  1.0|(13424,[10,60,139...|
|  0.0|(13424,[10,53,103...|
|  0.0|(13424,[125,184,4...|
|  1.0|(13424,[1,47,118,...|
|  1.0|(13424,[0,1,13,27...|
|  0.0|(13424,[18,43,120...|
|  1.0|(13424,[8,17,37,8...|
|  1.0|(13424,[13,30,47,...|
|  0.0|(13424,[39,96,217...|
|  0.0|(13424,[552,1697,...|
|  1.0|(13424,[30,109,11...|
|  0.0|(13424,[82,214,47...|
|  0.0|(13424,[0,2,49,13...|
|  0.0|(13424,[0,74,105,...|
|  1.0|(13424,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



In [108]:
training , test = clean_data.randomSplit([0.7,0.3])

In [110]:
spam_detector = nb.fit(training)

In [111]:
test_results = spam_detector.transform(test)

In [112]:
test_results.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,5,20,...|[-801.29819108471...|[1.0,1.9955123539...|       0.0|
|  0.0|(13424,[0,1,7,8,1...|[-1198.7110574189...|[1.0,1.8615501580...|       0.0|
|  0.0|(13424,[0,1,7,15,...|[-657.43250298019...|[1.0,8.8486654854...|       0.0|
|  0.0|(13424,[0,1,9,14,...|[-537.95986815131...|[1.0,3.1173622349...|       0.0|
|  0.0|(13424,[0,1,14,18...|[-1381.1413317736...|[1.0,1.6434447572...|       0.0|
|  0.0|(13424,[0,1,14,31...|[-216.88086902036...|[1.0,1.6247462170...|       0.0|
|  0.0|(13424,[0,1,14,78...|[-688.00798987124...|[1.0,3.3049204686...|       0.0|
|  0.0|(13424,[0,1,18,20...|[-830.81250981613...|[1.0,3.1502745779...|       0.0|
|  0.0|(13424,[0,1,20,27...|[-988.31767598204...|[0.99999999998086...|       0.0|
|  0.0|(13424,[0

In [113]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [114]:
acc_eval = MulticlassClassificationEvaluator()

In [115]:
acc = acc_eval.evaluate(test_results)

In [116]:
acc

0.9179650917618305