#Spark Implementation For Matrix Inversion

In [None]:
from typing import List, Tuple
import numpy as np
from pyspark.sql import SparkSession
from pyspark import RDD
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, ArrayType

##List of functions to be used

In [None]:
class SparkMatrixInversion:
    def __init__(self, spark_session: SparkSession):
        self.spark = spark_session

    def create_index_row_matrix(self, matrix: np.ndarray) -> RDD:
        pass

    def create_block_matrix(self, matrix: np.ndarray, block_size: int = 64) -> RDD:
        pass

    def block_matrix_multiplication(self, A_rdd: RDD, B_rdd: RDD, block_size: int) -> RDD:
        pass

    def array_permutation(self, matrix_rdd: RDD, perm_array: List[int]) -> RDD:
        pass

    def block_lu_decomposition(self, matrix_rdd: RDD, block_size: int = 64):
        pass

    def block_matrix_inverse(self, matrix: np.ndarray, block_size: int = 64):
        pass

    def lu_decompose(self, A):
        pass


##Index Row Matrix from Numpy Matrix

In [None]:
def create_index_row_matrix(self, matrix: np.ndarray) -> RDD:
        """
        Convert a numpy matrix to a Spark RDD in IndexRowMatrix format

        Args:
            matrix (np.ndarray): Input matrix to be distributed

        Returns:
            RDD of matrix rows with indices
        """
        rows_with_indices = []
        for i, row in enumerate(matrix):
            for j, val in enumerate(row):
                rows_with_indices.append((i, j, val))

        return self.spark.sparkContext.parallelize(rows_with_indices)
SparkMatrixInversion.create_index_row_matrix = create_index_row_matrix

##Block Matrix from numpy Matrix

