# Data Preparation

In [None]:
import pandas as pd
import numpy as np

data_url = "http://lib.stat.cmu.edu/datasets/boston"
boston_pd = pd.read_csv(data_url, sep="\s+", skiprows=22, header=None)
boston_pd = boston_pd.fillna(boston_pd.mean())

new_column_names = {0: 'x1', 1: 'x2', 2: 'x3', 3: 'x4', 4: 'x5', 5: 'x6', 6: 'x7', 7: 'x8', 8: 'x9', 9: 'x10', 10: 'target'}

boston_pd.rename(columns=new_column_names, inplace=True)

boston_pd.head(5)

In [None]:
# split into data and label arrays 
y = boston_pd['target']
X = boston_pd.drop(['target'], axis=1)

# create training (~80%) and test data sets
X_train = X[:400]
X_test = X[400:]
y_train = y[:400]
y_test = y[400:]

# Single Thread

In [None]:
import time
from sklearn.linear_model import LinearRegression

# Create a LinearRegression model
lr = LinearRegression()

# Measure the time it takes to fit the model
start_time = time.time()
model = lr.fit(X_train, y_train)
end_time = time.time()

# Calculate the time taken for fitting
fit_time = end_time - start_time
print(f"Fitting Time: {fit_time} seconds")

# Measure the time it takes to make predictions
start_time = time.time()
y_pred = model.predict(X_test)
end_time = time.time()

# Calculate the time taken for prediction
predict_time = end_time - start_time
print(f"Prediction Time: {predict_time} seconds")

# Native Spark

In [None]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Regression") \
    .getOrCreate()


In [None]:
from pyspark.ml.feature import VectorAssembler

# convert to a Spark data frame
boston_sp = spark.createDataFrame(boston_pd)
#display(boston_sp.take(5))

# split into training and test spark data frames
boston_train = spark.createDataFrame(boston_pd[:400])
boston_test = spark.createDataFrame(boston_pd[400:])

# convert to vector representation for MLlib
assembler = VectorAssembler(inputCols= boston_train.schema.names[:(boston_pd.shape[1] - 1)],  
                                                                        outputCol="features" )
boston_train = assembler.transform(boston_train).select('features', 'target') 
boston_test = assembler.transform(boston_test).select('features', 'target') 

# display(boston_train.take(5))

# linear regresion with Spark
from pyspark.ml.regression import LinearRegression

# linear regression 
lr = LinearRegression(maxIter=10, regParam=0.1, 
                      elasticNetParam=0.5, labelCol="target")

# Fit the model
start_time = time.time()
model = lr.fit(boston_train)
end_time = time.time()
fit_time = end_time - start_time
print(f"Fitting Time: {fit_time} seconds")

start_time = time.time()
boston_pred = model.transform(boston_test)
end_time = time.time()
predict_time = end_time - start_time
print(f"Prediction Time: {predict_time} seconds")

# calculate results 
r = boston_pred.stat.corr("prediction", "target")
print("R-squared: " + str(r**2))

Source Code: https://towardsdatascience.com/3-methods-for-parallelization-in-spark-6a1a4333b473

In [None]:
import pandas as pd
df = pd.read_csv("/Users/muhamadsyukron/Main Folder/Mac 2023 Files/DeepNeuron/HPC_DL_Collab_Lab/organizations-2000000.csv")

In [None]:
df = df[['Founded', 'Number of employees']]

In [None]:
df = df.rename(columns={'Founded': 'x', 'Number of employees': 'y'})

In [None]:
# import pandas as pd
# df = pd.read_csv("/Users/muhamadsyukron/Main Folder/Mac 2023 Files/DeepNeuron/HPC_DL_Collab_Lab/data.csv")
# data = [(row['x'], row['y']) for _, row in df.iterrows()]

In [1]:
import pandas as pd
df = pd.read_csv('/Users/muhamadsyukron/Main Folder/Mac 2023 Files/DeepNeuron/HPC_DL_Collab_Lab/linear_regression_data.csv')

