In [13]:
import pandas as pd
import numpy as np
import time
import copy

# Scipy
from scipy import linalg

# Pyspark
from pyspark.sql import SparkSession
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix, DenseMatrix

# Initialize or get existing Spark Session
def get_spark_session():
    spark = SparkSession.builder \
        .appName("MatrixInversion") \
        .master("local[*]") \
        .getOrCreate()
    return spark

spark = get_spark_session()
sc = spark.sparkContext

# Set the size of your input data
size = 256

# Set the datatype of your input
data_type = 'float32'

# Assuming numpy array as input
start_time = time.time()
input_arr = np.random.rand(size, size).astype(data_type)
print("--- %s seconds ---" % (time.time() - start_time))

print("Shape of input array is: " + str(input_arr.shape))

# Check the size of input array
from sys import getsizeof
print("Size of input array in GB is: " + str(getsizeof(input_arr)/1e9))

# Assuming numpy array as input
start_time = time.time()
input_arr_scipy_inverse = linalg.inv(input_arr)
print("--- %s seconds ---" % (time.time() - start_time))

print("Inverse using scipy is:")
print(input_arr_scipy_inverse)

# Define block size
block_size = 64

def makeBlocks(arr, nrows, ncols):
    '''
    Return an array of shape (n, nrows, ncols) where 
    n * nrows * ncols = arr.size
    '''
    h, w = arr.shape
    assert h % nrows == 0, "{} rows is not evenly divisible by {}".format(h, nrows)
    assert w % ncols == 0, "{} cols is not evenly divisible by {}".format(w, ncols)

    return (arr.reshape(h // nrows, nrows, -1, ncols)
          .swapaxes(1, 2)
          .reshape(-1, nrows, ncols))

block_arrays = makeBlocks(input_arr, block_size, block_size)
print("Shape of Blocked Array is: {}".format(block_arrays.shape))
print("Blocked Array is: {}".format(block_arrays))

block_arrays_list = []
num_blocks = block_arrays.shape[0]
num_rowIndex = input_arr.shape[0] // block_size
for idx in range(num_blocks):
    block = ((idx // num_rowIndex, idx % num_rowIndex), Matrices.dense(block_size, block_size, block_arrays[idx].flatten(order='F')))
    block_arrays_list.append(block)

print("Blocked Array List is: {}".format(block_arrays_list))

# Parallelize the Block Array List
blocks = sc.parallelize(block_arrays_list)

# Type will be .rdd
print("Type of blocks is: {}".format(type(blocks)))

# Assuming numpy array as input
start_time = time.time()
A = BlockMatrix(blocks, block_size, block_size)
print("--- %s seconds ---" % (time.time() - start_time))
print("Created BlockMatrix from RDD of sub-matrix blocks")
print("Number of rows of BlockMatrix: {}".format(A.numRows()))
print("Number of columns of BlockMatrix: {}".format(A.numCols()))

# Validate the BlockMatrix
A.validate()

# Print the first element of BlockRDD
print("First element of Block RDD is: {}".format(A.blocks.first()))

def MapToPair(block, size):
    ri = block[0][0]
    ci = block[0][1]

    if (ri // size == 0) and (ci // size == 0):
        tag = "A11"
    elif (ri // size == 0) and (ci // size == 1):
        tag = "A12"
    elif (ri // size == 1) and (ci // size == 0):
        tag = "A21"
    else:
        tag = "A22"

    numRows = block[1].numRows
    numCols = block[1].numCols
    matrixValues = block[1].values

    rowIndex = ri % size
    colIndex = ci % size

    newBlock = ((rowIndex, colIndex), Matrices.dense(numRows, numCols, matrixValues))

    return (tag, newBlock)

def breakMat(A, size):
    ARDD = A.blocks
    return ARDD.map(lambda x: MapToPair(x, size))

def function_xy(x, y, pairRDD, block_size):
    tag = 'A' + x + y
    filteredRDD = pairRDD.filter(lambda x: x[0] == tag)
    blocks = filteredRDD.map(lambda x: x[1])
    return BlockMatrix(blocks, block_size, block_size)

def scipy_inverse(block):
    rowIndex = block[0][0]
    colIndex = block[0][1]

    matrixValues = block[1].toArray()
    inverse_matrixValues = linalg.inv(matrixValues)
    inverse_matrixValues = inverse_matrixValues.flatten(order='F')

    inverse_block = ((rowIndex, colIndex), Matrices.dense(block[1].numRows, block[1].numCols, inverse_matrixValues))

    return inverse_block

def multiply(mat1, mat2):
    return mat1.multiply(mat2)

def subtract(mat1, mat2):
    return mat1.subtract(mat2)

def scalarMulHelper(block, scalar):
    rowIndex = block[0][0]
    colIndex = block[0][1]

    matrixValues = block[1].values
    matrixValues = matrixValues * scalar

    newBlock = ((rowIndex, colIndex), Matrices.dense(block[1].numRows, block[1].numCols, matrixValues))

    return newBlock

def scalarMul(A, scalar, block_size):
    ARDD = A.blocks
    blocks = ARDD.map(lambda x: scalarMulHelper(x, scalar))
    return BlockMatrix(blocks, block_size, block_size)

def map_c12(block, size):
    rowIndex = block[0][0]
    colIndex = block[0][1]
    colIndex = colIndex + size
    return ((rowIndex, colIndex), Matrices.dense(block[1].numRows, block[1].numCols, block[1].values))

def map_c21(block, size):
    rowIndex = block[0][0]
    rowIndex = rowIndex + size
    colIndex = block[0][1]
    return ((rowIndex, colIndex), Matrices.dense(block[1].numRows, block[1].numCols, block[1].values))

def map_c22(block, size):
    rowIndex = block[0][0]
    rowIndex = rowIndex + size
    colIndex = block[0][1]
    colIndex = colIndex + size
    return ((rowIndex, colIndex), Matrices.dense(block[1].numRows, block[1].numCols, block[1].values))

def arrange(C11, C12, C21, C22, size, block_size):
    C11RDD = C11.blocks
    C12RDD = C12.blocks
    C21RDD = C21.blocks
    C22RDD = C22.blocks

    C1 = C12RDD.map(lambda x: map_c12(x, size // block_size))
    C2 = C21RDD.map(lambda x: map_c21(x, size // block_size))
    C3 = C22RDD.map(lambda x: map_c22(x, size // block_size))

    unionRDD = C11RDD.union(C1).union(C2).union(C3)

    return BlockMatrix(unionRDD, block_size, block_size)

def inverse(A, size, block_size):
    n = size // block_size
    if n == 1:
        A_RDD = A.blocks
        A_Inverse_Block = A_RDD.map(lambda x: scipy_inverse(x))
        return BlockMatrix(A_Inverse_Block, block_size, block_size)
    else:
        size = size // 2
        pairRDD = breakMat(A, size // block_size)
        A11 = function_xy(str(1), str(1), pairRDD, block_size)
        A12 = function_xy(str(1), str(2), pairRDD, block_size)
        A21 = function_xy(str(2), str(1), pairRDD, block_size)
        A22 = function_xy(str(2), str(2), pairRDD, block_size)
        one = inverse(A11, size, block_size)
        two = multiply(A21, one)
        three = multiply(one, A12)
        four = multiply(A21, three)
        five = subtract(four, A22)
        six = inverse(five, size, block_size)
        C12 = multiply(three, six)
        C21 = multiply(six, two)
        seven = multiply(three, C21)
        C11 = subtract(one, seven)
        C22 = scalarMul(six, -1, block_size)
        C = arrange(C11, C12, C21, C22, size, block_size)
        return C

start_time = time.time()
A_inv = inverse(A, size, block_size)
print("--- %s seconds ---" % (time.time() - start_time))

inverse_scipy = np.around(input_arr_scipy_inverse, decimals=6)
inverse_spin = np.around(A_inv.toLocalMatrix().toArray(), decimals=6)

# Debugging: Check some values from both inverses
print("First 5 elements of scipy inverse: ", inverse_scipy.flatten()[:5])
print("First 5 elements of spark inverse: ", inverse_spin.flatten()[:5])

# Check and print differences
differences = np.abs(inverse_scipy - inverse_spin)
max_difference = np.max(differences)
print("Maximum difference: ", max_difference)

print("Differences where they are greater than 1e-5: ", differences[differences > 1e-5])

# Check with higher tolerance
tolerance = 1e-4
print(f"Are the inverses close within tolerance {tolerance}?", np.allclose(inverse_scipy, inverse_spin, atol=tolerance))

# Stop the Spark session
spark.stop()

24/05/31 23:44:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


--- 0.008738040924072266 seconds ---
Shape of input array is: (256, 256)
Size of input array in GB is: 0.000262272
--- 0.2542719841003418 seconds ---
Inverse using scipy is:
[[-0.17137265  0.5657562  -0.5339415  ... -0.3577603   0.22193313
   0.21759176]
 [ 0.09047893 -0.35179698  0.2610833  ... -0.3972826   0.20144528
   0.3800522 ]
 [ 0.12835173 -0.60599244  0.44713542 ... -0.2565006   0.12588957
   0.4274764 ]
 ...
 [ 0.04584032 -0.15261     0.29408392 ...  0.33775318 -0.34918898
  -0.5056786 ]
 [-0.14496937  1.3123677  -1.0479181  ...  0.7005172  -0.37162393
  -0.9374793 ]
 [ 0.11673246 -0.9510629   0.82413197 ... -0.02733673  0.08911656
   0.3330438 ]]
Shape of Blocked Array is: (16, 64, 64)
Blocked Array is: [[[1.67260334e-01 7.29815602e-01 1.88844785e-01 ... 7.11456463e-02
   7.84971118e-01 7.70484686e-01]
  [6.89970136e-01 5.51851749e-01 7.33800650e-01 ... 9.57407117e-01
   8.81738901e-01 1.70476928e-01]
  [1.01578668e-01 5.46984792e-01 7.54010439e-01 ... 7.93465436e-01
   1.73

                                                                                

--- 4.369112014770508 seconds ---
Created BlockMatrix from RDD of sub-matrix blocks
Number of rows of BlockMatrix: 256
Number of columns of BlockMatrix: 256
First element of Block RDD is: ((0, 0), DenseMatrix(64, 64, [0.1673, 0.69, 0.1016, 0.7041, 0.661, 0.9746, 0.0024, 0.0457, ..., 0.3765, 0.5384, 0.8662, 0.3636, 0.2945, 0.9841, 0.2125, 0.7421], 0))


                                                                                

--- 53.06801795959473 seconds ---


                                                                                

First 5 elements of scipy inverse:  [-0.171373  0.565756 -0.533942 -0.86826   0.874771]
First 5 elements of spark inverse:  [-0.171372  0.565775 -0.533959 -0.86828   0.874794]
Maximum difference:  6.80498733520718e-05
Differences where they are greater than 1e-5:  [1.89770699e-05 1.70158234e-05 1.99740219e-05 ... 1.40007639e-05
 1.09954376e-05 1.29934330e-05]
Are the inverses close within tolerance 0.0001? True