In [None]:
def create_block_matrix(self, matrix: np.ndarray, block_size: int = 64) -> RDD:
        """
        Convert a numpy matrix to a BlockMatrix format

        Args:
            matrix (np.ndarray): Input matrix
            block_size (int): Size of each block

        Returns:
            RDD of BlockMatrix with (block_id, block_values)
        """
        blocks = []
        n = matrix.shape[0]

        for i in range(0, n, block_size):
            for j in range(0, n, block_size):
                block = matrix[i:i+block_size, j:j+block_size]
                block_id = (i // block_size, j // block_size)
                blocks.append((block_id, block))

        return self.spark.sparkContext.parallelize(blocks)
SparkMatrixInversion.create_block_matrix = create_block_matrix

##Spark optimized 1-D Block based Matrix Multiplication

In [None]:
def block_matrix_multiplication(self, A_rdd: RDD, B_rdd: RDD, block_size: int) -> RDD:
        """
        1-D block-based matrix multiplication

        Args:
            A_rdd (RDD): First matrix RDD
            B_rdd (RDD): Second matrix RDD
            block_size (int): Size of blocks

        Returns:
            RDD of multiplied matrix blocks
        """
        flat_A = A_rdd.flatMap(lambda x: [(x[0][0] * len(x[1]) + x[0][1], x[1])])
        flat_B = B_rdd.flatMap(lambda x: [(x[0][1] * len(x[1]) + x[0][0], x[1].T)])

        flat_AB_pair = flat_A.join(flat_B)

        result = flat_AB_pair.map(lambda x: (
            (x[1][0].shape[0], x[1][1].shape[1]),
            np.dot(x[1][0], x[1][1])
        ))
        return result.reduceByKey(lambda a, b: a + b)
SparkMatrixInversion.block_matrix_multiplication = block_matrix_multiplication

##Spark based optimized permutation (Could not use)

In [None]:
def array_permutation(self, matrix_rdd: RDD, perm_array: List[int]) -> RDD:
        """
        Perform row permutation on block-based matrix

        Args:
            matrix_rdd (RDD): Block-based matrix
            perm_array (List[int]): Permutation array

        Returns:
            Permuted matrix RDD
        """
        def flatmap_permute(block):
            block_id, block_data = block
            block_rows, block_cols = block_data.shape

            permuted_rows = []
            for local_row in range(block_rows):
                global_row = block_id[0] * block_rows + local_row
                new_global_row = perm_array.index(global_row)

                new_block_row = new_global_row // block_rows
                new_local_row = new_global_row % block_rows

                new_block_id = (new_block_row, block_id[1])
                permuted_rows.append((new_block_id, block_data[local_row, :]))

            return permuted_rows

        return matrix_rdd.flatMap(flatmap_permute).groupByKey().mapValues(list)
SparkMatrixInversion.array_permutation = array_permutation

##Recursive Function for LU Decomposition

In [None]:
def block_lu_decomposition(self, matrix_rdd: RDD, block_size: int = 64):
        """
        Perform Block LU Decomposition on distributed matrix using Spark

        Args:
            matrix_rdd (RDD): Block-based matrix RDD
            block_size (int): Size of blocks

        Returns:
            Tuple containing LU decomposition components
        """
        matrix_blocks = dict(matrix_rdd.collect())

        max_row_block = max(block[0][0] for block in matrix_blocks.items()) + 1
        max_col_block = max(block[0][1] for block in matrix_blocks.items()) + 1
        n = max_row_block * block_size

        M = np.zeros((n, n))
        for (i, j), block in matrix_blocks.items():
            row_start = i * block_size
            col_start = j * block_size
            block_height, block_width = block.shape
            M[row_start:row_start+block_height, col_start:col_start+block_width] = block

        def recursive_block_lu(M):
            n = M.shape[0]
            mid = n // 2

            if mid <= 64:
                L1, U1, P1 = self.lu_decompose(M)
                L1_inv = np.linalg.inv(L1)
                U1_inv = np.linalg.inv(U1)
                T = U1_inv @ L1_inv

                return {
                    'L1_inv': L1_inv,
                    'U1_inv': U1_inv,
                    'P1': P1,
                    'T': T
                }

            M1 = M[:mid, :mid]
            M2 = M[:mid, mid:]
            M3 = M[mid:, :mid]
            M4 = M[mid:, mid:]

            M1_lu = recursive_block_lu(M1)
            L1_inv = M1_lu['L1_inv']
            U1_inv = M1_lu['U1_inv']
            P1 = M1_lu['P1']
            T = M1_lu['T']

            S_hat = M3 @ T

            M_hat = M4 - S_hat @ (P1 @ M2)

            M_hat_lu = recursive_block_lu(M_hat)

            return {
                'L1_inv': L1_inv,
                'S_hat': S_hat,
                'L3_inv': M_hat_lu['L1_inv'],
                'U1_inv': U1_inv,
                'T': T,
                'P1M2': P1 @ M2,
                'U3_inv': M_hat_lu['U1_inv'],
                'P1': P1,
                'P2': M_hat_lu['P1']
            }

        lu_result = recursive_block_lu(M)
        return lu_result
SparkMatrixInversion.block_lu_decomposition = block_lu_decomposition

##Function that takes numpy array as input and returns it's Inverse

In [None]:
def block_matrix_inverse(self, matrix: np.ndarray, block_size: int = 64):
        """
        Compute matrix inverse using block-based approach with Spark

        Args:
            matrix (np.ndarray): Input matrix
            block_size (int): Size of blocks

        Returns:
            Inverted matrix
        """
        # Convert to block matrix RDD
        block_matrix = self.create_block_matrix(matrix, block_size)

        # Perform block LU decomposition
        lu_decomp = self.block_lu_decomposition(block_matrix, block_size)

        # Extract LU decomposition components
        L1_inv = lu_decomp['L1_inv']
        S_hat = lu_decomp.get('S_hat', np.zeros_like(L1_inv))
        L3_inv = lu_decomp['L3_inv']
        U1_inv = lu_decomp['U1_inv']
        T = lu_decomp['T']
        P1A2 = lu_decomp['P1M2']
        U3_inv = lu_decomp['U3_inv']
        P1 = lu_decomp['P1']
        P2 = lu_decomp['P2']

        # Compute inverse blocks
        n = matrix.shape[0]
        mid = n // 2

        X1 = T @ P1
        X2 = U3_inv @ L3_inv
        Y1 = X1 @ matrix[:mid, mid:]
        Y2 = X2 @ matrix[mid:, :mid]

        A_inv_4 = X2
        A_inv_3 = -Y2 @ X1
        A_inv_2 = -Y1 @ X2
        A_inv_1 = X1 - Y1 @ A_inv_3

        # Reconstruct inverse matrix
        A_inv = np.block([[A_inv_1, A_inv_2],
                        [A_inv_3, A_inv_4]])

        return A_inv
SparkMatrixInversion.block_matrix_inverse = block_matrix_inverse

##Basic function for LU Decomposition

In [None]:
def lu_decompose(self, A):
        """
        Performs LU Decomposition with partial pivoting (original implementation)
        """
        n = A.shape[0]
        L = np.eye(n)
        U = A.copy()
        P = np.eye(n)

        for k in range(n):
            # Pivoting
            max_row = np.argmax(abs(U[k:, k])) + k
            if k != max_row:
                U[[k, max_row]] = U[[max_row, k]]
                P[[k, max_row]] = P[[max_row, k]]
                if k > 0:
                    L[[k, max_row], :k] = L[[max_row, k], :k]

            # LU Decomposition
            for i in range(k + 1, n):
                L[i, k] = U[i, k] / U[k, k]
                U[i, k:] -= L[i, k] * U[k, k:]

        return L, U, P
SparkMatrixInversion.lu_decompose = lu_decompose

##Validating Output

In [None]:
from pyspark.sql import SparkSession
import numpy as np

spark = SparkSession.builder.appName("MatrixInversion").getOrCreate()

# Create matrix inverter
matrix_inverter = SparkMatrixInversion(spark)

# Example matrix
matrix = np.random.rand(256, 256)
# matrix = np.array([[4,7],[2,6]])

# Perform block matrix inversion
inverse_matrix = matrix_inverter.block_matrix_inverse(matrix)

In [None]:
inverse_matrix

array([[ 0.02071588,  0.14597555, -0.22529224, ...,  0.05853017,
        -0.11440219, -0.17311356],
       [-0.4661433 ,  0.25643795, -0.34069122, ..., -0.18012414,
        -0.02902878, -0.35302252],
       [-0.03244434, -0.01204481,  0.08515246, ...,  0.14569533,
        -0.00846539,  0.09404938],
       ...,
       [ 0.50477678,  0.18524246,  0.08792011, ...,  0.02952585,
         0.33108323, -0.01340607],
       [ 0.18509081,  0.35737958, -0.27436789, ...,  0.18092087,
         0.12201275, -0.18678446],
       [ 0.03079864,  0.81603672, -0.44965053, ...,  0.70355329,
         0.5987302 , -0.60555205]])

In [None]:
inverse_matrix @ matrix

array([[ 1.00000000e+00, -9.94329094e-15, -1.78990974e-15, ...,
        -1.04014840e-14, -2.02708202e-14, -3.84625941e-14],
       [-1.20847785e-15,  1.00000000e+00, -5.15431203e-15, ...,
        -1.11177294e-14,  1.06235415e-14,  1.22446251e-14],
       [ 1.50765819e-14, -6.87109559e-15,  1.00000000e+00, ...,
         4.90777690e-15, -1.18863541e-14,  1.55382757e-14],
       ...,
       [ 3.98760828e-15,  8.10675697e-15,  3.79170118e-15, ...,
         1.00000000e+00,  5.35698944e-15,  3.82557316e-15],
       [ 5.93925011e-16, -2.10775642e-15, -6.86868951e-15, ...,
        -3.53616991e-15,  1.00000000e+00, -2.59096469e-15],
       [ 6.41571543e-15, -2.11505152e-15, -1.18275793e-14, ...,
        -8.34700956e-16, -4.81296735e-15,  1.00000000e+00]])

##Comparion with Identity Matrix

In [None]:
I = np.eye(256)
difference = np.linalg.norm(inverse_matrix @ matrix - I)
tolerance = 1e-10
if difference < tolerance:
    print("The product is almost the identity matrix.")
else:
    print("The product is not close to the identity matrix.")

The product is almost the identity matrix.


#Implementation Without using spark ( Backbone implementation of Algorithm)

##LU Decomposition

In [None]:
def LU_Decompose(A):
    n = A.shape[0]
    L = np.eye(n)
    U = A.copy()
    P = np.eye(n)

    for k in range(n):
        # Pivoting
        max_row = np.argmax(abs(U[k:, k])) + k
        if k != max_row:
            U[[k, max_row]] = U[[max_row, k]]
            P[[k, max_row]] = P[[max_row, k]]
            if k > 0:
                L[[k, max_row], :k] = L[[max_row, k], :k]

        # LU Decomposition
        for i in range(k + 1, n):
            L[i, k] = U[i, k] / U[k, k]
            U[i, k:] -= L[i, k] * U[k, k:]

    return L, U, P


##getLU

In [None]:
def getLU(LU_of_M):
    L1_inv, S_hat, L3_inv, U1_inv, T, P1M2, U3_inv, P1, P2 = LU_of_M

    # Construct the block matrix components for L_inv, U_inv, and P
    S = P2 @ S_hat
    # L_inv = np.block([[L1_inv, np.zeros_like(S_hat)],
    #                   [-np.dot(L3_inv, S_hat), L3_inv]])
    L_inv = np.block([[L1_inv, np.zeros((L1_inv.shape[0], L3_inv.shape[1]))],
                  [-L3_inv @ S, L3_inv]])


    # U_inv = np.block([[U1_inv, -np.dot(T, U3_inv)],
    #                   [np.zeros_like(T), U3_inv]])
    U_inv = np.block([[U1_inv, -T @ P1M2 @ U3_inv],
                  [np.zeros((U3_inv.shape[0], U1_inv.shape[1])), U3_inv]])


    P = np.block([[P1, np.zeros_like(P2)],
                  [np.zeros_like(P1), P2]])

    return L_inv, U_inv, P


##Block Decomposition Algorithm

In [None]:
def BlockLU_Decompose(M):
    n = M.shape[0]
    mid = n // 2

    M1 = M[:mid, :mid]
    M2 = M[:mid, mid:]
    M3 = M[mid:, :mid]
    M4 = M[mid:, mid:]

    M1Mid = M1.shape[0] // 2
    if M1Mid <= 64:
        L1, U1, P1 = LU_Decompose(M1)
        L1_inv = np.linalg.inv(L1)
        U1_inv = np.linalg.inv(U1)
        T = U1_inv @ L1_inv
        S_hat = M3 @ T
        M_hat = M4 - S_hat @ (P1 @ M2)
        L3, U3, P2 = LU_Decompose(M_hat)
        L3_inv = np.linalg.inv(L3)
        U3_inv = np.linalg.inv(U3)

    else:
        LU_of_M1 = BlockLU_Decompose(M1)
        L1_inv, U1_inv, P1 = getLU(LU_of_M1)
        T = U1_inv @ L1_inv
        S_hat = M3 @ T
        M_hat = M4 - S_hat @ (P1 @ M2)
        LU_of_M_hat = BlockLU_Decompose(M_hat)
        L3_inv, U3_inv, P2 = getLU(LU_of_M_hat)
    return L1_inv, S_hat, L3_inv, U1_inv, T, P1@M2, U3_inv, P1, P2


##Inverse Function that takes matrix as input and returns inverse of the Matrix

In [None]:
def BlockInverse(A):
  n = A.shape[0]
  mid = n//2
  if mid <= 64:
    return np.linalg.inv(A)
  else:
    A1 = A[:mid, :mid]
    A2 = A[:mid, mid:]
    A3 = A[mid:, :mid]
    A4 = A[mid:, mid:]

    LU_of_A = BlockLU_Decompose(A)
    L1_inv, S_hat, L3_inv, U1_inv, T, P1A2, U3_inv, P1, P2 = LU_of_A
    X1 = T @ P1
    X2 = U3_inv @ L3_inv
    Y1 = X1 @ A2
    Y2 = X2 @ A3
    A_inv_4 = X2
    A_inv_3 = -Y2 @ X1
    A_inv_2 = -Y1 @ X2
    A_inv_1 = X1 - Y1 @ A_inv_3
    return np.block([[A_inv_1, A_inv_2], [A_inv_3, A_inv_4]])

##Validating Output

In [None]:
A = np.random.rand(256, 256)
I = np.eye(256)
product = A @ BlockInverse(A)

In [None]:
product

array([[ 1.00000000e+00,  1.93498851e-12,  1.74619124e-12, ...,
         5.57222365e-14,  4.56480684e-14, -3.04048939e-13],
       [ 7.88081180e-14,  1.00000000e+00, -1.03016954e-13, ...,
         9.71201291e-14, -1.01485428e-13,  1.05190410e-13],
       [ 5.39660790e-14, -1.44066096e-12,  1.00000000e+00, ...,
         9.47758045e-14, -1.06860243e-13,  5.19931196e-15],
       ...,
       [-1.16629713e+00,  2.09706104e+00, -2.08906342e+00, ...,
         7.87011081e-14,  1.12711443e-13, -3.69846926e-13],
       [ 3.21846719e+00, -1.32779198e+00, -2.67559208e+00, ...,
         7.98493088e-14, -6.71843350e-14,  3.56905486e-14],
       [-1.30033700e+00,  1.83831897e+00,  1.24131002e+00, ...,
         6.85283086e-14, -1.29557734e-14, -2.44953747e-13]])