# ML TEXT PREPROCESSING USIN SPARK
##### @ author : Frederic TWAHIRWA

Spark is advantageous for text analytics because it provides a platform for scalable, distributed computing.

When it comes to text analytics, you have a few option for analyzing text. I like to categorize these techniques like this:

    - Text Mining (i.e. Text clustering, data-driven topics)
    - Categorization (i.e. Tagging unstructured data into categories and sub-categories; hierarchies; taxonomies)
    - Entity Extraction (i.e. Extracting patterns such as phrases, addresses, product codes, phone numbers, etc.)
    - Sentiment Analysis (i.e. Tagging positive, negative, or neutral with varying levels of sentiment)
    - Deep Linguistics (i.e Semantics. Understanding causality, purpose, time, etc.) 

Which technique you use typically depends on the business use case and the question(s) you are trying to answer. It's also common to combine these techniques.

### This post will focus on feature Engineering for Texts

In [1]:
#from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.Builder().getOrCreate()

In [2]:
spark = SparkSession.Builder().getOrCreate() # required for dataframes

## Text pre-processing
### create a dataFrame

In [3]:
# create a dataFrame
inputDF = spark.createDataFrame([(0, "This is a ML project. A ML is a software project, not a hardware project"),
                                 (1, "Software project is best described as a collection of software programs "),
                                 (2, "Hardware project is best described as a device")],
                                  ["id", "document"])

In [4]:
inputDF.toPandas()

Unnamed: 0,id,document
0,0,This is a ML project. A ML is a software proje...
1,1,Software project is best described as a collec...
2,2,Hardware project is best described as a device


## Tokenizer
Tokenization is the process of taking text (such as a sentence) and breaking it into individual terms (usually words)

In [5]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer

In [6]:
tokenizer = Tokenizer(inputCol = "document", outputCol = "words")

tokenizedDF = tokenizer.transform(inputDF)
tokenizedDF.select('id', 'document').show(truncate = False)
tokenizedDF.select('id', 'words').show(truncate = False)

+---+------------------------------------------------------------------------+
|id |document                                                                |
+---+------------------------------------------------------------------------+
|0  |This is a ML project. A ML is a software project, not a hardware project|
|1  |Software project is best described as a collection of software programs |
|2  |Hardware project is best described as a device                          |
+---+------------------------------------------------------------------------+

