In [None]:
!pip install PySpark
!pip install yfinance

Collecting PySpark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: PySpark
  Building wheel for PySpark (setup.py) ... [?25l[?25hdone
  Created wheel for PySpark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285397 sha256=38b13451987a35131a51bb0f58a5463e5ffd706f85f9b3c45d5fc75ae4f0e11a
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built PySpark
Installing collected packages: PySpark
Successfully installed PySpark-3.4.1


In [None]:
from sklearn.preprocessing import MinMaxScaler
from pyspark import SparkContext
import numpy as np
import pandas as pd
import yfinance as yf

class SVMRegression:

    def __init__(self, learning_rate=1e-5, lambda_param=1e-5, n_iters=1000):
        self.lr = learning_rate
        self.lambda_param = lambda_param
        self.n_iters = n_iters
        self.w = None
        self.b = None

    def fit(self, X, y, sc=None):
        # Create a SparkContext
        if sc is None:
          sc = SparkContext(appName="SVMRegressionMapReduce")

        # Broadcast the variables to all nodes for read-only access
        lr_broadcast = sc.broadcast(self.lr)
        lambda_param_broadcast = sc.broadcast(self.lambda_param)

        n_samples, n_features = X.shape

        # Initialize weights
        self.w = np.zeros(n_features)
        self.b = 0

        # Convert X and y to RDDs
        X_rdd = sc.parallelize(X.tolist())
        y_rdd = sc.parallelize(y.tolist())

        for _ in range(self.n_iters):
          # Perform the distributed gradient updates using MapReduce
          updates = X_rdd.zip(y_rdd).map(lambda x_y: self._gradient_update(x_y, lr_broadcast.value, lambda_param_broadcast.value))
          # Aggregate the updates using reduce
          total_update = updates.reduce(lambda acc, update: (acc[0] + update[0], acc[1] + update[1]))
          # Apply the aggregated updates to the weights and bias
          self.w -= total_update[0]
          self.b -= total_update[1]

        # Stop the SparkContext
        sc.stop()

    def _gradient_update(self, x_y, lr, lambda_param):
        x_i = np.array(x_y[0])
        y_i = x_y[1]
        prediction = np.dot(x_i, self.w) - self.b
        error = y_i - prediction

        # Calculate the gradient update for weights and bias
        w_update = lr * (2 * lambda_param * self.w - np.dot(x_i, error))
        b_update = lr * (-2 * lambda_param * self.b + error)

        return (w_update, b_update)

    def predict(self, X):
        return np.dot(X, self.w) - self.b

def load_dataset():
  # Replace the stock symbol for respective ticker interested in comparing
  stock_symbol = 'AAPL' # AAPL AMZN NVDA CNRG TAN BTC AMD INTC
  df = yf.download(tickers=stock_symbol, period = '5y', interval = '1d') # Get data for the last 5 years w/ interval set to 1 day for all cases
  type(df)

  # Train/Test
  data_training=pd.DataFrame(df['Close'][0:int(len(df)*0.70)])
  data_testing=pd.DataFrame(df['Close'][int(len(df)*0.70):int(len(df))])

  scaler=MinMaxScaler(feature_range=(0,1))

  data_training_array=scaler.fit_transform(data_training)
  data_training_array
  data_training_array.shape

  X_train=[]
  y_train=[]

  for i in range(100, data_training_array.shape[0]):
    X_train.append(data_training_array[i-100:i])
    y_train.append(data_training_array[i, 0])
  X_train, y_train=np.array(X_train), np.array(y_train)

  X_train = X_train.reshape(X_train.shape[0], -1)

  data_testing_array=scaler.fit_transform(data_testing)
  data_testing_array

  data_testing_array.shape

  X_test=[]
  y_test=[]

  for i in range(100, data_testing_array.shape[0]):
    X_test.append(data_testing_array[i-100:i])
    y_test.append(data_testing_array[i, 0])
  X_test, y_test=np.array(X_test), np.array(y_test)

  X_test = X_test.reshape(X_test.shape[0], -1)

  return X_train, y_train, X_test, y_test

def mean_squared_error(y_true, y_pred):
  mse = np.mean((y_true - y_pred) ** 2)
  return mse

def mean_absolute_error(y_true, y_pred):
  mae = np.mean(np.abs(y_true - y_pred))
  return mae

def r_squared(y_true, y_pred):
  y_mean = np.mean(y_true)
  ss_total = np.sum((y_true - y_mean) ** 2)
  ss_residual = np.sum((y_true - y_pred) ** 2)
  r2 = 1 - (ss_residual / ss_total)
  return r2

X_train, y_train, X_test, y_test = load_dataset()
clf = SVMRegression()
clf.fit(X_train, y_train)
predictions = clf.predict(X_test)
print("MSE regression mean squared error:", mean_squared_error(y_test, predictions))
print("MAE regression mean absolute error:", mean_absolute_error(y_test, predictions))
print("R2 score:", r_squared(y_test, predictions))

[*********************100%***********************]  1 of 1 completed
MSE regression mean squared error: 0.018420207803019405
MAE regression mean absolute error: 0.11903219712068351
R2 score: 0.687568098988488
