In [None]:
import requests
import datetime
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import math
import random
import time
import cvxpy as cp
import sys

In [None]:
def single_price(symbol, comparison_symbol):
    """
        Gets the single price of decentralized value and converts to any centralized value
        fysm: Blockchain Type
        tsyms: Centralized Coin
    """
    api_key = "insert-your-api-key"
    url = "https://min-api.cryptocompare.com/data/price"
    payload = {"fsym": symbol.upper(), "tsyms": comparison_symbol.upper()}
    headers = {"authorization": "Apikey " + api_key}
    result = requests.get(url, headers = headers, params = payload).json()
    print(result)

In [None]:
def hour_prices(symbol, comparison_symbol, limit, aggregate = 1):
    """
        Gets the prices of decentralized value occured with an hour break and converts to any centralized value
        fysm: Blockchain Type
        tsyms: Centralized Coin
        limit: Number of Data
        aggregate: Grouping the Same Data
    """
    api_key = "insert-your-api-key"
    url = "https://min-api.cryptocompare.com/data/histohour"
    payload = {"fsym": symbol.upper(), "tsym": comparison_symbol.upper(), "limit": limit, "aggregate": aggregate}
    headers = {"authorization": "Apikey " + api_key}
    result = requests.get(url, headers = headers, params = payload).json()
    df = pd.DataFrame(result['Data'])
    if 'time' in df:
        df['timestamp'] = [datetime.datetime.fromtimestamp(d) for d in df.time]
    return df

In [None]:
def minute_prices(symbol, comparison_symbol, limit, aggregate = 1):
    """
        Gets the prices of decentralized value occured with a minute break and converts to any centralized value
        fysm: Blockchain Type
        tsyms: Centralized Coin
        limit: Number of Data
        aggregate: Grouping the Same Data
    """    
    api_key = "insert-your-api-key"
    url = "https://min-api.cryptocompare.com/data/histominute"
    payload = {"fsym": symbol.upper(), "tsym": comparison_symbol.upper(), "limit": limit, "aggregate": aggregate}
    headers = {"authorization": "Apikey " + api_key}
    result = requests.get(url, headers = headers, params = payload).json()
    df = pd.DataFrame(result['Data'])
    if 'time' in df:
        df['timestamp'] = [datetime.datetime.fromtimestamp(d) for d in df.time]
    return df

In [None]:
def draw_graph(df_coin, time_coin, symbol, comparison_symbol):
    """
        Draw the graph of the distribution of coin data
    """ 
    plt.plot(df_coin.timestamp, df_coin.close)
    plt.title('OHLCV: ' + time_coin, fontsize = 24)
    plt.ylabel('Price: ' + symbol.upper() + ' to ' + comparison_symbol.upper(), fontsize = 14)
    plt.xlabel('Timestamp as '+ time_coin, fontsize = 14)
    plt.xticks(rotation = 45)
    plt.show()

In [None]:
def draw_hyperparameter(k_hyperparameter, error_rate, classification = False):
    """
        Draw the graph of the distribution of hyperparamters and error rates
    """ 
    plt.plot(k_hyperparameter, error_rate)
    if classification == False:
        plt.title('Hyperparameters and Error Rates', fontsize = 24)
    else:
        plt.title('Hyperparameters and Maximum Accuracy', fontsize = 24)
    plt.ylabel('Error Rate')
    plt.xlabel('Hyperparameter')
    plt.xticks(rotation = 45)
    plt.show()

In [None]:
def hoeffding(error_rate, confidence_rate):
    """
        To calculate the minimum number of data needed
    """
    result = math.log(2 / (1 - confidence_rate), 2) / (2 * error_rate ** 2)
    return int(result + 1)

