<a href="https://colab.research.google.com/github/YunlouTeng/Big_Data_Analysis_GCP/blob/main/multiple_linear_regression(Batch_Gradient_Descent).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#pip install --ignore-installed -q pyspark

[K     |████████████████████████████████| 281.4 MB 43 kB/s 
[K     |████████████████████████████████| 199 kB 49.3 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
from __future__ import print_function
import sys
from operator import add
from pyspark import SparkContext


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: wordcount <file> <output> ", file=sys.stderr)
        exit(-1)

In [None]:
import requests
import numpy as np

from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

from pyspark.sql.types import *
from pyspark.sql import functions as func
from pyspark.sql.functions import *

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)



In [None]:
data= sys.argv[1]
#data = '/content/drive/MyDrive/BU/cs777/taxi-data-sorted-small.csv.bz2'

In [None]:
schema = StructType() \
      .add("Taxi_id",StringType(),True) \
      .add("Driver_id",StringType(),True) \
      .add("pickup_datetime",TimestampType(),True) \
      .add("dropoff_datetime",TimestampType(),True) \
      .add("trip_time_in_secs",ShortType(),True) \
      .add("trip_distance",FloatType(),True) \
      .add("pickup_longitude",FloatType(),True) \
      .add("pickup_latitude",FloatType(),True) \
      .add("dropoff_longitude",FloatType(),True) \
      .add("dropoff_latitude",FloatType(),True) \
      .add("payment_type",StringType(),True) \
      .add("fare_amount",FloatType(),True) \
      .add("surcharge",FloatType(),True) \
      .add("mta_tax",FloatType(),True) \
      .add("tip_amount",FloatType(),True) \
      .add("toll_amount",FloatType(),True) \
      .add("toal_amount",FloatType(),True)

In [None]:
taxi = spark.read.format("csv") \
      .schema(schema) \
      .load(data)

###Data Clean-up Step

In [None]:
#remove all taxi rides that are less than 2 mins or more than 1 hour
taxi = taxi.filter((taxi.trip_time_in_secs > 120) & (taxi.trip_time_in_secs < 3600))
#Remove	all	taxi	rides	that	have	”fare	amount”	less	than	3	dollars or	more	than	200	dollars
taxi = taxi.filter((taxi.fare_amount > 3) & (taxi.fare_amount < 200))
#Remove	all	taxi	rides	that	have	”trip distance”	less	than	1	mile	or	more	than	50	miles
taxi = taxi.filter((taxi.trip_distance > 1) & (taxi.trip_distance < 50))
#Remove	all	taxi	rides	that	have	”tolls	amount”	less	than	3	dollars.
taxi = taxi.filter(taxi.toll_amount > 3)

In [None]:
X = np.array(taxi.select(["trip_time_in_secs","trip_distance","fare_amount","toll_amount"]).collect())
y = np.array(taxi.select("toal_amount").collect())

#scale the data
X = (X - X.mean()) / X.std()

In [None]:
class LinReg:
    
    # Initializing lr: learning rate, epochs: no. of iterations, 
    # weights & bias: parameters as None
    # default lr: 0.0001, epochs: 100
    def __init__(self, lr=0.0001, epochs=100):
      
        self.lr = lr
        self.epochs = epochs
        self.weights = None
        self.bias = None
    # Training function: fit
    def fit(self, X, y):
        # shape of X: (number of training examples: m, number of    
        # features: n)
        m, n = X.shape
        # Initializing weights as a matrix of zeros of size: (number
        # of features: n, 1) and bias as 0
        self.weights = np.zeros((n,1))
        self.bias = 0.1
        
        # reshaping y as (m,1) in case your dataset initialized as 
        # (m,) which can cause problems
        y = y.reshape(m,1)
        
        # empty lsit to store losses so we can plot them later 
        # against epochs
        losses = []
        weights = []
        bias = []
        
        # Gradient Descent loop/ Training loop
        for epoch in range(self.epochs):

            weights.append(self.weights)
            bias.append(self.bias)
        
            # Calculating prediction: y_hat or h(x)
            y_hat = np.dot(X, self.weights) + self.bias

     
            # Calculting loss
            loss = np.sum((y_hat - y)**2)
    
            # Appending loss in list: losses
            losses.append(loss)
            
    
            # Calculating derivatives of parameters(weights, and 
            # bias) 
            dw = (1/m)*np.dot(X.T, (y_hat - y))
            db = (1/m)*np.sum((y_hat - y))
   # Updating the parameters: parameter := parameter - lr*derivative
   # of loss/cost w.r.t parameter)
            
            self.bias -= self.lr*db
            self.weights -= self.lr*dw
        # returning the parameter so we can look at them later
        return self.weights, self.bias, weights, bias, losses
    # Predicting(calculating y_hat with our updated weights) for the 
    # testing/validation     
    def predict(self, X):
        return np.dot(X, self.weights) + self.bias

In [None]:
model = LinReg(epochs=100)

In [None]:
final_w, final_b, list_w, list_b, list_lost = model.fit(X,y)

In [None]:
print(list_w)

[array([[ 1.09535286],
       [-0.36797617],
       [-0.35121908],
       [-0.37415654]]), array([[ 1.09535286],
       [-0.36797617],
       [-0.35121908],
       [-0.37415654]]), array([[ 1.09535286],
       [-0.36797617],
       [-0.35121908],
       [-0.37415654]]), array([[ 1.09535286],
       [-0.36797617],
       [-0.35121908],
       [-0.37415654]]), array([[ 1.09535286],
       [-0.36797617],
       [-0.35121908],
       [-0.37415654]]), array([[ 1.09535286],
       [-0.36797617],
       [-0.35121908],
       [-0.37415654]]), array([[ 1.09535286],
       [-0.36797617],
       [-0.35121908],
       [-0.37415654]]), array([[ 1.09535286],
       [-0.36797617],
       [-0.35121908],
       [-0.37415654]]), array([[ 1.09535286],
       [-0.36797617],
       [-0.35121908],
       [-0.37415654]]), array([[ 1.09535286],
       [-0.36797617],
       [-0.35121908],
       [-0.37415654]]), array([[ 1.09535286],
       [-0.36797617],
       [-0.35121908],
       [-0.37415654]]), array([[ 

In [None]:
print(list_b)

[0.1, 0.1064871943661248, 0.11297115201728392, 0.11945187457031509, 0.12592936364124838, 0.13240362084530663, 0.13887464779690578, 0.14534244610965535, 0.1518070173963588, 0.15826836326901395, 0.16472648533881337, 0.17118138521614482, 0.17763306451059158, 0.18408152483093293, 0.1905267677851445, 0.19696879498039865, 0.20340760802306498, 0.20984320851871063, 0.21627559807210067, 0.2227047782871986, 0.22913075076716663, 0.2355535171143662, 0.2419730789303583, 0.24838943781590386, 0.2548025953709642, 0.2612125531947015, 0.2676193128854789, 0.27402287604086134, 0.2804232442576156, 0.28682041913171086, 0.29321440225831896, 0.2996051952318151, 0.3059927996457779, 0.31237721709299, 0.3187584491654383, 0.32513649745431455, 0.33151136355001565, 0.337883049042144, 0.344251555519508, 0.3506168845701223, 0.35697903778120843, 0.3633380167391949, 0.36969382302971787, 0.37604645823762134, 0.3823959239469577, 0.388742221740988, 0.3950853532021824, 0.4014253199122207, 0.40776212345199236, 0.41409576540

In [None]:
print(list_lost)

[11208843.880479729, 11198173.701413134, 11187514.179842588, 11176865.305123251, 11166227.066620924, 11155599.45371202, 11144982.455783576, 11134376.06223322, 11123780.262469169, 11113195.045910228, 11102620.40198576, 11092056.320135694, 11081502.789810503, 11070959.800471198, 11060427.341589313, 11049905.402646903, 11039393.97313653, 11028893.04256124, 11018402.600434579, 11007922.636280552, 10997453.139633637, 10986994.100038765, 10976545.507051304, 10966107.350237057, 10955679.619172253, 10945262.303443521, 10934855.392647909, 10924458.876392841, 10914072.744296119, 10903696.98598593, 10893331.591100805, 10882976.549289636, 10872631.850211648, 10862297.483536394, 10851973.438943746, 10841659.706123885, 10831356.274777286, 10821063.13461472, 10810780.275357224, 10800507.686736107, 10790245.35849294, 10779993.280379526, 10769751.44215792, 10759519.833600394, 10749298.444489434, 10739087.264617734, 10728886.283788187, 10718695.491813866, 10708514.878518023, 10698344.433734067, 10688184