<a href="https://colab.research.google.com/github/bdmlworkshop/Examples/blob/main/Features_for_NLP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Tools for NLP

There are lots of feature transformations that need to be done on text input to get it to a point that machine learning algorithms can start ptocessing. Spark has placed the most important ones in convienent Feature Transformer calls. 

Let's look over them before jumping into the spam example



ML. It's the newest library that's based on the use of DataFrame, which is basically an RDD[Row] (a Row is just a sequence of untyped objects) with a schema (ie an object that contains information about the colum names, types, metadata...). The fit method which is the method that all the estimators need to implement.

Explanation: The ML library uses the notion of Pipeline. A pipeline instance is basically an array of stages (of type PipelineStage), each one of them being either an **Estimator** or a **Transformer** (there are some other types, e.g. Evaluator but I won't get into them here as they are being rare). 

A Transformer is simply an algorithm that transforms your data, so its main method is `transform(DataFrame)` and it outputs another DataFrame. 

An Estimator is a an algorithm that produces a Model (a subtype of Transformer). It's basically any block that needs to fit (to train) on data, so it has a function `fit(DataFrame)` that outputs a Transformer. 

For instance if you want to multiply all your data by $2$, you only need a transformer that implements a transform method that takes your input and multiply it by $2$. 
If you need to compute the mean and substract it, you need an estimator that fits on the data to compute the mean and outputs a transformer the substracts the mean learned. So any time you use ML, use the fit and transform methods.

While Spark ML pipelines have a wide variety of algorithms, you may find yourself wanting additional functionality without having to leave the pipeline model. In Spark MLlib (rdd implementation), this isn’t much of a problem—you can manually implement your algorithm with RDD transformations and keep going from there. For Spark ML pipelines (Dataframes imlementation), the same approach can work, but we lose some of the nicely integrated properties of the pipeline, including the ability to automatically run meta-algorithms, such as cross-validation parameter search.

To add your own algorithm to a Spark pipeline, you need to implement either Estimator or Transformer, which implements the PipelineStage interface. For algorithms that don’t require training, you can implement the Transformer interface, and for algorithms with training you can implement the Estimator interface—both in org.apache.spark.ml (both of which implement the base PipelineStage). Note that training is not limited to complicated machine learning models; even the MinMaxScaler requires training to determine the range. If they need training, they must be constructed as Estimator rather than Transformer.

## Let start the introductory examples
### First as usual with Google Collab, install and init Spark

In [None]:
!wget -q https://mirrors.netix.net/apache/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar -xzf spark-3.0.1-bin-hadoop3.2.tgz
!pip install -q findspark
# define some evironement variable diretly with python instruction using the module os
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/default-java"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('BDML_nlp_tools').getOrCreate()

## Tokenizer
<p><a href="http://en.wikipedia.org/wiki/Lexical_analysis#Tokenization">Tokenization</a> is the process of taking text (such as a sentence) and breaking it into individual terms (usually words).  A simple <a href="api/scala/index.html#org.apache.spark.ml.feature.Tokenizer">Tokenizer</a> class provides this functionality.  The example below shows how to split sentences into sequences of words.</p>

<p><a href="api/scala/index.html#org.apache.spark.ml.feature.RegexTokenizer">RegexTokenizer</a> allows more
 advanced tokenization based on regular expression (regex) matching.
 By default, the parameter &#8220;pattern&#8221; (regex, default: <code>"\\s+"</code> one or many spaces) is used as delimiters to split the input text.
 Alternatively, users can set parameter &#8220;gaps&#8221; to false indicating the regex &#8220;pattern&#8221; denotes
 &#8220;tokens&#8221; rather than splitting gaps, and find all matching occurrences as the tokenization result.</p>

In [None]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

#help(RegexTokenizer)

In [None]:
help(col)

In [None]:
help(udf)

In [None]:
sentenceDataFrame = spark.createDataFrame([
    (0, "Hi do you heard about Spark?"),
    (1, "I wish I knew Python and pysaprk before"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

In [None]:
sentenceDataFrame.show(truncate=False)

In [None]:
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)



In [None]:
regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

In [None]:
regexTokenizer.getGaps()


## Stop Words Removal

<p><a href="https://en.wikipedia.org/wiki/Stop_words">Stop words</a> are words which
should be excluded from the input, typically because the words appear
frequently and don&#8217;t carry as much meaning.</p>

<p><code>StopWordsRemover</code> takes as input a sequence of strings (e.g. the output
of a <a href="ml-features.html#tokenizer">Tokenizer</a>) and drops all the stop
words from the input sequences. The list of stopwords is specified by
the <code>stopWords</code> parameter. Default stop words for some languages are accessible 
by calling <code>StopWordsRemover.loadDefaultStopWords(language)</code>, for which available 
options are &#8220;danish&#8221;, &#8220;dutch&#8221;, &#8220;english&#8221;, &#8220;finnish&#8221;, &#8220;french&#8221;, &#8220;german&#8221;, &#8220;hungarian&#8221;, 
&#8220;italian&#8221;, &#8220;norwegian&#8221;, &#8220;portuguese&#8221;, &#8220;russian&#8221;, &#8220;spanish&#8221;, &#8220;swedish&#8221; and &#8220;turkish&#8221;. 
A boolean parameter <code>caseSensitive</code> indicates if the matches should be case sensitive 
(false by default).</p>

In [None]:
from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)


## n-grams

An n-gram is a sequence of nn tokens (typically words) for some integer nn. The NGram class can be used to transform input features into nn-grams.

<p><code>NGram</code> takes as input a sequence of strings (e.g. the output of a <a href="ml-features.html#tokenizer">Tokenizer</a>).  The parameter <code>n</code> is used to determine the number of terms in each $n$-gram. The output will consist of a sequence of $n$-grams where each $n$-gram is represented by a space-delimited string of $n$ consecutive words.  If the input sequence contains fewer than <code>n</code> strings, no output is produced.</p>


In [None]:
from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["hi", "do", "you", "heard", "about", "spark"]),
    (1, ["i", "wish", "i", "knew", "python", "and", "pysaprk", "before"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

_______
# Feature Extractors
_______

<h2 id="tf-idf">TF-IDF</h2>

<p><a href="http://en.wikipedia.org/wiki/Tf%E2%80%93idf">Term frequency-inverse document frequency (TF-IDF)</a> 
is a feature vectorization method widely used in text mining to reflect the importance of a term 
to a document in the corpus. Denote a term by $t$ , a document by  d , and the corpus by D.
Term frequency $TF(t, d)$ is the number of times that term `$t$` appears in document $d$, while 
document frequency $DF(t, D)$ is the number of documents that contains term $t$. If we only use 
term frequency to measure the importance, it is very easy to over-emphasize terms that appear very 
often but carry little information about the document, e.g. &#8220;a&#8221;, &#8220;the&#8221;, and &#8220;of&#8221;. If a term appears 
very often across the corpus, it means it doesn&#8217;t carry special information about a particular document.
Inverse document frequency is a numerical measure of how much information a term provides:

$$ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1} $$

where |D| is the total number of documents in the corpus. Since logarithm is used, if a term 
appears in all documents, its IDF value becomes 0. Note that a smoothing term is applied to avoid 
dividing by zero for terms outside the corpus. The TF-IDF measure is simply the product of TF and IDF:
$$ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). $$


In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi do you heard about Spark?"),
    (0.0, "I wish I knew Python and pysaprk before"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

sentenceData.show(truncate=False)

In [None]:
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
wordsData.show(truncate=False)

In [None]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

featurizedData.head()

In [None]:
featurizedData.select("label", "rawFeatures").show(truncate=False)

In [None]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show(truncate=False)

## CountVectorizer
CountVectorizer and CountVectorizerModel aim to help convert a collection of text documents to vectors of token counts. When an a-priori dictionary is not available, CountVectorizer can be used as an Estimator to extract the vocabulary, and generates a CountVectorizerModel. The model produces sparse representations for the documents over the vocabulary, which can then be passed to other algorithms like LDA.

During the fitting process, CountVectorizer will select the top vocabSize words ordered by term frequency across the corpus. An optional parameter minDF also affects the fitting process by specifying the minimum number (or fraction if < 1.0) of documents a term must appear in to be included in the vocabulary. Another optional binary toggle parameter controls the output vector. If set to true all nonzero counts are set to 1. This is especially useful for discrete probabilistic models that model binary, rather than integer, counts.

In [None]:
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)