In [6]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import DenseVector
from pyspark.sql.functions import col, expr, sqrt
import numpy as np
import math
import time
import pandas as pd
import statsmodels.api as sm
import scipy.stats as stats
from scipy.stats import t, f
import matplotlib.pyplot as plt  # Diesen Import hinzufügen

In [7]:

spark = SparkSession.builder.master("spark://spark-master:7077").appName("Test").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [8]:

def backward_substitution(A_dict, y):
    n = len(A_dict)
    b = np.zeros(n)

    for i in range(n - 1, -1, -1):
        row = A_dict.get(i)
        if row is None:
            raise ValueError(f"No row found in A_rdd for index {i}")
        if row[i] == 0:
            raise ValueError("Matrix A contains a zero on the diagonal; no unique solution possible.")
        
        b[i] = (y[i] - np.dot(row[i+1:], b[i+1:])) / row[i]

    return b

def gram_schmidt(X_df):
    X_rdd = X_df.rdd.cache()
    m = len(X_df.first().features)
    Q = []
    R = np.zeros((m, m))

    for j in range(m):
        v = X_rdd.map(lambda row: row.features[j]).collect()
        
        for i in range(j):
            R[i, j] = np.dot(Q[i], v)
            v -= R[i, j] * Q[i]

        R[j, j] = np.linalg.norm(v)
        Q.append(v / R[j, j])
    
    R_dict = {i: R[i, :] for i in range(m)}

    Q_df = spark.createDataFrame([Row(features=DenseVector(q)) for q in zip(*Q)])

    return Q_df, R_dict

def create_data(n, p, beta_true):
    np.random.seed(42)
    X = np.random.rand(n, p)
    X = np.column_stack([np.ones(X.shape[0]), X])
    y = X @ beta_true + np.random.randn(n) * 0.1
    data = [Row(y=float(y_i), **{f"x{i}": float(x_i) for i, x_i in enumerate(x)}) for x, y_i in zip(X, y)]
    df = spark.createDataFrame(data).repartition(100)
    feature_columns = [f"x{i}" for i in range(p + 1)]  # Include intercept column
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    return assembler.transform(df).select("features", "y").repartition(100)

