# Natural Language Processing (NLP)

- NLP is a very large field of machine learning that focuses on creating models from a text data source 
- This course is a rapid presentation of tools provided by Spark machine learning library that facilitate constructing these models.  
- Applications of NLP are numerous:

    - Books recommendations
    - News articles clustering
    - Sentimental analysis
    - Spam detection
    - Text generation
    - Language translation
    - chatbots
    - etc. 

## Terminology

### Document
- A document can be an email, a text feedback, an sms, an article, a book, etc.. It is represented as a vector of word counts is called a “Bag of Words

### Corpus
- corpus (plural, "corpora") is a set of documents

## Process
- Compile the corpus and create a DataFrame
- Featurization: it is about preparing the features in order to create the model. This may includes:

    - Tokenization. Spark tools in pyspark.ml.feature: Tokenizer, RegexTokenizer
    - Stop Words Removing. Spark tools in pyspark.ml.feature: StopWordsRemover
    - Using NGrams. Spark tools in pyspark.ml.feature: NGram
    - Numeric Features. Spark tools in pyspark.ml.feature include HashingTF, IDF, CountVectorizer

In [0]:
df= spark.createDataFrame([
    (0, 'Hi Spark is good'),
    (1, 'Java is good choice for beginners'),
    (0, 'In,Spark,you can, do, logistic,Regression;NLP'),
    (0, 'Thanks for using spark. Many corporates will appreciate that!'),
    (1, 'Java is  fine'),
    (1, 'Learn about re on  https://docs.python.org/3/library/re.html'),
    (1, 'This is a quick itroduction to spark ML'),
    (1, 'Java is not the preferred language of Data Scientists') 
], ['label', 'text']
)

In [0]:
df.show(truncate=False)

In [0]:
display(df)

# Start with Tokenizer

In [0]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
tokenizer = Tokenizer(inputCol='text', outputCol='words')

In [0]:
tokenized_df = tokenizer.transform(df)

In [0]:
display(tokenized_df)

In [0]:
tokenized_output = tokenized_df.collect()
for document in tokenized_output:
  for token in document:
    print (token)
 

In [0]:
tokenized_df.head()

In [0]:
tokenized_df.take(1)

In [0]:
from pyspark.sql.functions import udf 
from pyspark.sql.types import IntegerType
count_words = udf(lambda words: len(words), IntegerType())

In [0]:
tokenized_df.withColumn('counts', count_words('words')).display()

# Now let's use RegexTokenizer

In [0]:
regex_tokenizer = RegexTokenizer(inputCol='text', outputCol='words', pattern='\\W')
#regex_tokenizer.setMinTokenLength(4)

## More on regular expressions with Python
More on:
https://docs.python.org/3/library/re.html

### \W
Matches any character which is not a word character. This is the opposite of \w. If the ASCII flag is used this becomes the equivalent of [^a-zA-Z0-9_] (but the flag affects the entire regular expression, so in such cases using an explicit [^a-zA-Z0-9_] may be a better choice).

In [0]:
regex_df = regex_tokenizer.transform(df)

In [0]:
regex_tokenized_counts = regex_df.withColumn('len', count_words('words'))

In [0]:
regex_tokenized_counts.display()

In [0]:
for item in regex_tokenized_counts.collect()[5]:
    print (item)
    
    


In [0]:
display(regex_tokenized_counts)

# With RegexTokenizer, we can remove small words

In [0]:
regex_tokenizer_min_4 = RegexTokenizer(inputCol='text', outputCol='words', pattern='\\W')
regex_tokenizer_min_4.setMinTokenLength(4)
regex_min_4_df = regex_tokenizer_min_4.transform(df)
regex_tokenized_min_4_counts = regex_min_4_df.withColumn('count', count_words('words'))
regex_tokenized_min_4_counts.display()

In [0]:

 for item in regex_tokenized_min_4_counts.collect()[5]:
    print (item)

# Once we have the tokens, we can remove _stop-words_ by using _StopWordsRemover_

In [0]:
from pyspark.ml.feature import StopWordsRemover

In [0]:
remover = StopWordsRemover(inputCol='words', outputCol='tokens')

In [0]:
tokens_filtered = remover.transform(regex_tokenized_counts)

In [0]:
cleanDF= tokens_filtered.withColumn('count_tokens', count_words('tokens'))


In [0]:
display (cleanDF)

In [0]:
cleanDF.select('words', 'len', 'tokens', 'count_tokens').show()

In [0]:
cleanDF.show()

In [0]:
for item in cleanDF.collect()[5]:
   print(item)    

## Defining a custom remove words list

In [0]:
remover.getStopWords()


In [0]:
stopWords=['a', 'is', 'for', 'hi', 'in', 'on']
remover.setStopWords(stopWords)


In [0]:
remover.transform(regex_tokenized_counts).select('text', 'tokens').show(truncate=False)

In [0]:
newCleanDF=remover.transform(regex_tokenized_counts).withColumn('count_tokens', count_words('tokens'))
newCleanDF.show()


In [0]:
for item in newCleanDF.collect()[4]:
   print(item)  

# NGRAM
A feature transformer that converts the input array of strings into an array of n-grams

In [0]:
from pyspark.ml.feature import NGram

