# NLP with Pyspark

This post includes code from [Spark and Python for Big Data udemy course](https://udemy.com/course/spark-and-python-for-big-data-with-pyspark) and [Spark and Python for Big Data notebooks](https://github.com/SuperJohn/spark-and-python-for-big-data-with-pyspark).

In [2]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [6]:
sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

In [7]:
sentenceDataFrame.show()

In [8]:
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

In [9]:
## Stop Words Removal

In [10]:
from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)


In [11]:
## n-grams

In [12]:
from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

In [13]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

sentenceData.show()

In [14]:
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
wordsData.show()

In [15]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()

In [16]:
## CountVectorizer

In [17]:
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)

In [18]:
df = spark.read.load("/FileStore/tables/SMSSpamCollection",
                     format="csv", sep="\t", inferSchema="true", header="false")


In [19]:
df.printSchema()

In [20]:
data = df.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')

In [21]:
data.printSchema()

In [22]:
## Clean and Prepare the Data

In [23]:
from pyspark.sql.functions import length

In [24]:
data = data.withColumn('length',length(data['text']))

In [25]:
data.printSchema()

In [26]:
# Pretty Clear Difference
data.groupby('class').mean().show()

In [27]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer

tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')

In [28]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [29]:
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

In [30]:
### Naive Bayes

In [31]:
from pyspark.ml.classification import NaiveBayes

In [32]:
# Use defaults
nb = NaiveBayes()

In [33]:
### Pipeline

In [34]:
from pyspark.ml import Pipeline

In [35]:
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,clean_up])

In [36]:
cleaner = data_prep_pipe.fit(data)

In [37]:
clean_data = cleaner.transform(data)

In [38]:
### Training and Evaluation

In [39]:
clean_data = clean_data.select(['label','features'])

In [40]:
clean_data.show()

In [41]:
(training,testing) = clean_data.randomSplit([0.7,0.3])

In [42]:
spam_predictor = nb.fit(training)


In [43]:
data.printSchema()


In [44]:
test_results = spam_predictor.transform(testing)


In [45]:
test_results.show()


In [46]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [47]:
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(acc))
