In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nlp').getOrCreate()

In [2]:
# A standard way of converting a body of text into
# something an ML model can understand is the TF-IDF methods
# Term Frequency - Inverse Document Frequency

In [3]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer

In [4]:
from pyspark.sql.functions import col,udf
from pyspark.sql.types import IntegerType

In [5]:
sentences_df = spark.createDataFrame([(0,'Hi I heard about Spark'),
    (1,'I wish Java could use case classes'),
    (2,'Logistic,regression,models,are,neat')
],['id','sentence'])

In [6]:
# If this generates some error, do the following:

# 1. In a cmd, run nano ~/.bashrc
# 2. Add at the end of the file the 3 lines below:
# export PYSPARK_PYTHON=/usr/bin/python3
# export PYSPARK_DRIVER_PYTHON=python3
# export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
# 3. Restart the Jupyter Notebook
sentences_df.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|Hi I heard about ...|
|  1|I wish Java could...|
|  2|Logistic,regressi...|
+---+--------------------+



In [7]:
# Create the tokenizer feature (detects spacing)
tokenizer = Tokenizer(inputCol='sentence',outputCol='words')

In [9]:
# Create the regex tokenizer feature
# (detects special characters to split on)
regexTokenizer = RegexTokenizer(inputCol='sentence',
                                outputCol='words',
                                pattern='\\W')

In [10]:
# User-Defined Function to count tokens
count_tokens = udf(lambda words:len(words),IntegerType())

In [11]:
tokenized = tokenizer.transform(sentences_df)

In [12]:
tokenized.show()
# Notice you do not know if the 3rd sentence is one full word
# (due to commas) or if it did count them correctly

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|
|  1|I wish Java could...|[i, wish, java, c...|
|  2|Logistic,regressi...|[logistic,regress...|
+---+--------------------+--------------------+



In [13]:
# That will be made clear with our UDF
tokenized.withColumn('tokens',count_tokens(col('words'))).show()

+---+--------------------+--------------------+------+
| id|            sentence|               words|tokens|
+---+--------------------+--------------------+------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|     5|
|  1|I wish Java could...|[i, wish, java, c...|     7|
|  2|Logistic,regressi...|[logistic,regress...|     1|
+---+--------------------+--------------------+------+



In [16]:
# Now, with the better function
# Split on commas and spaces
rg_tokenized = regexTokenizer.transform(sentences_df)

In [17]:
rg_tokenized.withColumn('tokens',count_tokens(col('words'))).show()

+---+--------------------+--------------------+------+
| id|            sentence|               words|tokens|
+---+--------------------+--------------------+------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|     5|
|  1|I wish Java could...|[i, wish, java, c...|     7|
|  2|Logistic,regressi...|[logistic, regres...|     5|
+---+--------------------+--------------------+------+



In [18]:
# Identify common words ('a', 'the', ...) and remove them
from pyspark.ml.feature import StopWordsRemover

In [19]:
sentenceDataFrame = spark.createDataFrame([(0,['I','saw','the','green','horse']),
                                           (1,['Mary','had','a','little','lamb'])],
                                         ['id','tokens'])

In [20]:
sentenceDataFrame.show()

+---+--------------------+
| id|              tokens|
+---+--------------------+
|  0|[I, saw, the, gre...|
|  1|[Mary, had, a, li...|
+---+--------------------+



In [21]:
remover = StopWordsRemover(inputCol='tokens',
                           outputCol='filtered')

In [22]:
remover.transform(sentenceDataFrame).show()

+---+--------------------+--------------------+
| id|              tokens|            filtered|
+---+--------------------+--------------------+
|  0|[I, saw, the, gre...| [saw, green, horse]|
|  1|[Mary, had, a, li...|[Mary, little, lamb]|
+---+--------------------+--------------------+



In [23]:
# n-gram takes input of tokens
# and creates strings of consecutive words
from pyspark.ml.feature import NGram

In [26]:
wordDataFrame = spark.createDataFrame([(0,['Hi','I','heard','about','Spark']),
                                       (1,['I','wish','Java','could','use','case','classes']),
                                       (2,['Logistic','regression','models','are','neat'])],
                                     ['id','words'])

In [27]:
wordDataFrame.show()

+---+--------------------+
| id|               words|
+---+--------------------+
|  0|[Hi, I, heard, ab...|
|  1|[I, wish, Java, c...|
|  2|[Logistic, regres...|
+---+--------------------+



In [28]:
ngram = NGram(n=2,inputCol='words',outputCol='grams')

In [30]:
#ngram.transform(wordDataFrame).show()
(ngram.transform(wordDataFrame)
 .select('grams')
 .show(truncate=False))

+------------------------------------------------------------------+
|grams                                                             |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



In [31]:
# Now let's check for term frequency
from pyspark.ml.feature import HashingTF, IDF

In [36]:
sentenceData = spark.createDataFrame([(0.0,'Hi I heard about Spark'),
    (0.0,'I wish Java could use case classes'),
    (1.0,'Logistic regression models are neat')
],['label','sentence'])

In [37]:
sentenceData.show()

+-----+--------------------+
|label|            sentence|
+-----+--------------------+
|  0.0|Hi I heard about ...|
|  0.0|I wish Java could...|
|  1.0|Logistic regressi...|
+-----+--------------------+



In [38]:
#tokenizer = Tokenizer(inputCol='sentence',outputCol='words')
words_data = tokenizer.transform(sentenceData)

In [39]:
words_data.show()

+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|  0.0|Hi I heard about ...|[hi, i, heard, ab...|
|  0.0|I wish Java could...|[i, wish, java, c...|
|  1.0|Logistic regressi...|[logistic, regres...|
+-----+--------------------+--------------------+



In [40]:
# To grab the term frequency...
hashing_tf = HashingTF(inputCol='words',
                       outputCol='rawFeatures')

In [41]:
featurizedData = hashing_tf.transform(words_data)

In [42]:
idf = IDF(inputCol='rawFeatures', outputCol='features')

In [43]:
idfModel = idf.fit(featurizedData)

In [44]:
rescaledData = idfModel.transform(featurizedData)

In [46]:
# NOW THE DATA IS READY FOR ANY ML SUPERVISED ALGORITHM!
rescaledData.select('label','features').show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262144,[18700,19...|
|  0.0|(262144,[19036,20...|
|  1.0|(262144,[46243,58...|
+-----+--------------------+



In [47]:
# Identify terms across bodies of text
# i.e., convert a collection of text documents 
# into vectors of word counts
from pyspark.ml.feature import CountVectorizer

In [48]:
df = spark.createDataFrame([(0,"a b c".split(" ")),
                            (1,"a b b c a".split(" "))],
                          ["id","words"])

In [49]:
df.show()

+---+---------------+
| id|          words|
+---+---------------+
|  0|      [a, b, c]|
|  1|[a, b, b, c, a]|
+---+---------------+



In [50]:
# Input words, output features, with a max.
# vocab. of 3 words and minimum number of documents
# a term must appear in to be considered a term is 2 documents
cv = CountVectorizer(inputCol='words',outputCol='features',
                     vocabSize=3, minDF=2.0)

In [51]:
model = cv.fit(df)

In [52]:
result = model.transform(df)

In [53]:
result.show(truncate=False)

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+

