In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nlp').getOrCreate()

In [0]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer

In [0]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [0]:
sen_df = spark.createDataFrame([
  (0, 'Hi I heard about spark'),
  (1, 'I wish java could use case classes'),
  (2, 'logistic,regression,models,are,neat')
],['id','sentence'])

In [0]:
sen_df.show()

##### word tokenize

In [0]:
tokenizer = Tokenizer(inputCol = 'sentence', outputCol = 'words')

In [0]:
regex_tokenizer = RegexTokenizer(inputCol = 'sentence', outputCol = 'words', pattern = '\\W')
# \W match any character which is not word character, equals to [^a-zA-Z0-9_]

In [0]:
count_tokens = udf(lambda words:len(words), IntegerType())

In [0]:
tokenized = tokenizer.transform(sen_df)

In [0]:
tokenized.show()

In [0]:
tokenized.withColumn('tokens', count_tokens(col('words'))).show()

In [0]:
rg_tokenized = regex_tokenizer.transform(sen_df)

In [0]:
rg_tokenized.withColumn('tokens', count_tokens(col('words'))).show()

##### remove stop words

In [0]:
from pyspark.ml.feature import StopWordsRemover

In [0]:
sentenceDF = rg_tokenized.select('id','words')

In [0]:
remover = StopWordsRemover(inputCol = 'words', outputCol = 'filtered')

In [0]:
remover.transform(sentenceDF).show()

#### n-grams

In [0]:
from pyspark.ml.feature import NGram

In [0]:
ngram = NGram(n = 2, inputCol = 'words', outputCol = 'grams')

In [0]:
ngram.transform(sentenceDF).show(truncate = False)

#### TF-IDF

In [0]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [0]:
sentence = spark.createDataFrame([
  (0.0, 'Hi I heard about spark'),
  (0.0, 'I wish Java could use case classes'),
  (1.0, 'Logistic regression models are neat')
], ['label','sentence'])

In [0]:
sentence.show()

In [0]:
tokenizer = Tokenizer(inputCol = 'sentence', outputCol = 'words')

In [0]:
words_data = tokenizer.transform(sentence)

In [0]:
words_data.show()

In [0]:
hashing_tf = HashingTF(inputCol = 'words', outputCol = 'rawFeatures')

In [0]:
featurized_data = hashing_tf.transform(words_data)

In [0]:
idf = IDF(inputCol = 'rawFeatures', outputCol = 'features')

In [0]:
idf_model = idf.fit(featurized_data)

In [0]:
rescaled_data = idf_model.transform(featurized_data)

In [0]:
rescaled_data.show()

#### count vectorizer

In [0]:
from pyspark.ml.feature import CountVectorizer

In [0]:
df = spark.createDataFrame([
  (0, 'a b c'.split(' ')),
  (1, 'a b b c a'.split(' '))
], ['id', 'words'])

In [0]:
df.show()

In [0]:
cv = CountVectorizer(inputCol = 'words', outputCol = 'features', vocabSize = 3, minDF = 2.0)

In [0]:
model = cv.fit(df)

In [0]:
model.transform(df).show(truncate = False)

#### nlp code along project

##### create spark session

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nlpproj').getOrCreate()

##### load data

In [0]:
path = '/FileStore/tables/SMSSpamCollection'
data = spark.read.csv(path, inferSchema = True, sep = '\t')

In [0]:
data.show()

##### rename data columns

In [0]:
data = data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')

In [0]:
data.show()

##### check text length

In [0]:
from pyspark.sql.functions import length

In [0]:
data = data.withColumn('length', length(data['text']))

In [0]:
data.groupBy('class').mean().show()

##### feature engineering

In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer

In [0]:
tokenizer = Tokenizer(inputCol = 'text', outputCol = 'token_text')
stop_remover = StopWordsRemover(inputCol = 'token_text', outputCol = 'stop_tokens')
cv = CountVectorizer(inputCol = 'stop_tokens', outputCol = 'c_vec')
idf = IDF(inputCol = 'c_vec', outputCol = 'tf_idf')
ham_spam_to_numeric = StringIndexer(inputCol = 'class', outputCol = 'Label')

In [0]:
from pyspark.ml.feature import VectorAssembler

In [0]:
clean_up = VectorAssembler(inputCols = ['tf_idf', 'length'], outputCol = 'features')

##### create data preprocessing pipeline

In [0]:
from pyspark.ml.classification import NaiveBayes

In [0]:
nb = NaiveBayes()

In [0]:
from pyspark.ml import Pipeline

In [0]:
data_prep_pipe = Pipeline(stages = [ham_spam_to_numeric, tokenizer, stop_remover, cv, idf, clean_up])

In [0]:
cleaner = data_prep_pipe.fit(data)

In [0]:
clean_data = cleaner.transform(data)

In [0]:
clean_data = clean_data.select('features','label')

##### split data into training set and test set

In [0]:
training, test = clean_data.randomSplit([0.7, 0.3])

##### modeling

In [0]:
spam_detector = nb.fit(training)

In [0]:
test_results = spam_detector.transform(test)

In [0]:
test_results.show()

##### evaluation

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
acc_eval =  MulticlassClassificationEvaluator()

In [0]:
acc = acc_eval.evaluate(test_results)

In [0]:
print('ACC of NB model')
print(acc)