In [None]:
class LinearRegression:
    """
        This class is created to implement the linear regression from scratch.
    """
    def __init__(self, train_x, train_y, valid_x, valid_y, alpha = 0, regularization_type = ""):
        self.train_x = train_x
        self.train_y = train_y
        self.valid_x = valid_x
        self.valid_y = valid_y
        self.alpha = alpha # Tuning parameter
        self.regularization_type = regularization_type
    def least_squares(self, train_x, train_y):
        """
            Use Sum of Least Squares without any Regularization Technique
            Find the beta_ls using the vectorized version of predictor and the response variable 
            and return the parameter vector.
        """
        beta_least_squares = np.zeros((1, 5))
        # Vanilla regression to check if it needs a regularization
        if np.linalg.det(np.matmul(train_x.transpose(), train_x)) != 0:
            beta_least_squares = np.matmul(np.matmul(np.linalg.inv(np.matmul(train_x.transpose(), train_x)), train_x.transpose()),train_y)
        return beta_least_squares
    def ridge_regression(self, train_x, train_y, alpha):
        """
            Standardize the predictors and center the response
            Find beta_ridge as the formula suggests and return it
        """
        mean_x = np.mean(train_x)
        mean_y = np.mean(train_y)
        std_x = np.std(train_x)
        x_standard = (train_x - mean_x) / std_x
        y_centered = train_y - mean_y
        x_standard_transpose = x_standard.transpose()
        beta_ridge_regression = np.matmul(np.matmul((np.linalg.inv(np.matmul(x_standard_transpose, x_standard) + alpha)), x_standard_transpose), y_centered)
        return beta_ridge_regression
    def lasso(self, train_x, train_y, alpha):
        """
            Standardize the predictors and center the response
            Find beta_lasso using quadratic programming package and return it.
        """
        mean_x = np.mean(train_x)
        mean_y = np.mean(train_y)
        std_x = np.std(train_x)
        x_standard = (train_x - mean_x) / std_x
        y_centered = train_y - mean_y
        x_standard_transpose = x_standard.transpose()
        y_centered_transpose = y_centered.transpose()
        gamma = cp.Parameter(nonneg=True)
        # Construct the problem.
        beta_lasso = cp.Variable((train_x.shape[1], 1))
        xbeta = (x_standard *  beta_lasso)
        error = cp.sum_squares(xbeta - np.array(y_centered).reshape(1800, 1))
        obj = cp.Minimize(error + gamma * cp.norm(beta_lasso, 1))
        prob = cp.Problem(obj)
        gamma.value = alpha
        prob.solve()
        return beta_lasso.value
    def fit_ls(self, train_x, train_y):
        """
            Fits to the training set using Ordinary Least Squares Parameter Vector and returns the error.
        """
        if self.alpha != 0:
            pass
        else:
            prediction = self.predict(train_x, train_y, 0, "")
            error_rmse = math.sqrt(prediction[1] / train_x.shape[0])
        return error_rmse 
    def cross_validation(self, valid_x, valid_y, alpha = 0, regularization_type = ""):
        """
            Find the best hyperparameter using 10-fold cross validation method.
            Return the tuning parameter to use in the unseen data and 
            the error to make the analysis of the performance of model.
        """
        if alpha == 0: # Use vanilla regression
            pass
        else:
            if regularization_type == "ridge_regression":
                prediction = self.predict(valid_x, valid_y, alpha, "ridge_regression")
            else:
                prediction = self.predict(valid_x, valid_y, alpha, "lasso")
        error_rmse = math.sqrt(prediction[1] / valid_x.shape[0])
        return error_rmse   
    def predict(self, test_x, test_y, alpha, regularization_type = ""):
        """
            Predict ups and downs using OHLCV of Blockchains
            Find the linear relationship between OHLCV features and result
            Estimate the result
            Check if it is increased or decreased, return it
        """
        result = []
        error_rate = 0
        # Standardize the predictors and center the response in the case of using a regularization
        mean_x = np.mean(self.train_x)
        mean_y = np.mean(self.train_y)
        std_x = np.std(self.train_x)
        test_x_standard = (test_x - mean_x) / std_x
        y_centered = self.train_y - mean_y
        if alpha == 0: # Use vanilla regression
            beta = self.least_squares(self.train_x, self.train_y)
            if np.count_nonzero(beta): # Check if it has lots of infinite solutions
                y_predicted = np.matmul(test_x, beta)
            else:
                raise Exception("The parameter beta has infinite solutions !")
        else:
            if regularization_type == "ridge_regression":
                beta = self.ridge_regression(self.train_x, self.train_y, alpha)
            else:
                beta = self.lasso(self.train_x, self.train_y, alpha)
            y_predicted = np.matmul(test_x_standard, beta) + mean_y # Find the estimator of y, recover the data
        error_rate += np.sum((y_predicted - test_y) ** 2)
        prediction = math.sqrt(error_rate / valid_x.shape[0])
        return y_predicted, prediction