In [0]:
display(cleanDF)

In [0]:
cleanDF.collect()[3]

In [0]:
ngram = NGram(n=2, inputCol='tokens', outputCol='2grams')

In [0]:
my_2ngrams =ngram.transform(cleanDF)

In [0]:
display (my_2ngrams)

In [0]:
my_2ngrams.select('2grams').show(truncate =False)

In [0]:
for item in my_2ngrams.collect()[6]:
    print (item)

# TF: Term Frequency
Maps a sequence of terms to their term frequencies using the hashing trick.

Note: the terms must be hashable (can not be dictionary or list...).

HashingTF(S) takes the hash code of each word modulo the desired vector size S, and thus maps each word to a number between 0 and S-1.

This yields a quite robust vector even if multiple words may map to the same hash code. 

Spark Machine Learning developers recommend setting S between $$ 2^{18}    \&    2^{20} $$

## Vector Size and tradeoff:
- S = 1000 
- TF (mot1) = Hash(mot1) % 1000 = 2002 % 1000 = 2
- TF (mot2) = Hash(mot2) % 1000 = 120013 % 1000 = 13
- ...
- ....
- TF (mot1000) = Hash(mot1000) % 1000 = 122002 % 1000 = 2
- TF (mot1) = TF (mot1000) (before frequency)

In [0]:
from pyspark.ml.feature import HashingTF, IDF

In [0]:
cleanDF.show()

In [0]:
tf = HashingTF(numFeatures=1000, inputCol='tokens', outputCol='features')

In [0]:
tf.explainParams()


In [0]:
tf.explainParam('numFeatures')

In [0]:
tf.setNumFeatures(200056)

In [0]:
print (tf.getNumFeatures())

In [0]:
tf_df = tf.transform(cleanDF)

In [0]:
display(tf_df)

In [0]:
for item in tf_df.collect()[3]:
    print (item)

# TF-IDF: Term Frequence - Inverse Document Frequency

Once you have TF vectors, you can use IDF to compute the inverse document frequencies and multiply them with the TF to compute the TF-IDF 

IDF measures how infrequently a term occurs across the whole document corpus

TF x IDF shows how relevant a term is to specific document (i.e., if it is common in that document but rare in the whole corpus)

TF-IDF is used to improve on Bag of Words by adjusting word counts based on their frequency in the corpus

## How to calculate them?
Various ways for determining the exact values of both statistics exist:

- TF(x, y): number of occurences of term x in document y. It represents the importance of a term in the document. 

- IDF(t): Importance of the term in the document. 

$$IDF(t)= log\frac{N}{N(t)}$$
- N: number of documents in the corpus D, N=|D|
- N(t): Number of the documents where the term t appears (i.e: TF(t, d)!=0). N(t)= |{d in D, t in D}|
- TF-IDF(t, d) =TF(t, d)  

## Further Information on TF-IDF
https://fr.wikipedia.org/wiki/TF-IDF

## Preparing Data to TF-IDF
In a real pipeline, you will likely need to preprocess and stem words before passing them to TF.

Ex: convert words to lowercase, drop punctuation characters or drop suffixes like ‘ing’.

You can use external single node natural language libraries like NLTK (http://www.nltk.org)

In [0]:
idf = IDF(inputCol='features', outputCol='idf_features')

In [0]:
idf_model = idf.fit(tf_df)

In [0]:
data=idf_model.transform(tf_df)

In [0]:
data.show()

In [0]:
display(data)

In [0]:
for term in data.collect()[4]:
    print (term)

In [0]:
data.select('tokens').show(truncate = False)

In [0]:
from pyspark.ml.classification import LogisticRegression

In [0]:
ll = LogisticRegression(featuresCol='idf_features', labelCol='label')

In [0]:
train, test = data.randomSplit([0.7, 0.3])
train.cache()
test.cache()

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(metricName='f1', predictionCol='prediction') #  

In [0]:
model = ll.fit(train)
results = model.transform(test)

In [0]:
print('evaluation of logistic regression model = %g'%evaluator.evaluate(results))

# Vectorizing with CountVectorizer (another option)

In [0]:
from pyspark.ml.feature import CountVectorizer

In [0]:
count_vec = CountVectorizer(inputCol='tokens', outputCol='features',  minDF=1)

In [0]:
count_vec.explainParam('maxDF')

In [0]:
model = count_vec.fit(cleanDF)

In [0]:
data = model.transform(cleanDF)

In [0]:
data.select(['tokens', 'features']).show(truncate = False)

In [0]:
count_vec = CountVectorizer(inputCol='tokens', outputCol='features', minDF=2)

In [0]:
help(CountVectorizer)

In [0]:
count_vec.explainParam('minDF')

In [0]:
count_vec_df=count_vec.fit(cleanDF).transform(cleanDF).select('tokens', 'features')

In [0]:
count_vec_df.show(truncate = False)

In [0]:
count_vec_df.printSchema()

In [0]:
count_vec = CountVectorizer(inputCol='tokens', outputCol='features', vocabSize=15, minDF=1)

In [0]:
count_vec.fit(cleanDF).transform(cleanDF).select('tokens', 'features').show(truncate = False)

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression, NaiveBayes, MultilayerPerceptronClassifier, RandomForestClassifier