# COM6012 Scalable Machine Learning 2019 - Haiping Lu
# Lab 5: PCA for dimensionality reduction

## Objectives

* Task 1: To finish in the lab session. **Essential**
* Task 2: To finish in the lab session. **Essential**
* Task 3: To explore by yourself. **Optional but recommended**
* Task 4: To explore by yourself. **Optional but recommended**


**Suggested reading**: 
* [Extracting, transforming and selecting features](https://spark.apache.org/docs/2.3.2/ml-features.html)
* [PCA in Spark](https://spark.apache.org/docs/2.3.2/ml-features.html#pca)
* [SVD in Spark: RDD-based only `pyspark.mllib`](https://spark.apache.org/docs/2.3.2/mllib-dimensionality-reduction.html#singular-value-decomposition-svd), **not available** yet in DataFrame-based `pyspark.ml`
* [StandardScaler in Spark](https://spark.apache.org/docs/2.3.2/ml-features.html#standardscaler) to standardise/normalise data to unit standard deviation and/or zero mean.
* [Data Types - RDD-based API](https://spark.apache.org/docs/2.3.2/mllib-data-types.html)
* [PCA on Wiki](https://en.wikipedia.org/wiki/Principal_component_analysis)
* [Understanding Dimension Reduction with Principal Component Analysis (PCA)](https://blog.paperspace.com/dimension-reduction-with-principal-component-analysis/)
* [Principal Component Analysis explained on Kaggle](https://www.kaggle.com/nirajvermafcb/principal-component-analysis-explained) with data available [here](https://www.kaggle.com/liujiaqi/hr-comma-sepcsv), and background info [here](https://rstudio-pubs-static.s3.amazonaws.com/345463_37f54d1c948b4cdfa181541841e0db8a.html)

[**A new cheat sheet for PySpark (2 page version)**](https://github.com/runawayhorse001/CheatSheet/blob/master/cheatSheet_pyspark.pdf): **Highly recommended. Very handy.** 

https://github.com/haipinglu/ScalableML/

## A. Ways to deal with out of space/memory errors

### A1. Configure spark properly to solve the problem

Let us take a look at the configuration of the Spark application properties [here (the table)](https://spark.apache.org/docs/2.3.2/configuration.html). There are also several good references: [set spark context;](https://stackoverflow.com/questions/30560241/is-it-possible-to-get-the-current-spark-context-settings-in-pyspark); [set **driver memory**](https://stackoverflow.com/questions/53606756/how-to-set-spark-driver-memory-in-client-mode-pyspark-version-2-3-1); [set **local dir**](https://stackoverflow.com/questions/40372655/how-to-set-spark-local-dir-property-from-spark-shell); [a discussion at databricks](https://forums.databricks.com/questions/11856/spark-job-fails-storagediskblockobjectwriter-uncau.html)
 
In shell, we can check (customised) config via sc: `sc._conf.getAll() (defaults are not shown)`

#### No Space Left problem. 

To solve it, we can set the `spark.local.dir` in notebook or `xxx.py` by, e.g.,
~~~~~
spark = SparkSession.builder \
    .master("local[2]") \
    .config("spark.local.dir","/fastdata/username") \ #replace username with your own username
    .appName("COM6012 Collaborative Filtering RecSys") \
    .getOrCreate()
~~~~~
**Note: you need to replace username with your own username**. We may also do it via `spark-submit` (see below), which is more certain to work (if the above doesn't). Again, notebook is for **small scale** development.

#### Out of memory problem
Note the default driver memory (in the configuration table above), i.e., `spark.driver.memory`, is 1G so even if you requested more memory, there can be out of memory problem due to this setting (read the setting description for `spark.driver.memory`). Similar for other memory-related settings.

This setting can be changed by setting the option, e.g., `spark-submit --driver-memory 8g AS1Q2.py` or increasing to 16G (you need to request more memory, e.g., starting a session via `qrshx -l rmem=32G`. We can also set this in shell (see references above) but some said that it is too late to change the memory after JVM starts and **for big data** (even if you see it shows new settings), it is safer to use **spark-submit** to specify such options (like the first sentence of this paragraph).

#### To change more configurations
You may search for example usage, an example that we used before **for very big data** is here for your reference only: `spark-submit --driver-memory 40g --executor-memory 2g --master local[10] --conf spark.driver.maxResultSize=4g test.py`. We can specify all information like `local.dir`, etc. similarly. 

### A2: work on data rather than home directory

When starting to work on bigger data, we recommend you to use `/home/username` for setting up environment (i.e., system files/data etc), and use `/data/username` for COM6012 works. After logging in, do `cd /data/username` to work under this directory. Of course, you also need to move all files to there.

### A3: conda clean

Do a cleaning via `conda clean --all` or `conda clean --yes --all` after loading conda. You may need to do this regularly. A related discussion is [here](https://stackoverflow.com/questions/36308531/how-to-uninstall-all-unused-packages-in-a-conda-virtual-environment). 

## Start

If running this notebook on HPC via [Jupyter Hub](https://jupyter-sharc.shef.ac.uk/), we need to run the following cell. If we are running this notebook on our local machine, skip the following cell.

In [None]:
import os
import subprocess
def module(*args):        
    if isinstance(args[0], list):        
        args = args[0]        
    else:        
        args = list(args)        
    (output, error) = subprocess.Popen(['/usr/bin/modulecmd', 'python'] + args, stdout=subprocess.PIPE).communicate()
    exec(output)    
module('load', 'apps/java/jdk1.8.0_102/binary')    
os.environ['PYSPARK_PYTHON'] = os.environ['HOME'] + '/.conda/envs/jupyter-spark/bin/python'

In [1]:
#import findspark
#findspark.init()
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[2]") \
    .appName("COM6012 PCA") \
    .getOrCreate()

sc = spark.sparkContext

## 1. Data Types: Local vs Distributed, Dense vs Sparse, Labelled vs Unlabelled

To deal with data efficiently, Spark considers different [data types](https://spark.apache.org/docs/2.3.2/mllib-data-types.html).

### Local vector:  Dense vs Sparse
> A local vector has integer-typed and 0-based indices and double-typed values, stored on a single machine. MLlib supports two types of local vectors: dense and sparse. A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values. For example, a vector (1.0, 0.0, 3.0) can be represented in dense format as [1.0, 0.0, 3.0] or in sparse format as (3, [0, 2], [1.0, 3.0]), where 3 is the size of the vector.

Please check out the [Vector in RDD API](https://spark.apache.org/docs/2.3.2/api/python/pyspark.mllib.html#pyspark.mllib.linalg.Vectors) or [Vector in DataFrame API](https://spark.apache.org/docs/2.3.2/api/python/pyspark.ml.html#pyspark.ml.linalg.Vector)(see method `.Sparse()`) and [SparseVector in RDD API ](https://spark.apache.org/docs/2.3.2/api/python/pyspark.mllib.html#pyspark.mllib.linalg.SparseVector) or [SparseVector in DataFrame API ](https://spark.apache.org/docs/2.3.2/api/python/pyspark.ml.html#pyspark.ml.linalg.SparseVector). The official example is below

In [2]:
import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors

# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])

Note the vector created by `Vectors.sparse()` is of type `SparseVector()`

In [3]:
sv1

SparseVector(3, {0: 1.0, 2: 3.0})

View the sparse vector in a dense format

In [4]:
sv1.toArray()

array([1., 0., 3.])

### Labeled point
> A labeled point is a local vector, either dense or sparse, associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms. We use a double to store a label, so we can use labeled points in both regression and classification. For binary classification, a label should be either 0 (negative) or 1 (positive). For multiclass classification, labels should be class indices starting from zero: 0, 1, 2, ....

[LabeledPoint API](https://spark.apache.org/docs/2.3.2/api/python/pyspark.mllib.html#pyspark.mllib.regression.LabeledPoint)

In [5]:
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint

# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])

# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))

In [6]:
print(neg)

(0.0,(3,[0,2],[1.0,3.0]))


In [7]:
neg.label

0.0

In [8]:
neg.features

SparseVector(3, {0: 1.0, 2: 3.0})

View the features as dense vector (rather than sparse vector)

In [9]:
neg.features.toArray()

array([1., 0., 3.])

### Local matrix
> A local matrix has integer-typed row and column indices and double-typed values, stored on a single machine. MLlib supports dense matrices, whose entry values are stored in a single double array in column-major order, and sparse matrices, whose non-zero entry values are stored in the Compressed Sparse Column (CSC) format in column-major order. For example, the following dense matrix

In [10]:
from pyspark.mllib.linalg import Matrix, Matrices

# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 3, 5, 2, 4, 6]) #Note the official example is wrong

# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])

In [11]:
print(dm2)

DenseMatrix([[1., 2.],
             [3., 4.],
             [5., 6.]])


In [12]:
print(sm)

3 X 2 CSCMatrix
(0,0) 9.0
(2,1) 6.0
(1,1) 8.0


See the [Compressed sparse column (CSC or CCS) format](https://en.wikipedia.org/wiki/Sparse_matrix#Compressed_sparse_column_(CSC_or_CCS))
> values are read first by column, a row index is stored for each value, and column pointers are stored. For example, CSC is (val, row_ind, col_ptr), where val is an array of the (top-to-bottom, then left-to-right) non-zero values of the matrix; row_ind is the row indices corresponding to the values; and, col_ptr is the list of val indexes where each column starts. 

In [13]:
dsm=sm.toDense()
print(dsm)

DenseMatrix([[9., 0.],
             [0., 8.],
             [0., 6.]])


### Distributed matrix

> A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive. Four types of distributed matrices have been implemented so far.

#### RowMatrix
> The basic type is called RowMatrix. A RowMatrix is a row-oriented distributed matrix without meaningful row indices, e.g., a collection of feature vectors. It is backed by an RDD of its rows, where each row is a local vector. We assume that the number of columns is not huge for a RowMatrix so that a single local vector can be reasonably communicated to the driver and can also be stored / operated on using a single node.
> Since each row is represented by a local vector, the number of columns is limited by the integer range but it should be much smaller in practice.

In [14]:
from pyspark.mllib.linalg.distributed import RowMatrix

# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])

# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)

# Get its size.
m = mat.numRows()  # 4
n = mat.numCols()  # 3

# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows

View the RowMatrix in a dense matrix format

In [15]:
rowsRDD.collect()

[DenseVector([1.0, 2.0, 3.0]),
 DenseVector([4.0, 5.0, 6.0]),
 DenseVector([7.0, 8.0, 9.0]),
 DenseVector([10.0, 11.0, 12.0])]

## 2. PCA

[Principal component analysis](http://en.wikipedia.org/wiki/Principal_component_analysis) (PCA) is a statistical procedure that uses an orthogonal transformation to convert a set of observations of possibly correlated variables (entities each of which takes on various numerical values) into a set of values of linearly uncorrelated variables called **principal components (PCs)**. A PCA class trains a model to project vectors to a low-dimensional space using PCA and this is probably the most commonly used **dimensionality reduction** method.



### PCA in DataFrame-based API `pyspark.ml`  

Check out the [API](https://spark.apache.org/docs/2.3.2/api/python/pyspark.ml.html#pyspark.ml.feature.PCA). Check `pyspark.ml.feature.PCAModel` too to see what is available for the fitted model. The official example of projecting 5-dimensional feature vectors into 3-dimensional principal components.

In [16]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)

  return f(*args, **kwds)


+-----------------------------------------------------------+
|pcaFeatures                                                |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+



In [17]:
df.show()

+--------------------+
|            features|
+--------------------+
| (5,[1,3],[1.0,7.0])|
|[2.0,0.0,3.0,4.0,...|
|[4.0,0.0,0.0,6.0,...|
+--------------------+



Check the explained variance in percentage

In [18]:
model.explainedVariance

DenseVector([0.7944, 0.2056, 0.0])

The variance should sum to one.

In [19]:
sum(model.explainedVariance)

0.9999999999999999

Take a look at the principal components Matrix. Each column is one principal component.

The **PC** obtained by this method.

In [20]:
print(model.pc)

DenseMatrix([[-0.44859172, -0.28423808,  0.08344545],
             [ 0.13301986, -0.05621156,  0.04423979],
             [-0.12523156,  0.76362648, -0.57807123],
             [ 0.21650757, -0.56529588, -0.79554051],
             [-0.84765129, -0.11560341, -0.15501179]])


### PCA in RDD-based API `pyspark.mllib` 

#### Eigendecomposition for PCA

`pyspark.mllib` supports PCA for **tall-and-skinny** (big $n$, small $d$) matrices stored in row-oriented format and any Vectors. We demonstrate how to compute principal components on a [<tt>RowMatrix</tt>](http://spark.apache.org/docs/latest/mllib-data-types.html#rowmatrix) and use them to project the vectors into a low-dimensional space in the cell below.

In [21]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix

rows = sc.parallelize([
    Vectors.sparse(5, {1: 1.0, 3: 7.0}),
    Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
    Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])

mat = RowMatrix(rows)
# Compute the top 3 principal components.
# Principal components are stored in a local dense matrix.
pc = mat.computePrincipalComponents(3)

# Project the rows to the linear space spanned by the top 4 principal components.
projected = mat.multiply(pc)

In [22]:
rows.collect()

[SparseVector(5, {1: 1.0, 3: 7.0}),
 DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]),
 DenseVector([4.0, 0.0, 0.0, 6.0, 7.0])]

Now we convert to dense rows to see the matrix

In [23]:
from pyspark.mllib.linalg import DenseVector

denseRows = rows.map(lambda vector: DenseVector(vector.toArray()))
denseRows.collect()

[DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]),
 DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]),
 DenseVector([4.0, 0.0, 0.0, 6.0, 7.0])]

The **PC** obtained by this method (the same as the above).

In [24]:
print(pc)

DenseMatrix([[-0.44859172, -0.28423808,  0.08344545],
             [ 0.13301986, -0.05621156,  0.04423979],
             [-0.12523156,  0.76362648, -0.57807123],
             [ 0.21650757, -0.56529588, -0.79554051],
             [-0.84765129, -0.11560341, -0.15501179]])


In [25]:
projected.rows.collect()

[DenseVector([1.6486, -4.0133, -5.5245]),
 DenseVector([-4.6451, -1.1168, -5.5245]),
 DenseVector([-6.4289, -5.338, -5.5245])]

The same as above (see `pcaFeatures` output).

#### SVD for PCA  - more **scalable** way to do PCA

Please read [SVD in RDD-based API `pyspark.mllib`](https://spark.apache.org/docs/2.3.2/mllib-dimensionality-reduction.html#singular-value-decomposition-svd). As covered in the lecture, we will need SVD for PCA on large-scale data. Here, we use it on the same small toy example to examine the relationship with eigenvalue decomposition based PCA methods above.

In [26]:
# Compute the top 5 singular values and corresponding singular vectors.
svd = mat.computeSVD(3, computeU=True)
U = svd.U       # The U factor is a RowMatrix.
s = svd.s       # The singular values are stored in a local dense vector.
V = svd.V       # The V factor is a local dense matrix.

If we are doing it right, the right singular vectors should be the same as the eigenvectors.

In [27]:
print(V)

DenseMatrix([[-0.31278534,  0.31167136,  0.30366911],
             [-0.02980145, -0.17133211, -0.02226069],
             [-0.12207248,  0.15256471, -0.95070998],
             [-0.71847899, -0.68096285, -0.0172245 ],
             [-0.60841059,  0.62170723,  0.05606596]])


But it is **not the same**! Why? We need to do **centering**! We can do so use the [StandardScaler (check out the API](https://spark.apache.org/docs/2.3.2/mllib-feature-extraction.html#standardscaler))

In [28]:
from pyspark.mllib.feature import StandardScaler

#We center the data to remove the mean. 
standardizer = StandardScaler(True, False)

In [29]:
model = standardizer.fit(rows)
centeredRows = model.transform(rows)
centeredRows.collect()

[DenseVector([-2.0, 0.6667, -1.0, 1.3333, -4.0]),
 DenseVector([0.0, -0.3333, 2.0, -1.6667, 1.0]),
 DenseVector([2.0, -0.3333, -1.0, 0.3333, 3.0])]

In [30]:
centeredmat = RowMatrix(centeredRows)

# Compute the top 3 singular values and corresponding singular vectors.
svd = centeredmat.computeSVD(3, computeU=True)
U = svd.U       # The U factor is a RowMatrix.
s = svd.s       # The singular values are stored in a local dense vector.
V = svd.V       # The V factor is a local dense matrix.

Check the **PC** obtained this time (it is the same as the above PCA methods now)

In [31]:
print(V)

DenseMatrix([[-0.44859172, -0.28423808, -0.81664677],
             [ 0.13301986, -0.05621156, -0.03622012],
             [-0.12523156,  0.76362648, -0.34267361],
             [ 0.21650757, -0.56529588, -0.13898906],
             [-0.84765129, -0.11560341,  0.44162541]])


In [32]:
print(s)

[6.001041088520536,3.0530049438580336,5.017337590814169e-08]


We get the eigenvalues by taking squares of the singular values

In [33]:
evs=s*s
evs

DenseVector([36.0125, 9.3208, 0.0])

Now we compute the percentage of variance captures and compare with the above to verify (see/search `model.explainedVariance`).

In [34]:
evs/sum(evs)

DenseVector([0.7944, 0.2056, 0.0])

## 3. Exercise (Optional but recommended, completing any of the following is considered as completion of this task.)

* The [HR Analytics problem:](https://rstudio-pubs-static.s3.amazonaws.com/345463_37f54d1c948b4cdfa181541841e0db8a.html)  A company is trying to figure out why their best and experienced employees are leaving prematurely from a [dataset](https://www.kaggle.com/liujiaqi/hr-comma-sepcsv). Follow the example [Principal Component Analysis explained on Kaggle](https://www.kaggle.com/nirajvermafcb/principal-component-analysis-explained) to perform such analysis in PySpark, using as many PySpark APIs as possible.
* Follow [Understanding Dimension Reduction with Principal Component Analysis (PCA)](https://blog.paperspace.com/dimension-reduction-with-principal-component-analysis/) to use PCA in inspecting the Iris data in Lab 4.
* Use `pca.fit()`, `computePrincipalComponents()` and `computeSVD` to separately compute the PCs of the same data and compare/verify whether you obtain the same results. Try on your own, and beyond what is covered already above in this notebook.
* Choose a bigger dataset and use the three ways above to separately compute the PCs and compare the time taken. 
* Choose datasets of increasing sizes and use the three ways above to compute the PCs and plot the time cost against the size to study the scalability
* Increase the number of cores used, e.g., setting `local[4]`, `local[6]`, `local[8]`, and study the time cost against the number of cores used.

You can find datasets of your own interest, e.g., face/digit datasets explored in MLAI. You may also use consider using [Bag of Words Data Sets](https://archive.ics.uci.edu/ml/datasets/Bag+of+Words) below in Task 3, which have data sets of various sizes.

## 4. PCA on bigger data (Optional but recommended, completing any of the following is considered as completion of this task.)

### Word meaning extraction

Use PySpark to perform the steps in IBM's notebook on [Spark-based machine learning for word meanings](https://dataplatform.cloud.ibm.com/analytics/notebooks/cd12e0c6-3560-4ad2-9850-82055daada23/view?access_token=4b8b2a7ac1f15b80e93aa12b93b60344368bea0014f2c48a7c6bc45a06a90ef3) that makes use of PCA, kmeans, and Word2Vec to learn work meanings.

### Bag of words analysis

Choose a [Bag of Words Data Set](https://archive.ics.uci.edu/ml/datasets/Bag+of+Words). Let us take  the **NIPS full papers** data as an example. 

The format of this data is

    Number of documents
    Number of words in the vocabulary
    Total number of words in the collection
    docID wordID count
    docID wordID count
    ...
    docID wordID count
    
Our data matrix will be $doc \times wordcount$ Initially, we need to read this data in: the steps in this would be roughly:

1. extract the number of documents, size of the vocabulary and strip off the first 3 lines
2. combine the words per document
3. create sparse vectors (for better space efficiency)

Create this as a standalone program. Keep everything as parallel as possible. You will benefit from creating a very small example to test your work, and then checking **whether** your work scales up to the **NYTIMES** bagofwords data.

### Large image datasets

Find some large-scale image datasets to examine the principal components and explore low-dimensional representations. 