<img src="uva_seal.png">  

## Machine Learning with MLlib
## *Introduction and Feature Extraction*

### University of Virginia
### DS 5110: Big Data Systems
### Last Updated: February 15, 2022

---  

### SOURCES 

1. Learning Spark
2. Spark Documentation  
	https://spark.apache.org/docs/latest/mllib-data-types.html  
	http://spark.apache.org/docs/1.2.0/mllib-feature-extraction.html

### OBJECTIVES
1. Introduction to the machine learning library
2. Introduction to MLlib data types
3. Discuss Feature Extraction tools in MLLib


### CONCEPTS AND FUNCTIONS
- pipeline  
- supervised and unsupervised learning  
- learning tasks: classification, regression, clustering, dimensionality reduction  
- training set, testing set  
- feature extraction  

- MLlib data types:  
  - LabeledPoint  
  - sparse vector, dense vector  
  - sparse matrix, dense matrix  
  - Rating  

- Feature Extraction  
- TF-IDF  
- Word2Vec  
- Cosine Similarity  


---  

**Supervised Learning vs Unsupervised Learning**  

In *supervised learning* tasks, each observation has a label or ground truth indicating the correct answer.  
Unsupervised learning tasks do NOT have this label. Most data in the wild does not have the label.

---  

**Machine Learning in Spark**  
Spark MLlib is the library for machine learning.  There are two interfaces:

1) A newer DataFrame-based API which is being actively built out

2) An older RDD-based API which is still maintained, but it is not growing  
  For supervised learning tasks, the RDD API uses a `LabeledPoint` object to bundle labels with predictors.
  
  For unsupervised learning tasks, since there is no label, the `LabeledPoint` object is not used.  
  Examples of unsupervised learning tasks include clustering methods like k-means.

Some functionality is only available in the RDD-based API.  
We will discuss both APIs in this course. 
---  

**Pipelines**

MLlib includes a pipeline API useful for building ML pipelines, similar to scikit-learn in Python.  It is HIGHLY recommended that you use pipelines.  They encapsulate the process, reducing the chance of errors, and making the scoring process simple.  More on pipelines later.

Next, we jump right in, building a classifier and making predictions. You might not yet know about objects like `LabeledPoint`, but this should be fun and motivating!

**Game Plan**

We will begin with the RDD interface, and then transition to the DataFrame interface.  
The remainder of this notebook uses the RDD interface.

### Build LogReg Classifier to Predict Spam vs Not

In [1]:
# IMPORT MODULES
import os
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
        .master("local") \
        .appName("mllib_classifier") \
        .getOrCreate()

In [3]:
spark

In [5]:
sc = spark.sparkContext

In [6]:
# read in spam and ham (not spam) data
spam = sc.textFile("spam.txt")
ham = sc.textFile("ham.txt")

In [7]:
spam.collect()[0]

'Dear sir, I am a Prince in a far kingdom you have not heard of.  I want to send you money via wire transfer so please ...'

In [8]:
# note you wouldn't collect to driver if RDD was massive
spam.collect()

['Dear sir, I am a Prince in a far kingdom you have not heard of.  I want to send you money via wire transfer so please ...',
 'Get Viagra real cheap!  Send money right away to ...',
 'Oh my gosh you can be really strong too with these drugs found in the rainforest. Get them cheap right now ...',
 'YOUR COMPUTER HAS BEEN INFECTED!  YOU MUST RESET YOUR PASSWORD.  Reply to this email with your password and SSN ...',
 'THIS IS NOT A SCAM!  Send money and get access to awesome stuff really cheap and never have to ...']

In [9]:
ham.collect()

['Dear Spark Learner, Thanks so much for attending the Spark Summit 2014!  Check out videos of talks from the summit at ...',
 'Hi Mom, Apologies for being late about emailing and forgetting to send you the package.  I hope you and bro have been ...',
 'Wow, hey Fred, just heard about the Spark petabyte sort.  I think we need to take time to try it out immediately ...',
 'Hi Spark user list, This is my first question to this list, so thanks in advance for your help!  I tried running ...',
 "Thanks Tom for your email.  I need to refer you to Alice for this one.  I haven't yet figured out that part either ...",
 'Good job yesterday!  I was attending your talk, and really enjoyed it.  I want to try out GraphX ...',
 'Summit demo got whoops from audience!  Had to let you know. --Joe']