In [None]:
class kNN:
    """
        This class is created to implement the slightly different version of kNN because of the higher dependency of datapoints.
    """
    def __init__(self, train_x, valid_x, train_y, valid_y, k_hyperparameter = 5, weighted = 0, distance_measure = 0):
        self.train_x = train_x
        self.k_hyperparameter = k_hyperparameter
        self.valid_x = valid_x
        self.weighted = weighted
        self.distance_measure = distance_measure
        self.train_y = train_y
        self.valid_y = valid_y
    def predict_kNN(self, train_x, k_hyperparameter, weighted = 0, distance_measure = 0):
        """
            We have three distance measure and two weight techniques to calculate the optimal result in test data.
            Details are given in the report.
        """
        prob_ups = 0
        prob_downs = 0
        if distance_measure == 0:
            """
                The model assumes all adjacent points are dependent.
                It calculates up and down measurement in terms of adjacency and predicts according to the measurement.
            """
            if not weighted:
                """
                    Assumes the distance between points does not affect the result.
                    Namely, all points are independent of each other.
                """
                for i in range(self.train_x.shape[0] - k_hyperparameter - 2, self.train_x.shape[0] - 2):
                    if self.train_x[i + 1, 2] - self.train_x[i, 2] > 0:
                        prob_ups += 1
                    else:
                        prob_downs += 1
            else:
                """
                    Assumes that the weight has inversely proportional to the distance.
                """
                for i in range(self.train_x.shape[0] - k_hyperparameter - 2, self.train_x.shape[0] - 2):
                    result = i * math.log(k, 2)
                    if self.train_x[i + 1, 2] - self.train_x[i, 2] > 0:
                        prob_ups += result
                    else:
                        prob_downs += result
        elif distance_measure == 1:
            """
                The model assumes all points are dependent with the chosen point but independent from each other.
                It calculates up and down measurement in terms of difference between chosen point and other points
                and predicts according to the measurement.
            """
            if not weighted:
                for i in range(self.train_x.shape[0] - k_hyperparameter - 2, self.train_x.shape[0] - 2):
                    if self.train_x[i + 1, 2] - self.train_x[i, 2] > 0:
                        prob_ups += 1
                    else:
                        prob_downs += 1
            else:
                for i in range(self.train_x.shape[0] - k_hyperparameter - 2, self.train_x.shape[0] - 2):
                    result = i * math.log(k, 2)
                    if self.train_x[i + 1, 2] - self.train_x[i, 2] > 0:
                        prob_ups += result
                    else:
                        prob_downs += result
        else:
            """
                The model assumes that all points with the same values are dependent of each other.
                This comes from the famous phrase "All that has happened before will happen again."
            """
            measure_points = []
            k_measure = 0
            for i in range(self.train_x.shape[0] - 1, -1, -1):
                if self.train_x[i, 2] == self.train_x[self.train_x.shape[0] - 1, 2]:
                    measure_points.append(i)
                    k_measure += 1
                if k_measure >= k_hyperparameter:
                    break
            if not weighted:
                for i in range(len(measure_points)):
                    if self.train_x[measure_points[i] + 1, 2] - self.train_x[measure_points[i], 2] > 0:
                        prob_ups += 1
                    else:
                        prob_downs += 1
            else:
                for i in range(len(measure_points)):
                    result = i * math.log(k, 2)
                    if self.train_x[i + 1, 2] - self.train_x[i, 2] > 0:
                        prob_ups += result
                    else:
                        prob_downs += result
        distribution = [prob_ups / (prob_ups + prob_downs), prob_downs / (prob_ups + prob_downs)]
        return distribution
    def cross_validation(self):
        """
            Find the validation error using cross validation.
            Classify the results and create a confusion matrix.
            Calculate precision, recall and accuracy using the confusion matrix.
        """
        predicted_actual = np.zeros((2, 2), dtype = int)
        for i in range(self.valid_x.shape[0] - 2, -1, -1):
            pred = self.predict_kNN(self.valid_x, self.k_hyperparameter, self.weighted, self.distance_measure)
            rando = np.random.choice(np.arange(0, 2), p=pred) 
            if rando:
                if self.valid_x[i + 1, 2] - self.valid_x[i, 2]:
                    predicted_actual[1, 1] += 1
                else:
                    predicted_actual[1, 0] += 1
            else:
                if self.valid_x[i + 1, 2] - self.valid_x[i, 2]:
                    predicted_actual[0, 1] += 1
                else:
                    predicted_actual[0, 0] += 1
        precision = predicted_actual[1, 1] / (predicted_actual[1, 1] + predicted_actual[1, 0]) * 100
        recall = predicted_actual[1, 1] / (predicted_actual[1, 1] + predicted_actual[0, 1]) * 100
        accuracy = (predicted_actual[1, 1] + predicted_actual[0, 0]) / np.sum(predicted_actual) * 100
        parameters = [precision, recall, accuracy]
        return predicted_actual, parameters
    def predict_test(self, test_x, k_hyperparameter):
        """
            Predict the unseen data.
            Classify the unseen data and compare with the actual data.
        """
        predict_x = []
        predict = 0
        for i in range(self.train_x.shape[0]):
            predict_x.append([(self.train_x[i, 2] - test_x[0, 2]), i])
        predict_x.sort(key=lambda x:x[0])
        least = 9999
        index = 0
        for i in range(len(predict_x)):
            if abs(predict_x[i][0]) < least:
                index = i
                least = abs(predict_x[i][0])
        ups = 0
        downs = 0
        low, high = 0, 0
        if index + k_hyperparameter // 2 > self.train_x.shape[0]:
            low = self.train_x.shape[0] - k_hyperparameter
            high = self.train_x.shape[0]
        elif index - k_hyperparameter // 2 < 0:
            low = 0
            high = k_hyperparameter
        else:
            low = index - k_hyperparameter // 2
            high = index + k_hyperparameter // 2
        for i in range(low, high):
            if self.train_y[i] - self.train_x[i, 2] > 0:
                ups += 1
            else:
                downs += 1
        if ups > downs:
            predict = 1
        else:
            predict = 0
        return predict             

