## Importing Libraries

In [None]:
# Python
import pandas as pd
import numpy as np
import time
import copy

# Scipy
from scipy import linalg

# Pyspark
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix
from pyspark.mllib.linalg.distributed import DenseMatrix

## Creating Input

### Reference
https://spark.apache.org/docs/latest/mllib-data-types.html#distributed-matrix

In [None]:
# Set the size of your input data
size = 256

In [None]:
# Set the datatype of your input
data_type = 'float32'

In [None]:
# Assuming numpy array as input
start_time = time.time()
input_arr = np.random.rand(size, size).astype(data_type) 
print("--- %s seconds ---" % (time.time() - start_time))

print("Shape of input array is: " + str(input_arr.shape))

In [None]:
# Check the size of input array
from sys import getsizeof
print("Size of input array in GB is: " + str(getsizeof(input_arr)/1e9))

### Inverse using Scipy

In [None]:
# Assuming numpy array as input
start_time = time.time()
input_arr_scipy_inverse = linalg.inv(input_arr) 
print("--- %s seconds ---" % (time.time() - start_time))

print("Inverse using scipy is:")
print(input_arr_scipy_inverse)

### RDD of sub-matrix blocks

In [None]:
# Define block size
block_size = 64

In [None]:
def makeBlocks(arr, nrows, ncols):
    '''
    Return an array of shape (n, nrows, ncols) where 
    n * nrows * ncols = arr.size

    '''

    h, w = arr.shape
    assert h % nrows == 0, "{} rows is not evenly divisible by {}".format(h, nrows)
    assert w % ncols == 0, "{} cols is not evenly divisible by {}".format(w, cols)

    return (arr.reshape(h//nrows, nrows, -1, ncols)
          .swapaxes(1, 2)
          .reshape(-1, nrows, ncols))

In [None]:
block_arrays = makeBlocks(input_arr, block_size, block_size)
print("Shape of Blocked Array is: {}".format(block_arrays.shape))
print("Blocked Array is: {}".format(block_arrays))

In [None]:
block_arrays_list = []
num_blocks = block_arrays.shape[0]
num_rowIndex = input_arr.shape[0]//block_size
for idx in range(num_blocks):
    block  = ((idx//num_rowIndex, idx%num_rowIndex), Matrices.dense(block_size, block_size, block_arrays[idx].flatten(order='F')))
    block_arrays_list.append(block)

In [None]:
print("Blocked Array List is: {}".format(block_arrays_list))

In [None]:
# Parallelize the Block Array List
blocks = sc.parallelize(block_arrays_list)

# Type will be .rdd
print("Type of blocks is: {}".format(type(blocks)))

### Create BlockMatrix 

In [None]:
# Assuming numpy array as input
start_time = time.time()
A = BlockMatrix(blocks, block_size, block_size) 
print("--- %s seconds ---" % (time.time() - start_time))
print("Created BlockMatrix from RDD of sub-matrix blocks")

In [None]:
print("Number of rows of BlockMatrix: {}".format(A.numRows()))
print("Number of columns of BlockMatrix: {}".format(A.numCols()))

In [None]:
# Validate the BlockMatrix
A.validate()

In [None]:
# Print the first element of BlockRDD
print("First element of Block RDD is: {}".format(A.blocks.first()))

## Inverse using SPIN Algorithm

### MapToPair()

In [None]:
def MapToPair(block, size):
    ri = block[0][0]
    ci = block[0][1]

    if(ri//size == 0) and (ci//size == 0):
    tag = "A11"
    elif(ri//size == 0) and (ci//size == 1):
    tag = "A12"
    elif(ri//size == 1) and (ci//size == 0):
    tag = "A21"
    else(ri//size == 1) and (ci//size == 1):
    tag = "A22"

    # Get the number of rows, number of columns and values to create a new block
    numRows = block[1].numRows
    numCols = block[1].numCols
    matrixValues = block[1].values

    # Row Index and Column Index of the new block
    rowIndex = ri % size
    colIndex = ci % size

    newBlock = ((rowIndex, colIndex), Matrices.dense(numRows, numCols, matrixValues))

    return (tag, newBlock)  

### breakMat()

In [None]:
def breakMat(A, size):
    ARDD = A.blocks
    return ARDD.map(lambda x: MapToPair(x, size))

### xy()

In [None]:
def function_xy(x, y, pairRDD, block_size):
    tag = 'A' + x + y
    filteredRDD = pairRDD.filter(lambda x: x[0] == tag)
    blocks = filteredRDD.map(lambda x: x[1])
    return BlockMatrix(blocks, block_size, block_size)

### scipy inverse()

In [None]:
def scipy_inverse(block):
    # Get the Row Index and Column Index of the block
    rowIndex = block[0][0]
    colIndex = block[0][1]

    # Get values to find the inverse
    matrixValues = block[1].toArray()

    # Find inverse using scipy
    inverse_matrixValues = linalg.inv(matrixValues)

    # Change the inverse matrix to column major order
    inverse_matrixValues = inverse_matrixValues.flatten(order='F')

    inverse_block = ((rowIndex, colIndex), Matrices.dense(block[1].numRows, block[1].numCols, inverse_matrixValues))

    return inverse_block

### multiply()

In [None]:
def multiply(mat1, mat2):
    mat1_mat2 = mat1.multiply(mat2)
    return mat1_mat2

### subtract()

In [None]:
def subtract(mat1, mat2):
    mat1_mat2 = mat1.subtract(mat2)
    return mat1_mat2

### Scalar Multiplication

In [None]:
def scalarMulHelper(block, scalar):
    # Get the RowIndex and the ColIndex of the block
    rowIndex = block[0][0]
    colIndex = block[0][1]

    # Get values to multiply with a scalar
    matrixValues = block[1].values
    matrixValues = matrixValues*scalar

    newBlock = ((rowIndex, colIndex), Matrices.dense(block[1].numRows, block[1].numCols, matrixValues))

    return newBlock

In [None]:
def scalarMul(A, scalar, block_size):
    ARDD = A.blocks
    blocks = ARDD.map(lambda x: scalarMulHelper(x, scalar))
    return BlockMatrix(blocks, block_size, block_size)

### arrange()

In [None]:
def map_c12(block, size):
    # Get the RowIndex and the ColIndex of the block
    rowIndex = block[0][0]
    colIndex = block[0][1]
    colIndex = colIndex + size
    return ((rowIndex, colIndex), Matrices.dense(block[1].numRows, block[1].numCols, block[1].values))

    def map_c21(block, size):
    # Get the RowIndex and the ColIndex of the block
    rowIndex = block[0][0]
    rowIndex = rowIndex + size
    colIndex = block[0][1]
    return ((rowIndex, colIndex), Matrices.dense(block[1].numRows, block[1].numCols, block[1].values))

    def map_c22(block, size):
    # Get the RowIndex and the ColIndex of the block
    rowIndex = block[0][0]
    rowIndex = rowIndex + size
    colIndex = block[0][1]
    colIndex = colIndex + size
    return ((rowIndex, colIndex), Matrices.dense(block[1].numRows, block[1].numCols, block[1].values))

In [None]:
def arrange(C11, C12, C21, C22, size, block_size):
    C11RDD = C11.blocks
    C12RDD = C12.blocks
    C21RDD = C21.blocks
    C22RDD = C22.blocks

    C1 = C12RDD.map(lambda x: map_c12(x, size//blocksize))
    C2 = C21RDD.map(lambda x: map_c21(x, size//blocksize))
    C3 = C22RDD.map(lambda x: map_c22(x, size//blocksize))

    unionRDD = C11RDD.union(C1.union(C2.union(C3)))

    return BlockMatrix(unionRDD, block_size, block_size)

### inverse()

In [None]:
def inverse(A, size, block_size):
    n = size//block_size
    if n == 1:
        A_RDD = A.blocks
        A_Inverse_Block = A_RDD.map(lambda x: scipy_inverse(x))
        return BlockMatrix(A_Inverse_Block, block_size, block_size)
    else:
        size = size/2
        pairRDD = breakMat(A, size//block_size)
        A11 = function_xy(str(1), str(1), pairRDD, block_size)
        A12 = function_xy(str(1), str(2), pairRDD, block_size)
        A21 = function_xy(str(2), str(1), pairRDD, block_size)
        A22 = function_xy(str(2), str(2), pairRDD, block_size)
        one = inverse(A11, size, block_size)
        two = multiply(A21, one)
        three = multiply(one, A12)
        four = multiply(A21, three)
        five = subtract(four, A22)
        six = inverse(five, size, block_size)
        C12 = multiply(three, six)
        C21 = multiply(six, two)
        seven = multiply(three, C21)
        C11 = subtract(one, seven)
        C22 = scalarMul(six, -1, block_size)
        C = arrange(C11, C12, C21, C22, size, block_size)
        return C

In [None]:
start_time = time.time()
A_inv = inverse(A, size, block_size)
print("--- %s seconds ---" % (time.time() - start_time))

## Test SPIN implementation

In [None]:
inverse_scipy = np.around(input_arr_scipy_inverse, decimals = 6)
inverse_spin = np.around(A_inv.toLocalMatrix().toArray(), decimals = 6)
print(np.allclose(inverse_scipy, inverse_spin, atoi=1e-6))