In [1]:
###### from pyspark.sql import SparkSession  
import numpy as np  
from pyspark.sql import SparkSession
# Initialize Spark session with the configurations  
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("SVD") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.cores", "3") \
    .config("spark.driver.maxResultSize","4096") \
    .config("spark.cores.max", "12") \
    .config("spark.executor.instances", "4") \
    .getOrCreate() 

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/19 08:05:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
#Final CODE NOT PARALLEL:
from pyspark.sql import SparkSession
import numpy as np

def generatecorrelatedtestdata(spark, n_samples, n_features, target_r_squared=0.7):
    """
    Generates correlated test data and returns the true coefficients and an RDD.
    """
    # Generate random coefficients
    np.random.seed(31)
    true_coef = np.random.randn(n_features)
    X = np.random.randn(n_samples, n_features)
    X = np.column_stack((X[:,:-1], np.ones(X.shape[0]))) 
    y_clean = np.dot(X, true_coef)
    noise = np.random.randn(n_samples) * np.sqrt(1 - target_r_squared)
    y = y_clean + noise
    scaling_factor = np.sqrt(target_r_squared / (1 - target_r_squared))
    y = y_clean * scaling_factor + noise

    # Create an RDD of numpy arrays
    rdd = spark.sparkContext.parallelize(X)
    return  rdd,y

def svd(arr):
    """
    Function to calculate the SVD of a given nxm Matrix.
    """
    # Compute V and singular values
    eigval_V, V = np.linalg.eigh(arr.T @ arr)
    
    # Sort eigenvalues and eigenvectors in descending order
    idx = eigval_V.argsort()[::-1]
    eigval_V = eigval_V[idx]
    V = V[:, idx]
    
    # Compute singular values
    singval = np.sqrt(np.abs(eigval_V))
    
    # Compute U
    U = np.zeros((arr.shape[0], min(arr.shape[0], arr.shape[1])))
    for i in range(len(singval)):
        if singval[i] > 1e-15:  # Avoid division by zero
            U[:, i] = (arr @ V[:, i]) / singval[i]
    
    # Ensure V is returned as V.T
    Vt = V.T
    
    return U, singval, Vt

def parallelinvers(U, sigma, V):
    # Inverting sigma correctly
    sigma_inv = np.diag(1 / sigma)
    return V.T @ sigma_inv @ U.T

def betacalc(X,Y):
    """
    Calculates the beta values for the input matrix A (X + y).
    """
    
    # Step 1: Compute X^T @ X
    XtX = X.T @ X
    
    # Step 2: SVD decomposition of X^T @ X
    U, Sigma, Vt = svd(XtX)
    
    # Step 3: Calculate the inverse of XtX using SVD components
    parinv = parallelinvers(U, Sigma, Vt)
    
    # Step 4: Compute beta = (X^T X)^(-1) X^T Y
    beta = parinv @ X.T @ Y
    return beta


# Example usage (make sure a SparkSession 'spark' is already created)
rdd,y = generatecorrelatedtestdata(spark, 300, 100)
X = np.array(rdd.collect())  # Collect RDD into a numpy array
beta_values = betacalc(X,y)
XtX = X.T @ X
U, Sigma, Vt = svd(XtX)
print(beta_values)


