#Part One

###Initializing SparkSession

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('nlp').getOrCreate()

### Creating our dataframe

In [6]:
sen_df = spark.createDataFrame([
  (0, 'Hi I heard about Spark'),
  (1, 'I wish java could use case classes'),
  (2, 'Logistic,regression,models,are,neat')
], ['id', 'sentence'])

In [7]:
sen_df.show()

### Creating Tokenizer object

In [9]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
tokenizer = Tokenizer(inputCol='sentence', outputCol='words')

In [10]:
regex_tokenizer = RegexTokenizer(inputCol='sentence', outputCol='words', pattern='\\W')

### Creating a User Defined Function

In [12]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
count_tokens = udf(lambda words:len(words), IntegerType())

###Creating the tokenized dataframe

In [14]:
tokenized = tokenizer.transform(sen_df)

In [15]:
tokenized.show()

In [16]:
tokenized.withColumn('tokens', count_tokens(col('words'))).show()

### Splitting not only on white space, but in comma too

In [18]:
rg_tokenized = regex_tokenizer.transform(sen_df)

In [19]:
rg_tokenized.withColumn('tokens', count_tokens(col('words'))).show()

###Stop words removal

In [21]:
from pyspark.ml.feature import StopWordsRemover

In [22]:
sentence_df = spark.createDataFrame([
  (0, ['I', 'saw', 'the', 'green', 'horse']),
  (1, ['Mary', 'had', 'a', 'little', 'lamb'])
], ['id', 'tokens'])

In [23]:
sentence_df.show()

In [24]:
remover = StopWordsRemover(inputCol='tokens', outputCol='filtered')

In [25]:
remover.transform(sentence_df).show()

###n-gram (sequence of n consecutive words)

In [27]:
from pyspark.ml.feature import NGram
word_df = spark.createDataFrame([
  (0, ['Hi', 'I', 'heard', 'about', 'Spark']),
  (1, ['I', 'wish', 'Java', 'could', 'use', 'case', 'classes']),
  (2, ['Logistic', 'regression', 'models', 'are', 'neat'])
], ['id', 'words'])

In [28]:
ngram = NGram(n=2, inputCol='words', outputCol='grams')

In [29]:
ngram.transform(word_df).show()

In [30]:
ngram.transform(word_df).select('grams').show(truncate=False)

#Part Two

In [32]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

###Creating a dataset

In [34]:
sen_df = spark.createDataFrame([
  (0.0, 'Hi I heard about Spark'),
  (0.0, 'I wish java could use case classes'),
  (1.0, 'Logistic regression models are neat') 
], ['label', 'sentence'])

In [35]:
sen_df.show()

###Tokenizing

In [37]:
tokenizer = Tokenizer(inputCol='sentence', outputCol='words')

In [38]:
words_data = tokenizer.transform(sen_df)

In [39]:
words_data.show(truncate=False)

###Getting the tf

In [41]:
hashing_tf = HashingTF(inputCol='words', outputCol='rawFeatures')

In [42]:
featurized_data = hashing_tf.transform(words_data)

###Getting the idf

In [44]:
idf = IDF(inputCol='rawFeatures', outputCol='features')

In [45]:
idf_model = idf.fit(featurized_data)

###Getting tf-idf

In [47]:
rescaled_data = idf_model.transform(featurized_data)

In [48]:
rescaled_data.select('label', 'features').show(truncate=False)

###Creating a new sample df

In [50]:
df = spark.createDataFrame([
  (0, 'a b c'.split(' ')),
  (1, 'a b b c a'.split(' '))
], ['id', 'words'])

###Using CountVectorizer which counts the number of words using a vector

In [52]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol='words', outputCol='features', vocabSize=3, minDF=2.0)

In [53]:
model = cv.fit(df)

In [54]:
result = model.transform(df)

In [55]:
result.show(truncate=False)

#Part 3

##Bulding a spam detection filter

###Initializing SparkSession

In [59]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nlp').getOrCreate()

###Importing the dataset

In [61]:
df = spark.read.csv('/FileStore/tables/SMSSpamCollection', inferSchema=True, sep='\t')
df = df.withColumnRenamed('_c0', 'class').withColumnRenamed('_c1', 'text')
df.show()

##Formatting the data

###Creating length column (feature engineering)

In [64]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from pyspark.sql.functions import length
df = df.withColumn('length', length(df['text']))
df.show()

###Visualizing length col

In [66]:
df.groupBy('class').mean().show()

###Label encoding

In [68]:
ham_spam_to_numeric = StringIndexer(inputCol='class', outputCol='label')

###Using tokenizer (splitting the document list)

In [70]:
tokenizer = Tokenizer(inputCol='text', outputCol='token_text')

###Removing irrelevant words

In [72]:
stop_remove = StopWordsRemover(inputCol='token_text', outputCol='stop_token')

###Counting the words

In [74]:
count_vec = CountVectorizer(inputCol='stop_token', outputCol='c_vec')

###Getting TF-IDF

In [76]:
idf = IDF(inputCol='c_vec', outputCol='tf_idf')

###Getting transformed df

In [78]:
from pyspark.ml.feature import VectorAssembler
transformed_df = VectorAssembler(inputCols=['tf_idf', 'length'], outputCol='features')
transformed_df.show()

###Building a pipeline

In [80]:
from pyspark.ml import Pipeline
data_prep_pipe = Pipeline(stages=[ham_spam_to_numeric,
                                  tokenizer,
                                  stop_remove,
                                  count_vec,
                                  idf,
                                  transformed_df])
cleaner = data_prep_pipe.fit(df)
clean_data = cleaner.transform(df)
clean_data = clean_data.select('label', 'features')
clean_data.show()

##Machine Learning

###Splitting the dataset

In [83]:
training, test = clean_data.randomSplit([0.7, 0.3])

###Building the model

In [85]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes()
spam_detector = nb.fit(training)

In [86]:
df.printSchema()

###Making predictions

In [88]:
test_results = spam_detector.transform(test)
test_results.show()

###Evaluting the model

In [90]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print('ACC of NB Model')
print(acc)

###Using evaluation based on my documentation

In [92]:
preds = spam_detector.transform(test)

In [93]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label')
area_under_curve = evaluator.evaluate(preds)
accuracy = MulticlassClassificationEvaluator(metricName='accuracy', labelCol='label')
accuracy = accuracy.evaluate(preds)

In [94]:
print(area_under_curve)
print(accuracy)