In [2]:
df = df.rename(columns={'X': 'x', 'Y': 'y'})

In [3]:
from pyspark import SparkContext, SparkConf
import pandas as pd
import time

# Initialize Spark
conf = SparkConf().setAppName("RDDLinearRegression")
sc = SparkContext(conf=conf)

#df = pd.read_csv("/Users/muhamadsyukron/Main Folder/Mac 2023 Files/DeepNeuron/HPC_DL_Collab_Lab/data.csv")
data = [(row['x'], row['y']) for _, row in df.iterrows()]
data_rdd = sc.parallelize(data)

# Define the number of iterations and learning rate
num_iterations = 50
learning_rate = 0.01

# Initialize the weights (slope and intercept)
weights = (0.0, 0.0)

# Perform gradient descent to learn the linear regression coefficients
start_time = time.time()
for _ in range(num_iterations):
    # Compute the gradients for the weights
    gradients = data_rdd.map(lambda data_point: (
        -2 * data_point[0] * (data_point[1] - (weights[0] * data_point[0] + weights[1])),
        -2 * (data_point[1] - (weights[0] * data_point[0] + weights[1]))
    )).reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]))

    # Update the weights using the gradients and learning rate
    weights = (weights[0] - learning_rate * gradients[0], weights[1] - learning_rate * gradients[1])

end_time = time.time()
predict_time = end_time - start_time
print(f"Training Time: {predict_time} seconds")

# The final weights represent the learned linear regression coefficients
slope, intercept = weights
print(f"Linear Regression Model: y = {slope} * x + {intercept}")

# Stop SparkContext
sc.stop()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/12 15:41:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/12 15:41:38 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Training Time: 19.382373094558716 seconds
Linear Regression Model: y = -1.3283691617368033e+242 * x + -2.0057847463742946e+241


In [4]:
import pandas as pd
import time

# Load your CSV data into a Pandas DataFrame
#df = pd.read_csv("/Users/muhamadsyukron/Main Folder/Mac 2023 Files/DeepNeuron/HPC_DL_Collab_Lab/data.csv")

# Convert the DataFrame to a list of tuples
#data = [(row['x'], row['y']) for _, row in df.iterrows()]

# Define the number of iterations and learning rate
num_iterations = 50
learning_rate = 0.01

# Initialize the weights (slope and intercept)
weights = (0.0, 0.0)

# Perform gradient descent to learn the linear regression coefficients
start_time = time.time()
for _ in range(num_iterations):
    # Compute the gradients for the weights
    gradient_sum = [0.0, 0.0]
    for data_point in data:
        gradient_sum[0] += -2 * data_point[0] * (data_point[1] - (weights[0] * data_point[0] + weights[1]))
        gradient_sum[1] += -2 * (data_point[1] - (weights[0] * data_point[0] + weights[1]))

    # Update the weights using the gradients and learning rate
    weights = (weights[0] - learning_rate * gradient_sum[0], weights[1] - learning_rate * gradient_sum[1])

end_time = time.time()
training_time = end_time - start_time
print(f"Training Time: {training_time} seconds")

# The final weights represent the learned linear regression coefficients
slope, intercept


Training Time: 6.557524919509888 seconds


(-1.3283691617368033e+242, -2.0057847463742946e+241)

In [5]:
from pyspark import SparkContext, SparkConf
import pandas as pd
import time

# Initialize Spark
conf = SparkConf().setAppName("RDDLinearRegression2").setMaster("local[4]")
sc = SparkContext(conf=conf)

#df = pd.read_csv("/Users/muhamadsyukron/Main Folder/Mac 2023 Files/DeepNeuron/HPC_DL_Collab_Lab/data.csv")
data = [(row['x'], row['y']) for _, row in df.iterrows()]
data_rdd = sc.parallelize(data)

# Define the number of iterations and learning rate
num_iterations = 50
learning_rate = 0.01

