# <center>Introduction to Spark MLlib with Python</center>
## <center>Data Types</center>
### <center>July 20,2016</center>

<img src = "https://ibm.box.com/shared/static/wfbduwkbx22nx3i2psbp9g27s2p9s86v.png", width="500" align = 'center'>

## <b>Welcome to the first lab in the course, Introduction to Spark MLlib with Python.</b>
### <b>Spark has many libraries, namely under MLlib (Machine Learning Library)! Spark allows for quick and easy scalability of practical machine learning!</b>

In this lab exercise, you will learn about the basic Data Types that are used in Spark MLlib. This lab will help you develop the building blocks required to continue developing knowledge in machine learning with Spark.

### Some Notebook Commands
#### In case you haven't dealt with a Jupyter Notebook before, here are some quick, useful commands that may be handy to get started.
<ul>
    <li>Run a cell: CTRL + ENTER</li>
    <li>Create a cell above a cell: a</li>
    <li>Create a cell below a cell: b</li>
    <li>Change a cell to Markdown: m</li>
    
    <li>Change a cell to code: y</li>
</ul>

<b> If you are interested in more keyboard shortcuts, go to Help -> Keyboard Shortcuts </b>

## Dense and Sparse Vectors

Import the following libraries: <br>
<ul>
    <li> numpy as np </li>
    <li> scipy.sparse as sps </li>
    <li> Vectors from pyspark.mllib.linalg </li>
</ul>

In [2]:
import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors

First, we will be dealing with <b>Dense Vectors</b>. There are 2 types of <b>dense vectors</b> that we can create.<br>
The dense vectors will be modeled having the values: <b>8.0, 312.0, -9.0, 1.3</b>

The first <b>dense vector</b> we will create is as easy as creating a <b>numpy array</b>. <br>
Using the np.array function, create a <b>dense vector</b> called <b>dense_vector1</b> <br> <br>
Note: numpy's array function takes an array as input

In [5]:
dense_vector1 = np.array([8.0, 312.0, -9.0, 1.3])

Print <b>dense_vector1</b> and its <b>type</b>

In [6]:
print dense_vector1
print type(dense_vector1)

[   8.   312.    -9.     1.3]
<type 'numpy.ndarray'>


The second <b>dense vector</b> is easier than the first, and is made by creating an <b>array</b>. <br>
Create a <b>dense vector</b> called <b>dense_vector2</b>

In [7]:
dense_vector2 = [8.0, 312.0, -9.0, 1.3]

Print <b>dense_vector2</b> and its <b>type</b>

In [9]:
print dense_vector2
print type(dense_vector2)

[8.0, 312.0, -9.0, 1.3]
<type 'list'>


Next, we will be dealing with <b>sparse vectors</b>. There are 2 types of <b>sparse vectors</b> we can create. <br>
The sparse vectors we will be creating will follow these values: <b> 7.0, 0.0, 0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 0.0, 6.5 </b>

First, create a <b>sparse vector</b> called <b>sparse_vector1</b> using Vector's <b>sparse</b> function. <br>
Inputs to Vector.sparse: <br>
<ul>
    <li>1st: Size of the sparse vector</li>
    <li>2nd: Indicies of array</li>
    <li>3rd: Values placed where the indices are</li>
</ul>

In [11]:
sparse_vector1 = Vectors.sparse(10, [0, 3, 5, 9], [7.0, 2.0, 1.0, 6.5])

Print <b>sparse_vector1</b> and its <b>type</b>

In [12]:
print sparse_vector1
print type(sparse_vector1)

(10,[0,3,5,9],[7.0,2.0,1.0,6.5])
<class 'pyspark.mllib.linalg.SparseVector'>