In [None]:
class LogisticRegression:
    """
        This class is created to implement the logistic regression from scratch.
    """
    def __init__(self, data_x, data_y, alpha = 0, regularization_type = "LS"):
        self.data_x = data_x
        self.data_y = data_y
        self.alpha = 0
        self.learning_rate = 0
        self.theta = None
        self.train_x = None
        self.train_y = None
        self.valid_x = None
        self.valid_y = None
        self.regularization_type = regularization_type
    def create_label(self, train_x, train_y):
        """
            Creates the binary classification label of the training and the validation set.
            Return the label set.
        """
        class_y = train_y - train_x[:, 3]
        class_y[class_y > 0] = 1
        class_y[class_y <= 0] = 0
        return class_y
    def cost_function_derivative(self, train_x, theta):
        """
            The implementation of sigmoid function as a cost function on the software.
            Return the cost matrix.
        """
        cost_matrix = np.zeros((train_x.shape[0], 1))
        for i in range(0, train_x.shape[0]):
            cost_matrix[i] = 1 / (1 + np.exp(- 1 * np.matmul(train_x[i], theta.transpose())))
        return cost_matrix
    def gradient_descent(self, train_x, train_y, alpha = 0, learning_rate = 0.1):
        """
            Find the parameter vector using the gradient descent.
            Return the cost and the parameter vector.
        """
        theta = np.random.rand(1, train_x.shape[1])
        theta = theta - np.mean(theta)
        temp = np.zeros((1, train_x.shape[1]))
        self.mean_x = np.mean(train_x)
        self.std_x = np.std(train_x)
        standard_x = (train_x - np.mean(train_x)) / np.std(train_x)
        center_y = self.create_label(train_x, train_y)
        cost = 0
        if self.regularization_type == "LS" or self.regularization_type == "RR":  
            temp[0, 0] = theta[0, 0] * (1 - learning_rate * alpha / train_x.shape[0]) - learning_rate * np.sum(self.cost_function_derivative(standard_x, theta) - center_y) / train_x.shape[0]
            for i in range(1, train_x.shape[1]):
                temp[0, i] = theta[0, i] * (1 - learning_rate * alpha / train_x.shape[0]) - learning_rate * np.sum(np.matmul((self.cost_function_derivative(standard_x, theta) - center_y).transpose(), standard_x)) / train_x.shape[0]
            while True:
                temp[0, 0] = theta[0, 0] * (1 - learning_rate * alpha / train_x.shape[0]) - learning_rate * np.sum(self.cost_function_derivative(standard_x, theta) - center_y)  / train_x.shape[0]
                for i in range(1, train_x.shape[1]):
                    temp[0, i] = theta[0, i] * (1 - learning_rate * alpha / train_x.shape[0]) - learning_rate * np.sum(np.matmul((self.cost_function_derivative(standard_x, theta) - center_y).transpose(), standard_x)) / train_x.shape[0]
                if np.allclose(theta, temp, atol=1e-04) == True:
                    break
                else:
                    theta = temp
        elif self.regularization_type == "LR":
            temp[0, 0] = theta[0, 0] * (1 - learning_rate * alpha / train_x.shape[0]) - learning_rate * np.sum(self.cost_function_derivative(standard_x, theta) - center_y) / train_x.shape[0]
            for i in range(1, train_x.shape[1]):
                temp[0, i] = theta[0, i] * (1 - learning_rate * alpha / train_x.shape[0]) - learning_rate * np.sum(np.matmul((self.cost_function_derivative(standard_x, theta) - center_y).transpose(), standard_x)) / train_x.shape[0]
                if temp[0, i] > alpha / 2:
                    temp[0, i] -= alpha / 2
                elif temp[0, i] < alpha / 2:
                    temp[0, i] += alpha / 2
                else:
                    temp[0, i] = 0
            while True:
                temp[0, 0] = theta[0, 0] * (1 - learning_rate * alpha / train_x.shape[0]) - learning_rate * np.sum(self.cost_function_derivative(standard_x, theta) - center_y) / train_x.shape[0]
                for i in range(1, train_x.shape[1]):
                    temp[0, i] = theta[0, i] * (1 - learning_rate * alpha / train_x.shape[0]) - learning_rate * np.sum(np.matmul((self.cost_function_derivative(standard_x, theta) - center_y).transpose(), standard_x)) / train_x.shape[0]
                    if temp[0, i] > alpha / 2:
                        temp[0, i] -= alpha / 2
                    elif temp[0, i] < alpha / 2:
                        temp[0, i] += alpha / 2
                    else:
                        temp[0, i] = 0
                if np.allclose(theta, temp, atol=1e-04) == True:
                    break
                else:
                    theta = temp
        for i in range(standard_x.shape[0]):
            cost += -1 * center_y[i] * np.log(self.cost_function_derivative(theta, standard_x[i])) - (1 - center_y[i]) * np.log(1 - self.cost_function_derivative(theta, standard_x[i]))
        cost = cost / train_x.shape[0]
        return theta, cost
    def cross_validation(self):
        """
            Find the tuning hyperparameter using 10-fold cross validation.
            Return the tuning hyperparameter with the minimum error.
        """
        if self.regularization_type == "LS":
            pass
        else:
            min_error = 999999
            min_alpha = 0
            graph_alpha = []
            graph_error = []
            for alpha in range(1, 1000, 1):
                error = 0
                for i in range(10):
                    self.valid_x = self.data_x.loc[self.data_x.shape[0] * i // 10: self.data_x.shape[0] * (i + 1) // 10]
                    self.valid_y = self.data_y.loc[self.data_x.shape[0] * i // 10: self.data_x.shape[0] * (i + 1) // 10]
                    self.train_x = self.data_x.loc[[x for x in range(0, self.data_x.shape[0] * i // 10)] + [x for x in range(self.data_x.shape[0] * (i + 1) // 10, data_x.shape[0])]]
                    self.train_y = self.data_y.loc[[x for x in range(0, self.data_x.shape[0] * i // 10)] + [x for x in range(self.data_x.shape[0] * (i + 1) // 10, data_x.shape[0])]]
                    self.train_y = self.train_y.to_numpy() - self.train_x.to_numpy()[:, 3]
                    self.valid_y = self.valid_y.to_numpy() - self.valid_x.to_numpy()[:, 3]
                    self.train_x = self.train_x.to_numpy()
                    self.valid_x = self.valid_x.to_numpy()
                    theta = self.gradient_descent(self.train_x, self.train_y, alpha / 10, 0.3)[0]
                    valid_x_standard = (self.valid_x - np.mean(self.train_x)) / np.std(self.train_x)
                    y_predicted = np.matmul(valid_x_standard, theta.transpose()) + np.mean(self.train_y)
                    cv_error = 0
                    for i in range(self.valid_x.shape[0]):
                        cv_error += -1 * self.valid_y[i] * np.log(1e-15 + self.cost_function_derivative(theta, self.train_x[i])) - (1 - self.valid_y[i]) * np.log(1 + 1e-15 - self.cost_function_derivative(theta, self.train_x[i]))
                    error += cv_error[0] / valid_x.shape[0]
                graph_alpha.append(alpha)
                graph_error.append(error / 10)
                if error / 10 < min_error:
                    min_error = error / 10
                    min_alpha = alpha 
            print(min_error, min_alpha)
            draw_hyperparameter(graph_alpha, graph_error)
            self.alpha = min_alpha
            return min_error, min_alpha
    def find_min_lr(self):
            """
                Find the fastest learning rate for the software performance.
                Return the learning rate and the running time.
            """
            learning_rate = 0.05
            min_time = 999999999
            min_rate = learning_rate
            for i in range(1, 21):
                first_timer = time.perf_counter()
                gradient = self.gradient_descent(learning_rate * i, self.train_x, self.train_y, self.alpha)
                last_timer = time.perf_counter()
                if last_timer - first_timer < min_time:
                    min_time = last_timer - first_timer
                    min_rate = learning_rate * i
            self.learning_rate = min_rate
            return min_time, min_rate      
    def fit(self):
        """
            Fit the model into the training set.
            Return the parameter vector and the error.

        """
        if(self.regularization_type != "LS"):
            self.cross_validation()
            result = self.gradient_descent(self.train_x, self.train_y, self.alpha, self.learning_rate)
        else:
            result = self.gradient_descent(self.data_x, self.data_y, 0, self.learning_rate)
        self.theta, error = result[0], result[1]
        return self.theta, error
    def predict(self, test_x, test_y):
        """
            Predict the price increase-decrease of unseen data using the parameter vector of the training data
            Return the prediction as ups = 1 and downs = 0, the error and the odds.
        """
        test_x_standard = (test_x - np.mean(test_x)) / np.std(test_x)
        test_x_standard = test_x_standard.astype('float64')
        self.theta = self.theta.astype('float64') 
        cost = 1 / (1 + np.exp(np.matmul(test_x_standard, self.theta.transpose())))
        odds = [(1 - cost, cost)]
        decision_boundary = np.matmul(test_x_standard, self.theta.transpose())
        if decision_boundary >= 0:
            y_predicted = 1
        else:
            y_predicted = 0
        error = -1 * test_y * np.log(1e-15 + y_predicted) - (1 - test_y) * np.log(1 + 1e-15 - y_predicted)
        return y_predicted, error, odds

In [None]:
data_btc = minute_prices("BTC", "USD", 1999)

In [None]:
print(data_btc)

In [None]:
draw_graph(data_btc, "Minute", "BTC", "USD")

In [None]:
"""
    Split the dataset as the training and the test set
"""
train_x = data_btc.loc[:1799, 'high':'volumeto']
train_y = data_btc.loc[:1799, 'close']
valid_x = data_btc.loc[1800:, 'high':'volumeto']
print(valid_x)
valid_y = data_btc.loc[1800:, 'close']

In [None]:
"""
    Use the ordinary least squares method.
"""
error = 0
train_x = data_btc.loc[:data_btc.shape[0] - 1, 'high':'volumeto']
train_y = data_btc.loc[:data_btc.shape[0] - 1, 'close']
ols = LinearRegression(train_x.to_numpy(), train_y.to_numpy(), valid_x.to_numpy(), valid_y.to_numpy(), 0, "")
beta_ols = ols.least_squares(train_x.to_numpy(), train_y.to_numpy())
error = ols.fit_ls(train_x.to_numpy(), train_y.to_numpy())
print(beta_ols, error)
data_btc_test = minute_prices("BTC", "USD", 1)
test_x = data_btc_test.loc[:, 'high':'volumeto']
test_y = data_btc_test.loc[:, 'close']
test_ols = ols.predict(test_x.to_numpy(), test_y.to_numpy(), 0)
print(test_ols)

In [None]:
"""
    Use the ridge regression method.
"""
min_error = 99999999
min_k = 0
ridge = None
graph_k = []
graph_error = []
for k in range(1, 500):
    error = 0
    for i in range(10):
        valid_x = data_btc.loc[200 * i: 200 * (i + 1), 'high':'volumeto']
        valid_y = data_btc.loc[200 * i: 200 * (i + 1), 'close']
        train_x = data_btc.loc[[x for x in range(0, 200 * i)] + [x for x in range(200 * (i + 1), 2000)], 'high':'volumeto']
        train_y = data_btc.loc[[x for x in range(0, 200 * i)] + [x for x in range(200 * (i + 1), 2000)], 'close']
        ridge = LinearRegression(train_x.to_numpy(), train_y.to_numpy(), valid_x.to_numpy(), valid_y.to_numpy(), k, "ridge_regression")
        beta_ridge = ridge.ridge_regression(train_x.to_numpy(), train_y.to_numpy(), k)
        cv_ridge = ridge.cross_validation(valid_x.to_numpy(), valid_y.to_numpy(), k, "ridge_regression")
        error += cv_ridge
    graph_k.append(k)
    graph_error.append(error / 10)
    if error / 10 < min_error:
        min_error = error / 10
        min_k = k
print(min_error, min_k)
draw_hyperparameter(graph_k, graph_error)
data_btc_test = minute_prices("BTC", "USD", 1)
test_x = data_btc_test.loc[:, 'high':'volumeto']
test_y = data_btc_test.loc[:, 'close']
test_ridge = ridge.predict(test_x.to_numpy(), test_y.to_numpy(), min_k, "ridge_regression")
print(test_ridge)

In [None]:
"""
    Use the lasso method.
"""
min_error = 99999999
min_k = 0
graph_k = []
graph_error = []
for k in range(1, 500):
    error = 0
    for i in range(10):
        valid_x = data_btc.loc[200 * i: 200 * (i + 1), 'high':'volumeto']
        valid_y = data_btc.loc[200 * i: 200 * (i + 1), 'close']
        train_x = data_btc.loc[[x for x in range(0, 200 * i)] + [x for x in range(200 * (i + 1), 2000)], 'high':'volumeto']
        train_y = data_btc.loc[[x for x in range(0, 200 * i)] + [x for x in range(200 * (i + 1), 2000)], 'close']
        lasso = LinearRegression(train_x.to_numpy(), train_y.to_numpy(), valid_x.to_numpy(), valid_y.to_numpy(), k, "lasso")
        beta_lasso = lasso.lasso(train_x.to_numpy(), train_y.to_numpy(), k)
        cv_lasso = lasso.cross_validation(valid_x.to_numpy(), valid_y.to_numpy(), k, "lasso")
        error += cv_lasso
    graph_k.append(k)
    graph_error.append(error / 10)
    if error / 10 < min_error:
        min_error = error / 10
        min_k = k
print(min_error, min_k)
draw_hyperparameter(graph_k, graph_error)
data_btc_test = minute_prices("BTC", "USD", 1)
test_x = data_btc_test.loc[:, 'high':'volumeto']
test_y = data_btc_test.loc[:, 'close']
test_lasso = lasso.predict(test_x.to_numpy(), test_y.to_numpy(), min_k, "lasso")
print(test_lasso)

In [None]:
"""
    Use the slightly different version of kNN.
"""
max_k = 0
max_accuracy = -1
graph_k = []
graph_accuracy = []
for k in range(3, 100):
    accuracy = 0
    for i in range(10):
        valid_x = data_btc.loc[200 * i: 200 * (i + 1), 'high':'volumeto']
        valid_y = data_btc.loc[200 * i: 200 * (i + 1), 'close']
        train_x = data_btc.loc[[x for x in range(0, 200 * i)] + [x for x in range(200 * (i + 1), 2000)], 'high':'volumeto']
        train_y = data_btc.loc[[x for x in range(0, 200 * i)] + [x for x in range(200 * (i + 1), 2000)], 'close']
        knn = kNN(train_x.to_numpy(), valid_x.to_numpy(), train_y.to_numpy(), valid_y.to_numpy(), k, 1)
        cv_knn = knn.cross_validation()
        accuracy += cv_knn[1][2]
    graph_k.append(k)
    graph_accuracy.append(accuracy / 10)
    if accuracy / 10 > max_accuracy:
        max_accuracy = cv_knn[1][2]
        max_k = k
print(max_accuracy, max_k)
draw_hyperparameter(graph_k, graph_accuracy, True)
data_btc_test = minute_prices("BTC", "USD", 1)
test_x = data_btc_test.loc[:, 'high':'volumeto']
test_y = data_btc_test.loc[:, 'close']
test_knn = knn.predict_test(test_x.to_numpy(), max_k)
if test_y.to_numpy()[0] - test_x.to_numpy()[0, 2] > 0:
    actual = 1
else:
    actual = 0
print(test_knn == actual)

In [None]:
"""
    Use the logistic regression without using any regularization technique.
"""
logistic_ols = None
error = 0
data_x = (data_btc.loc[:, 'high':'volumeto']).to_numpy()
data_y = (data_btc.loc[:, 'close']).to_numpy()
ones = np.ones((data_x.shape[0], 1))
data_x = np.hstack((ones, data_x))
logistic_ols = LogisticRegression(data_x, data_y)
result = logistic_ols.fit()
cv_log = result[1]
error += cv_log
print(error, result[0])
data_btc_test = minute_prices("BTC", "USD", 1)
test_x = data_btc_test.loc[1, 'high':'volumeto']
test_y = data_btc_test.loc[1, 'close']
ones_test = np.ones((1, 1))
if test_y - test_x[2] > 0:
    test_y = 1
else:
    test_y = 0
test_x = test_x.to_numpy().transpose()
test_x = np.append(ones_test, test_x).transpose()
test_x = test_x.astype('float64') 
test_ols = logistic_ols.predict(test_x, test_y)
print(test_ols)

In [None]:
"""
    Use the logistic regression with using L2 regularization.
"""
logistic_ols = None
error = 0
data_x = (data_btc.loc[:, 'high':'volumeto'])
data_y = (data_btc.loc[:, 'close'])
ones = [1 for i in range(data_x.shape[0])]
data_x["bias"] = ones
data_x = data_x[["bias", "high", "low", "open", "volumefrom", "volumeto"]]
logistic_rr = LogisticRegression(data_x, data_y, 1, "RR")
result = logistic_rr.fit()
cv_log = result[1]
error += cv_log
print(error, result[0])
data_btc_test = minute_prices("BTC", "USD", 1)
test_x = data_btc_test.loc[1, 'high':'volumeto']
test_y = data_btc_test.loc[1, 'close']
ones_test = np.ones((1, 1))
if test_y - test_x[2] > 0:
    test_y = 1
else:
    test_y = 0
test_x = test_x.to_numpy().transpose()
test_x = np.append(ones_test, test_x).transpose()
test_x = test_x.astype('float64') 
test_ols = logistic_rr.predict(test_x, test_y)
print(test_ols)

In [None]:
"""
    Use the logistic regression with using L1 regularization.
"""
logistic_ols = None
error = 0
data_x = (data_btc.loc[:, 'high':'volumeto'])
data_y = (data_btc.loc[:, 'close'])
ones = [1 for i in range(data_x.shape[0])]
data_x["bias"] = ones
data_x = data_x[["bias", "high", "low", "open", "volumefrom", "volumeto"]]
logistic_lr = LogisticRegression(data_x, data_y, 1, "LR")
result = logistic_lr.fit()
cv_log = result[1]
error += cv_log
print(error, result[0])
data_btc_test = minute_prices("BTC", "USD", 1)
test_x = data_btc_test.loc[1, 'high':'volumeto']
test_y = data_btc_test.loc[1, 'close']
ones_test = np.ones((1, 1))
if test_y - test_x[2] > 0:
    test_y = 1
else:
    test_y = 0
test_x = test_x.to_numpy().transpose()
test_x = np.append(ones_test, test_x).transpose()
test_x = test_x.astype('float64') 
test_ols = logistic_lr.predict(test_x, test_y)
print(test_ols)