# Initialize the weights (slope and intercept)
weights = (0.0, 0.0)

# Perform gradient descent to learn the linear regression coefficients
start_time = time.time()
for _ in range(num_iterations):
    # Compute the gradients for the weights
    gradients = data_rdd.map(lambda data_point: (
        -2 * data_point[0] * (data_point[1] - (weights[0] * data_point[0] + weights[1])),
        -2 * (data_point[1] - (weights[0] * data_point[0] + weights[1]))
    )).reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]))

    # Update the weights using the gradients and learning rate
    weights = (weights[0] - learning_rate * gradients[0], weights[1] - learning_rate * gradients[1])

end_time = time.time()
predict_time = end_time - start_time
print(f"Training Time: {predict_time} seconds")

# The final weights represent the learned linear regression coefficients
slope, intercept = weights
print(f"Linear Regression Model: y = {slope} * x + {intercept}")

# Stop SparkContext
sc.stop()




23/09/12 15:45:45 WARN TaskSetManager: Stage 0 contains a task of very large size (1009 KiB). The maximum recommended task size is 1000 KiB.
23/09/12 15:45:45 WARN TaskSetManager: Stage 1 contains a task of very large size (1009 KiB). The maximum recommended task size is 1000 KiB.
23/09/12 15:45:46 WARN TaskSetManager: Stage 2 contains a task of very large size (1009 KiB). The maximum recommended task size is 1000 KiB.
23/09/12 15:45:46 WARN TaskSetManager: Stage 3 contains a task of very large size (1009 KiB). The maximum recommended task size is 1000 KiB.
23/09/12 15:45:46 WARN TaskSetManager: Stage 4 contains a task of very large size (1009 KiB). The maximum recommended task size is 1000 KiB.
23/09/12 15:45:47 WARN TaskSetManager: Stage 5 contains a task of very large size (1009 KiB). The maximum recommended task size is 1000 KiB.
23/09/12 15:45:47 WARN TaskSetManager: Stage 6 contains a task of very large size (1009 KiB). The maximum recommended task size is 1000 KiB.
23/09/12 15:4

Training Time: 17.205900192260742 seconds
Linear Regression Model: y = -1.328369161736794e+242 * x + -2.0057847463742934e+241


In [8]:
from pyspark import SparkContext, SparkConf
import pandas as pd
import time

# Initialize Spark
conf = SparkConf().setAppName("RDDLinearRegression3").setMaster("local[1]")
sc = SparkContext(conf=conf)

#df = pd.read_csv("/Users/muhamadsyukron/Main Folder/Mac 2023 Files/DeepNeuron/HPC_DL_Collab_Lab/data.csv")
data = [(row['x'], row['y']) for _, row in df.iterrows()]
data_rdd = sc.parallelize(data)

# Define the number of iterations and learning rate
num_iterations = 50
learning_rate = 0.01

# Initialize the weights (slope and intercept)
weights = (0.0, 0.0)

# Perform gradient descent to learn the linear regression coefficients
start_time = time.time()
for _ in range(num_iterations):
    # Compute the gradients for the weights
    gradients = data_rdd.map(lambda data_point: (
        -2 * data_point[0] * (data_point[1] - (weights[0] * data_point[0] + weights[1])),
        -2 * (data_point[1] - (weights[0] * data_point[0] + weights[1]))
    )).reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]))

    # Update the weights using the gradients and learning rate
    weights = (weights[0] - learning_rate * gradients[0], weights[1] - learning_rate * gradients[1])

end_time = time.time()
predict_time = end_time - start_time
print(f"Training Time: {predict_time} seconds")

# The final weights represent the learned linear regression coefficients
slope, intercept = weights
print(f"Linear Regression Model: y = {slope} * x + {intercept}")

# Stop SparkContext
sc.stop()




