In [10]:
#import findspark
#findspark.init()

# Import the required libraries and modules

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.mllib.random import RandomRDDs
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, IntegerType
from pyspark.mllib.linalg.distributed import RowMatrix, IndexedRowMatrix
from pyspark.ml.regression import LinearRegression
from pyspark.ml.functions import array_to_vector
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
from pyspark.mllib.linalg import DenseMatrix
import time
import numpy as np
import seaborn as sns
import csv
import pandas as pd
from scipy.linalg import lu, lu_factor, lu_solve # is used for LU decomposition
from sklearn.metrics import mean_absolute_error

# Test Runs

Introduction

The following script varies the cluster nodes, the n rows and the k columns, and the regression estimator to compare the different regression estimators. The outermost loop iterates over the cluster nodes and configures the Spark session. The inner loops iterate over the different combinations of n and k. Within the inner loop, the dataset is additionally simulated and applied based on the flags (do_Spark, do_QR ...) from the different regression estimators.
Finally, still within the innermost loop, the measured execution times as well as the parameters are stored in a csv file.

In [16]:
spark.stop()

In [21]:
# Festlegen des Regressionsschätzers

do_Spark=False
do_QR=False
do_SVD=False
do_LU=True

# Festlegen der parallelen Clusterknoten
n_jobs_a = [64]#, 32, 8, 1]

# Anzahl der Wiederholungen für jede OLS-Schätzung bei den gleichen Parametern für n und k
numtries = 10

