In [None]:
import numpy as np
import pandas as pd
from math import sqrt

In [None]:
class UnlimitedDataWorks:

    def __init__(self, deg):
        self.exp = []
        for i in range(deg+1):
            for j in range(deg+1):
                if i+j <= deg:
                    self.exp.append((i, j))

    def train_test_split(self, dataframe):
        normalize = lambda x: ((x - x.min()) / (x.max() - x.min()))
        self.data = pd.DataFrame([])
        self.count = -1
        for (a, b) in self.exp:
            self.count += 1
            res = ((dataframe["lat"] ** a) * (dataframe["lon"] ** b))
            self.data.insert(self.count, "col" + str(a) + str(b), res, True)

        self.count += 1 
        dataframe = normalize(dataframe)
        self.data = normalize(self.data)
        self.data["col00"] = [1.0]*len(self.data)
        
        # generate a 70-20-10 split on the data:
        X = self.data[:304113]
        Y = dataframe["alt"][:304113]
        xval = self.data[304113:391088]
        yval = dataframe["alt"][304113:391088]
        x = self.data[391088:]
        y = dataframe["alt"][391088:]   
        return (X, Y, xval, yval, x, y)

In [None]:
class RegressionModel:
    def __init__(self, N, X, Y, x, y, xval, yval):
        """
        X :: training data                  (304113 x 3)
        x :: testing data                   (43786 x 3)
        Y :: training target values         (304113 x 1)
        y :: testing target values          (43786 x 1)
        xval :: validation data             (86975 x 3)
        yval :: validation training data    (86975 X 1)
        """
        self.N = N
        self.X = np.array(X)
        self.Y = np.array(Y)
        self.x = np.array(x)
        self.y = np.array(y)
        self.xval = np.array(xval)
        self.yval = np.array(yval)

    def score(self, weights):
        """
        the following method helps us find the
        R2 (R-squared) error of a given training data
        wrt the generated weights
        """
        ss_tot = sum(np.square(np.mean(self.y) - self.y))
        ss_res = sum(np.square((self.x @ weights) - self.y))
        rmse = sqrt(ss_res/len(self.x))
        r2 = (1-(ss_res / ss_tot))
        return [r2*100, rmse]

    def gradient_descent(self):
        """
        train till error is almost constant
        """
        lr = 8e-7
        prev_err, count = 1e10, 0
        W = np.random.randn(self.N)
        while True:
            diff = ((self.X @ W) - self.Y)
            err = 0.5 * (diff @ diff)
            grad = (self.X.T @ diff)
            if count % 500 == 0:
                    print("epoch =", count, "| err_diff =", prev_err-err)
                    print("error = ", err, "||", W)
                    print("score =", self.score(W), end="\n\n")
            W -= lr * grad
            if abs(prev_err-err) <= 1e-4:
                break
            prev_err = err
            count += 1
        print(count, err)
        print(W, self.score(W), end="\n\n")

    def stocastic_gradient_descent(self, epochs):
        """
        train till error is almost constant
        """
        lr = 0.05
        W = np.random.randn(self.N)
        for count in range(epochs):
            diff = ((self.X @ W) - self.Y)
            err = 0.5 * (diff @ diff)
            W -= lr * (((self.X[count] @ W) - self.Y[count]) * self.X[count])
            if count % 500 == 0:
                print("epoch =", count)
                print("error =", err, "||", W)
                print("score =", self.score(W), end="\n\n")

    def gradient_descent_L1_reg(self):
        """
        attempts a L1 regularization on the data
        considering 10% of training data as validation data
        """
        W_fin = np.array([])
        lr, l1_fin = 5e-7, 0
        MVLE = 1e10
        L1_vals = [0.0, 0.05, 0.15, 0.25, 0.35, 0.45,
                   0.55, 0.65, 0.75, 0.85, 0.95, 1.0]
        sgn = lambda x: (x / abs(x))
        for l1 in L1_vals:
            prev_err, count = 1e10, 0
            W = np.random.randn(self.N)
            while True:
                diff = ((self.X @ W) - self.Y)
                err = 0.5 * ((diff @ diff) + l1*sum([abs(w) for w in W]))
                if count % 500 == 0:
                    print("L1 hyperparamter =", l1, end=", ")
                    print("epoch =", count, "| err_diff =", prev_err-err)
                    print("error = ", err, "||", W)
                    print("score =", self.score(W), end="\n\n")
                sgn_w = np.array([sgn(w) for w in W])
                W -= lr * ((self.X.T @ diff) + 0.5*l1*sgn_w)
                if abs(prev_err-err) <= 0.005:
                    break
                prev_err = err
                count += 1
            VLD = ((self.xval @ W) - self.yval)
            VLE = 0.5 * ((VLD.T @ VLD) + l1*sum([abs(w) for w in W]))
            if VLE < MVLE:
                W_fin = W
                l1_fin = l1
                MVLE = VLE
        print(MVLE, l1_fin, W_fin)

    def gradient_descent_L2_reg(self):
        """
        attempts a L2 regularization on the data
        considering 10% of training data as validation data
        """
        W_fin = np.array([])
        lr, l2_fin = 5e-7, 0
        MVLE = 1e10
        L2_vals = [0.0, 0.05, 0.15, 0.25, 0.35, 0.45,
                   0.55, 0.65, 0.75, 0.85, 0.95, 1.0]
        for l2 in L2_vals:
            prev_err, count = 1e10, 0
            W = np.random.randn(self.N)
            while True:
                diff = ((self.X @ W) - self.Y)
                err = 0.5 * ((diff @ diff) + l2*sum([w*w for w in W]))
                if count % 500 == 0:
                    print("L2 hyperparamter =", l2, end=", ")
                    print("epoch =", count, "| err_diff =", prev_err-err)
                    print("error = ", err, "||", W)
                    print("score =", self.score(W), end="\n\n")
                W -= lr * ((self.X.T @ diff) + l2*W)
                if abs(prev_err-err) <= 0.005:
                    break
                prev_err = err
                count += 1
            VLD = ((self.xval @ W) - self.yval)
            VLE = 0.5 * ((VLD.T @ VLD) + l2 * (W.T @ W))
            if VLE < MVLE:
                W_fin = W
                l2_fin = l2
                MVLE = VLE
        print(MVLE, l2_fin, W_fin)

    def fit(self):
        """
        solves for optimal weights using system of
        N linear equations; AW = B, hence, W = inv(A)*B
        """
        B = self.X.T @ self.Y
        A = self.X.T @ self.X
        W = (np.linalg.inv(A)) @ B
        print(W, self.score(W))

In [None]:
columns = ["junk", "lat", "lon", "alt"]
raw_df = pd.read_csv("3D_spatial_network.txt", sep=',', header=None,
                     names=columns).drop("junk", 1).sample(frac=1)

pre_processor = UnlimitedDataWorks(deg=1)
X_train, Y_train, x_val, y_val, x_test, y_test = pre_processor.train_test_split(raw_df)

model = RegressionModel(N=pre_processor.count,
                        X=X_train,
                        Y=Y_train,
                        x=x_test,
                        y=y_test,
                        xval=x_val,
                        yval=y_val)

model.fit()
# model.gradient_descent()
model.stocastic_gradient_descent(50250)
# model.gradient_descent_L1_reg()
# model.gradient_descent_L2_reg()