Next we will create a <b>sparse vector</b> called <i>sparse_vector2</i> using a single-column SciPy <b>csc_matrix</b> <br> <br>
The inputs to sps.csc_matrix are: <br>
<ul>
    <li>1st: A tuple consisting of the three inputs:</li>
    <ul>
        <li>1st: Data Values (in a numpy array) (values placed at the specified indices)</li>
        <li>2nd: Indicies of the array (in a numpy array) (where the values will be placed)</li>
        <li>3rd: Index pointer of the array (in a numpy array)</li>
    </ul>
    <li>2nd: Shape of the array (#rows, #columns) Use 10 rows and 1 column</li>
    <ul>
        <li>shape = (\_,\_)</li>
    </ul>
</ul> <br>
Note: You may get a deprecation warning. Please Ignore it.

In [13]:
sparse_vector2 = sps.csc_matrix((np.array([7.0,2.0,1.0,6.5]), np.array([0,3,5,9]), np.array([0, 4])), (10, 1))

Print <b>sparse_vector2</b> and its <b>type</b>

In [14]:
print sparse_vector2
print type(sparse_vector2)

  (0, 0)	7.0
  (3, 0)	2.0
  (5, 0)	1.0
  (9, 0)	6.5
<class 'scipy.sparse.csc.csc_matrix'>


## Labeled Points

So the next data type will be Labeled points. Remember that this data type is mainly used for classification algorithms in supervised learning.<br>

Start by importing the following libraries: <br>
<ul>
    <li>SparseVector from pyspark.mllib.linalg</li>
    <li>LabeledPoint from pyspark.mllib.regression</li>
</ul>

In [16]:
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint

Remember that with a lableled point, we can create binary or multiclass classification. In this lab, we will deal with binary classification for ease. <br> <br>
The <b>LabeledPoint</b> function takes in 2 inputs:
<ul>
    <li>1st: Label of the Point. In this case (for binary classification), we will be using <font color="green">1.0</font> for <font color="green">positive</font> and <font color="red">0.0</font> for <font color="red">negative</font></li>
    <li>2nd: Vector of features for the point (We will input a Dense or Sparse Vector using any of the methods defined in the <b>Dense and Sparse Vectors</b> section of this lab.</b>
</ul>

Using the LabelPoint class, create a <b>dense</b> feature vector with a <b>positive</b> label called <b>pos_class</b> with the values: <b>5.0, 2.0, 1.0, 9.0</b>

In [17]:
pos_class = LabeledPoint(features=[5.0, 2.0, 1.0, 9.0], label= 1.0)

Print <b>pos_class</b> and its <b>type</b>

In [18]:
print pos_class
print type(pos_class)

(1.0,[5.0,2.0,1.0,9.0])
<class 'pyspark.mllib.regression.LabeledPoint'>


Next we will create a <b>sparse</b> feature vector with a <b>negative</b> label called <b>neg_class</b> with the values: <b>1.0, 0.0, 0.0, 4.0, 0.0, 2.0</b>

In [19]:
neg_class = LabeledPoint(features=Vectors.sparse(6, [0, 3, 5], [1.0, 4.0, 2.0]), label=0.0)

Print <b>neg_class</b> and its <b>type</b>

In [20]:
print neg_class
print type(neg_class)

(0.0,(6,[0,3,5],[1.0,4.0,2.0]))
<class 'pyspark.mllib.regression.LabeledPoint'>


---
## Matrix Data Types
In this next section, we will be dealing creating the following matrices:
<ul>
    <li>Local Matrix</li>
    <li>Row Matrix</li>
    <li>Indexed Row Matrix</li>
    <li>Coordinate Matrix</li>
    <li>Block Matrix</li>
</ul> <br> <br>

Throughout this section, we will be modelling the following matricies: <br>
<center>For a Dense Matrix:</center> <br> <img src="http://imgur.com/O8a4ZS0.png", align="center"> </img> <br>
<center>For a Sparse Matrix:</center> <br> <img src="http://imgur.com/k0XwOfA.png", align="center"> </img>

## Local Matrix

Import the following Library:
<ul>
    <li>pyspark.mllib.linalg as laMat</li>
</ul>

In [21]:
import pyspark.mllib.linalg as laMat

Create a dense local matrix called <b>dense_LM</b> <br>
The inputs into the <b>laMat.Matrices.dense</b> function are:
<ul>
    <li>1st: Number of Rows</li>
    <li>2nd: Number of Columns</li>
    <li>3rd: Values in an array format (Read as Column-Major)</li>
</ul>

In [22]:
dense_LM = laMat.Matrices.dense(3, 4, [1.0, 3.0, 9.0, 6.0, 2.0, 4.0, 3.0, 5.0, 0.0, 0.0, 1.0, 3.0])

Print <b>dense_LM</b> and its <b>type</b>

In [23]:
print dense_LM
print type(dense_LM)

DenseMatrix([[ 1.,  6.,  3.,  0.],
             [ 3.,  2.,  5.,  1.],
             [ 9.,  4.,  0.,  3.]])
<class 'pyspark.mllib.linalg.DenseMatrix'>


Next we will do the same thing with a sparse matrix, calling the output <b>sparse_LM</b>
The inputs into the <b>laMat.Matrices.sparse</b> function are:
<ul>
    <li>1st: Number of Rows</li>
    <li>2nd: Number of Columns</li>
    <li>3rd: Column Pointers (in a list)</li>
    <li>4th: Row Indices (in a list)</li>
    <li>5th: Values of the Matrix (in a list)</li>
</ul> <br>
<b>Note</b>: Remember that this is <b>column-major</b> so all arrays should be read as columns first (top down, left to right)

In [27]:
sparse_LM = pyspark.mllib.linalg.Matrices.sparse(3, 4, [0, 2, 3, 4, 5], [0, 1, 2, 0, 1], [1.0, 3.0, 4.0, 3.0, 1.0])

Print <b>sparse_LM</b> and its <b>type</b>

In [28]:
print sparse_LM
print type(sparse_LM)

3 X 4 CSCMatrix
(0,0) 1.0
(1,0) 3.0
(2,1) 4.0
(0,2) 3.0
(1,3) 1.0
<class 'pyspark.mllib.linalg.SparseMatrix'>


Make sure the output of <b>sparse_LM</b> matches the original matrix.

## RowMatrix

A RowMatrix is a Row-oriented distributed matrix that doesn't have meaningful row indices.

Import the following library:
<ul>
    <li>RowMatrix from pyspark.mllib.linalg.distributed</li>
</ul>

In [29]:
from pyspark.mllib.linalg.distributed import RowMatrix

Now, let's create a RDD of vectors called <b>rowVecs</b>, using the SparkContext's parallelize function on the <b>Dense Matrix</b>.<br>
The input into <b>sc.parallelize</b> is:
<ul>
    <li>A list (The list we will be creating will be a list of the row values (each row is a list))</li>
</ul> <br>
<b>Note</b>: And RDD is a fault-tolerated collection of elements that can be operated on in parallel. <br>

In [31]:
rowVecs = sc.parallelize([[1.0, 6.0, 3.0, 0.0],
                       [3.0, 2.0, 5.0, 1.0],
                       [9.0, 4.0, 0.0, 3.0]])

Next, create a variable called <b>rowMat</b> by using the <b>RowMatrix</b> function and passing in the RDD.

In [32]:
rowMat = RowMatrix(rowVecs)

Now we will retrieve the <font color="green">row numbers</font> (save it as <font color="green">m</font>) and <font color="blue">column numbers</font> (save it as <font color="blue">n</font>) from the RowMatrix.
<ul>
    <li>To get the number of rows, use <i>numRows()</i> on rowMat</li>
    <li>To get the number of columns, use <i>numCols()</i> on rowMat</li>
</ul>

In [33]:
m = rowMat.numRows()
n = rowMat.numCols()

Print out <b>m</b> and <b>n</b>. The results should be:
<ul>
    <li>Number of Rows: 3</li>
    <li>Number of Columns: 4</li>
</ul>

In [34]:
print m, n

3 4


## IndexedRowMatrix

Since we just created a RowMatrix, which had no meaningful row indicies, let's create an <b>IndexedRowMatrix</b> which has meaningful row indices!

Import the following Library:
<ul>
    <li> IndexedRow, IndexedRowMatrix from pyspark.mllib.linalg.distributed</li>
</ul>

In [35]:
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

Now, create a RDD called <b>indRows</b> by using the SparkContext's parallelize function on the <b>Dense Matrix</b>. <br>
There are two different inputs you can use to create the RDD:
<ul>
    <li>Method 1: A list containing multiple IndexedRow inputs</li>
    <ul>
        <li>Input into IndexedRow:</li>
        <ul>
            <li>1. Index for the given row (row number)</li>
            <li>2. row in the matrix for the given index</li>
        </ul>
        <li>ex. sc.parallelize([IndexedRow(0,[1, 2, 3]), ...])</li>
    </ul> <br>
    <li>Method 2: A list containing multiple tuples</li>
    <ul>
        <li>Values in the tuple:</li>
        <ul>
            <li>1. Index for the given row (row number) (type:long)</li>
            <li>2. List containing the values in the row for the given index (type:vector)</li>
        </ul>
        <li>ex. sc.parallelize([(0, [1, 2, 3]), ...])</li>
    </ul>
</ul>

In [40]:
indRows = sc.parallelize([IndexedRow(0, [1.0, 6.0, 3.0, 0.0]),
                        IndexedRow(1, [3.0, 2.0, 5.0, 1.0]),
                        IndexedRow(2
                                   , [9.0, 4.0, 0.0, 3.0])])

Now, create the <b>IndexedRowMatrix</b> called <b>indRowMat</b> by using the IndexedRowMatrix function and passing in the <b>indRows</b> RDD

In [41]:
indRowMat = IndexedRowMatrix(indRows)

Now we will retrieve the <font color="green">row numbers</font> (save it as <font color="green">m2</font>) and <font color="blue">column numbers</font> (save it as <font color="blue">n2</font>) from the IndexedRowMatrix.
<ul>
    <li>To get the number of rows, use <i>numRows()</i> on indRowMat</li>
    <li>To get the number of columns, use <i>numCols()</i> on indRowMat</li>
</ul>

In [42]:
m2 = indRowMat.numRows()
n2 = indRowMat.numCols()

Print out <b>m2</b> and <b>n2</b>. The results should be:
<ul>
    <li>Number of Rows: 3</li>
    <li>Number of Columns: 4</li>
</ul>

In [43]:
print m2, n2

3 4


## CoordinateMatrix

Now it's time to create a different type of matrix, whos use should be when both the dimensions of the matrix is very large, and the data in the matrix is sparse. <br>
<b>Note</b>: In this case, we will be using the small, sparse matrix above, just to get the idea of how to initialize a CoordinateMatrix

Import the following libraries:
<ul>
    <li>CoordinateMatrix, MatrixEntry from pyspark.mllib.linalg.distributed</li>
</ul>

In [44]:
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

Now, create a RDD called <b>coordRows</b> by using the SparkContext's parallelize function on the <b>Sparse Matrix</b>. There are two different inputs you can use to create the RDD:
<ul>
    <li>Method 1: A list containing multiple MatrixEntry inputs</li>
    <ul>
        <li>Input into MatrixEntry:</li>
        <ul>
            <li>1. Row index of the matrix (row number) (type: long)</li>
            <li>2. Column index of the matrix (column number) (type: long)</li>
            <li>3. Value at the (Row Index, Column Index) entry of the matrix (type: float)</li>
        </ul>
        <li>ex. sc.parallelize([MatrixEntry(0, 0, 1,), ...])</li>
    </ul> <br>
    <li>Method 2: A list containing multiple tuples</li>
    <ul>
        <li>Values in the tuple:</li>
        <ul>
            <li>1. Row index of the matrix (row number) (type: long)</li>
            <li>2. Column index of the matrix (column number) (type: long)</li>
            <li>3. Value at the (Row Index, Column Index) entry of the matrix (type: float)</li>
        </ul>
        <li>ex. sc.parallelize([(0, 0, 1), ...])</li>
    </ul>
</ul>

In [47]:
coordRows = sc.parallelize([MatrixEntry(0, 0, 1.0),
                           MatrixEntry(1, 0, 3.0),
                           MatrixEntry(2, 1, 4.0),
                           MatrixEntry(0, 2, 3.0),
                           MatrixEntry(1, 3, 1.0)])

Now, create the <b>CoordinateMatrix</b> called <b>coordMat</b> by using the CoordinateMatrix function and passing in the <b>coordRows</b> RDD

In [49]:
coordMat = CoordinateMatrix(coordRows)

Now we will retrieve the <font color="green">row numbers</font> (save it as <font color="green">m3</font>) and <font color="blue">column numbers</font> (save it as <font color="blue">n3</font>) from the CoordinateMatrix.
<ul>
    <li>To get the number of rows, use <i>numRows()</i> on coordMat</li>
    <li>To get the number of columns, use <i>numCols()</i> on coordMat</li>
</ul>

In [51]:
m3 = coordMat.numRows()
n3 = coordMat.numCols()

Print out <b>m3</b> and <b>n3</b>. The results should be:
<ul>
    <li>Number of Rows: 3</li>
    <li>Number of Columns: 4</li>
</ul>

In [52]:
print m3, n3

3 4


Now, we can get the <b>entries</b> of coordMat by calling the entries method on it. Store this in a variable called coordEnt.

In [53]:
coordEnt = coordMat.entries

Check out the <i>type</i> of coordEnt.

In [54]:
type(coordEnt)

pyspark.rdd.PipelinedRDD

It should be a <b>PipelinedRDD</b> type, which has many methods that are associated with it. One of them is <b>first()</b>, which will get the first element in the RDD. <br> <br>

Run coordEnt.first()

In [55]:
coordEnt.first()

MatrixEntry(0, 0, 1.0)

## BlockMatrix

A BlockMatrix is essentially a matrix consisting of elements which are partitions of the matrix that is being created.

Import the following libraries:
<ul>
    <li>Matrices from pyspark.mllib.linalg</li>
    <li>BlockMatrix from pyspark.mllib.linalg.distributed</li>
</ul>

In [56]:
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix

Now create a <b>RDD</b> of <b>sub-matrix blocks</b>. <br>
This will be done using SparkContext's parallelize function. <br>

The input into <b>sc.parallelize</b> requires a <b>list of tuples</b>. The tuples are the sub-matrices, which consist of two inputs:
<ul>
    <li>1st: A tuple containing the row index and column index (row, column), denoting where the sub-matrix will start</li>
    <li>2nd: The sub-matrix, which will come from <b>Matrices.dense</b>. The sub-matrix requires 3 inputs:</li>
    <ul>
        <li>1st: Number of rows</li>
        <li>2nd: Number of columns</li>
        <li>3rd: A list containing the elements of the sub-matrix. These values are read into the sub-matrix column-major fashion</li>
    </ul>
</ul> <br>
(ex. ((51, 2), Matrices.dense(2, 2, [61.0, 43.0, 1.0, 74.0])) would be one row (one tuple)).

The matrix we will be modelling is <b>Dense Matrix</b> from above. Create the following sub-matrices:
<ul>
    <li>Row: 0, Column: 0, Values: 1.0, 3.0, 6.0, 2.0, with 2 Rows and 2 Columns </li>
    <li>Row: 2, Column: 0, Values: 9.0, 4.0, with 1 Row and 2 Columns</li>
    <li>Row: 0, Column: 2, Values: 3.0, 5.0, 0.0, 0.0, 1.0, 3.0, with 3 Rows and 2 Columns</li>
</ul>

In [57]:
theRDD = sc.parallelize([((0, 0), Matrices.dense(2,2, [1.0, 3.0, 6.0, 2.0])),
                        ((2, 0), Matrices.dense(1,2, [9.0, 4.0])),
                        ((0, 2), Matrices.dense(3,2, [3.0, 5.0, 0.0, 0.0, 1.0, 3.0]))])

Now that we have the RDD, it's time to create the BlockMatrix called <b>blockMat</b> using the BlockMatrix class. The <b>BlockMatrix</b> class requires 3 inputs:
<ul>
    <li>1st: The RDD of sub-matricies</li>
    <li>2nd: The rows per block. Keep this value at 1</li>
    <li>3rd: The columns per block. Keep this value at 1</li>
</ul>

In [58]:
blockMat = BlockMatrix(theRDD, 1, 1)

Now we will retrieve the <font color="green">row numbers</font> (save it as <font color="green">m4</font>) and <font color="blue">column numbers</font> (save it as <font color="blue">n4</font>) from the BlockMatrix.
<ul>
    <li>To get the number of rows, use <i>numRows()</i> on blockMat</li>
    <li>To get the number of columns, use <i>numCols()</i> on blockMat</li>
</ul>

In [59]:
m4 = blockMat.numRows()
n4 = blockMat.numCols()

Print out <b>m4</b> and <b>n4</b>. The results should be:
<ul>
    <li>Number of Rows: 3</li>
    <li>Number of Columns: 4</li>
</ul>

In [60]:
print m4, n4

3 4


Now, we need to check if our matrix is correct. We can do this by first converting <b>blockMat</b> into a LocalMatrix, by using the <b>.toLocalMatrix()</b> function on our matrix. Store the result into a variable called <b>locBMat</b>

In [61]:
locBMat = blockMat.toLocalMatrix()

Now print out <b>locBMat</b> and its <b>type</b>. The result should model the original <b>Dense Matrix</b> and the type should be a DenseMatrix.

In [62]:
print locBMat
print type(locBMat)

DenseMatrix([[ 1.,  6.,  3.,  0.],
             [ 3.,  2.,  5.,  1.],
             [ 9.,  4.,  0.,  3.]])
<class 'pyspark.mllib.linalg.DenseMatrix'>


## Bonus - Matrix Conversions

In this bonus section, we will talk about a relationship between the different types of matrices. You can convert between these matrices that we discussed with the following functions. <br>
<ul>
    <li>.toRowMatrix() converts the matrix to a RowMatrix</li>
    <li>.toIndexedRowMatrix() converts the matrix to an IndexedRowMatrix</li>
    <li>.toCoordinateMatrix() converts the matrix to a CoordinateMatrix</li>
    <li>.toBlockMatrix() converts the matrix to a BlockMatrix</li>
</ul>

### IndexedRowMatrix Conversions

The following conversions are supported for an IndexedRowMatrix:
<ul>
    <li>IndexedRowMatrix -> RowMatrix</li>
    <li>IndexedRowMatrix -> CoordinateMatrix</li>
    <li>IndexedRowMatrix -> BlockMatrix</li>
</ul>

In [63]:
# Convert to a RowMatrix
rMat = indRowMat.toRowMatrix()
print(type(rMat))

# Convert to a CoordinateMatrix
cMat = indRowMat.toCoordinateMatrix()
print(type(cMat))

# Convert to a BlockMatrix
bMat = indRowMat.toBlockMatrix()
print(type(bMat))

<class 'pyspark.mllib.linalg.distributed.RowMatrix'>
<class 'pyspark.mllib.linalg.distributed.CoordinateMatrix'>
<class 'pyspark.mllib.linalg.distributed.BlockMatrix'>


### CoordinateMatrix Conversions

The following conversions are supported for an CoordinateMatrix:
<ul>
    <li>CoordinateMatrix -> RowMatrix</li>
    <li>CoordinateMatrix -> IndexedRowMatrix</li>
    <li>CoordinateMatrix -> BlockMatrix</li>
</ul>

In [64]:
# Convert to a RowMatrix
rMat2 = coordMat.toRowMatrix()
print(type(rMat2))

# Convert to an IndexedRowMatrix
iRMat = coordMat.toIndexedRowMatrix()
print(type(iRMat))

# Convert to a BlockMatrix
bMat2 = coordMat.toBlockMatrix()
print(type(bMat2))

<class 'pyspark.mllib.linalg.distributed.RowMatrix'>
<class 'pyspark.mllib.linalg.distributed.IndexedRowMatrix'>
<class 'pyspark.mllib.linalg.distributed.BlockMatrix'>


### BlockMatrix Conversions

The following conversions are supported for an BlockMatrix:
<ul>
    <li>BlockMatrix -> LocalMatrix (Can display the Matrix)</li>
    <li>BlockMatrix -> IndexedRowMatrix</li>
    <li>BlockMatrix -> CoordinateMatrix</li>
</ul>

In [65]:
# Convert to a LocalMatrix
lMat = blockMat.toLocalMatrix()
print(type(lMat))

# Convert to an IndexedRowMatrix
iRMat2 = blockMat.toIndexedRowMatrix()
print(type(iRMat2))

# Convert to a CoordinateMatrix
cMat2 = blockMat.toCoordinateMatrix()
print(type(cMat2))

<class 'pyspark.mllib.linalg.DenseMatrix'>
<class 'pyspark.mllib.linalg.distributed.IndexedRowMatrix'>
<class 'pyspark.mllib.linalg.distributed.CoordinateMatrix'>