In [3]:
# set up a Term Frequency object using the hashing trick
tf = HashingTF(numFeatures = 10000)

In [11]:
# tokenize the datasets, parsing on spaces
spamFeatures = spam.map(lambda email: tf.transform(email.split(" ")))
normalFeatures = ham.map(lambda email: tf.transform(email.split(" ")))

In [12]:
spamFeatures.take(2)

[SparseVector(10000, {0: 1.0, 365: 1.0, 455: 1.0, 509: 1.0, 1320: 1.0, 1363: 2.0, 1583: 1.0, 2321: 2.0, 2403: 1.0, 3289: 2.0, 3342: 1.0, 4995: 1.0, 5336: 1.0, 5706: 1.0, 5831: 1.0, 6052: 1.0, 6300: 1.0, 6582: 1.0, 6744: 1.0, 8971: 1.0, 8977: 1.0, 9232: 1.0, 9604: 1.0, 9646: 1.0, 9878: 1.0}),
 SparseVector(10000, {0: 1.0, 365: 1.0, 940: 1.0, 2220: 1.0, 3122: 1.0, 4460: 1.0, 4671: 1.0, 5336: 1.0, 5849: 1.0, 8479: 1.0, 9604: 1.0})]

In [None]:
normalFeatures.take(2)

---
**Examining the *HashingTF* example**

Recall we selected vocab size 10K, so sparse vectors will have this length

In [5]:
doc1 = 'cat the tabby cat'
doc2 = 'the siamese cat'

# tokenize the docs and hash them to term frequencies
print(tf.transform(doc1.split(" ")))
print(tf.transform(doc2.split(" ")))    

(10000,[4511,4946,9026],[1.0,1.0,2.0])
(10000,[4946,9026,9421],[1.0,1.0,1.0])


Each document is represented as a vector.  
Only words appearing in the doc are reflected in the sparse vector.  
Notice for the common words "the" and "cat", there are location matches between the document representations.  
As 'cat' appears twice in the first doc, this tells us the location of the word 'cat'.   
You can work out which locations store the remaining words.  
The analyst won't need to apply this mapping in practice, but it is instructive.

---

Build LabeledPoint datasets (1=spam, 0=ham)  
LabeledPoints package (label, features) for each record

In [14]:
positiveExamples = spamFeatures.map(lambda features: LabeledPoint(1, features))
negativeExamples = normalFeatures.map(lambda features: LabeledPoint(0, features))

In [15]:
pos = positiveExamples.collect()

In [16]:
pos[0]