23/09/12 15:47:42 WARN TaskSetManager: Stage 0 contains a task of very large size (3924 KiB). The maximum recommended task size is 1000 KiB.
23/09/12 15:47:43 WARN TaskSetManager: Stage 1 contains a task of very large size (3924 KiB). The maximum recommended task size is 1000 KiB.
23/09/12 15:47:43 WARN TaskSetManager: Stage 2 contains a task of very large size (3924 KiB). The maximum recommended task size is 1000 KiB.
23/09/12 15:47:44 WARN TaskSetManager: Stage 3 contains a task of very large size (3924 KiB). The maximum recommended task size is 1000 KiB.
23/09/12 15:47:44 WARN TaskSetManager: Stage 4 contains a task of very large size (3924 KiB). The maximum recommended task size is 1000 KiB.
23/09/12 15:47:45 WARN TaskSetManager: Stage 5 contains a task of very large size (3924 KiB). The maximum recommended task size is 1000 KiB.
23/09/12 15:47:45 WARN TaskSetManager: Stage 6 contains a task of very large size (3924 KiB). The maximum recommended task size is 1000 KiB.
23/09/12 15:4

Training Time: 18.465419054031372 seconds
Linear Regression Model: y = -1.3283691617368772e+242 * x + -2.0057847463744057e+241


- 1 cores: 18 s
- 4 cores: 17 s
- 7 cores: 17 s

In [9]:
from pyspark import SparkContext, SparkConf
import pandas as pd
import time

# Initialize Spark
conf = SparkConf().setAppName("RDDLinearRegression2").setMaster("local[7]")
sc = SparkContext(conf=conf)

#df = pd.read_csv("/Users/muhamadsyukron/Main Folder/Mac 2023 Files/DeepNeuron/HPC_DL_Collab_Lab/data.csv")
data = [(row['x'], row['y']) for _, row in df.iterrows()]
data_rdd = sc.parallelize(data)

# Define the number of iterations and learning rate
num_iterations = 50
learning_rate = 0.01

# Initialize the weights (slope and intercept)
weights = (0.0, 0.0)

# Perform gradient descent to learn the linear regression coefficients
start_time = time.time()
for _ in range(num_iterations):
    # Compute the gradients for the weights
    gradients = data_rdd.map(lambda data_point: (
        -2 * data_point[0] * (data_point[1] - (weights[0] * data_point[0] + weights[1])),
        -2 * (data_point[1] - (weights[0] * data_point[0] + weights[1]))
    )).reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]))

    # Update the weights using the gradients and learning rate
    weights = (weights[0] - learning_rate * gradients[0], weights[1] - learning_rate * gradients[1])

end_time = time.time()
predict_time = end_time - start_time
print(f"Training Time: {predict_time} seconds")

# The final weights represent the learned linear regression coefficients
slope, intercept = weights
print(f"Linear Regression Model: y = {slope} * x + {intercept}")

# Stop SparkContext
sc.stop()




                                                                                

Training Time: 17.27907681465149 seconds
Linear Regression Model: y = -1.328369161736819e+242 * x + -2.0057847463743227e+241


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import time
from pyspark import SparkContext, SparkConf


data = pd.read_csv("data.csv")
# y = mx + b
# E = -2/n sum(yi - (mxi + b))


def grad_desc(m_now, b_now, data, L):
    m_grad = 0
    b_grad = 0

    N = len(data)
    for i in range(N):
        x = data.iloc[i].x
        y = data.iloc[i].y

        m_grad += -(2 / N) * x * (y - (m_now * x + b_now))
        b_grad += -(2 / N) * (y - (m_now * x + b_now))
    m = m_now - m_grad * L
    b = b_now - b_grad * L
    return m, b


m = 0
b = 0
L = 0.0001
epochs = 100

start_time = time.time()

for i in range(epochs):
    m, b = grad_desc(m, b, data, L)


end_time = time.time()
predict_time = end_time - start_time
print(f"Training Time: {predict_time} seconds")

# plt.scatter(data.x, data.y, color="black")
# plt.plot(list(range(50)), ([m * x + b for x in range(50)]), color="red")
# plt.show()