In [None]:
import numpy as np
import pandas as pd
import yfinance as yf
from typing import List, Callable
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
x = 5

# DATA DOWNLOADING AND PROCESSING

In [None]:
# Extract Meta data from yfinance
GetFacebookInformation = yf.Ticker("META") # Create a ticker
pd.DataFrame(GetFacebookInformation.info).head() # Retrieve general information about the Meta stock
GetFacebookInformation.history(period='5y').info() # Retrieve historical stock data for Meta for the past 5 years

In [None]:
# Let us get historical stock prices for Facebook 
data =  GetFacebookInformation.history(period="5y")
# selecting independent feature's data
vectors = data[['Open', 'Close', 'High', 'Low']].values.tolist() # Converts data to a list
vectors.pop() # Remove the last element
for vector in vectors: # Add an intercept term (bias) of 1 to each vector
    vector.append(1)
# vectors now that column that correspond to intercept 
vectors[0]

In [None]:
# selecting dependent feature's data
y_vectors = data[['Open', 'Close', 'High', 'Low']].values.tolist() # Converts data to a list
y_values = [sum(y_vector) / 4 for y_vector in y_vectors] # Calculate average values for each row
y_values.pop(0) # Remove the last element
len(y_values) # Check the length of y_values

In [None]:
# Split data into train and test sets
split_index = int(0.8 * len(vectors)) # 80% of data for training, 20% for testing
train_vectors, test_vectors = vectors[:split_index], vectors[split_index:] # Train value contain 80% of vectors and y_values, test is the 20% remaining
train_y_values, test_y_values = y_values[:split_index], y_values[split_index:]

# BASIC FUNCTIONS FOR ALGEBRA CALCULATION

In [None]:
from typing import List # define vector
Vector = List[float]

In [None]:
 #define scalar product
def dot(v, w) -> float:
    return sum(v_i * w_i for v_i, w_i in zip(v, w))

In [None]:
# our model's prediction function
def predict(x: Vector, beta: Vector) -> float: 
    return dot(x,beta)

In [None]:
# compute the element-wise sum for a list of vectors
# i.e: new_vector[i] = sum of all vector[i] for vector in vectors
def vector_sum(vectors: List[Vector]) -> Vector:    
    assert vectors, "No vectors provided!" # Ensures that vectors is not empty
    num_elements = len(vectors[0]) # Number of elements in each vector
    assert all(len(v) == num_elements for v in vectors), "different sizes!" # Checks that all vectors in vectors have the same length
    return [sum(vector[i] for vector in vectors) for i in range(num_elements)]
    

In [None]:
# creates a new list
def scalar_multiply(c: float, v:Vector) ->Vector: 
    return [c *v_i for v_i in v]

In [None]:
# compute the element-wise mean for a list of vectors
def vector_mean(vectors: List[Vector]) -> Vector: 
    n = len(vectors)
    return scalar_multiply(1/n, vector_sum(vectors))

# FUNCTION FOR CALCULATING GRADIENT DESCENT

In [None]:
#error calculating function
def error(x: Vector, y:float, beta: Vector) ->float: 
    return predict(x,beta) -y

In [None]:
# square of error calculating function
def squared_error(x:Vector, y: float, beta: Vector) -> float: 
    return error(x,y,beta) **2

In [None]:
# gradient of each x vector in SSE function
def sqerror_gradient(x: Vector, y: float, beta:Vector) -> Vector:
    err = error(x,y,beta)
    return [2*err*x_i for x_i in x]

### CALCULATING GRADIENT STEP

In [None]:
# return a new vector that has move in the negative gradient direction by a 'step_size' amount
def gradient_step(v: Vector, gradient: Vector, step_size: float) -> Vector:
    return [v_i - step_size * gradient_i for v_i, gradient_i in zip(v, gradient)]


In [None]:
# return a optimize weight vector correspond to features
def least_squares_fit(xs: List[Vector], ys: List[float], learning_rate: float = 0.00000001, num_steps: int = 10000, batch_size: int = 1) -> Vector:
    guess = [0.0] * len(xs[0]) # Initialize the initial guess for coefficients
    for _ in range(num_steps):
        for start in range(0, len(xs), batch_size):
            batch_xs = xs[start:start + batch_size] # Select a batch of input vectors
            batch_ys = ys[start:start + batch_size] # Select corresponding batch of output values
            gradient = vector_mean([sqerror_gradient(x, y, guess) for x, y in zip(batch_xs, batch_ys)]) # Computes the mean gradient
            guess = gradient_step(guess, gradient, learning_rate) # Update the coefficients
    return guess



In [None]:
learning_rate = 0.00000001
beta = least_squares_fit(train_vectors, train_y_values, learning_rate, 30000, 1)
print("Nghiệm tìm được bằng least_squares_fit:", beta)

In [None]:
model = LinearRegression() # Create a LinearRegression model instance
new_vectors = train_vectors 
for vector in new_vectors:
    vector.pop() # Remove the last element
model.fit(new_vectors, train_y_values) # Fit the model
coef = model.coef_.tolist() # Coefficients converted to list
coef.append(model.intercept_) # Append the intercept to coefficients list
print("Hệ số của các biến:", coef)


# Đây là sử dụng model của sklearn để predict