LabeledPoint(1.0, (10000,[0,365,455,509,1320,1363,1583,2321,2403,3289,3342,4995,5336,5706,5831,6052,6300,6582,6744,8971,8977,9232,9604,9646,9878],[1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))

In [17]:
neg = negativeExamples.collect()

In [18]:
neg[0]

LabeledPoint(0.0, (10000,[0,1162,2403,2809,3080,3317,4161,4770,5423,5651,5743,5831,6006,6827,6971,7069,7872,9150,9370,9521,9604],[1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))

In [19]:
# build training set; this stacks positive and negative records
trainData = positiveExamples.union(negativeExamples)

# cache since model training is recursive; o.w. would rebuild DataFrame
trainData.cache()

UnionRDD[7] at union at NativeMethodAccessorImpl.java:0

In [20]:
# train LogReg model using default params
model = LogisticRegressionWithSGD.train(trainData, iterations=1000)

In [21]:
# push "not spam" example through classifier. this is label=0
Test = tf.transform("I love learning Spark programming".split(" "))

In [22]:
model.predict(Test)

0

In [None]:
# Prediction
print("Prediction for example: {}".format(model.predict(Test)))
if model.predict(Test)==0:
    print("CORRECT!")
else:
    print("INCORRECT!")

### Definitions

Next we define the `MLlib` objects.

**LabeledPoint**  
Stores feature vector together with label  

**Rating**  
Rating of product by a user. Used in recommendation, for instance.  

**Vector**  
Handles dense and sparse. For sparse, only nonzero values and their indices are stored, along w vector length.  
Sparse saves on memory and runtime.  

**Matrix**  
A local matrix has integer-typed row and column indices and double-typed values, stored on a single   machine.  
MLlib supports dense matrices, whose entry values are stored in a single double array in column-major order, and sparse matrices, whose non-zero entry values are stored in the Compressed Sparse Column (CSC) format in column-major order.  

**Distributed matrix**  
A distributed matrix has long-typed row and column indices and double-typed values  

**Row matrix**  
A RowMatrix is a row-oriented distributed matrix without meaningful row indices  

**CoordinateMatrix**  
CoordinateMatrix is a distributed matrix backed by an RDD of its entries  
A CoordinateMatrix should be used only when both dimensions of the matrix are huge and the matrix is very sparse.

Take a look at this wiki to learn about row- versus column-major order.  It is super important to know how the data is saved.  Could you imagine what would happen to results if this were mixed up?

https://en.wikipedia.org/wiki/Row-_and_column-major_order

### Feature Extraction

*mllib.feature*  
contains classes for common feature transformations:  
-  Term Frequency-Inverse Document Frequency (TF-IDF)  
Produces feature vectors from text documents

There are two algorithms that compute TF-IDF:  

**1. HashingTF**  
	Computes term frequency vector from document  
	Can process one document or an RDD of documents  
	Each document needs to be an interable sequence (a list in Python)  

To reduce the chance of collision, we can increase the target feature dimension, i.e., the  
	 number of buckets of the hash table. The default feature dimension is 1,048,576  

**2. IDF**  
	Computes inverse document frequency  
	Terms that appear in high fraction of the docs are not as valuable  
	IDF will downweight such terms  

Here is a good example of Feature Extraction:  
http://spark.apache.org/docs/1.2.0/mllib-feature-extraction.html

**Word2Vec**  
Computes distributed vector representation of words.  
Similar words are close in the vector space  
Useful in many NLP applications:  
named entity recognition, disambiguation, parsing, tagging and machine translation.  

The algorithm uses a neural network and some interesting concepts like the *hierarchical softmax*.  I encourage you to learn more if you have the time and interest.

### Fit Word2VecModel to some text data

In [None]:
from pyspark.mllib.feature import Word2Vec

inp = sc.textFile("fed_rates_article.txt").map(lambda row: row.split(" "))
topk = 5
print('First {} records:'.format(topk))
first_five = inp.take(topk)
for i in range(topk):
    print(first_five[i])
print("-----------------")
                        
word2vec = Word2Vec() # construct Word2Vec object
model = word2vec.fit(inp) # train Word2Vec on the dasta

# apply Word2Vec to find synonyms by representing words as vectors
synonyms = model.findSynonyms('rate', 20)

for word, cosine_distance in synonyms:
    print("{}: {}".format(word, cosine_distance))

**StandardScaler**   

Standardization can improve the convergence rate during the optimization process, and it also prevents against features with very large variances exerting an overly large influence during model   training.  

For each feature,  
1. Scales to unit variance  
2. Centers to mean zero  
Useful or even essential for some models  

`K-means` works in Euclidean space, so all features should be on same scale  

Tree models do not need this

Use this in a *Pipeline* so the statistics can be applied to datasets for scoring later. You would NOT compute means and standard deviations on the scoring set to standardize.

### Standard Scaler  
Load dataset in libsvm format, standardize the features so that the new features have unit variance and/or zero mean

In [None]:
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler

In [None]:
data = MLUtils.loadLibSVMFile(sc, "sample_libsvm_data.txt")

In [None]:
data.take(1)

In [None]:
type(data)

In [None]:
# extract labels and features; stored as RDDs
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)

In [None]:
scaler1 = StandardScaler().fit(features)

In [None]:
data.take(2)

In [None]:
scaler1.transform(features).take(2)

In [None]:
# data1 will be unit variance.
data1 = label.zip(scaler1.transform(features))

In [None]:
data1.take(2)

**TRY FOR YOURSELF (UNGRADED EXERCISES)**

1) Print the label and features (before scaling) from the first record in *data*.

2) Compute the first 20 synonyms of the word "economy." Then extract and print the cosine distances.  Do the results make sense?

3) Copy the Ham/Spam classifier code in the cell below.  Then try a different model, leaving the rest of the code unchanged.  Run the code.  Does it get the "not spam" example right?