for n_jobs in n_jobs_a:
    # Initialization and configuration of the Spark session.
    # Spark is run in yarn-cluster distributed cluster mode.
    # In summary, the clusters are configured, resource and memory management are set
    spark = SparkSession.builder \
                        .master('yarn') \
                        .appName('DBiBD') \
                        .config("spark.driver.port", "55500")\
                        .config("spark.driver.memory", "15g")\
                        .config("spark.driver.blockManager.port", "55501")\
                        .config("spark.executor.instances", n_jobs)\
                        .config("spark.executor.cores", 1)\
                        .config("spark.executor.memory", "15g")\
                        .config("spark.executor.memoryOverhead", "15g")\
                        .getOrCreate()

    # Parameters for the possible dimensions of the data matrix
    perms = [
        # {"n": 1e2, "k": [2, 5, 10, 20]},
        # {"n": 3e2, "k": [2, 5, 10, 20]},
        {"n": 6e3, "k": [2, 5, 10, 20]},
        # {"n": 1e4, "k": [2, 5, 10, 20]},
        # {"n": 3e4, "k": [2, 5, 10, 20]},
        # {"n": 6e4, "k": [2, 5, 10, 20]},
        # {"n": 1e5, "k": [2, 5, 10, 20]},
        # {"n": 3e5, "k": [2, 5, 10, 20]},
        # {"n": 6e5, "k": [2, 5, 10, 20]},
        # {"n": 1e6, "k": [2, 5, 10, 20]},
        # {"n": 6e6, "k": [2, 5, 10, 20]},
        # {"n": 1e7, "k": [2, 5, 10, 20]},
        # {"n": 6e7, "k": [2, 5, 10, 20]},
        # {"n": 1e9, "k": [2, 5, 10, 20]},
    ]
    perms.reverse()

    # Loop over the parameter list perms
    for nk in perms:
        # number of rows/samples
        n = int(nk["n"])
        for k in nk["k"]:
            # number of colums/features
            k = int(k)
            # # Generate random coefficients (betas) and covariances (cov)
            betas = (np.random.rand(k)-0.5)*20
            cov = (np.random.rand(k)-0.5)*20
            betas = (np.random.rand(k)-0.5)*20
            cov = (np.random.rand(k)-0.5)*20

            # Generate dataset
            data = RandomRDDs.normalVectorRDD(spark.sparkContext, n, k)

            def createRow(noise):
                x = []
                x.append(noise[0])
                for i in range(1, len(noise)):
                    x.append((x[0]*cov[i])+noise[i])
                x = [float(a) for a in x]

                y = 0
                for i in range(0, len(x)):
                    y += x[i]*betas[i]

                return x, float(y)
            data = data.map(createRow)
            dataMatrix = RowMatrix(data.map(lambda x: x[0]))
            schema = StructType([
                StructField('features', ArrayType(FloatType()), True),
                StructField('y', FloatType(), True)
            ])

            dataDF = data.toDF(schema=schema)
            print("n_jobs =", n_jobs, "n =", n, "k =", k)
            
            # Iteration over the required repetitions.
            # repetitions for the same calculation serves to ensure certain variance and to take into account statistical uncertainty
            for numtry in range(numtries):
                
                # PySpark Linear Regression
                if do_Spark:
                    start_time = time.time()
                    lr = LinearRegression(featuresCol="features", labelCol="y", predictionCol="pred_y",regParam=0.001)
                    lr_model = lr.fit(dataDF.withColumn("features", array_to_vector('features')))
                    time_PySpark = time.time() - start_time
                    
                # QR
                if do_QR:
                    start_time = time.time()
                    QR = dataMatrix.tallSkinnyQR(True)
                    R = np.asmatrix(QR.R.toArray())
                    R_inv = np.linalg.inv(R)
                    cm = CoordinateMatrix(
                        QR.Q.rows.zipWithIndex().flatMap(
                            lambda x: [MatrixEntry(x[1], j, v) for j, v in enumerate(x[0])]
                        )
                    )
                    Q_T = cm.transpose().toRowMatrix()
                    y = DenseMatrix(n, 1, dataDF.select("y").toPandas().to_numpy().ravel())
                    step1 = Q_T.multiply(y).rows.collect()
                    step1 = np.array(step1)
                    step2 = np.matmul(R_inv, step1)
                    time_QR = time.time() - start_time
                # SVD
                if do_SVD:
                    start_time = time.time()
                    svd = dataMatrix.computeSVD(k, computeU=True)
                    cm = CoordinateMatrix(
                        svd.U.rows.zipWithIndex().flatMap(
                            lambda x: [MatrixEntry(x[1], j, v) for j, v in enumerate(x[0])]
                        )
                    )
                    U_T = cm.transpose().toRowMatrix()

                    step1 = U_T.multiply(y).rows.collect()
                    step2 = (np.array(step1).ravel()/svd.s)
                    v = np.matrix(svd.V.toArray())

                    SVD_coeeffs = (v @ step2).ravel()
                    time_SVD = time.time() - start_time
                
                #LU
                if do_LU:
                    start_time = time.time()
                    def luSpark(part):
                        # Die Größe der Partition ermitteln
                        partition_n = len(part)
                        
                        # Array Features in nparray schreiben damit die struktur mit .T transponiert werden kann
                        lufeatures = np.array([b for b in part["features"].to_numpy()])
                        ypanda = part["y"].to_numpy()

                        # Durchführen der LU-Composition
                        LUtemp, pivtemp = lu_factor(lufeatures.T @ lufeatures)
                        lubetas = lu_solve((LUtemp, pivtemp), lufeatures.T @ ypanda)
                        
                        # Eine DataFrame mit den geschätzten Betas und der Anzahl der Beobachtungen erstellen
                        return pd.DataFrame({"betas": [lubetas], "sampleCounts": [partition_n]})
                    
                    schemaUDF=StructType([
                        StructField("betas",ArrayType(FloatType())),
                        StructField("sampleCounts",IntegerType())
                    ])
                    LU_res = dataDF.groupBy(F.spark_partition_id()).applyInPandas(luSpark,schema=schemaUDF)
                    try:
                        LU_betas = pd.DataFrame(LU_res.rdd.map(lambda x : [(x["sampleCounts"]/n) * xi for xi in x["betas"]]).collect()).sum().to_numpy()
                    except Exception as e:
                        print (e)
                    
                    time_LU=time.time() - start_time
                    if numtry == 0:
                        print("LU MAE: ",mean_absolute_error(betas,LU_betas))

                # Saving the results to results.csv 
                with open('cgi/DAiBD/results.csv', 'a') as f:
                    writer = csv.writer(f)
                    if do_Spark:
                        writer.writerow([n_jobs, n, k, numtry, "PySpark", time_PySpark])
                    if do_QR:
                        writer.writerow([n_jobs, n, k, numtry, "QR", time_QR])
                    if do_SVD:
                        writer.writerow([n_jobs, n, k, numtry, "SVD", time_SVD])
                    if do_LU:
                        writer.writerow([n_jobs, n, k, numtry, "LU", time_LU])
    
    # Ending the Spark session
    spark.stop()


n_jobs = 64 n = 6000 k = 2


                                                                                

LU MAE:  1.4370528813641842e-05


                                                                                

n_jobs = 64 n = 6000 k = 5


                                                                                

LU MAE:  0.00011964710042418058


                                                                                

n_jobs = 64 n = 6000 k = 10
LU MAE:  1.820609616253188e-05


                                                                                

n_jobs = 64 n = 6000 k = 20


                                                                                

LU MAE:  0.00012059048529337556


                                                                                