In [None]:
test = data[['Open', 'Close', 'High', 'Low']].values.tolist() # convert to list
last_day_data = test[-1] # Select the last day's data
last_day_data.append(1) # Append the intercept term
y = predict(last_day_data, beta) # Predict the stock price
print("Giá cổ phiếu dự đoán ngày tiếp theo (bằng least square fit): ", y)


# Evaluation Matrix

In [None]:
# Predictions using custom least squares fit
train_predictions_custom = [predict(x, beta) for x in train_vectors]
test_predictions_custom = [predict(x, beta) for x in test_vectors]

In [None]:
# Train and predict with sklearn's LinearRegression
model = LinearRegression() # create a model
model.fit(train_vectors, train_y_values) # Fit the model
for test in test_vectors:
    test.pop() # Remove the last element
train_predictions_sklearn = model.predict(new_vectors) # Predictions on training data
test_predictions_sklearn = model.predict(test_vectors) # Predictions on test data

In [None]:
# Create correlation matrix
corr_matrix = data[['Open', 'Close', 'High', 'Low','Volume']].corr()
fig, ax = plt.subplots(figsize=(10, 6)) # Set up the figure and axis for plotting
sns.heatmap(corr_matrix, annot=True, fmt=".4f", cmap='coolwarm', ax=ax, cbar=False, annot_kws={"size": 10}) # Create heatmap using seaborn

# Set table config
ax.set_title('Relationship between the independent parameters', fontsize=14)
ax.set_xticklabels(corr_matrix.columns, rotation=0, fontsize=10) # Set label for x axis
ax.set_yticklabels(corr_matrix.columns, rotation=0, fontsize=10) # Set label for y axis

# Show table
plt.show()

In [None]:
# Save as png file
fig.savefig('relationship_table.png', dpi=300, bbox_inches='tight')

In [None]:
# Calculating evaluation metrics
def adjusted_r2(r2, n, p):
    """ calculate the proportion of y values explain only by relevant independent variables"""
    return 1 - (1 - r2) * ((n - 1) / (n - p - 1))

# Evalute metrics dictionary
metrics = {
    "Model": [],
    "R-Squared": [],
    "Adjusted R-Squared": [],
    "Multiple R": [],
    "Standard Error": [],
    "MSE": []
} 

# Evaluate model function
def evaluate_model(y_true, y_pred, model_name):
    r2 = r2_score(y_true, y_pred) # measures the proportion
    adj_r2 = adjusted_r2(r2, len(y_true), len(train_vectors[0])) # calculated using the adjusted_r2
    mse = mean_squared_error(y_true, y_pred) # measures the average squared difference between the predicted and actual values
    std_error = np.sqrt(mse) # Standard error of the model predictions, calculated as the square root of MSE
    metrics["Model"].append(model_name)
    metrics["R-Squared"].append(r2)
    metrics["Adjusted R-Squared"].append(adj_r2)
    metrics["Multiple R"].append(np.sqrt(r2))
    metrics["Standard Error"].append(std_error)
    metrics["MSE"].append(mse)

# Evaluate model
evaluate_model(test_y_values, test_predictions_custom, "Custom Least Squares")
evaluate_model(test_y_values, test_predictions_sklearn, "Sklearn LinearRegression")

# Display evaluation metrics
evaluation_df = pd.DataFrame(metrics)
print(evaluation_df)

# Save the model
import pickle

with open('linear_regression_model.pkl', 'wb') as file:
    pickle.dump(model, file)

with open('custom_least_squares_model.pkl', 'wb') as file:
    pickle.dump(beta, file)


In [None]:
# Vẽ biểu đồ scatter plot
plt.figure(figsize=(14, 7))

# Biểu đồ scatter plot cho tập train
plt.subplot(1, 2, 1) # Creating a subplot
plt.scatter(train_y_values, train_predictions_custom, label="Custom Model Predictions - Train", alpha=1,color='blue') # Plot a scatter plot of custom model
plt.scatter(train_y_values, train_predictions_sklearn, label="Sklearn Model Predictions - Train", alpha=0.45,color='yellow') # Plot a scatter plot of Sklearn
plt.plot([min(train_y_values), max(train_y_values)], [min(train_y_values), max(train_y_values)], color='black') # Plots a diagonal line
plt.xlabel('Actual Prices - Train') # Sets the x-axis label
plt.ylabel('Predicted Prices - Train') # Sets the x-axis label
plt.title('Training Data') # Set the title
plt.legend() # Displays a legend

# Biểu đồ scatter plot cho tập test
plt.subplot(1, 2, 2) # Creating a subplot
plt.scatter(test_y_values, test_predictions_custom, label="Custom Model Predictions - Test", alpha=1,color='blue') # Plot a scatter plot of custom model
plt.scatter(test_y_values, test_predictions_sklearn, label="Sklearn Model Predictions - Test", alpha=0.45, color='yellow') # Plot a scatter plot of Sklearn
plt.plot([min(test_y_values), max(test_y_values)], [min(test_y_values), max(test_y_values)], color='black') # Plots a diagonal line
plt.xlabel('Actual Prices - Test') # Sets the x-axis label
plt.ylabel('Predicted Prices - Test') # Sets the x-axis label
plt.title('Testing Data') # Set the title
plt.legend() # Displays a legend
plt.savefig('model evaluation.png',dpi=100, bbox_inches='tight') # Save the figure
plt.show() # Display the plot