+---+----------------------------------------------------------------------------------------+
|id |words                                                                                   |
+---+----------------------------------------------------------------------------------------+
|0  |[this, is, a, ml, project., a, ml, is, a, software, project,, not, a, hardware, project]|
|1  |[software, project, is, best, described, as, a, collection, o

#### Ponctuation removal ( using regexTokenizer )

In [7]:
regexTokenizer = RegexTokenizer(inputCol = "document", outputCol = "words", pattern = "\\s+|,|\\.")

tokenizedDF = regexTokenizer.transform(inputDF)
tokenizedDF.select('id', 'document').show(truncate = False)
tokenizedDF.select('id', 'words').show(truncate = False)

+---+------------------------------------------------------------------------+
|id |document                                                                |
+---+------------------------------------------------------------------------+
|0  |This is a ML project. A ML is a software project, not a hardware project|
|1  |Software project is best described as a collection of software programs |
|2  |Hardware project is best described as a device                          |
+---+------------------------------------------------------------------------+

+---+--------------------------------------------------------------------------------------+
|id |words                                                                                 |
+---+--------------------------------------------------------------------------------------+
|0  |[this, is, a, ml, project, a, ml, is, a, software, project, not, a, hardware, project]|
|1  |[software, project, is, best, described, as, a, collection, of, softw

## Removal of the StopWords
Stop words are words which should be excluded from the input, typically because the words appear frequently and don’t carry as much meaning.

We are going to use "StopWordsRemover".StopWordsRemover takes as input a sequence of strings (e.g. the output of a Tokenizer) and drops all the stop words from the input sequences. The list of stopwords is specified by the stopWords parameter

In [8]:
from pyspark.ml.feature import StopWordsRemover

In [9]:
stopwordsRemover = StopWordsRemover(inputCol = "words", outputCol = "words_filtered")

In [10]:
print (stopwordsRemover.loadDefaultStopWords('english'))

['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', 'her', 'hers', 'herself', 'it', 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', 'should', 'no

In [11]:
removedDF = stopwordsRemover.transform(tokenizedDF)
removedDF.show(truncate = True)

+---+--------------------+--------------------+--------------------+
| id|            document|               words|      words_filtered|
+---+--------------------+--------------------+--------------------+
|  0|This is a ML proj...|[this, is, a, ml,...|[ml, project, ml,...|
|  1|Software project ...|[software, projec...|[software, projec...|
|  2|Hardware project ...|[hardware, projec...|[hardware, projec...|
+---+--------------------+--------------------+--------------------+



In [12]:
removedDF.select("document", "words_filtered").show(truncate = False)

+------------------------------------------------------------------------+--------------------------------------------------------------------+
|document                                                                |words_filtered                                                      |
+------------------------------------------------------------------------+--------------------------------------------------------------------+
|This is a ML project. A ML is a software project, not a hardware project|[ml, project, ml, software, project, hardware, project]             |
|Software project is best described as a collection of software programs |[software, project, best, described, collection, software, programs]|
|Hardware project is best described as a device                          |[hardware, project, best, described, device]                        |
+------------------------------------------------------------------------+--------------------------------------------------------------

## Generate n-grams
An n-gram is a sequence of n tokens (typically words) for some integer n. The NGram class can be used to transform input features into n-grams

In [13]:
from pyspark.ml.feature import NGram

In [14]:
ngram = NGram(n = 2, inputCol = "words", outputCol = "ngrams")  # bigram
ngramDF = ngram.transform(removedDF)

In [15]:
row = ngramDF.select("document", "ngrams").collect()[0]

print (row['document'])
print (row['ngrams'])

This is a ML project. A ML is a software project, not a hardware project
['this is', 'is a', 'a ml', 'ml project', 'project a', 'a ml', 'ml is', 'is a', 'a software', 'software project', 'project not', 'not a', 'a hardware', 'hardware project']


In [16]:
row = ngramDF.select("document", "ngrams").collect()[1]

print (row['document'])
print (row['ngrams'])

Software project is best described as a collection of software programs 
['software project', 'project is', 'is best', 'best described', 'described as', 'as a', 'a collection', 'collection of', 'of software', 'software programs']


In [17]:
ngram = NGram(n = 3, inputCol = "words", outputCol = "ngrams")  # trigram
ngramDF = ngram.transform(removedDF)

In [18]:
row = ngramDF.select("document", "ngrams").collect()[0]

print (row['document'])
print (row['ngrams'])

This is a ML project. A ML is a software project, not a hardware project
['this is a', 'is a ml', 'a ml project', 'ml project a', 'project a ml', 'a ml is', 'ml is a', 'is a software', 'a software project', 'software project not', 'project not a', 'not a hardware', 'a hardware project']


In [19]:
row = ngramDF.select("document", "ngrams").collect()[1]

print (row['document'])
print (row['ngrams'])

Software project is best described as a collection of software programs 
['software project is', 'project is best', 'is best described', 'best described as', 'described as a', 'as a collection', 'a collection of', 'collection of software', 'of software programs']


# TF*DF
Term frequency-inverse document frequency (TF-IDF) is a feature vectorization method widely used in text mining to reflect the importance of a term to a document in the corpus. Denote a term by t, a document by d, and the corpus by D. Term frequency TF(t,d) is the number of times that term t appears in document d, while document frequency DF(t,D) is the number of documents that contains term t. If we only use term frequency to measure the importance, it is very easy to over-emphasize terms that appear very often but carry little information about the document, e.g. “a”, “the”, and “of”. If a term appears very often across the corpus, it means it doesn’t carry special information about a particular document. Inverse document frequency is a numerical measure of how much information a term provides:

$$
IDF(t,D)= log\frac{|D|+1}{DF(t,D)+1},
$$

where |D| is the total number of documents in the corpus. Since logarithm is used, if a term appears in all documents, its IDF value becomes 0. Note that a smoothing term is applied to avoid dividing by zero for terms outside the corpus. The TF-IDF measure is simply the product of TF and IDF:

$$
TFIDF(t,d,D)=TF(t,d)⋅IDF(t,D).
$$

There are several variants on the definition of term frequency and document frequency. In MLlib, we separate TF and IDF to make them flexible

more info : https://spark.apache.org/docs/2.2.0/ml-features.html#tf-idf

## Term Frequency

In [20]:
from pyspark.ml.feature import CountVectorizer    # implemented as an estimator

In [21]:
#countVectorizer = CountVectorizer(inputCol = "words_filtered", outputCol = "features_tf", vocabSize = 2)
countVectorizer = CountVectorizer(inputCol = "words_filtered", outputCol = "features_tf") 
model = countVectorizer.fit(removedDF)

In [22]:
print (model.vocabulary)
print (len(model.vocabulary))

['project', 'software', 'described', 'hardware', 'ml', 'best', 'programs', 'device', 'collection']
9


In [23]:
countDF = model.transform(removedDF)
row = countDF.collect()[0]

In [24]:
print (row['document'])
print (row['words_filtered'])
print (row['features_tf'])

This is a ML project. A ML is a software project, not a hardware project
['ml', 'project', 'ml', 'software', 'project', 'hardware', 'project']
(9,[0,1,3,4],[3.0,1.0,1.0,2.0])


In [25]:
type(row['features_tf'])

pyspark.ml.linalg.SparseVector

In [26]:
v = row['features_tf']

In [27]:
v.toArray()

array([ 3.,  1.,  0.,  1.,  2.,  0.,  0.,  0.,  0.])

## Document Frequency

In [28]:
from pyspark.ml.feature import HashingTF, IDF  # implemented as an estimator

In [29]:
idf = IDF(inputCol = "features_tf", outputCol = "features_tf_idf")
idfModel = idf.fit(countDF)

In [30]:
featuresDF = idfModel.transform(countDF)
featuresDF.select("id", "document").show(truncate = False)
featuresDF.select("id", "features_tf_idf").show(truncate = False)

+---+------------------------------------------------------------------------+
|id |document                                                                |
+---+------------------------------------------------------------------------+
|0  |This is a ML project. A ML is a software project, not a hardware project|
|1  |Software project is best described as a collection of software programs |
|2  |Hardware project is best described as a device                          |
+---+------------------------------------------------------------------------+

+---+------------------------------------------------------------------------------------------------------------------------+
|id |features_tf_idf                                                                                                         |
+---+------------------------------------------------------------------------------------------------------------------------+
|0  |(9,[0,1,3,4],[0.0,0.28768207245178085,0.28768207245178085,1.

In [31]:
row = featuresDF.collect()[0]

In [32]:
print (row['document'])
print (row['words_filtered'])
print (row['features_tf'])
print (row['features_tf_idf'])

This is a ML project. A ML is a software project, not a hardware project
['ml', 'project', 'ml', 'software', 'project', 'hardware', 'project']
(9,[0,1,3,4],[3.0,1.0,1.0,2.0])
(9,[0,1,3,4],[0.0,0.287682072452,0.287682072452,1.38629436112])


In [33]:
row = featuresDF.collect()[0]

In [34]:
print (row['document'])
print (row['words_filtered'])
print (row['features_tf'])
print (row['features_tf_idf'])

This is a ML project. A ML is a software project, not a hardware project
['ml', 'project', 'ml', 'software', 'project', 'hardware', 'project']
(9,[0,1,3,4],[3.0,1.0,1.0,2.0])
(9,[0,1,3,4],[0.0,0.287682072452,0.287682072452,1.38629436112])


## Categorical Features
**StringIndexer** encodes a string column of labels to a column of label indices.

** One-hot encoding** maps a column of label indices to a column of binary vectors, with at most a single one-value. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features.

In [35]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [36]:
catDF = spark.createDataFrame([
    (0, "Milan"),
    (1, "London"),
    (2, "Brussels"),
    (3, "Milan"),
    (4, "Paris"),
    (5, "Paris"),
    (6, "Milan"),
    (7, "Brussels")],
    ["row_id", "city"])

In [37]:
stringIndexer = StringIndexer(inputCol = "city", outputCol = "cityIndex")
model = stringIndexer.fit(catDF)
indexedDF = model.transform(catDF)

indexedDF.show()

+------+--------+---------+
|row_id|    city|cityIndex|
+------+--------+---------+
|     0|   Milan|      0.0|
|     1|  London|      3.0|
|     2|Brussels|      1.0|
|     3|   Milan|      0.0|
|     4|   Paris|      2.0|
|     5|   Paris|      2.0|
|     6|   Milan|      0.0|
|     7|Brussels|      1.0|
+------+--------+---------+



In [38]:
# produce a vector with OneHotEncoder
encoder = OneHotEncoder(inputCol = "cityIndex", outputCol = "cityVec")
encoder.setDropLast(False)
encodedDF = encoder.transform(indexedDF)
encodedDF.show()

+------+--------+---------+-------------+
|row_id|    city|cityIndex|      cityVec|
+------+--------+---------+-------------+
|     0|   Milan|      0.0|(4,[0],[1.0])|
|     1|  London|      3.0|(4,[3],[1.0])|
|     2|Brussels|      1.0|(4,[1],[1.0])|
|     3|   Milan|      0.0|(4,[0],[1.0])|
|     4|   Paris|      2.0|(4,[2],[1.0])|
|     5|   Paris|      2.0|(4,[2],[1.0])|
|     6|   Milan|      0.0|(4,[0],[1.0])|
|     7|Brussels|      1.0|(4,[1],[1.0])|
+------+--------+---------+-------------+



In [39]:
row = encodedDF.collect()[0]
row['cityVec'].toArray()

array([ 1.,  0.,  0.,  0.])

In [40]:
row = encodedDF.collect()[1]
row['cityVec'].toArray()

array([ 0.,  0.,  0.,  1.])

In [41]:
row = encodedDF.collect()[2]
row['cityVec'].toArray()

array([ 0.,  1.,  0.,  0.])

In [42]:
row = encodedDF.collect()[4]
row['cityVec'].toArray()

array([ 0.,  0.,  1.,  0.])

In [43]:
catDF = spark.createDataFrame([
    (0, "Milan", "fashion"),
    (1, "London", "fashion"),
    (2, "Brussels", "beer"),
    (3, "Milan", "clothes"),
    (4, "Paris", "wine"),
    (5, "Paris", "books"),
    (6, "Milan", "wine"),
    (7, "Brussels", "books")],
    ["row_id", "city", "category"])

In [44]:
stringIndexer = StringIndexer(inputCol = "city", outputCol = "cityIndex")
model = stringIndexer.fit(catDF)
indexedDF = model.transform(catDF)

stringIndexer2 = StringIndexer(inputCol = "category", outputCol = "categoryIndex")
model2 = stringIndexer2.fit(indexedDF)
indexedDF2 = model2.transform(indexedDF)

In [45]:
encoder = OneHotEncoder(inputCol = "cityIndex", outputCol = "cityVec")
encoder.setDropLast(False)
encodedDF = encoder.transform(indexedDF2)

encoder2 = OneHotEncoder(inputCol = "categoryIndex", outputCol = "categoryVec")
encoder2.setDropLast(False)
encodedDF2 = encoder2.transform(encodedDF)
encodedDF2.select('city', 'category', 'cityIndex', 'categoryIndex', 'cityVec', 'categoryVec').show(truncate = False)

+--------+--------+---------+-------------+-------------+-------------+
|city    |category|cityIndex|categoryIndex|cityVec      |categoryVec  |
+--------+--------+---------+-------------+-------------+-------------+
|Milan   |fashion |0.0      |0.0          |(4,[0],[1.0])|(5,[0],[1.0])|
|London  |fashion |3.0      |0.0          |(4,[3],[1.0])|(5,[0],[1.0])|
|Brussels|beer    |1.0      |3.0          |(4,[1],[1.0])|(5,[3],[1.0])|
|Milan   |clothes |0.0      |4.0          |(4,[0],[1.0])|(5,[4],[1.0])|
|Paris   |wine    |2.0      |2.0          |(4,[2],[1.0])|(5,[2],[1.0])|
|Paris   |books   |2.0      |1.0          |(4,[2],[1.0])|(5,[1],[1.0])|
|Milan   |wine    |0.0      |2.0          |(4,[0],[1.0])|(5,[2],[1.0])|
|Brussels|books   |1.0      |1.0          |(4,[1],[1.0])|(5,[1],[1.0])|
+--------+--------+---------+-------------+-------------+-------------+



In [46]:
from pyspark.ml.feature import VectorAssembler

In [47]:
assembler = VectorAssembler(
    inputCols = ["cityVec", "categoryVec"],
    outputCol = "totalVec")

finalDF = assembler.transform(encodedDF2)

In [48]:
finalDF.select('city', 'category', 'cityVec', 'categoryVec', 'totalVec').show(truncate = True)

+--------+--------+-------------+-------------+-------------------+
|    city|category|      cityVec|  categoryVec|           totalVec|
+--------+--------+-------------+-------------+-------------------+
|   Milan| fashion|(4,[0],[1.0])|(5,[0],[1.0])|(9,[0,4],[1.0,1.0])|
|  London| fashion|(4,[3],[1.0])|(5,[0],[1.0])|(9,[3,4],[1.0,1.0])|
|Brussels|    beer|(4,[1],[1.0])|(5,[3],[1.0])|(9,[1,7],[1.0,1.0])|
|   Milan| clothes|(4,[0],[1.0])|(5,[4],[1.0])|(9,[0,8],[1.0,1.0])|
|   Paris|    wine|(4,[2],[1.0])|(5,[2],[1.0])|(9,[2,6],[1.0,1.0])|
|   Paris|   books|(4,[2],[1.0])|(5,[1],[1.0])|(9,[2,5],[1.0,1.0])|
|   Milan|    wine|(4,[0],[1.0])|(5,[2],[1.0])|(9,[0,6],[1.0,1.0])|
|Brussels|   books|(4,[1],[1.0])|(5,[1],[1.0])|(9,[1,5],[1.0,1.0])|
+--------+--------+-------------+-------------+-------------------+