def linear_regression_manual_qr(X, y):
    start_time = time.time()
    Q, R = gram_schmidt(X)
    Qt_y = np.array(Q.rdd.map(lambda row: sum(row.features[i] * y[i] for i in range(len(row.features)))).collect())
    beta = backward_substitution(R, Qt_y)

    # Calculate additional metrics
    X_mat = np.array(X.rdd.map(lambda row: row.features).collect())  # Collect the features matrix as a numpy array
    n, k = X_mat.shape
    k -= 1  # -1 to account for the intercept column

    df_residuals = n - k - 1  # Degrees of freedom
    XtX = np.dot(X_mat.T, X_mat)
    XtX_inv = np.linalg.inv(XtX)

    # Predicted y values
    y_pred = np.dot(X_mat, beta)
    residuals = y - y_pred

    # R-squared and adjusted R-squared
    r_quadrat = 1 - (np.sum(residuals ** 2) / np.sum((y - np.mean(y)) ** 2))
    adjusted_r_quadrat = 1 - ((n - 1) / df_residuals) * (np.sum(residuals ** 2) / np.sum((y - np.mean(y)) ** 2))

    # Sum of Squared Errors (SSE), Sum of Squared Regression (SSR)
    SSE = np.sum(residuals ** 2)
    SSR = np.sum((y_pred - np.mean(y)) ** 2)
    sigma_hat = np.sqrt(SSE / df_residuals)

    # F-statistics
    f_statistics = (SSR / k) / (SSE / df_residuals)
    f_p_wert = stats.f.sf(f_statistics, k, df_residuals)

    # Log-likelihood, AIC, BIC
    log_likelihood = -n / 2 * (np.log(2 * np.pi) + np.log(sigma_hat ** 2) + SSE / (n * sigma_hat ** 2))
    AIC = 2 * k - 2 * log_likelihood
    BIC = np.log(n) * k - 2 * log_likelihood

    # Covariance matrix and standard errors
    cov_matrix = (sigma_hat ** 2) * XtX_inv
    se = np.sqrt(np.diag(cov_matrix))
    t_werte = beta / se
    p_werte = 2 * (1 - stats.t.cdf(np.abs(t_werte), df_residuals))

    # Confidence intervals
    alpha = 0.05
    t_crit = stats.t.ppf(1 - alpha / 2, df=df_residuals)
    conf_int_lower = beta - t_crit * se
    conf_int_upper = beta + t_crit * se

    # Omnibus test, Durbin-Watson statistic, Jarque-Bera test
    skewness = stats.skew(residuals)
    kurtosis = stats.kurtosis(residuals, fisher=False)
    omnibus_stat = (n / 6) * (skewness**2 + ((kurtosis - 3)**2) / 4)
    prob_omnibus = 1 - stats.chi2.cdf(omnibus_stat, df=2)
    dw_statistic = np.sum(np.diff(residuals) ** 2) / np.sum(residuals ** 2)
    jarque_bera_stat = (n / 6) * (skewness**2 + (kurtosis - 3)**2 / 4)
    prob_jb = 1 - stats.chi2.cdf(jarque_bera_stat, df=2)

    end_time = time.time()
    elapsed_time = end_time - start_time

    result = [
        {'Dep. Variable': 'y',
         'Method': 'Linear Regression with QR Decomposition',
         'Observations': n,
         'df': df_residuals,
         'Variables': k},
        {'R-Squared': r_quadrat,
         'Adj. R-Squared': adjusted_r_quadrat,
         'F-statistic': f_statistics,
         'Prob (F-statistic)': f_p_wert,
         'Log-Likelihood': log_likelihood,
         'AIC': AIC,
         'BIC': BIC},
        {'coef': beta,
         'std err': se,
         't': t_werte,
         'P>abs(t)': p_werte,
         'lower boundary': conf_int_lower,
         'upper boundary': conf_int_upper},
        {'Ombibus': omnibus_stat,
         'Prob(Omnibus)': prob_omnibus,
         'Skew': skewness,
         'Kurtosis': kurtosis,
         'Durbin-Watson': dw_statistic,
         'Jarque-Bera (JB)': jarque_bera_stat,
         'Prob(JB)': prob_jb}
    ]

    return result, elapsed_time

def run_benchmark(n_list, repetitions=5):
    results = []
    beta_true = [-8, -1.6, 4.1, -10, -9.2, 1.3, 1.6, 2.3]
    p = 7

    for n in n_list:
        times = []
        for _ in range(repetitions):
            X_y_df = create_data(n, p, beta_true)
            X = X_y_df.select("features")
            y = X_y_df.select("y").rdd.flatMap(lambda x: x).collect()
            result, elapsed_time = linear_regression_manual_qr(X, y)
            times.append(elapsed_time)

        avg_time = sum(times) / repetitions
        std_time = (sum((x - avg_time) ** 2 for x in times) / repetitions) ** 0.5
        results.append([n, avg_time, std_time, result])
        print("\nData Rows:", n)
        print("Average Run Time:", avg_time)

    return results



In [9]:
n_values = [200000, 500000, 1000000, 5000000, 10000000]
benchmark_results = run_benchmark(n_values)

                                                                                


Data Rows: 200000
Average Run Time: 30.760185050964356


                                                                                


Data Rows: 500000
Average Run Time: 43.66273641586304


                                                                                


Data Rows: 1000000
Average Run Time: 68.14592761993408


24/10/18 13:20:24 ERROR Inbox: An error happened while processing message in the inbox for CoarseGrainedScheduler
java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.se

KeyboardInterrupt: 



In [None]:
n_values = [5000000, 10000000]
benchmark_results = run_benchmark(n_values)