[-6.55839490e-01 -4.91691210e-01  1.07914010e-01 -1.09752762e+00
 -3.30923572e-01 -1.17692928e+00 -1.13093615e+00  2.83152506e+00
 -1.10071927e+00 -1.22513413e-01  4.46032266e-01 -2.10802072e-01
 -1.52952995e+00 -1.41231571e+00  1.77440527e+00 -5.42673093e-01
 -2.69942196e+00  1.02783801e-01 -8.79170838e-01 -1.04398091e+00
 -1.42822311e+00  2.33527834e+00  3.03088843e+00 -6.46367791e-01
 -1.47830272e-01 -2.93175099e-01 -1.16380227e+00  7.72051879e-02
  1.02085647e+00 -3.51443067e-01  7.77464038e-01  1.06543317e+00
  1.73671000e+00 -1.43123725e-01 -6.62707861e-01 -2.87446994e-01
 -3.72306562e-02 -1.77799508e+00 -1.64898063e+00 -1.71799251e+00
  8.23904062e-03 -1.40547783e-01 -1.09257713e+00  2.38828375e+00
 -6.90495748e-01 -7.91397390e-01 -1.10719577e+00  1.95416017e+00
  2.15168012e+00 -9.21388111e-01  6.12199880e-01  9.68386421e-01
 -2.10319376e+00 -2.78159066e+00 -2.38329995e+00  3.68915648e+00
  3.93165953e-01  4.24978125e-01 -3.65674562e-01 -7.79804654e-01
  2.75540848e+00 -1.39467

In [105]:
#Final CODE PARALLEL
import numpy as np
from numpy.linalg import eigh
import pyspark
from pyspark import SparkContext
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix

 
def Gramiam(mat:pyspark.mllib.linalg.distributed.BlockMatrix):
    """
    Compute the inverse of a symmetric positive-definite Gramian matrix using eigenvalue decomposition.

    Args:
        gram_mat (BlockMatrix): The input Gramian matrix to invert.
        chunks (int): Block size for the resulting matrices. Default is 12.

    Returns:
        BlockMatrix: The inverse of the input Gramian matrix.
    """
    return mat.transpose().multiply(mat)


def create_block_matrix_from_numpy(np_array, block_row_size, block_col_size, spark_context):
    """
    Create a BlockMatrix from a NumPy array.
    
    Args:
        np_array (np.ndarray): The NumPy array to be converted to a BlockMatrix.
        block_row_size (int): Number of rows in each block.
        block_col_size (int): Number of columns in each block.
        spark_context (SparkContext): The Spark context used to parallelize the blocks.
        
    Returns:
        BlockMatrix: The resulting BlockMatrix.
    """
    
    # Get the shape of the numpy array
    num_rows, num_cols = np_array.shape
    
    # List to hold the blocks in the format ((blockRowIndex, blockColIndex), denseMatrix)
    blocks = []
    
    # Divide the array into blocks of block_row_size x block_col_size
    for i in range(0, num_rows, block_row_size):
        for j in range(0, num_cols, block_col_size):
            # Extract the block as a NumPy subarray
            block = np_array[i:i + block_row_size, j:j + block_col_size]
            
            # Convert the block to a DenseMatrix
            block_dense_matrix = Matrices.dense(block.shape[0], block.shape[1], block.flatten())
            
            # Store the block with its block indices (i // block_row_size, j // block_col_size)
            blocks.append(((i // block_row_size, j // block_col_size), block_dense_matrix))
    
    # Parallelize the blocks to create an RDD
    blocks_rdd = spark_context.parallelize(blocks)
    
    # Create and return the BlockMatrix
    block_matrix = BlockMatrix(blocks_rdd, block_row_size, block_col_size)
    
    return block_matrix
 
def svd(gram_mat:pyspark.mllib.linalg.distributed.BlockMatrix,chunks=12):
    """
    Perform Singular Value Decomposition (SVD) on a Gramian matrix.
    
    Args:
        gram_mat (BlockMatrix): The input Gramian matrix.
        chunks (int): Block size for the resulting matrices. Default is 12.
    
    Returns:
        tuple[BlockMatrix, BlockMatrix, BlockMatrix]: The U, Sigma-inverse, and V^T matrices as BlockMatrices.
    """
    
    arr = gram_mat.toLocalMatrix().toArray()
    eigval_V, eigvecs = np.linalg.eigh(arr)
        # Sort eigenvalues and eigenvectors in descending order
    idx = eigval_V.argsort()[::-1]
    eigval_V = eigval_V[idx]
    eigvecs = eigvecs[:, idx]
    # Compute eigenvalues
    print("eigval_V",eigval_V)

    print("eigvecs",eigvecs)

    tol = 1e-15
    eigvals_inv = np.array([1/val if val > tol else 0 for val in eigval_V])
    eigvals_inv = np.diag(eigvals_inv)

    print("eigvals_inv",eigvals_inv)

    block_s_inv = create_block_matrix_from_numpy(eigvals_inv,chunks,chunks,spark.sparkContext)
    block_v= create_block_matrix_from_numpy(eigvecs,chunks,chunks,spark.sparkContext)
    block_v = block_v.transpose() # needs to be transposed for V as Result not VT

    print("block_v",block_v.toLocalMatrix().toArray())
    print("block_s_inv",block_s_inv.toLocalMatrix().toArray())
    print("block_v.transpose()",block_v.transpose().toLocalMatrix().toArray())

    print("block_v.multiply(block_s_inv).multiply(block_v.transpose())",block_v.multiply(block_s_inv).multiply(block_v.transpose()).toLocalMatrix().toArray())

    
    
    return block_v.multiply(block_s_inv).multiply(block_v.transpose())

def betacalc(X, Y):
    """
    Calculate the beta values for the input matrices X and Y.
    
    Args:
        X (BlockMatrix): The input feature matrix.
        Y (BlockMatrix): The target values matrix.
    
    Returns:
        np.ndarray: The computed beta values.
    """
    XtX = X.transpose().multiply(X)
    print("XtX",XtX.toLocalMatrix().toArray())
    
    # Step 4: Compute beta = (X^T X)^(-1) X^T Y
    XtX_inv = svd(XtX)
    print("XtX_inv",XtX_inv.toLocalMatrix().toArray())


    Y = Y.toLocalMatrix().toArray()
    X = X.toLocalMatrix().toArray()
    XtX_inv = XtX_inv.toLocalMatrix().toArray()

    beta = XtX_inv @ X.T @ Y

    
    return beta



In [106]:
# BeispielDaten

# Parallel Data
sc = spark.sparkContext

dm1 = Matrices.dense(3, 4, [1, 2, 3, 6, 2, 4,7,1,2])

blocks1 = sc.parallelize([((0, 0), dm1)])

mat1 = BlockMatrix(blocks1, 3, 3)

Y =  Matrices.dense(3, 1, [1, 2, 3])
blocksy = sc.parallelize([((0, 0), Y)])
matY = BlockMatrix(blocksy,3,1)

betacalc(mat1.transpose(),matY)# Nutze mat1 transpose, da mat1 falsch formatiert ist. Bitte MAtrix korrekt formatieren





                                                                                

XtX [[86. 21. 41.]
 [21.  9. 16.]
 [41. 16. 29.]]
eigval_V [1.13899537e+02 1.00445226e+01 5.59408056e-02]
eigvecs [[-0.85533858 -0.51562151 -0.05030282]
 [-0.24116714  0.48222298 -0.84219915]
 [-0.45851317  0.70823404  0.53681489]]
eigvals_inv [[8.77966698e-03 0.00000000e+00 0.00000000e+00]
 [0.00000000e+00 9.95567475e-02 0.00000000e+00]
 [0.00000000e+00 0.00000000e+00 1.78760386e+01]]


                                                                                

block_v [[-0.85533858 -0.51562151 -0.05030282]
 [-0.24116714  0.48222298 -0.84219915]
 [-0.45851317  0.70823404  0.53681489]]
block_s_inv [[8.77966698e-03 0.00000000e+00 0.00000000e+00]
 [0.00000000e+00 9.95567475e-02 0.00000000e+00]
 [0.00000000e+00 0.00000000e+00 1.78760386e+01]]
block_v.transpose() [[-0.85533858 -0.24116714 -0.45851317]
 [-0.51562151  0.48222298  0.70823404]
 [-0.05030282 -0.84219915  0.53681489]]


                                                                                

block_v.multiply(block_s_inv).multiply(block_v.transpose()) [[ 0.078125  0.734375 -0.515625]
 [ 0.734375 12.703125 -8.046875]
 [-0.515625 -8.046875  5.203125]]
XtX_inv [[ 0.078125  0.734375 -0.515625]
 [ 0.734375 12.703125 -8.046875]
 [-0.515625 -8.046875  5.203125]]


array([[ 0.5],
       [ 2.5],
       [-1.5]])