In [None]:
import matplotlib
import numpy as np
import pandas as pd
import seaborn as sns

from matplotlib import pyplot as plt
from pylab import rcParams
from sklearn import linear_model
from sklearn.preprocessing import PolynomialFeatures
from sklearn.metrics import mean_squared_error, r2_score, explained_variance_score, median_absolute_error
from sklearn.model_selection import cross_val_score

In [None]:
src_stock_path = 'indexProcessed.csv'
test_rate = 0.2                 # testing dataset proportion

In [None]:
df = pd.read_csv(src_stock_path, sep = ",")
# df = df.drop(df.columns[[0]], axis = 1)
display(df)

In [None]:
# Convert Date column to datetime
df.loc[:, 'Date'] = pd.to_datetime(df['Date'],format='%Y-%m-%d')

# Change all column headings to be lower case, and remove spacing
df.columns = [str(x).lower().replace(' ', '_') for x in df.columns]

# df.head(10)
display(df)

In [None]:
# read from indexinfo.csv
f = open("indexInfo.csv", 'r')
data_str = f.read()
data_lines = data_str.split('\n')
data_dict = {}
# print(data_lines)

for line in data_lines:
  if line == '': break
  fields = line.split(',')
  # print(fields)
  if 'Region' in fields[0]:
    continue
  key = fields[2]
  value = fields[-1]
  data_dict[key] = value

print(data_dict)

In [None]:
grouped = df.groupby(df['index'])
stock_label = []
individual_stock = []
for name, group in grouped:
    stock_label.append(name)
    individual_stock.append(group)
    
plt.title("Trading days")
plt.pie([len(i.index) for i in individual_stock], labels = [f'{name} : {data_dict[name]} : {len(individual_stock[index])}' for index, name in enumerate(stock_label)])

In [None]:
## del minimum dataset
for index, name in enumerate(stock_label):
    if name == 'J203.JO':
        stock_label.pop(index)
        individual_stock.pop(index)

In [None]:
fig = plt.figure(figsize = (20, 18))

nrows = 5
ncols = 3

for index, label in enumerate(stock_label):
  ax = fig.add_subplot(nrows, ncols, index + 1)
  stock_data = individual_stock[index]
  
  ax.plot(stock_data['date'], stock_data['adj_close'], label = label)
  ax.set_xlabel(label)
  ax.set_ylabel(data_dict[label])
plt.show()

In [None]:
sns.histplot(data=df, x="high", y="adj_close", palette='bright', kde=True)
# plt.title('Number of companies at each Revenues/Employees level')
# plt.xlabel('Revenues/Employees')
# plt.ylabel('Number of companies')
plt.show()

In [None]:
sns.histplot(data=df, x="volume", y="adj_close", palette='bright', kde=True)
# plt.title('Number of companies at each Revenues/Employees level')
# plt.xlabel('Revenues/Employees')
# plt.ylabel('Number of companies')
plt.show()

In [None]:
sns.histplot(data=df, x="close", y="adj_close", palette='bright', kde=True)
# plt.title('Number of companies at each Revenues/Employees level')
# plt.xlabel('Revenues/Employees')
# plt.ylabel('Number of companies')
plt.show()

## data process function

In [None]:
def wash_data(X, test_rate):
    # Calculate the number of data entries in the training and test sets
    num_test = int(test_rate * len(X))
    num_train = len(X) - num_test

    # Get the training and test sets
    X_train, Y_train = X[:num_train], X[:num_train].adj_close
    X_test, Y_test = X[num_train:], X[num_train:].adj_close

    # Get the date column data
    date_val = X_test['date'].shift(day).dropna() ## x value to draw

    # Keep only the high, low, close, volume columns
    X_train = X_train[['high', 'low', 'volume']]
    X_test = X_test[['high', 'low', 'volume']]

    # Shift the adj_close column in the training set by one position and fill the empty value with the mean
    Y_train = Y_train.fillna(Y_train.median()).shift(-day).dropna()

    # Shift the high, low, close, volume columns in the training set by one position and fill the empty value with the mean
    X_train = X_train.fillna(X_train.median()).shift(day).dropna()

    # Shift the high, low, close, volume columns in the test set by one position and fill the empty value with the mean
    X_test = X_test.shift(day).dropna()

    # Shift the adj_close column in the test set by one position and fill the empty value with the mean
    Y_test = Y_test.shift(-day).dropna()

    return X_train, Y_train, X_test, Y_test, date_val

