In [None]:
!pip install pyspark

In [25]:
import numpy as np
from pyspark import SparkContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.stat import Statistics

In [26]:
#init sparkcontext
sc = SparkContext.getOrCreate()

#gen synthesized dstaset
def gen_synthetic_dataset():
  np.random.seed(42)
  rows = 1000
  cols = 3
  data = np.random.randn(rows, cols)
  as_strings = [','.join(map(str, row)) for row in data]
  return as_strings

dataset = gen_synthetic_dataset()
rdd = sc.parallelize(dataset)

In [27]:
from pyspark.mllib.regression import LabeledPoint

# Parse each line into a labeled point
def parse_line(line):
    parts = line.split(',')
    label = float(parts[-1])  # Last column as label
    features = Vectors.dense([float(x) for x in parts[:-1]])  # All but the last column as features
    return LabeledPoint(label, features)

vectorized_rdd = rdd.map(parse_line)

In [29]:
#standardize the vectorized dataset
mean_vector = Statistics.colStats(vectorized_rdd.map(lambda lp: lp.features)).mean()
centered_rdd = vectorized_rdd.map(lambda lp: LabeledPoint(lp.label, Vectors.dense(lp.features.toArray() - mean_vector)))
covariance_matrix = Statistics.corr(centered_rdd.map(lambda lp: lp.features), method="pearson")


#compute eigenvectors and eigenvalues
cov_matrix_np = np.array(covariance_matrix)
#eg_vc, eig_val
eig_val, eig_vec = np.linalg.eig(cov_matrix_np)
#sort the eig vcs based on eig vals
sorted_indices = np.argsort(eig_val)[::-1]
sorted_eig_vectors = eig_vec[:, sorted_indices]

In [35]:
#project the data  onto principle components
#rdd is using distributed computations behind the scene
k = 2
top_k_eig_vcs = sorted_eig_vectors[:, :k]
top_k_eig_vcs_broadcast = sc.broadcast(top_k_eig_vcs)


reduced_rdd = centered_rdd .map(lambda lp: LabeledPoint(lp.label, Vectors.dense(np.dot(top_k_eig_vcs_broadcast.value.T, lp.features))))

In [36]:
from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.evaluation import RegressionMetrics

# Split the data into training and test sets
train_rdd, test_rdd = reduced_rdd.randomSplit([0.8, 0.2], seed=42)

# Train a linear regression model
model = LinearRegressionWithSGD.train(train_rdd, iterations=100, step=0.01)

# Evaluate the model on test data
predictions_and_labels = test_rdd.map(lambda lp: (float(model.predict(lp.features)), lp.label))
metrics = RegressionMetrics(predictions_and_labels)

# Print the model's metrics
print(f"Mean Squared Error: {metrics.meanSquaredError}")
print(f"Root Mean Squared Error: {metrics.rootMeanSquaredError}")
print(f"R^2: {metrics.r2}")




Mean Squared Error: 1.064191499279945
Root Mean Squared Error: 1.031596577776383
R^2: -0.002543615442915259