## Regression Switch Fuction (linear, polynomial and Losso)

In [None]:
from warnings import simplefilter
from sklearn.exceptions import ConvergenceWarning

def regression_model(X, test_rate, mode, name, degree = 2, alpha = 1):
    simplefilter("ignore", category=ConvergenceWarning)

    X_train, Y_train, X_test, Y_test, date_val = wash_data(X, test_rate)

    if mode == 'linear':
        regression = linear_model.LinearRegression()
        regression.fit(X_train, Y_train)
        Y_pred = regression.predict(X_test)

    elif mode == 'polynomial':
        regression = PolynomialFeatures(degree = degree)
        x_poly = regression.fit_transform(X_train)
        model = linear_model.LinearRegression()
        model.fit(x_poly, Y_train)
        Y_pred = model.predict(regression.fit_transform(X_test))

    else: 
        regression = linear_model.Lasso(alpha = alpha)
        regression.fit(X_train, Y_train)
        Y_pred = regression.predict(X_test)
            
    mae = median_absolute_error(Y_test, Y_pred)

    res = []
    res.append(name)
    res.append(mean_squared_error(Y_test, Y_pred))
    res.append("{:.2%}".format(r2_score(Y_test, Y_pred)))
    # res.append(f'{r2_score(Y_test, Y_pred):.2f}%')
    res.append("{:.2%}".format(explained_variance_score(Y_test, Y_pred)))
    res.append(mae)

    ax = fig.add_subplot(nrows, ncols, index + 1)
    stock_data = individual_stock[index]
    ax.plot(date_val, Y_pred, color="red")
    ax.scatter(date_val, Y_test, color="blue", marker = '.')
    plt.xlabel(name)
    plt.ylabel(data_dict[name])
    
    print(res)
    return res

    # ax = plt.scatter(date_val[1: ], Y_test[: len(Y_test) - 1], color="blue", marker = '.')
    # bx = plt.plot(date_val[1: ], Y_pred[1: ], color="red")


## Main function

#### linear

In [None]:
model = 'linear' # linear, polynomial and lasso
fig = plt.figure(figsize = (20, 18))
index = 0
day = 1 # predict n days price
test_rate = 0.2

print(f'{model} regression to predict {day} days later adj_close price')
print('name    | mean squared error', ' |   r2_score   ', '| explained_variance_score', '| mae')
for index, label in enumerate(stock_label):
    regression_model(individual_stock[index], test_rate, model, label)

# plt.title(f'{model} regression to predict {day} days later stock adj_close price')
plt.show()

#### polynomial

In [None]:
model = 'polynomial' # linear, polynomial and lasso
fig = plt.figure(figsize = (20, 18))
index = 0
day = 1
test_rate = 0.2

print(f'{model} regression to predict {day} days later adj_close price')
print('name    | mean squared error', ' |   r2_score   ', '| explained_variance_score', '| mae')
for index, label in enumerate(stock_label):
    regression_model(individual_stock[index], test_rate, model, label, degree = (2, 3))

# plt.title(f'{model} regression to predict {day} days later stock adj_close price')
plt.show()

#### lasso

In [None]:
model = 'lasso' # linear, polynomial and lasso
fig = plt.figure(figsize = (20, 18))
index = 0
day = 1
test_rate = 0.2

print(f'{model} regression to predict {day} days later adj_close price')
print('name    | mean squared error', ' |   r2_score   ', '| explained_variance_score', '| mae')
for index, label in enumerate(stock_label):
    regression_model(individual_stock[index], test_rate, model, label, alpha = 5)

# plt.title(f'{model} regression to predict {day} days later stock adj_close price')
plt.show()